You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by ma...@apache.org on 2021/04/26 20:55:00 UTC

[airavata-django-portal-sdk] 02/07: AIRAVATA-3420 Implements DjangoFileSystemProvider, refactored user_storage module to use UserStorageProvider

This is an automated email from the ASF dual-hosted git repository.

machristie pushed a commit to branch mft-integration
in repository https://gitbox.apache.org/repos/asf/airavata-django-portal-sdk.git

commit 58ff63c4acff0034a70558005601d83888a52714
Author: Marcus Christie <ma...@apache.org>
AuthorDate: Wed Apr 7 18:17:16 2021 -0400

    AIRAVATA-3420 Implements DjangoFileSystemProvider, refactored user_storage module to use UserStorageProvider
---
 airavata_django_portal_sdk/user_storage.py         | 1196 --------------------
 .../user_storage/__init__.py                       |   51 +
 airavata_django_portal_sdk/user_storage/api.py     |  758 +++++++++++++
 .../user_storage/backends/__init__.py              |    3 +
 .../user_storage/backends/base.py                  |   64 ++
 .../backends/django_filesystem_provider.py         |  306 +++++
 .../user_storage/backends/mft_provider.py          |  152 +++
 .../user_storage/backends/remote_api_provider.py   |    0
 .../user_storage_provider.py                       |    3 +-
 9 files changed, 1336 insertions(+), 1197 deletions(-)

diff --git a/airavata_django_portal_sdk/user_storage.py b/airavata_django_portal_sdk/user_storage.py
deleted file mode 100644
index 5fb07c5..0000000
--- a/airavata_django_portal_sdk/user_storage.py
+++ /dev/null
@@ -1,1196 +0,0 @@
-import cgi
-import copy
-import io
-import logging
-import mimetypes
-import os
-import shutil
-import warnings
-from datetime import datetime
-from http import HTTPStatus
-from urllib.parse import quote, unquote, urlparse
-
-import grpc
-import requests
-from airavata.model.data.replica.ttypes import (
-    DataProductModel,
-    DataProductType,
-    DataReplicaLocationModel,
-    ReplicaLocationCategory,
-    ReplicaPersistentType
-)
-from django.conf import settings
-from django.core.exceptions import ObjectDoesNotExist, SuspiciousFileOperation
-from django.core.files import File
-from django.core.files.move import file_move_safe
-from django.core.files.storage import FileSystemStorage
-
-from . import MFTApi_pb2, MFTApi_pb2_grpc
-from .util import convert_iso8601_to_datetime
-
-logger = logging.getLogger(__name__)
-
-TMP_INPUT_FILE_UPLOAD_DIR = "tmp"
-
-
-def save(request, path, file, name=None, content_type=None):
-    "Save file in path in the user's storage and return DataProduct."
-    if _is_remote_api():
-        if name is None and hasattr(file, 'name'):
-            name = os.path.basename(file.name)
-        files = {'file': (name, file, content_type)
-                 if content_type is not None else file, }
-        resp = _call_remote_api(request,
-                                "/user-storage/~/{path}",
-                                path_params={"path": path},
-                                method="post",
-                                files=files)
-        data = resp.json()
-        product_uri = data['uploaded']['productUri']
-        data_product = request.airavata_client.getDataProduct(
-            request.authz_token, product_uri)
-        return data_product
-    else:
-        username = request.user.username
-        full_path = _Datastore().save(username, path, file, name=name)
-        data_product = _save_data_product(
-            request, full_path, name=name, content_type=content_type
-        )
-        return data_product
-
-
-def move_from_filepath(
-        request,
-        source_path,
-        target_path,
-        name=None,
-        content_type=None):
-    "Move a file from filesystem into user's storage."
-    # TODO: deprecate this method
-    username = request.user.username
-    file_name = name if name is not None else os.path.basename(source_path)
-    full_path = _Datastore().move_external(
-        source_path, username, target_path, file_name)
-    data_product = _save_data_product(
-        request, full_path, name=file_name, content_type=content_type
-    )
-    return data_product
-
-
-def save_input_file(request, file, name=None, content_type=None):
-    """Save input file in staging area for input files."""
-    if _is_remote_api():
-        if name is None and hasattr(file, 'name'):
-            name = os.path.basename(file.name)
-        files = {'file': (name, file, content_type)
-                 if content_type is not None else file, }
-        resp = _call_remote_api(request,
-                                "/upload",
-                                method="post",
-                                files=files)
-        data = resp.json()
-        product_uri = data['data-product']['productUri']
-        data_product = request.airavata_client.getDataProduct(
-            request.authz_token, product_uri)
-        return data_product
-    else:
-        username = request.user.username
-        file_name = name if name is not None else os.path.basename(file.name)
-        full_path = _Datastore().save(username, TMP_INPUT_FILE_UPLOAD_DIR, file)
-        data_product = _save_data_product(
-            request, full_path, name=file_name, content_type=content_type
-        )
-        return data_product
-
-
-def copy_input_file(request, data_product=None, data_product_uri=None):
-    # TODO: we could probably deprecate this as well, since we do an open/save
-    # to copy instead. Or at least, we don't need it in UserStorageProvider.
-    if data_product is None:
-        data_product = _get_data_product(request, data_product_uri)
-    path = _get_replica_filepath(data_product)
-    name = data_product.productName
-    full_path = _Datastore().copy(
-        data_product.ownerName,
-        path,
-        request.user.username,
-        TMP_INPUT_FILE_UPLOAD_DIR,
-        name=name,
-    )
-    return _save_copy_of_data_product(request, full_path, data_product)
-
-
-def is_input_file(request, data_product=None, data_product_uri=None):
-    # TODO: don't need this in UserStorageProvider
-    if data_product is None:
-        data_product = _get_data_product(request, data_product_uri)
-    if _is_remote_api():
-        resp = _call_remote_api(
-            request,
-            "/data-products/",
-            params={'product-uri': data_product.productUri})
-        data = resp.json()
-        return data['isInputFileUpload']
-    # Check if file is one of user's files and in TMP_INPUT_FILE_UPLOAD_DIR
-    path = _get_replica_filepath(data_product)
-    if _Datastore().exists(request.user.username, path):
-        rel_path = _Datastore().rel_path(request.user.username, path)
-        return os.path.dirname(rel_path) == TMP_INPUT_FILE_UPLOAD_DIR
-    else:
-        return False
-
-
-def move_input_file(request, data_product=None, path=None, data_product_uri=None):
-    # TODO: don't need this in UserStorageProvider
-    if data_product is None:
-        data_product = _get_data_product(request, data_product_uri)
-    source_path = _get_replica_filepath(data_product)
-    file_name = data_product.productName
-    full_path = _Datastore().move(
-        data_product.ownerName,
-        source_path,
-        request.user.username,
-        path,
-        file_name)
-    _delete_data_product(data_product.ownerName, source_path)
-    data_product = _save_copy_of_data_product(request, full_path, data_product)
-    return data_product
-
-
-def move_input_file_from_filepath(
-    request, source_path, name=None, content_type=None
-):
-    # TODO: don't need this in UserStorageProvider
-    "Move a file from filesystem into user's input file staging area."
-    username = request.user.username
-    file_name = name if name is not None else os.path.basename(source_path)
-    full_path = _Datastore().move_external(
-        source_path, username, TMP_INPUT_FILE_UPLOAD_DIR, file_name
-    )
-    data_product = _save_data_product(
-        request, full_path, name=file_name, content_type=content_type
-    )
-    return data_product
-
-
-def open_file(request, data_product=None, data_product_uri=None):
-    """
-    Return file object for replica if it exists in user storage. One of
-    `data_product` or `data_product_uri` is required.
-    """
-    if data_product is None:
-        data_product = _get_data_product(request, data_product_uri)
-    if _is_remote_api():
-        resp = _call_remote_api(
-            request,
-            "/download",
-            params={'data-product-uri': data_product.productUri})
-        file = io.BytesIO(resp.content)
-        disposition = resp.headers['Content-Disposition']
-        disp_value, disp_params = cgi.parse_header(disposition)
-        # Give the file object a name just like a real opened file object
-        file.name = disp_params['filename']
-        return file
-    else:
-        path = _get_replica_filepath(data_product)
-        return _Datastore().open(data_product.ownerName, path)
-
-
-def exists(request, data_product=None, data_product_uri=None):
-    """
-    Return True if replica for data_product exists in user storage. One of
-    `data_product` or `data_product_uri` is required.
-    """
-    if data_product is None:
-        data_product = _get_data_product(request, data_product_uri)
-    if _is_remote_api():
-        resp = _call_remote_api(
-            request,
-            "/data-products/",
-            params={'product-uri': data_product.productUri})
-        data = resp.json()
-        return data['downloadURL'] is not None
-    else:
-        path = _get_replica_filepath(data_product)
-        return _Datastore().exists(data_product.ownerName, path)
-
-
-def dir_exists(request, path):
-    "Return True if path exists in user's data store."
-    if _is_remote_api():
-        resp = _call_remote_api(request,
-                                "/user-storage/~/{path}",
-                                path_params={"path": path},
-                                raise_for_status=False)
-        if resp.status_code == HTTPStatus.NOT_FOUND:
-            return False
-        resp.raise_for_status()
-        return resp.json()['isDir']
-    else:
-        user_storage_provider = MFTApiUserStorageProvider()
-        return user_storage_provider.dir_exists(request, path)
-
-
-def user_file_exists(request, path):
-    """If file exists, return data product URI, else None."""
-    if _is_remote_api():
-        resp = _call_remote_api(request,
-                                "/user-storage/~/{path}",
-                                path_params={"path": path},
-                                raise_for_status=False)
-        if resp.status_code == HTTPStatus.NOT_FOUND or resp.json()['isDir']:
-            return None
-        resp.raise_for_status()
-        return resp.json()['files'][0]['dataProductURI']
-    elif _Datastore().exists(request.user.username, path):
-        full_path = _Datastore().path(request.user.username, path)
-        data_product_uri = _get_data_product_uri(request, full_path)
-        return data_product_uri
-    else:
-        return None
-
-
-def delete_dir(request, path):
-    """Delete path in user's data store, if it exists."""
-    if _is_remote_api():
-        resp = _call_remote_api(request,
-                                "/user-storage/~/{path}",
-                                path_params={"path": path},
-                                method="delete",
-                                raise_for_status=False)
-        _raise_404(resp, f"File path does not exist {path}")
-        resp.raise_for_status()
-        return
-    _Datastore().delete_dir(request.user.username, path)
-
-
-def delete_user_file(request, path):
-    """Delete file in user's data store, if it exists."""
-    if _is_remote_api():
-        resp = _call_remote_api(request,
-                                "/user-storage/~/{path}",
-                                path_params={"path": path},
-                                method="delete",
-                                raise_for_status=False)
-        _raise_404(resp, f"File path does not exist {path}")
-        resp.raise_for_status()
-        return
-    return _Datastore().delete(request.user.username, path)
-
-
-def update_file_content(request, path, fileContentText):
-    if _is_remote_api():
-        _call_remote_api(request,
-                         "/user-storage/~/{path}",
-                         path_params={"path": path},
-                         method="put",
-                         data={"fileContentText": fileContentText}
-                         )
-        return
-    else:
-        full_path = _Datastore().path(request.user.username, path)
-        with open(full_path, 'w') as f:
-            myfile = File(f)
-            myfile.write(fileContentText)
-
-
-def update_data_product_content(request, data_product=None, fileContentText="", data_product_uri=None):
-    if data_product is None:
-        data_product = _get_data_product(request, data_product_uri)
-    # TODO: implement remote api support (DataProductView.put())
-    path = _get_replica_filepath(data_product)
-    full_path = _Datastore().path(request.user.username, path)
-    with open(full_path, 'w') as f:
-        myfile = File(f)
-        myfile.write(fileContentText)
-
-
-def get_file(request, path):
-    if _is_remote_api():
-        resp = _call_remote_api(request,
-                                "/user-storage/~/{path}",
-                                path_params={"path": path},
-                                raise_for_status=False
-                                )
-        _raise_404(resp, "User storage file path does not exist")
-        data = resp.json()
-        if data["isDir"]:
-            raise Exception("User storage path is a directory, not a file")
-        file = data['files'][0]
-        file['created_time'] = convert_iso8601_to_datetime(file['createdTime'])
-        file['mime_type'] = file['mimeType']
-        file['data-product-uri'] = file['dataProductURI']
-        return file
-
-    user_storage_provider = MFTApiUserStorageProvider()
-    return user_storage_provider.get_file(request, path)
-
-
-def delete(request, data_product=None, data_product_uri=None):
-    """
-    Delete replica for data product in this data store. One of `data_product`
-    or `data_product_uri` is required.
-    """
-    if data_product is None:
-        data_product = _get_data_product(request, data_product_uri)
-    if _is_remote_api():
-        _call_remote_api(
-            request,
-            "/delete-file",
-            params={'data-product-uri': data_product.productUri},
-            method="delete")
-        return
-    else:
-        path = _get_replica_filepath(data_product)
-        try:
-            _Datastore().delete(data_product.ownerName, path)
-            _delete_data_product(data_product.ownerName, path)
-        except Exception:
-            logger.exception(
-                "Unable to delete file {} for data product uri {}".format(
-                    path, data_product.productUri
-                )
-            )
-            raise
-
-
-def listdir(request, path):
-    """Return a tuple of two lists, one for directories, the second for files."""
-
-    if _is_remote_api():
-        resp = _call_remote_api(request,
-                                "/user-storage/~/{path}",
-                                path_params={"path": path},
-                                )
-        data = resp.json()
-        for directory in data['directories']:
-            # Convert JSON ISO8601 timestamp to datetime instance
-            directory['created_time'] = convert_iso8601_to_datetime(
-                directory['createdTime'])
-        for file in data['files']:
-            # Convert JSON ISO8601 timestamp to datetime instance
-            file['created_time'] = convert_iso8601_to_datetime(
-                file['createdTime'])
-            file['mime_type'] = file['mimeType']
-            file['data-product-uri'] = file['dataProductURI']
-        return data['directories'], data['files']
-
-    user_storage_provider = MFTApiUserStorageProvider()
-    return user_storage_provider.listdir(request, path)
-
-
-def list_experiment_dir(request, experiment_id, path=""):
-    """
-    List files, directories in experiment data directory. Returns a tuple,
-    see `listdir`.
-    """
-    if _is_remote_api():
-        resp = _call_remote_api(request,
-                                "/experiment-storage/{experiment_id}/{path}",
-                                path_params={"path": path,
-                                             "experiment_id": experiment_id},
-                                )
-        data = resp.json()
-        for directory in data['directories']:
-            # Convert JSON ISO8601 timestamp to datetime instance
-            directory['created_time'] = convert_iso8601_to_datetime(
-                directory['createdTime'])
-        for file in data['files']:
-            # Convert JSON ISO8601 timestamp to datetime instance
-            file['created_time'] = convert_iso8601_to_datetime(
-                file['createdTime'])
-            file['mime_type'] = file['mimeType']
-            file['data-product-uri'] = file['dataProductURI']
-        return data['directories'], data['files']
-
-    experiment = request.airavata_client.getExperiment(
-        request.authz_token, experiment_id)
-    datastore = _Datastore()
-    exp_data_path = experiment.userConfigurationData.experimentDataDir
-    exp_data_path = os.path.join(exp_data_path, path)
-    exp_owner = experiment.userName
-    if datastore.dir_exists(exp_owner, exp_data_path):
-        directories, files = datastore.list_user_dir(
-            exp_owner, exp_data_path)
-        directories_data = []
-        for d in directories:
-            dpath = os.path.join(exp_data_path, d)
-            rel_path = os.path.join(path, d)
-            created_time = datastore.get_created_time(
-                exp_owner, dpath)
-            size = datastore.size(exp_owner, dpath)
-            directories_data.append(
-                {
-                    "name": d,
-                    "path": rel_path,
-                    "created_time": created_time,
-                    "size": size,
-                }
-            )
-        files_data = []
-        for f in files:
-            user_rel_path = os.path.join(exp_data_path, f)
-            if not datastore.exists(exp_owner, user_rel_path):
-                logger.warning(
-                    f"list_experiment_dir skipping {exp_owner}:{user_rel_path}, "
-                    "does not exist (broken symlink?)")
-                continue
-            created_time = datastore.get_created_time(
-                exp_owner, user_rel_path
-            )
-            size = datastore.size(exp_owner, user_rel_path)
-            full_path = datastore.path(exp_owner, user_rel_path)
-            data_product_uri = _get_data_product_uri(request, full_path, owner=exp_owner)
-
-            data_product = request.airavata_client.getDataProduct(
-                request.authz_token, data_product_uri)
-            mime_type = None
-            if 'mime-type' in data_product.productMetadata:
-                mime_type = data_product.productMetadata['mime-type']
-            files_data.append(
-                {
-                    "name": f,
-                    "path": user_rel_path,
-                    "data-product-uri": data_product_uri,
-                    "created_time": created_time,
-                    "mime_type": mime_type,
-                    "size": size,
-                    "hidden": False,
-                }
-            )
-        return directories_data, files_data
-    else:
-        raise ObjectDoesNotExist("Experiment data directory does not exist")
-
-
-def experiment_dir_exists(request, experiment_id, path=""):
-
-    if _is_remote_api():
-        resp = _call_remote_api(request,
-                                "/experiment-storage/{experiment_id}/{path}",
-                                path_params={"path": path,
-                                             "experiment_id": experiment_id},
-                                raise_for_status=False)
-        if resp.status_code == HTTPStatus.NOT_FOUND:
-            return False
-        resp.raise_for_status()
-        return resp.json()['isDir']
-
-    experiment = request.airavata_client.getExperiment(
-        request.authz_token, experiment_id)
-    datastore = _Datastore()
-    exp_data_path = experiment.userConfigurationData.experimentDataDir
-    if exp_data_path is None:
-        return False
-    exp_data_path = os.path.join(exp_data_path, path)
-    exp_owner = experiment.userName
-    return datastore.dir_exists(exp_owner, exp_data_path)
-
-
-def get_experiment_dir(
-        request,
-        project_name=None,
-        experiment_name=None,
-        path=None):
-    return _Datastore().get_experiment_dir(
-        request.user.username, project_name, experiment_name, path
-    )
-
-
-def create_user_dir(request, path):
-    if _is_remote_api():
-        logger.debug(f"path={path}")
-        _call_remote_api(request,
-                         "/user-storage/~/{path}",
-                         path_params={"path": path},
-                         method="post")
-        return
-    _Datastore().create_user_dir(request.user.username, path)
-
-
-def get_rel_path(request, path):
-    return _Datastore().rel_path(request.user.username, path)
-
-
-def get_rel_experiment_dir(request, experiment_id):
-    """Return experiment data dir path relative to user's directory."""
-    warnings.warn("Use list_experiment_dir instead.", DeprecationWarning)
-    if _is_remote_api():
-        resp = _call_remote_api(request,
-                                "/experiments/{experimentId}/",
-                                path_params={"experimentId": experiment_id})
-        resp.raise_for_status()
-        return resp.json()['relativeExperimentDataDir']
-
-    experiment = request.airavata_client.getExperiment(
-        request.authz_token, experiment_id)
-    if (experiment.userConfigurationData and
-            experiment.userConfigurationData.experimentDataDir):
-        datastore = _Datastore()
-        data_dir = experiment.userConfigurationData.experimentDataDir
-        if datastore.dir_exists(request.user.username, data_dir):
-            return datastore.rel_path(request.user.username, data_dir)
-        else:
-            return None
-    else:
-        return None
-
-
-def _get_data_product_uri(request, full_path, owner=None):
-
-    from airavata_django_portal_sdk import models
-    if owner is None:
-        owner = request.user.username
-    user_file = models.UserFiles.objects.filter(
-        username=owner, file_path=full_path)
-    if user_file.exists():
-        product_uri = user_file[0].file_dpu
-    else:
-        data_product = _save_data_product(request, full_path, owner=owner)
-        product_uri = data_product.productUri
-    return product_uri
-
-
-def _get_data_product(request, data_product_uri):
-    return request.airavata_client.getDataProduct(
-        request.authz_token, data_product_uri)
-
-
-def _save_data_product(request, full_path, name=None, content_type=None, owner=None):
-    "Create, register and record in DB a data product for full_path."
-    if owner is None:
-        owner = request.user.username
-    data_product = _create_data_product(
-        owner, full_path, name=name, content_type=content_type
-    )
-    product_uri = _register_data_product(request, full_path, data_product, owner=owner)
-    data_product.productUri = product_uri
-    return data_product
-
-
-def _register_data_product(request, full_path, data_product, owner=None):
-    if owner is None:
-        owner = request.user.username
-    product_uri = request.airavata_client.registerDataProduct(
-        request.authz_token, data_product
-    )
-    from airavata_django_portal_sdk import models
-    user_file_instance = models.UserFiles(
-        username=owner,
-        file_path=full_path,
-        file_dpu=product_uri)
-    user_file_instance.save()
-    return product_uri
-
-
-def _save_copy_of_data_product(request, full_path, data_product):
-    """Save copy of a data product with a different path."""
-    data_product_copy = _copy_data_product(request, data_product, full_path)
-    product_uri = _register_data_product(request, full_path, data_product_copy)
-    data_product_copy.productUri = product_uri
-    return data_product_copy
-
-
-def _copy_data_product(request, data_product, full_path):
-    """Create an unsaved copy of a data product with different path."""
-    data_product_copy = copy.copy(data_product)
-    data_product_copy.productUri = None
-    data_product_copy.ownerName = request.user.username
-    data_replica_location = _create_replica_location(
-        full_path, data_product_copy.productName
-    )
-    data_product_copy.replicaLocations = [data_replica_location]
-    return data_product_copy
-
-
-def _delete_data_product(username, full_path):
-    # TODO: call API to delete data product from replica catalog when it is
-    # available (not currently implemented)
-    from airavata_django_portal_sdk import models
-    user_file = models.UserFiles.objects.filter(
-        username=username, file_path=full_path)
-    if user_file.exists():
-        user_file.delete()
-
-
-def _create_data_product(username, full_path, name=None, content_type=None):
-    data_product = DataProductModel()
-    data_product.gatewayId = settings.GATEWAY_ID
-    data_product.ownerName = username
-    if name is not None:
-        file_name = name
-    else:
-        file_name = os.path.basename(full_path)
-    data_product.productName = file_name
-    data_product.dataProductType = DataProductType.FILE
-    final_content_type = _determine_content_type(full_path, content_type)
-    if final_content_type is not None:
-        data_product.productMetadata = {"mime-type": final_content_type}
-    data_replica_location = _create_replica_location(full_path, file_name)
-    data_product.replicaLocations = [data_replica_location]
-    return data_product
-
-
-def _determine_content_type(full_path, content_type=None):
-    result = content_type
-    if result is None:
-        # Try to guess the content-type from file extension
-        guessed_type, encoding = mimetypes.guess_type(full_path)
-        result = guessed_type
-    if result is None or result == "application/octet-stream":
-        # Check if file is Unicode text by trying to read some of it
-        try:
-            open(full_path, "r").read(1024)
-            result = "text/plain"
-        except UnicodeDecodeError:
-            logger.debug(f"Failed to read as Unicode text: {full_path}")
-    return result
-
-
-def _create_replica_location(full_path, file_name):
-    data_replica_location = DataReplicaLocationModel()
-    data_replica_location.storageResourceId = settings.GATEWAY_DATA_STORE_RESOURCE_ID
-    data_replica_location.replicaName = "{} gateway data store copy".format(
-        file_name)
-    data_replica_location.replicaLocationCategory = (
-        ReplicaLocationCategory.GATEWAY_DATA_STORE
-    )
-    data_replica_location.replicaPersistentType = ReplicaPersistentType.TRANSIENT
-    data_replica_location.filePath = "file://{}:{}".format(
-        settings.GATEWAY_DATA_STORE_HOSTNAME, quote(full_path)
-    )
-    return data_replica_location
-
-
-def _get_replica_filepath(data_product):
-    replica_filepaths = [
-        rep.filePath
-        for rep in data_product.replicaLocations
-        if rep.replicaLocationCategory == ReplicaLocationCategory.GATEWAY_DATA_STORE
-    ]
-    replica_filepath = replica_filepaths[0] if len(
-        replica_filepaths) > 0 else None
-    if replica_filepath:
-        return unquote(urlparse(replica_filepath).path)
-    return None
-
-
-def _is_remote_api():
-    return getattr(settings, 'GATEWAY_DATA_STORE_REMOTE_API', None) is not None
-
-
-def _call_remote_api(
-        request,
-        path,
-        path_params=None,
-        method="get",
-        raise_for_status=True,
-        **kwargs):
-
-    headers = {
-        'Authorization': f'Bearer {request.authz_token.accessToken}'}
-    encoded_path_params = {}
-    if path_params is not None:
-        for pk, pv in path_params.items():
-            encoded_path_params[pk] = quote(pv)
-    encoded_path = path.format(**encoded_path_params)
-    logger.debug(f"encoded_path={encoded_path}")
-    r = requests.request(
-        method,
-        f'{settings.GATEWAY_DATA_STORE_REMOTE_API}{encoded_path}',
-        headers=headers,
-        **kwargs,
-    )
-    if raise_for_status:
-        r.raise_for_status()
-    return r
-
-
-def _raise_404(response, msg, exception_class=ObjectDoesNotExist):
-    if response.status_code == 404:
-        raise exception_class(msg)
-
-
-class _Datastore:
-    """Internal datastore abstraction."""
-
-    def __init__(self, directory=None):
-        if getattr(
-            settings,
-            'GATEWAY_DATA_STORE_REMOTE_API',
-                None) is not None:
-            raise Exception(
-                f"This Django portal instance is configured to connect to a "
-                f"remote data store via API (settings.GATEWAY_DATA_STORE_REMOTE_API="
-                f"{settings.GATEWAY_DATA_STORE_REMOTE_API}). This local "
-                f"Datastore instance is not available in remote data store mode.")
-        if directory:
-            self.directory = directory
-        else:
-            self.directory = settings.GATEWAY_DATA_STORE_DIR
-
-    def exists(self, username, path):
-        """Check if file path exists in this data store."""
-        try:
-            return self._user_data_storage(username).exists(
-                path) and os.path.isfile(self.path(username, path))
-        except SuspiciousFileOperation as e:
-            logger.warning(
-                "Invalid path for user {}: {}".format(
-                    username, str(e)))
-            return False
-
-    def dir_exists(self, username, path):
-        """Check if directory path exists in this data store."""
-        try:
-            return self._user_data_storage(username).exists(
-                path) and os.path.isdir(self.path(username, path))
-        except SuspiciousFileOperation as e:
-            logger.warning(
-                "Invalid path for user {}: {}".format(
-                    username, str(e)))
-            return False
-
-    def open(self, username, path):
-        """Open path for user if it exists in this data store."""
-        if self.exists(username, path):
-            return self._user_data_storage(username).open(path)
-        else:
-            raise ObjectDoesNotExist(
-                "File path does not exist: {}".format(path))
-
-    def save(self, username, path, file, name=None):
-        """Save file to username/path in data store."""
-        # file.name may be full path, so get just the name of the file
-        file_name = name if name is not None else os.path.basename(file.name)
-        user_data_storage = self._user_data_storage(username)
-        file_path = os.path.join(
-            path, user_data_storage.get_valid_name(file_name))
-        input_file_name = user_data_storage.save(file_path, file)
-        input_file_fullpath = user_data_storage.path(input_file_name)
-        return input_file_fullpath
-
-    def move(
-            self,
-            source_username,
-            source_path,
-            target_username,
-            target_dir,
-            file_name):
-        source_full_path = self.path(source_username, source_path)
-        user_data_storage = self._user_data_storage(target_username)
-        # Make file_name a valid filename
-        target_path = os.path.join(
-            target_dir, user_data_storage.get_valid_name(file_name)
-        )
-        # Get available file path: if there is an existing file at target_path
-        # create a uniquely named path
-        target_path = user_data_storage.get_available_name(target_path)
-        target_full_path = self.path(target_username, target_path)
-        file_move_safe(source_full_path, target_full_path)
-        return target_full_path
-
-    def move_external(
-            self,
-            external_path,
-            target_username,
-            target_dir,
-            file_name):
-        user_data_storage = self._user_data_storage(target_username)
-        # Make file_name a valid filename
-        target_path = os.path.join(
-            target_dir, user_data_storage.get_valid_name(file_name)
-        )
-        # Get available file path: if there is an existing file at target_path
-        # create a uniquely named path
-        target_path = user_data_storage.get_available_name(target_path)
-        if not self.dir_exists(target_username, target_dir):
-            self.create_user_dir(target_username, target_dir)
-        target_full_path = self.path(target_username, target_path)
-        file_move_safe(external_path, target_full_path)
-        return target_full_path
-
-    def create_user_dir(self, username, path):
-        user_data_storage = self._user_data_storage(username)
-        if not user_data_storage.exists(path):
-            self._makedirs(username, path)
-        else:
-            raise Exception("Directory {} already exists".format(path))
-
-    def copy(
-            self,
-            source_username,
-            source_path,
-            target_username,
-            target_path,
-            name=None):
-        """Copy a user file into target_path dir."""
-        f = self.open(source_username, source_path)
-        return self.save(target_username, target_path, f, name=name)
-
-    def delete(self, username, path):
-        """Delete file in this data store."""
-        if self.exists(username, path):
-            user_data_storage = self._user_data_storage(username)
-            user_data_storage.delete(path)
-        else:
-            raise ObjectDoesNotExist(
-                "File path does not exist: {}".format(path))
-
-    def delete_dir(self, username, path):
-        """Delete entire directory in this data store."""
-        if self.dir_exists(username, path):
-            user_path = self.path(username, path)
-            shutil.rmtree(user_path)
-        else:
-            raise ObjectDoesNotExist(
-                "File path does not exist: {}".format(path))
-
-    def get_experiment_dir(
-        self, username, project_name=None, experiment_name=None, path=None
-    ):
-        """Return an experiment directory (full path) for the given experiment."""
-        user_experiment_data_storage = self._user_data_storage(username)
-        if path is None:
-            proj_dir_name = user_experiment_data_storage.get_valid_name(
-                project_name)
-            # AIRAVATA-3245 Make project directory with correct permissions
-            if not user_experiment_data_storage.exists(proj_dir_name):
-                self._makedirs(username, proj_dir_name)
-            experiment_dir_name = os.path.join(
-                proj_dir_name,
-                user_experiment_data_storage.get_valid_name(experiment_name),
-            )
-            # Since there may already be another experiment with the same name in
-            # this project, we need to check for available name
-            experiment_dir_name = user_experiment_data_storage.get_available_name(
-                experiment_dir_name)
-            experiment_dir = user_experiment_data_storage.path(
-                experiment_dir_name)
-        else:
-            # path can be relative to the user's storage space or absolute (as long
-            # as it is still inside the user's storage space)
-            # if path is passed in, assumption is that it has already been
-            # created
-            user_experiment_data_storage = self._user_data_storage(username)
-            experiment_dir = user_experiment_data_storage.path(path)
-        if not user_experiment_data_storage.exists(experiment_dir):
-            self._makedirs(username, experiment_dir)
-        return experiment_dir
-
-    def _makedirs(self, username, dir_path):
-        user_experiment_data_storage = self._user_data_storage(username)
-        full_path = user_experiment_data_storage.path(dir_path)
-        os.makedirs(
-            full_path,
-            mode=user_experiment_data_storage.directory_permissions_mode)
-        # os.makedirs mode isn't always respected so need to chmod to be sure
-        os.chmod(
-            full_path,
-            mode=user_experiment_data_storage.directory_permissions_mode)
-
-    def list_user_dir(self, username, file_path):
-        logger.debug("file_path={}".format(file_path))
-        user_data_storage = self._user_data_storage(username)
-        return user_data_storage.listdir(file_path)
-
-    def get_created_time(self, username, file_path):
-        user_data_storage = self._user_data_storage(username)
-        return user_data_storage.get_created_time(file_path)
-
-    def size(self, username, file_path):
-        user_data_storage = self._user_data_storage(username)
-        full_path = self.path(username, file_path)
-        if os.path.isdir(full_path):
-            return self._get_dir_size(full_path)
-        else:
-            return user_data_storage.size(file_path)
-
-    def path(self, username, file_path):
-        user_data_storage = self._user_data_storage(username)
-        return user_data_storage.path(file_path)
-
-    def rel_path(self, username, file_path):
-        full_path = self.path(username, file_path)
-        return os.path.relpath(full_path, self.path(username, ""))
-
-    def _user_data_storage(self, username):
-        return FileSystemStorage(
-            location=os.path.join(
-                self.directory, username))
-
-    # from https://stackoverflow.com/a/1392549
-    def _get_dir_size(self, start_path="."):
-        total_size = 0
-        for dirpath, dirnames, filenames in os.walk(start_path):
-            for f in filenames:
-                fp = os.path.join(dirpath, f)
-                # Check for broken symlinks (.exists return False for broken
-                # symlinks)
-                if os.path.exists(fp):
-                    total_size += os.path.getsize(fp)
-        return total_size
-
-
-class UserStorageProvider:
-    def dir_exists(self, request, path):
-        raise NotImplementedError()
-
-    def listdir(self, request, path):
-        raise NotImplementedError()
-
-    def get_file(self, request, path):
-        raise NotImplementedError()
-
-
-class FileSystemUserStorageProvider(UserStorageProvider):
-    def dir_exists(self, request, path):
-        return _Datastore().dir_exists(request.user.username, path)
-
-    def listdir(self, request, path):
-        datastore = _Datastore()
-        if datastore.dir_exists(request.user.username, path):
-            directories, files = datastore.list_user_dir(
-                request.user.username, path)
-            directories_data = []
-            for d in directories:
-                dpath = os.path.join(path, d)
-                created_time = datastore.get_created_time(
-                    request.user.username, dpath)
-                size = datastore.size(request.user.username, dpath)
-                directories_data.append(
-                    {
-                        "name": d,
-                        "path": dpath,
-                        "created_time": created_time,
-                        "size": size,
-                        "hidden": dpath == TMP_INPUT_FILE_UPLOAD_DIR,
-                    }
-                )
-            files_data = []
-            for f in files:
-                user_rel_path = os.path.join(path, f)
-                if not datastore.exists(request.user.username, user_rel_path):
-                    logger.warning(f"listdir skipping {request.user.username}:{user_rel_path}, "
-                                   "does not exist (broken symlink?)")
-                    continue
-                created_time = datastore.get_created_time(
-                    request.user.username, user_rel_path
-                )
-                size = datastore.size(request.user.username, user_rel_path)
-                full_path = datastore.path(request.user.username, user_rel_path)
-                data_product_uri = _get_data_product_uri(request, full_path)
-
-                data_product = request.airavata_client.getDataProduct(
-                    request.authz_token, data_product_uri)
-                mime_type = None
-                if 'mime-type' in data_product.productMetadata:
-                    mime_type = data_product.productMetadata['mime-type']
-                files_data.append(
-                    {
-                        "name": f,
-                        "path": user_rel_path,
-                        "data-product-uri": data_product_uri,
-                        "created_time": created_time,
-                        "mime_type": mime_type,
-                        "size": size,
-                        "hidden": False,
-                    }
-                )
-            return directories_data, files_data
-        else:
-            raise ObjectDoesNotExist("User storage path does not exist")
-
-    def get_file(self, request, path):
-
-        if _is_remote_api():
-            resp = _call_remote_api(request,
-                                    "/user-storage/~/{path}",
-                                    path_params={"path": path},
-                                    raise_for_status=False
-                                    )
-            _raise_404(resp, "User storage file path does not exist")
-            data = resp.json()
-            if data["isDir"]:
-                raise Exception("User storage path is a directory, not a file")
-            file = data['files'][0]
-            file['created_time'] = convert_iso8601_to_datetime(file['createdTime'])
-            file['mime_type'] = file['mimeType']
-            file['data-product-uri'] = file['dataProductURI']
-            return file
-        datastore = _Datastore()
-        if datastore.exists(request.user.username, path):
-            created_time = datastore.get_created_time(
-                request.user.username, path)
-            size = datastore.size(request.user.username, path)
-            full_path = datastore.path(request.user.username, path)
-            data_product_uri = _get_data_product_uri(request, full_path)
-            dir_path, file_name = os.path.split(path)
-
-            data_product = request.airavata_client.getDataProduct(
-                request.authz_token, data_product_uri)
-            mime_type = None
-            if 'mime-type' in data_product.productMetadata:
-                mime_type = data_product.productMetadata['mime-type']
-
-            return {
-                'name': full_path,
-                'path': dir_path,
-                'data-product-uri': data_product_uri,
-                'created_time': created_time,
-                'mime_type': mime_type,
-                'size': size,
-                'hidden': False
-            }
-        else:
-            raise ObjectDoesNotExist("User storage file path does not exist")
-
-
-class MFTApiUserStorageProvider(UserStorageProvider):
-    def __init__(self) -> None:
-        super().__init__()
-
-    def dir_exists(self, request, path):
-        with grpc.insecure_channel('localhost:7004') as channel:
-            # remove trailing slash and figure out parent path
-            # FIXME remove the hard coded /tmp path
-            parent_path, child_path = os.path.split(f"/tmp/{path}".rstrip("/"))
-            logger.debug(f"parent_path={parent_path}, child_path={child_path}")
-            stub = MFTApi_pb2_grpc.MFTApiServiceStub(channel)
-            # Get metadata for parent directory and see if child_path exists
-            request = MFTApi_pb2.FetchResourceMetadataRequest(
-                resourceId="remote-ssh-dir-resource",
-                resourceType="SCP",
-                resourceToken="local-ssh-cred",
-                resourceBackend="FILE",
-                resourceCredentialBackend="FILE",
-                targetAgentId="agent0",
-                childPath=parent_path,
-                mftAuthorizationToken="user token")
-            response = stub.getDirectoryResourceMetadata(request)
-            # if not child_path, then return True since the response was
-            # successful and we just need to confirm the existence of the root dir
-            if child_path == '':
-                return True
-            return child_path in map(lambda f: f.friendlyName, response.directories)
-
-    def listdir(self, request, path):
-        # TODO setup resourceId, etc from __init__ arguments
-        channel = grpc.insecure_channel('localhost:7004')
-        stub = MFTApi_pb2_grpc.MFTApiServiceStub(channel)
-        request = MFTApi_pb2.FetchResourceMetadataRequest(
-            resourceId="remote-ssh-dir-resource",
-            resourceType="SCP",
-            resourceToken="local-ssh-cred",
-            resourceBackend="FILE",
-            resourceCredentialBackend="FILE",
-            targetAgentId="agent0",
-            childPath=f"/tmp/{path}",
-            mftAuthorizationToken="user token")
-        response = stub.getDirectoryResourceMetadata(request)
-        directories_data = []
-        for d in response.directories:
-
-            dpath = os.path.join(path, d.friendlyName)
-            created_time = datetime.fromtimestamp(d.createdTime)
-            # TODO MFT API doesn't report size
-            size = 0
-            directories_data.append(
-                {
-                    "name": d.friendlyName,
-                    "path": dpath,
-                    "created_time": created_time,
-                    "size": size,
-                    # TODO how to handle hidden directories or directories for
-                    # staging input file uploads
-                    "hidden": False
-                }
-            )
-        files_data = []
-        for f in response.files:
-            user_rel_path = os.path.join(path, f.friendlyName)
-            # TODO do we need to check for broken symlinks?
-            created_time = datetime.fromtimestamp(f.createdTime)
-            # TODO get the size as well
-            size = 0
-            # full_path = datastore.path(request.user.username, user_rel_path)
-            # TODO how do we register these as data products, do we need to?
-            # data_product_uri = _get_data_product_uri(request, full_path)
-
-            # data_product = request.airavata_client.getDataProduct(
-            #     request.authz_token, data_product_uri)
-            # mime_type = None
-            # if 'mime-type' in data_product.productMetadata:
-            #     mime_type = data_product.productMetadata['mime-type']
-            files_data.append(
-                {
-                    "name": f.friendlyName,
-                    "path": user_rel_path,
-                    "data-product-uri": None,
-                    "created_time": created_time,
-                    "mime_type": None,
-                    "size": size,
-                    "hidden": False,
-                }
-            )
-        return directories_data, files_data
-
-    def get_file(self, request, path):
-        # FIXME remove hard coded /tmp path
-        path = f"/tmp/{path}".rstrip("/")
-        file_metadata = self._get_file(path)
-        if file_metadata is not None:
-            user_rel_path = os.path.join(path, file_metadata.friendlyName)
-            created_time = datetime.fromtimestamp(file_metadata.createdTime)
-            # TODO get the size as well
-            size = 0
-
-            return {
-                "name": file_metadata.friendlyName,
-                "path": user_rel_path,
-                "data-product-uri": None,
-                "created_time": created_time,
-                "mime_type": None,
-                "size": size,
-                "hidden": False,
-            }
-        else:
-            raise ObjectDoesNotExist("User storage file path does not exist")
-
-    def _get_file(self, path):
-        with grpc.insecure_channel('localhost:7004') as channel:
-            stub = MFTApi_pb2_grpc.MFTApiServiceStub(channel)
-            # Get metadata for parent directory and see if child_path exists
-            request = MFTApi_pb2.FetchResourceMetadataRequest(
-                resourceId="remote-ssh-dir-resource",
-                resourceType="SCP",
-                resourceToken="local-ssh-cred",
-                resourceBackend="FILE",
-                resourceCredentialBackend="FILE",
-                targetAgentId="agent0",
-                childPath=path,
-                mftAuthorizationToken="user token")
-            try:
-                # TODO is there a better way to check if file exists than catching exception?
-                return stub.getFileResourceMetadata(request)
-            except Exception:
-                logger.exception(f"_get_file({path})")
-                return None
-
-    def _get_download_url(self, path):
-
-        with grpc.insecure_channel('localhost:7004') as channel:
-            stub = MFTApi_pb2_grpc.MFTApiServiceStub(channel)
-            download_request = MFTApi_pb2.HttpDownloadApiRequest(sourceStoreId="remote-ssh-storage",
-                                                                 sourcePath="/tmp/a.txt",
-                                                                 sourceToken="local-ssh-cred",
-                                                                 sourceType="SCP",
-                                                                 targetAgent="agent0",
-                                                                 mftAuthorizationToken="")
-            try:
-                # TODO is there a better way to check if file exists than catching exception?
-                # response stub.submitHttpDownload(request)
-                pass
-            except Exception:
-                logger.exception(f"_get_file({path})")
-                return None
diff --git a/airavata_django_portal_sdk/user_storage/__init__.py b/airavata_django_portal_sdk/user_storage/__init__.py
new file mode 100644
index 0000000..21a22b4
--- /dev/null
+++ b/airavata_django_portal_sdk/user_storage/__init__.py
@@ -0,0 +1,51 @@
+from .api import (
+    copy_input_file,
+    create_user_dir,
+    delete,
+    delete_dir,
+    delete_user_file,
+    dir_exists,
+    exists,
+    experiment_dir_exists,
+    get_experiment_dir,
+    get_file,
+    get_file_metadata,
+    get_rel_experiment_dir,
+    is_input_file,
+    list_experiment_dir,
+    listdir,
+    move,
+    move_input_file,
+    open_file,
+    save,
+    save_input_file,
+    update_data_product_content,
+    update_file_content,
+    user_file_exists
+)
+
+__all__ = [
+    'copy_input_file',
+    'create_user_dir',
+    'delete',
+    'delete_dir',
+    'delete_user_file',
+    'dir_exists',
+    'exists',
+    'experiment_dir_exists',
+    'get_experiment_dir',
+    'get_file',
+    'get_file_metadata',
+    'get_rel_experiment_dir',
+    'is_input_file',
+    'list_experiment_dir',
+    'listdir',
+    'move',
+    'move_input_file',
+    'open_file',
+    'save',
+    'save_input_file',
+    'update_data_product_content',
+    'update_file_content',
+    'user_file_exists'
+]
diff --git a/airavata_django_portal_sdk/user_storage/api.py b/airavata_django_portal_sdk/user_storage/api.py
new file mode 100644
index 0000000..4a21498
--- /dev/null
+++ b/airavata_django_portal_sdk/user_storage/api.py
@@ -0,0 +1,758 @@
+import cgi
+import copy
+import importlib
+import io
+import logging
+import mimetypes
+import os
+import warnings
+from http import HTTPStatus
+from urllib.parse import quote, unquote, urlparse
+
+import requests
+from airavata.model.data.replica.ttypes import (
+    DataProductModel,
+    DataProductType,
+    DataReplicaLocationModel,
+    ReplicaLocationCategory,
+    ReplicaPersistentType
+)
+from django.conf import settings
+from django.core.exceptions import ObjectDoesNotExist
+
+from ..util import convert_iso8601_to_datetime
+
+logger = logging.getLogger(__name__)
+
+TMP_INPUT_FILE_UPLOAD_DIR = "tmp"
+
+
+def get_user_storage_provider(request, owner_username=None, storage_resource_id=None):
+    # TODO: default the module_class_name to MFT provider
+    module_class_name = None
+    options = {}
+    if storage_resource_id is None:
+        if not hasattr(settings, 'USER_STORAGES'):
+            # make this backward compatible with the older settings
+            module_class_name = 'airavata_django_portal_sdk.user_storage.backends.DjangoFileSystemProvider'
+            storage_resource_id = settings.GATEWAY_DATA_STORE_RESOURCE_ID
+            options = dict(directory=settings.GATEWAY_DATA_STORE_DIR)
+            logger.warning("Please add the USER_STORAGES setting. Using legacy GATEWAY_DATA_STORE_RESOURCE_ID and GATEWAY_DATA_STORE_DIR settings.")
+        else:
+            conf = settings.USER_STORAGES["default"]
+            module_class_name = conf['BACKEND']
+            storage_resource_id = conf['STORAGE_RESOURCE_ID']
+            options = conf.get('OPTIONS', {})
+    else:
+        if not hasattr(settings, 'USER_STORAGES'):
+            # make this backward compatible with the older settings
+            module_class_name = 'airavata_django_portal_sdk.user_storage.backends.DjangoFileSystemProvider'
+            storage_resource_id = settings.GATEWAY_DATA_STORE_RESOURCE_ID
+            options = dict(directory=settings.GATEWAY_DATA_STORE_DIR)
+            logger.warning("Please add the USER_STORAGES setting. Using legacy GATEWAY_DATA_STORE_RESOURCE_ID and GATEWAY_DATA_STORE_DIR settings.")
+        else:
+            for conf in settings.USER_STORAGES:
+                if conf['STORAGE_RESOURCE_ID'] == storage_resource_id:
+                    module_class_name = conf['BACKEND']
+                    options = conf.get('OPTIONS', {})
+                    break
+    module_name, class_name = module_class_name.rsplit(".", 1)
+    module = importlib.import_module(module_name)
+    BackendClass = getattr(module, class_name)
+    authz_token = request.authz_token
+    context = {
+        'request': request,
+        'owner_username': owner_username,
+    }
+    instance = BackendClass(authz_token, storage_resource_id, context=context, **options)
+    return instance
+
+
+def save(request, path, file, name=None, content_type=None, storage_resource_id=None):
+    "Save file in path in the user's storage and return DataProduct."
+    if _is_remote_api():
+        if name is None and hasattr(file, 'name'):
+            name = os.path.basename(file.name)
+        files = {'file': (name, file, content_type)
+                 if content_type is not None else file, }
+        resp = _call_remote_api(request,
+                                "/user-storage/~/{path}",
+                                path_params={"path": path},
+                                method="post",
+                                files=files)
+        data = resp.json()
+        product_uri = data['uploaded']['productUri']
+        data_product = request.airavata_client.getDataProduct(
+            request.authz_token, product_uri)
+        return data_product
+    backend = get_user_storage_provider(request, storage_resource_id=storage_resource_id)
+    storage_resource_id, resource_path = backend.save(path, file, name=name, content_type=content_type)
+    data_product = _save_data_product(
+        request, resource_path, storage_resource_id, name=name, content_type=content_type
+    )
+    return data_product
+
+
+def save_input_file(request, file, name=None, content_type=None, storage_resource_id=None):
+    """Save input file in staging area for input files."""
+    if _is_remote_api():
+        if name is None and hasattr(file, 'name'):
+            name = os.path.basename(file.name)
+        files = {'file': (name, file, content_type)
+                 if content_type is not None else file, }
+        resp = _call_remote_api(request,
+                                "/upload",
+                                method="post",
+                                files=files)
+        data = resp.json()
+        product_uri = data['data-product']['productUri']
+        data_product = request.airavata_client.getDataProduct(
+            request.authz_token, product_uri)
+        return data_product
+    else:
+        backend = get_user_storage_provider(request, storage_resource_id=storage_resource_id)
+        file_name = name if name is not None else os.path.basename(file.name)
+        storage_resource_id, resource_path = backend.save(
+            TMP_INPUT_FILE_UPLOAD_DIR, file, name=file_name)
+        data_product = _save_data_product(
+            request, resource_path, storage_resource_id, name=name, content_type=content_type
+        )
+        return data_product
+
+
+def copy_input_file(request, data_product=None, data_product_uri=None, storage_resource_id=None):
+    if data_product is None:
+        data_product = _get_data_product(request, data_product_uri)
+    source_storage_resource_id, source_resource_path = _get_replica_resource_id_and_filepath(data_product)
+    source_backend = get_user_storage_provider(request,
+                                               owner_username=data_product.ownerName,
+                                               storage_resource_id=source_storage_resource_id)
+    file = source_backend.open(source_resource_path)
+    name = data_product.productName
+    target_backend = get_user_storage_provider(request, storage_resource_id=storage_resource_id)
+    storage_resource_id, full_path = target_backend.save(TMP_INPUT_FILE_UPLOAD_DIR, file, name=name)
+    return _save_copy_of_data_product(request, full_path, data_product, storage_resource_id)
+
+
+def is_input_file(request, data_product=None, data_product_uri=None):
+    if data_product is None:
+        data_product = _get_data_product(request, data_product_uri)
+    if _is_remote_api():
+        resp = _call_remote_api(
+            request,
+            "/data-products/",
+            params={'product-uri': data_product.productUri})
+        data = resp.json()
+        return data['isInputFileUpload']
+    # Check if file is one of user's files and in TMP_INPUT_FILE_UPLOAD_DIR
+    storage_resource_id, path = _get_replica_resource_id_and_filepath(data_product)
+    backend = get_user_storage_provider(request,
+                                        owner_username=data_product.ownerName,
+                                        storage_resource_id=storage_resource_id)
+    if backend.exists(path):
+        directories, files = backend.get_metadata(path)
+        rel_path = files[0]['path']
+        return os.path.dirname(rel_path) == TMP_INPUT_FILE_UPLOAD_DIR
+    else:
+        return False
+
+
+def move(request, data_product=None, path=None, data_product_uri=None, storage_resource_id=None):
+    if data_product is None:
+        data_product = _get_data_product(request, data_product_uri)
+    source_storage_resource_id, source_path = _get_replica_resource_id_and_filepath(data_product)
+    source_backend = get_user_storage_provider(request,
+                                               owner_username=data_product.ownerName,
+                                               storage_resource_id=source_storage_resource_id)
+    file = source_backend.open(source_path)
+    file_name = data_product.productName
+    target_backend = get_user_storage_provider(request, storage_resource_id=storage_resource_id)
+    storage_resource_id, full_path = target_backend.save(path, file, name=file_name)
+    data_product_copy = _save_copy_of_data_product(request, full_path, data_product, storage_resource_id)
+    # Remove the source file and data product metadata
+    source_backend.delete(source_path)
+    _delete_data_product(data_product.ownerName, source_path)
+    return data_product_copy
+
+
+def move_input_file(request, data_product=None, path=None, data_product_uri=None, storage_resource_id=None):
+    warnings.warn("Use 'move' instead.", DeprecationWarning)
+    return move(request, data_product=data_product, path=path, data_product_uri=data_product_uri, storage_resource_id=storage_resource_id)
+
+
+def open_file(request, data_product=None, data_product_uri=None):
+    """
+    Return file object for replica if it exists in user storage. One of
+    `data_product` or `data_product_uri` is required.
+    """
+    if data_product is None:
+        data_product = _get_data_product(request, data_product_uri)
+    if _is_remote_api():
+        resp = _call_remote_api(
+            request,
+            "/download",
+            params={'data-product-uri': data_product.productUri})
+        file = io.BytesIO(resp.content)
+        disposition = resp.headers['Content-Disposition']
+        disp_value, disp_params = cgi.parse_header(disposition)
+        # Give the file object a name just like a real opened file object
+        file.name = disp_params['filename']
+        return file
+    else:
+        storage_resource_id, path = _get_replica_resource_id_and_filepath(data_product)
+        backend = get_user_storage_provider(request,
+                                            owner_username=data_product.ownerName,
+                                            storage_resource_id=storage_resource_id)
+        return backend.open(path)
+
+
+def exists(request, data_product=None, data_product_uri=None):
+    """
+    Return True if replica for data_product exists in user storage. One of
+    `data_product` or `data_product_uri` is required.
+    """
+    if data_product is None:
+        data_product = _get_data_product(request, data_product_uri)
+    if _is_remote_api():
+        resp = _call_remote_api(
+            request,
+            "/data-products/",
+            params={'product-uri': data_product.productUri})
+        data = resp.json()
+        return data['downloadURL'] is not None
+    else:
+        storage_resource_id, path = _get_replica_resource_id_and_filepath(data_product)
+        backend = get_user_storage_provider(request,
+                                            owner_username=data_product.ownerName,
+                                            storage_resource_id=storage_resource_id)
+        return backend.exists(path)
+
+
+def dir_exists(request, path, storage_resource_id=None):
+    "Return True if path exists in user's data store."
+    if _is_remote_api():
+        resp = _call_remote_api(request,
+                                "/user-storage/~/{path}",
+                                path_params={"path": path},
+                                raise_for_status=False)
+        if resp.status_code == HTTPStatus.NOT_FOUND:
+            return False
+        resp.raise_for_status()
+        return resp.json()['isDir']
+    else:
+        backend = get_user_storage_provider(request, storage_resource_id=storage_resource_id)
+        return backend.exists(path)
+
+
+def user_file_exists(request, path, storage_resource_id=None):
+    """If file exists, return data product URI, else None."""
+    if _is_remote_api():
+        resp = _call_remote_api(request,
+                                "/user-storage/~/{path}",
+                                path_params={"path": path},
+                                raise_for_status=False)
+        if resp.status_code == HTTPStatus.NOT_FOUND or resp.json()['isDir']:
+            return None
+        resp.raise_for_status()
+        return resp.json()['files'][0]['dataProductURI']
+    backend = get_user_storage_provider(request, storage_resource_id=storage_resource_id)
+    if backend.exists(path) and backend.is_file(path):
+        _, files = backend.get_metadata(path)
+        full_path = files[0]['resource_path']
+        data_product_uri = _get_data_product_uri(request, full_path, backend.resource_id)
+        return data_product_uri
+    else:
+        return None
+
+
+def delete_dir(request, path, storage_resource_id=None):
+    """Delete path in user's data store, if it exists."""
+    if _is_remote_api():
+        resp = _call_remote_api(request,
+                                "/user-storage/~/{path}",
+                                path_params={"path": path},
+                                method="delete",
+                                raise_for_status=False)
+        _raise_404(resp, f"File path does not exist {path}")
+        resp.raise_for_status()
+        return
+    backend = get_user_storage_provider(request, storage_resource_id=storage_resource_id)
+    backend.delete(path)
+
+
+def delete_user_file(request, path, storage_resource_id=None):
+    """Delete file in user's data store, if it exists."""
+    if _is_remote_api():
+        resp = _call_remote_api(request,
+                                "/user-storage/~/{path}",
+                                path_params={"path": path},
+                                method="delete",
+                                raise_for_status=False)
+        _raise_404(resp, f"File path does not exist {path}")
+        resp.raise_for_status()
+        return
+    backend = get_user_storage_provider(request, storage_resource_id=storage_resource_id)
+    backend.delete(path)
+
+
+def update_file_content(request, path, fileContentText, storage_resource_id=None):
+    if _is_remote_api():
+        _call_remote_api(request,
+                         "/user-storage/~/{path}",
+                         path_params={"path": path},
+                         method="put",
+                         data={"fileContentText": fileContentText}
+                         )
+        return
+    else:
+        backend = get_user_storage_provider(request, storage_resource_id=storage_resource_id)
+        file = io.StringIO(fileContentText)
+        backend.update(path, file)
+
+
+def update_data_product_content(request, data_product=None, fileContentText="", data_product_uri=None):
+    if data_product is None:
+        data_product = _get_data_product(request, data_product_uri)
+    if _is_remote_api():
+        _call_remote_api(request,
+                         "/data-products/",
+                         params={'product-uri': data_product.productUri},
+                         method="put",
+                         data={"fileContentText": fileContentText},
+                         )
+        return
+    storage_resource_id, path = _get_replica_resource_id_and_filepath(data_product)
+    update_file_content(request, path, fileContentText, storage_resource_id=storage_resource_id)
+
+
+def get_file_metadata(request, path, storage_resource_id=None):
+    if _is_remote_api():
+        resp = _call_remote_api(request,
+                                "/user-storage/~/{path}",
+                                path_params={"path": path},
+                                raise_for_status=False
+                                )
+        _raise_404(resp, "User storage file path does not exist")
+        data = resp.json()
+        if data["isDir"]:
+            raise Exception("User storage path is a directory, not a file")
+        file = data['files'][0]
+        file['created_time'] = convert_iso8601_to_datetime(file['createdTime'])
+        file['mime_type'] = file['mimeType']
+        file['data-product-uri'] = file['dataProductURI']
+        return file
+
+    backend = get_user_storage_provider(request, storage_resource_id=storage_resource_id)
+    if backend.exists(path) and backend.is_file(path):
+        _, files = backend.get_metadata(path)
+        file = files[0]
+        data_product_uri = _get_data_product_uri(request, file['resource_path'],
+                                                 storage_resource_id=backend.resource_id)
+
+        data_product = request.airavata_client.getDataProduct(
+            request.authz_token, data_product_uri)
+        mime_type = None
+        if 'mime-type' in data_product.productMetadata:
+            mime_type = data_product.productMetadata['mime-type']
+        file['data-product-uri'] = data_product_uri
+        file['mime_type'] = mime_type
+        return file
+    else:
+        raise ObjectDoesNotExist("File does not exist at that path.")
+
+
+def get_file(request, path, storage_resource_id=None):
+    warnings.warn("Use 'get_file_metadata' instead.", DeprecationWarning)
+    return get_file_metadata(request, path, storage_resource_id)
+
+
+def delete(request, data_product=None, data_product_uri=None):
+    """
+    Delete replica for data product in this data store. One of `data_product`
+    or `data_product_uri` is required.
+    """
+    if data_product is None:
+        data_product = _get_data_product(request, data_product_uri)
+    if _is_remote_api():
+        _call_remote_api(
+            request,
+            "/delete-file",
+            params={'data-product-uri': data_product.productUri},
+            method="delete")
+        return
+    else:
+        storage_resource_id, path = _get_replica_resource_id_and_filepath(data_product)
+        backend = get_user_storage_provider(request, storage_resource_id=storage_resource_id)
+        try:
+            backend.delete(path)
+            _delete_data_product(data_product.ownerName, path)
+        except Exception:
+            logger.exception(
+                "Unable to delete file {} for data product uri {}".format(
+                    path, data_product.productUri
+                )
+            )
+            raise
+
+
+def listdir(request, path, storage_resource_id=None):
+    """Return a tuple of two lists, one for directories, the second for files."""
+
+    if _is_remote_api():
+        resp = _call_remote_api(request,
+                                "/user-storage/~/{path}",
+                                path_params={"path": path},
+                                )
+        data = resp.json()
+        for directory in data['directories']:
+            # Convert JSON ISO8601 timestamp to datetime instance
+            directory['created_time'] = convert_iso8601_to_datetime(
+                directory['createdTime'])
+        for file in data['files']:
+            # Convert JSON ISO8601 timestamp to datetime instance
+            file['created_time'] = convert_iso8601_to_datetime(
+                file['createdTime'])
+            file['mime_type'] = file['mimeType']
+            file['data-product-uri'] = file['dataProductURI']
+        return data['directories'], data['files']
+
+    backend = get_user_storage_provider(request, storage_resource_id=storage_resource_id)
+    directories, files = backend.get_metadata(path)
+    # for each file, lookup or register a data product and enrich the file
+    # metadata with data-product-uri and mime-type
+    for file in files:
+        data_product_uri = _get_data_product_uri(request, file['resource_path'],
+                                                 storage_resource_id=backend.resource_id)
+
+        data_product = request.airavata_client.getDataProduct(
+            request.authz_token, data_product_uri)
+        mime_type = None
+        if 'mime-type' in data_product.productMetadata:
+            mime_type = data_product.productMetadata['mime-type']
+        file['data-product-uri'] = data_product_uri
+        file['mime_type'] = mime_type
+    return directories, files
+
+
+def list_experiment_dir(request, experiment_id, path="", storage_resource_id=None):
+    """
+    List files, directories in experiment data directory. Returns a tuple,
+    see `listdir`.
+    """
+    if _is_remote_api():
+        resp = _call_remote_api(request,
+                                "/experiment-storage/{experiment_id}/{path}",
+                                path_params={"path": path,
+                                             "experiment_id": experiment_id},
+                                )
+        data = resp.json()
+        for directory in data['directories']:
+            # Convert JSON ISO8601 timestamp to datetime instance
+            directory['created_time'] = convert_iso8601_to_datetime(
+                directory['createdTime'])
+        for file in data['files']:
+            # Convert JSON ISO8601 timestamp to datetime instance
+            file['created_time'] = convert_iso8601_to_datetime(
+                file['createdTime'])
+            file['mime_type'] = file['mimeType']
+            file['data-product-uri'] = file['dataProductURI']
+        return data['directories'], data['files']
+
+    experiment = request.airavata_client.getExperiment(
+        request.authz_token, experiment_id)
+    backend = get_user_storage_provider(request, storage_resource_id=storage_resource_id)
+    exp_data_path = experiment.userConfigurationData.experimentDataDir
+    exp_data_path = os.path.join(exp_data_path, path)
+    # Implement username override with exp_owner
+    # exp_owner = experiment.userName
+    if backend.exists(exp_data_path):
+        directories, files = backend.get_metadata(exp_data_path)
+        # for each file, lookup or register a data product and enrich the file
+        # metadata with data-product-uri and mime-type
+        for file in files:
+            data_product_uri = _get_data_product_uri(request, file['resource_path'],
+                                                     storage_resource_id=backend.resource_id)
+
+            data_product = request.airavata_client.getDataProduct(
+                request.authz_token, data_product_uri)
+            mime_type = None
+            if 'mime-type' in data_product.productMetadata:
+                mime_type = data_product.productMetadata['mime-type']
+            file['data-product-uri'] = data_product_uri
+            file['mime_type'] = mime_type
+        return directories, files
+    else:
+        raise ObjectDoesNotExist("Experiment data directory does not exist")
+
+
+def experiment_dir_exists(request, experiment_id, path="", storage_resource_id=None):
+
+    if _is_remote_api():
+        resp = _call_remote_api(request,
+                                "/experiment-storage/{experiment_id}/{path}",
+                                path_params={"path": path,
+                                             "experiment_id": experiment_id},
+                                raise_for_status=False)
+        if resp.status_code == HTTPStatus.NOT_FOUND:
+            return False
+        resp.raise_for_status()
+        return resp.json()['isDir']
+
+    experiment = request.airavata_client.getExperiment(
+        request.authz_token, experiment_id)
+    exp_data_path = experiment.userConfigurationData.experimentDataDir
+    if exp_data_path is None:
+        return False
+    # Implement username overide with exp_owner
+    # exp_owner = experiment.userName
+    backend = get_user_storage_provider(request, storage_resource_id=storage_resource_id)
+    return backend.exists(exp_data_path)
+
+
+def get_experiment_dir(request, project_name=None, experiment_name=None, path=None, storage_resource_id=None):
+    warnings.warn("Use 'create_user_dir' instead.", DeprecationWarning)
+    storage_resource_id, resource_path = create_user_dir(request,
+                                                         dir_names=[project_name, experiment_name],
+                                                         create_unique=True,
+                                                         storage_resource_id=storage_resource_id)
+    return resource_path
+
+
+def create_user_dir(request, path="", dir_names=(), create_unique=False, storage_resource_id=None):
+    if _is_remote_api():
+        logger.debug(f"path={path}")
+        _call_remote_api(request,
+                         "/user-storage/~/{path}",
+                         path_params={"path": path},
+                         method="post")
+        return
+    backend = get_user_storage_provider(request, storage_resource_id=storage_resource_id)
+    # For backwards compatibility, manufacture the dir_names array as needed
+    if len(dir_names) == 0:
+        dir_names = []
+        while not backend.exists(path):
+            path, dir_name = os.path.split(path)
+            if dir_name == '':
+                raise Exception("Could not find a base directory in which to create directories.")
+            dir_names.insert(0, dir_name)
+    storage_resource_id, resource_path = backend.create_dirs(path, dir_names=dir_names, create_unique=create_unique)
+    return storage_resource_id, resource_path
+
+
+def get_rel_experiment_dir(request, experiment_id, storage_resource_id=None):
+    """Return experiment data dir path relative to user's directory."""
+    warnings.warn("Use 'list_experiment_dir' instead.", DeprecationWarning)
+    if _is_remote_api():
+        resp = _call_remote_api(request,
+                                "/experiments/{experimentId}/",
+                                path_params={"experimentId": experiment_id})
+        resp.raise_for_status()
+        return resp.json()['relativeExperimentDataDir']
+
+    experiment = request.airavata_client.getExperiment(
+        request.authz_token, experiment_id)
+    if (experiment.userConfigurationData and
+            experiment.userConfigurationData.experimentDataDir):
+        backend = get_user_storage_provider(request, storage_resource_id=storage_resource_id)
+        data_dir = experiment.userConfigurationData.experimentDataDir
+        if backend.exists(data_dir):
+            directories, _ = backend.get_metadata(os.path.dirname(data_dir))
+            for directory in directories:
+                if directory['name'] == os.path.basename(data_dir):
+                    return directory['path']
+            raise Exception(f"Could not find relative path to experiment data dir {data_dir}")
+        else:
+            return None
+    else:
+        return None
+
+
+def _get_data_product_uri(request, full_path, storage_resource_id, owner=None):
+
+    from airavata_django_portal_sdk import models
+    if owner is None:
+        owner = request.user.username
+    user_file = models.UserFiles.objects.filter(
+        username=owner, file_path=full_path)
+    if user_file.exists():
+        product_uri = user_file[0].file_dpu
+    else:
+        data_product = _save_data_product(request, full_path, storage_resource_id, owner=owner)
+        product_uri = data_product.productUri
+    return product_uri
+
+
+def _get_data_product(request, data_product_uri):
+    return request.airavata_client.getDataProduct(
+        request.authz_token, data_product_uri)
+
+
+def _save_data_product(request, full_path, storage_resource_id, name=None, content_type=None, owner=None):
+    "Create, register and record in DB a data product for full_path."
+    if owner is None:
+        owner = request.user.username
+    data_product = _create_data_product(
+        owner, full_path, storage_resource_id, name=name, content_type=content_type
+    )
+    product_uri = _register_data_product(request, full_path, data_product, owner=owner)
+    data_product.productUri = product_uri
+    return data_product
+
+
+def _register_data_product(request, full_path, data_product, owner=None):
+    if owner is None:
+        owner = request.user.username
+    product_uri = request.airavata_client.registerDataProduct(
+        request.authz_token, data_product
+    )
+    from airavata_django_portal_sdk import models
+    user_file_instance = models.UserFiles(
+        username=owner,
+        file_path=full_path,
+        file_dpu=product_uri)
+    user_file_instance.save()
+    return product_uri
+
+
+def _save_copy_of_data_product(request, full_path, data_product, storage_resource_id):
+    """Save copy of a data product with a different path."""
+    data_product_copy = _copy_data_product(request, data_product, full_path, storage_resource_id)
+    product_uri = _register_data_product(request, full_path, data_product_copy)
+    data_product_copy.productUri = product_uri
+    return data_product_copy
+
+
+def _copy_data_product(request, data_product, full_path, storage_resource_id):
+    """Create an unsaved copy of a data product with different path."""
+    data_product_copy = copy.copy(data_product)
+    data_product_copy.productUri = None
+    data_product_copy.ownerName = request.user.username
+    data_replica_location = _create_replica_location(
+        full_path, data_product_copy.productName, storage_resource_id
+    )
+    data_product_copy.replicaLocations = [data_replica_location]
+    return data_product_copy
+
+
+def _delete_data_product(username, full_path):
+    # TODO: call API to delete data product from replica catalog when it is
+    # available (not currently implemented)
+    from airavata_django_portal_sdk import models
+    user_file = models.UserFiles.objects.filter(
+        username=username, file_path=full_path)
+    if user_file.exists():
+        user_file.delete()
+
+
+def _create_data_product(username, full_path, storage_resource_id, name=None, content_type=None):
+    data_product = DataProductModel()
+    data_product.gatewayId = settings.GATEWAY_ID
+    data_product.ownerName = username
+    if name is not None:
+        file_name = name
+    else:
+        file_name = os.path.basename(full_path)
+    data_product.productName = file_name
+    data_product.dataProductType = DataProductType.FILE
+    final_content_type = _determine_content_type(full_path, content_type)
+    if final_content_type is not None:
+        data_product.productMetadata = {"mime-type": final_content_type}
+    data_replica_location = _create_replica_location(full_path, file_name, storage_resource_id)
+    data_product.replicaLocations = [data_replica_location]
+    return data_product
+
+
+def _determine_content_type(full_path, content_type=None):
+    result = content_type
+    if result is None:
+        # Try to guess the content-type from file extension
+        guessed_type, encoding = mimetypes.guess_type(full_path)
+        result = guessed_type
+    if result is None or result == "application/octet-stream":
+        # Check if file is Unicode text by trying to read some of it
+        try:
+            open(full_path, "r").read(1024)
+            result = "text/plain"
+        except UnicodeDecodeError:
+            logger.debug(f"Failed to read as Unicode text: {full_path}")
+    return result
+
+
+def _create_replica_location(full_path, file_name, storage_resource_id):
+    data_replica_location = DataReplicaLocationModel()
+    data_replica_location.storageResourceId = storage_resource_id
+    data_replica_location.replicaName = "{} gateway data store copy".format(
+        file_name)
+    data_replica_location.replicaLocationCategory = (
+        ReplicaLocationCategory.GATEWAY_DATA_STORE
+    )
+    data_replica_location.replicaPersistentType = ReplicaPersistentType.TRANSIENT
+    data_replica_location.filePath = quote(full_path)
+    return data_replica_location
+
+
+def _get_replica_filepath(data_product):
+    replica_filepaths = [
+        rep.filePath
+        for rep in data_product.replicaLocations
+        if rep.replicaLocationCategory == ReplicaLocationCategory.GATEWAY_DATA_STORE
+    ]
+    replica_filepath = replica_filepaths[0] if len(
+        replica_filepaths) > 0 else None
+    if replica_filepath:
+        return unquote(urlparse(replica_filepath).path)
+    return None
+
+
+def _get_replica_location(data_product, category=ReplicaLocationCategory.GATEWAY_DATA_STORE):
+    replica_locations = [
+        rep
+        for rep in data_product.replicaLocations
+        if rep.replicaLocationCategory == ReplicaLocationCategory.GATEWAY_DATA_STORE
+    ]
+    return replica_locations[0] if len(replica_locations) > 0 else None
+
+
+def _get_replica_resource_id_and_filepath(data_product):
+    replica_location = _get_replica_location(data_product)
+    if replica_location is not None:
+        return (replica_location.storageResourceId,
+                unquote(urlparse(replica_location.filePath).path))
+    else:
+        return None
+
+
+def _is_remote_api():
+    return getattr(settings, 'GATEWAY_DATA_STORE_REMOTE_API', None) is not None
+
+
+def _call_remote_api(
+        request,
+        path,
+        path_params=None,
+        method="get",
+        raise_for_status=True,
+        **kwargs):
+
+    headers = {
+        'Authorization': f'Bearer {request.authz_token.accessToken}'}
+    encoded_path_params = {}
+    if path_params is not None:
+        for pk, pv in path_params.items():
+            encoded_path_params[pk] = quote(pv)
+    encoded_path = path.format(**encoded_path_params)
+    logger.debug(f"encoded_path={encoded_path}")
+    r = requests.request(
+        method,
+        f'{settings.GATEWAY_DATA_STORE_REMOTE_API}{encoded_path}',
+        headers=headers,
+        **kwargs,
+    )
+    if raise_for_status:
+        r.raise_for_status()
+    return r
+
+
+def _raise_404(response, msg, exception_class=ObjectDoesNotExist):
+    if response.status_code == 404:
+        raise exception_class(msg)
diff --git a/airavata_django_portal_sdk/user_storage/backends/__init__.py b/airavata_django_portal_sdk/user_storage/backends/__init__.py
new file mode 100644
index 0000000..eefe980
--- /dev/null
+++ b/airavata_django_portal_sdk/user_storage/backends/__init__.py
@@ -0,0 +1,3 @@
+from .django_filesystem_provider import DjangoFileSystemProvider
+
+__all__ = ['DjangoFileSystemProvider']
diff --git a/airavata_django_portal_sdk/user_storage/backends/base.py b/airavata_django_portal_sdk/user_storage/backends/base.py
new file mode 100644
index 0000000..409a794
--- /dev/null
+++ b/airavata_django_portal_sdk/user_storage/backends/base.py
@@ -0,0 +1,64 @@
+
+class UserStorageProvider:
+    def __init__(self, authz_token, resource_id, context=None, **kwargs):
+        self.authz_token = authz_token
+        self.resource_id = resource_id
+        # TODO probably don't need context for passing 'request'
+        self.context = context
+
+    # TODO remove content_type
+    def save(self, resource_path, file, name=None, content_type=None):
+        """
+        Return a tuple of storage resource id and path, if any, to file.
+        """
+        raise NotImplementedError()
+
+    def get_upload_url(self, resource_path):
+        raise NotImplementedError()
+
+    def open(self, resource_path):
+        raise NotImplementedError()
+
+    def get_download_url(self, resource_path):
+        raise NotImplementedError()
+
+    def exists(self, resource_path):
+        raise NotImplementedError()
+
+    def is_file(self, resource_path):
+        # TODO: is this needed if we have get_metadata?
+        raise NotImplementedError()
+
+    def is_dir(self, resource_path):
+        # TODO: is this needed if we have get_metadata?
+        raise NotImplementedError()
+
+    def get_metadata(self, resource_path):
+        """
+        Return a tuple of two sequences: directories and files for the given
+        resource_path. If the resource_path represents a file, then the
+        directories sequence should be empty and the files sequence will only
+        have the one file.
+        """
+        raise NotImplementedError()
+
+    def delete(self, resource_path):
+        raise NotImplementedError()
+
+    def update(self, resource_path, file):
+        raise NotImplementedError()
+
+    def create_dirs(self, resource_path, dir_names=[], create_unique=False):
+        """
+        Create one or more named subdirectories inside the resource_path.
+        resource_path must exist. dir_names will potentially be normalized as
+        needed. The intermediate directories may already exist, but if the
+        final directory already exists, this method will raise an Exception,
+        unless create_unique is True in which the name will be modified until
+        a unique directory name is found.
+        """
+        raise NotImplementedError()
+
+    @property
+    def username(self):
+        return self.authz_token.claimsMap['userName']
diff --git a/airavata_django_portal_sdk/user_storage/backends/django_filesystem_provider.py b/airavata_django_portal_sdk/user_storage/backends/django_filesystem_provider.py
new file mode 100644
index 0000000..78812bd
--- /dev/null
+++ b/airavata_django_portal_sdk/user_storage/backends/django_filesystem_provider.py
@@ -0,0 +1,306 @@
+import logging
+import os
+import shutil
+
+from django.core.exceptions import ObjectDoesNotExist, SuspiciousFileOperation
+from django.core.files.storage import FileSystemStorage
+
+from .base import UserStorageProvider
+from django.core.files import File
+
+logger = logging.getLogger(__name__)
+
+TMP_INPUT_FILE_UPLOAD_DIR = "tmp"
+
+
+class DjangoFileSystemProvider(UserStorageProvider):
+    def __init__(self, authz_token, resource_id, context=None, directory=None, storage_resource_id=None, **kwargs):
+        super().__init__(authz_token, resource_id, context=context, **kwargs)
+        self.directory = directory
+        self.storage_resource_id = resource_id
+
+    def save(self, path, file, name=None, content_type=None):
+        full_path = self.datastore.save(path, file, name=name)
+        return self.storage_resource_id, full_path
+
+    def get_upload_url(self, resource_path):
+        # TODO: implement
+        return super().get_upload_url(resource_path)
+
+    def open(self, resource_path):
+        return self.datastore.open(resource_path)
+
+    def get_download_url(self, resource_path):
+        # TODO: implement
+        return super().get_download_url(resource_path)
+
+    def exists(self, resource_path):
+        return self.datastore.exists(resource_path)
+
+    def is_file(self, resource_path):
+        return self.datastore.file_exists(resource_path)
+
+    def is_dir(self, resource_path):
+        return self.datastore.dir_exists(resource_path)
+
+    def get_metadata(self, resource_path):
+        # TODO: also return an isDir boolean flag?
+        datastore = self.datastore
+        if datastore.dir_exists(resource_path):
+            directories, files = datastore.list_user_dir(
+                resource_path)
+            directories_data = []
+            for d in directories:
+                dpath = os.path.join(resource_path, d)
+                created_time = datastore.get_created_time(dpath)
+                size = datastore.size(dpath)
+                directories_data.append(
+                    {
+                        "name": d,
+                        "path": datastore.rel_path(dpath),
+                        "created_time": created_time,
+                        "size": size,
+                        "hidden": dpath == TMP_INPUT_FILE_UPLOAD_DIR,
+                    }
+                )
+            files_data = []
+            for f in files:
+                user_rel_path = os.path.join(resource_path, f)
+                if not datastore.exists(user_rel_path):
+                    logger.warning(f"listdir skipping {user_rel_path}, "
+                                   "does not exist (broken symlink?)")
+                    continue
+                created_time = datastore.get_created_time(user_rel_path)
+                size = datastore.size(user_rel_path)
+                full_path = datastore.path(user_rel_path)
+                files_data.append(
+                    {
+                        "name": f,
+                        "path": datastore.rel_path(full_path),
+                        "resource_path": full_path,
+                        "created_time": created_time,
+                        "size": size,
+                        "hidden": False,
+                    }
+                )
+            return directories_data, files_data
+        elif datastore.exists(resource_path):
+
+            created_time = datastore.get_created_time(resource_path)
+            size = datastore.size(resource_path)
+            full_path = datastore.path(resource_path)
+            return [], [
+                {
+                    "name": os.path.basename(resource_path),
+                    "path": datastore.rel_path(full_path),
+                    "resource_path": full_path,
+                    "created_time": created_time,
+                    "size": size,
+                    "hidden": False,
+                }
+            ]
+        else:
+            raise ObjectDoesNotExist(f"User storage path does not exist {resource_path}")
+
+    def delete(self, resource_path):
+        if self.datastore.file_exists(resource_path):
+            self.datastore.delete(resource_path)
+        elif self.datastore.dir_exists(resource_path):
+            self.datastore.delete_dir(resource_path)
+        else:
+            raise ObjectDoesNotExist(f"User resource_path does not exist {resource_path}")
+
+    def update(self, resource_path, file):
+        full_path = self.datastore.path(resource_path)
+        with open(full_path, 'w') as f:
+            f.write(file.read())
+
+    def create_dirs(self, resource_path, dir_names=[], create_unique=False):
+        datastore = self.datastore
+        if not datastore.exists(resource_path):
+            raise ObjectDoesNotExist(f"User resource_path does not exist {resource_path}")
+        valid_dir_names = []
+        for dir_name in dir_names:
+            valid_dir_names.append(datastore.get_valid_name(dir_name))
+        final_path = os.path.join(resource_path, *valid_dir_names)
+        if datastore.exists(final_path) and not create_unique:
+            raise Exception(f"Directory {final_path} already exists")
+        # Make sure path is unique if it already exists
+        final_path = datastore.get_available_name(final_path)
+        datastore.create_user_dir(final_path)
+        return self.storage_resource_id, final_path
+
+    @property
+    def datastore(self):
+        directory = os.path.join(self.directory, self.username)
+        owner_username = self.context.get('owner_username')
+        # When the current user isn't the owner, set the directory based on the owner's username
+        if owner_username:
+            directory = os.path.join(self.directory, owner_username)
+        return _Datastore(directory=directory)
+
+
+class _Datastore:
+    """Internal datastore abstraction."""
+
+    def __init__(self, directory=None):
+        self.directory = directory
+        self.storage = self._user_data_storage(self.directory)
+
+    def exists(self, path):
+        """Check if path exists in this data store."""
+        try:
+            return self.storage.exists(path)
+        except SuspiciousFileOperation as e:
+            logger.warning(f"Invalid path: {e}")
+            return False
+
+    def file_exists(self, path):
+        """Check if file path exists in this data store."""
+        try:
+            return self.storage.exists(path) and os.path.isfile(self.path(path))
+        except SuspiciousFileOperation as e:
+            logger.warning(f"Invalid path: {e}")
+            return False
+
+    def dir_exists(self, path):
+        """Check if directory path exists in this data store."""
+        logger.debug(f"dir_exists: {path}, {self.path(path)}")
+        try:
+            return self.storage.exists(path) and os.path.isdir(self.path(path))
+        except SuspiciousFileOperation as e:
+            logger.warning(f"Invalid path: {e}")
+            return False
+
+    def open(self, path):
+        """Open path for user if it exists in this data store."""
+        if self.exists(path):
+            return self.storage.open(path)
+        else:
+            raise ObjectDoesNotExist(
+                "File path does not exist: {}".format(path))
+
+    def save(self, path, file, name=None):
+        """Save file to username/path in data store."""
+        # file.name may be full path, so get just the name of the file
+        file_name = name if name is not None else os.path.basename(file.name)
+        user_data_storage = self.storage
+        file_path = os.path.join(
+            path, user_data_storage.get_valid_name(file_name))
+        input_file_name = user_data_storage.save(file_path, file)
+        input_file_fullpath = user_data_storage.path(input_file_name)
+        return input_file_fullpath
+
+    def create_user_dir(self, path):
+        user_data_storage = self.storage
+        if not user_data_storage.exists(path):
+            self._makedirs(path)
+        else:
+            raise Exception("Directory {} already exists".format(path))
+
+    def delete(self, path):
+        """Delete file in this data store."""
+        if self.file_exists(path):
+            user_data_storage = self.storage
+            user_data_storage.delete(path)
+        else:
+            raise ObjectDoesNotExist(
+                "File path does not exist: {}".format(path))
+
+    def delete_dir(self, path):
+        """Delete entire directory in this data store."""
+        if self.dir_exists(path):
+            user_path = self.path(path)
+            shutil.rmtree(user_path)
+        else:
+            raise ObjectDoesNotExist(
+                "File path does not exist: {}".format(path))
+
+    def get_experiment_dir(
+        self, project_name=None, experiment_name=None, path=None
+    ):
+        """Return an experiment directory (full path) for the given experiment."""
+        user_experiment_data_storage = self.storage
+        if path is None:
+            proj_dir_name = user_experiment_data_storage.get_valid_name(
+                project_name)
+            # AIRAVATA-3245 Make project directory with correct permissions
+            if not user_experiment_data_storage.exists(proj_dir_name):
+                self._makedirs(proj_dir_name)
+            experiment_dir_name = os.path.join(
+                proj_dir_name,
+                user_experiment_data_storage.get_valid_name(experiment_name),
+            )
+            # Since there may already be another experiment with the same name in
+            # this project, we need to check for available name
+            experiment_dir_name = user_experiment_data_storage.get_available_name(
+                experiment_dir_name)
+            experiment_dir = user_experiment_data_storage.path(
+                experiment_dir_name)
+        else:
+            # path can be relative to the user's storage space or absolute (as long
+            # as it is still inside the user's storage space)
+            # if path is passed in, assumption is that it has already been
+            # created
+            user_experiment_data_storage = self.storage
+            experiment_dir = user_experiment_data_storage.path(path)
+        if not user_experiment_data_storage.exists(experiment_dir):
+            self._makedirs(experiment_dir)
+        return experiment_dir
+
+    def get_valid_name(self, name):
+        return self.storage.get_valid_name(name)
+
+    def get_available_name(self, name):
+        return self.storage.get_available_name(name)
+
+    def _makedirs(self, dir_path):
+        user_experiment_data_storage = self.storage
+        full_path = user_experiment_data_storage.path(dir_path)
+        os.makedirs(
+            full_path,
+            mode=user_experiment_data_storage.directory_permissions_mode)
+        # os.makedirs mode isn't always respected so need to chmod to be sure
+        os.chmod(
+            full_path,
+            mode=user_experiment_data_storage.directory_permissions_mode)
+
+    def list_user_dir(self, file_path):
+        logger.debug("file_path={}".format(file_path))
+        user_data_storage = self.storage
+        return user_data_storage.listdir(file_path)
+
+    def get_created_time(self, file_path):
+        user_data_storage = self.storage
+        return user_data_storage.get_created_time(file_path)
+
+    def size(self, file_path):
+        user_data_storage = self.storage
+        full_path = self.path(file_path)
+        if os.path.isdir(full_path):
+            return self._get_dir_size(full_path)
+        else:
+            return user_data_storage.size(file_path)
+
+    def path(self, file_path):
+        user_data_storage = self.storage
+        return user_data_storage.path(file_path)
+
+    def rel_path(self, file_path):
+        full_path = self.path(file_path)
+        return os.path.relpath(full_path, self.path(""))
+
+    def _user_data_storage(self, directory):
+        return FileSystemStorage(location=directory)
+
+    # from https://stackoverflow.com/a/1392549
+    def _get_dir_size(self, start_path="."):
+        total_size = 0
+        for dirpath, dirnames, filenames in os.walk(start_path):
+            for f in filenames:
+                fp = os.path.join(dirpath, f)
+                # Check for broken symlinks (.exists return False for broken
+                # symlinks)
+                if os.path.exists(fp):
+                    total_size += os.path.getsize(fp)
+        return total_size
diff --git a/airavata_django_portal_sdk/user_storage/backends/mft_provider.py b/airavata_django_portal_sdk/user_storage/backends/mft_provider.py
new file mode 100644
index 0000000..6678572
--- /dev/null
+++ b/airavata_django_portal_sdk/user_storage/backends/mft_provider.py
@@ -0,0 +1,152 @@
+from .base import UserStorageProvider
+
+
+class MFTUserStorageProvider(UserStorageProvider):
+
+    def exists(self, resource_path):
+        return super().exists(resource_path)
+#         with grpc.insecure_channel('localhost:7004') as channel:
+#             # remove trailing slash and figure out parent path
+#             # FIXME remove the hard coded /tmp path
+#             parent_path, child_path = os.path.split(f"/tmp/{path}".rstrip("/"))
+#             logger.debug(f"parent_path={parent_path}, child_path={child_path}")
+#             stub = MFTApi_pb2_grpc.MFTApiServiceStub(channel)
+#             # Get metadata for parent directory and see if child_path exists
+#             request = MFTApi_pb2.FetchResourceMetadataRequest(
+#                 resourceId="remote-ssh-dir-resource",
+#                 resourceType="SCP",
+#                 resourceToken="local-ssh-cred",
+#                 resourceBackend="FILE",
+#                 resourceCredentialBackend="FILE",
+#                 targetAgentId="agent0",
+#                 childPath=parent_path,
+#                 mftAuthorizationToken="user token")
+#             response = stub.getDirectoryResourceMetadata(request)
+#             # if not child_path, then return True since the response was
+#             # successful and we just need to confirm the existence of the root dir
+#             if child_path == '':
+#                 return True
+#             return child_path in map(lambda f: f.friendlyName, response.directories)
+
+    def get_metadata(self, resource_path):
+        return super().get_metadata(resource_path)
+#     def listdir(self, request, path):
+#         # TODO setup resourceId, etc from __init__ arguments
+#         channel = grpc.insecure_channel('localhost:7004')
+#         stub = MFTApi_pb2_grpc.MFTApiServiceStub(channel)
+#         request = MFTApi_pb2.FetchResourceMetadataRequest(
+#             resourceId="remote-ssh-dir-resource",
+#             resourceType="SCP",
+#             resourceToken="local-ssh-cred",
+#             resourceBackend="FILE",
+#             resourceCredentialBackend="FILE",
+#             targetAgentId="agent0",
+#             childPath=f"/tmp/{path}",
+#             mftAuthorizationToken="user token")
+#         response = stub.getDirectoryResourceMetadata(request)
+#         directories_data = []
+#         for d in response.directories:
+
+#             dpath = os.path.join(path, d.friendlyName)
+#             created_time = datetime.fromtimestamp(d.createdTime)
+#             # TODO MFT API doesn't report size
+#             size = 0
+#             directories_data.append(
+#                 {
+#                     "name": d.friendlyName,
+#                     "path": dpath,
+#                     "created_time": created_time,
+#                     "size": size,
+#                     # TODO how to handle hidden directories or directories for
+#                     # staging input file uploads
+#                     "hidden": False
+#                 }
+#             )
+#         files_data = []
+#         for f in response.files:
+#             user_rel_path = os.path.join(path, f.friendlyName)
+#             # TODO do we need to check for broken symlinks?
+#             created_time = datetime.fromtimestamp(f.createdTime)
+#             # TODO get the size as well
+#             size = 0
+#             # full_path = datastore.path(request.user.username, user_rel_path)
+#             # TODO how do we register these as data products, do we need to?
+#             # data_product_uri = _get_data_product_uri(request, full_path)
+
+#             # data_product = request.airavata_client.getDataProduct(
+#             #     request.authz_token, data_product_uri)
+#             # mime_type = None
+#             # if 'mime-type' in data_product.productMetadata:
+#             #     mime_type = data_product.productMetadata['mime-type']
+#             files_data.append(
+#                 {
+#                     "name": f.friendlyName,
+#                     "path": user_rel_path,
+#                     "data-product-uri": None,
+#                     "created_time": created_time,
+#                     "mime_type": None,
+#                     "size": size,
+#                     "hidden": False,
+#                 }
+#             )
+#         return directories_data, files_data
+
+#     def get_file(self, request, path):
+#         # FIXME remove hard coded /tmp path
+#         path = f"/tmp/{path}".rstrip("/")
+#         file_metadata = self._get_file(path)
+#         if file_metadata is not None:
+#             user_rel_path = os.path.join(path, file_metadata.friendlyName)
+#             created_time = datetime.fromtimestamp(file_metadata.createdTime)
+#             # TODO get the size as well
+#             size = 0
+
+#             return {
+#                 "name": file_metadata.friendlyName,
+#                 "path": user_rel_path,
+#                 "data-product-uri": None,
+#                 "created_time": created_time,
+#                 "mime_type": None,
+#                 "size": size,
+#                 "hidden": False,
+#             }
+#         else:
+#             raise ObjectDoesNotExist("User storage file path does not exist")
+
+#     def _get_file(self, path):
+#         with grpc.insecure_channel('localhost:7004') as channel:
+#             stub = MFTApi_pb2_grpc.MFTApiServiceStub(channel)
+#             # Get metadata for parent directory and see if child_path exists
+#             request = MFTApi_pb2.FetchResourceMetadataRequest(
+#                 resourceId="remote-ssh-dir-resource",
+#                 resourceType="SCP",
+#                 resourceToken="local-ssh-cred",
+#                 resourceBackend="FILE",
+#                 resourceCredentialBackend="FILE",
+#                 targetAgentId="agent0",
+#                 childPath=path,
+#                 mftAuthorizationToken="user token")
+#             try:
+#                 # TODO is there a better way to check if file exists than catching exception?
+#                 return stub.getFileResourceMetadata(request)
+#             except Exception:
+#                 logger.exception(f"_get_file({path})")
+#                 return None
+
+#     def _get_download_url(self, path):
+
+#         with grpc.insecure_channel('localhost:7004') as channel:
+#             stub = MFTApi_pb2_grpc.MFTApiServiceStub(channel)
+#             download_request = MFTApi_pb2.HttpDownloadApiRequest(sourceStoreId="remote-ssh-storage",
+#                                                                  sourcePath="/tmp/a.txt",
+#                                                                  sourceToken="local-ssh-cred",
+#                                                                  sourceType="SCP",
+#                                                                  targetAgent="agent0",
+#                                                                  mftAuthorizationToken="")
+#             try:
+#                 # TODO is there a better way to check if file exists than catching exception?
+#                 # response stub.submitHttpDownload(request)
+#                 pass
+#             except Exception:
+#                 logger.exception(f"_get_file({path})")
+#                 return None
diff --git a/airavata_django_portal_sdk/user_storage/backends/remote_api_provider.py b/airavata_django_portal_sdk/user_storage/backends/remote_api_provider.py
new file mode 100644
index 0000000..e69de29
diff --git a/airavata_django_portal_sdk/user_storage_provider.py b/airavata_django_portal_sdk/user_storage_provider.py
index cc214db..b6b6842 100644
--- a/airavata_django_portal_sdk/user_storage_provider.py
+++ b/airavata_django_portal_sdk/user_storage_provider.py
@@ -13,8 +13,9 @@ logger = logging.getLogger(__name__)
 
 
 class UserStorageProvider:
-    def __init__(self, authz_token, *args, **kwargs):
+    def __init__(self, authz_token, context=None, *args, **kwargs):
         self.authz_token = authz_token
+        self.context = context
 
     def save(self, authz_token, path, file, name=None, content_type=None):
         raise NotImplementedError()