You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by am...@apache.org on 2018/07/30 11:05:11 UTC

svn commit: r1837047 [1/3] - in /jackrabbit/oak/trunk: oak-blob-cloud-azure/ oak-blob-cloud-azure/src/main/java/org/apache/jackrabbit/oak/blob/cloud/azure/blobstorage/ oak-blob-cloud-azure/src/test/java/org/apache/jackrabbit/oak/blob/cloud/azure/blobst...

Author: amitj
Date: Mon Jul 30 11:05:10 2018
New Revision: 1837047

URL: http://svn.apache.org/viewvc?rev=1837047&view=rev
Log:
OAK-7569: Direct Binary Access

Blob APIs and S3/Azure implementation for direct binary upload/download. Modified patch from Matt Ryan

Added:
    jackrabbit/oak/trunk/oak-blob-cloud-azure/src/test/java/org/apache/jackrabbit/oak/blob/cloud/azure/blobstorage/AzureDataRecordAccessProviderIT.java   (with props)
    jackrabbit/oak/trunk/oak-blob-cloud-azure/src/test/java/org/apache/jackrabbit/oak/blob/cloud/azure/blobstorage/AzureDataRecordAccessProviderTest.java   (with props)
    jackrabbit/oak/trunk/oak-blob-cloud/src/test/java/org/apache/jackrabbit/oak/blob/cloud/s3/S3DataRecordAccessProviderIT.java   (with props)
    jackrabbit/oak/trunk/oak-blob-cloud/src/test/java/org/apache/jackrabbit/oak/blob/cloud/s3/S3DataRecordAccessProviderTest.java   (with props)
    jackrabbit/oak/trunk/oak-blob-plugins/src/main/java/org/apache/jackrabbit/oak/plugins/blob/datastore/directaccess/
    jackrabbit/oak/trunk/oak-blob-plugins/src/main/java/org/apache/jackrabbit/oak/plugins/blob/datastore/directaccess/ConfigurableDataRecordAccessProvider.java   (with props)
    jackrabbit/oak/trunk/oak-blob-plugins/src/main/java/org/apache/jackrabbit/oak/plugins/blob/datastore/directaccess/DataRecordAccessProvider.java   (with props)
    jackrabbit/oak/trunk/oak-blob-plugins/src/main/java/org/apache/jackrabbit/oak/plugins/blob/datastore/directaccess/DataRecordDownloadOptions.java   (with props)
    jackrabbit/oak/trunk/oak-blob-plugins/src/main/java/org/apache/jackrabbit/oak/plugins/blob/datastore/directaccess/DataRecordUpload.java   (with props)
    jackrabbit/oak/trunk/oak-blob-plugins/src/main/java/org/apache/jackrabbit/oak/plugins/blob/datastore/directaccess/DataRecordUploadException.java   (with props)
    jackrabbit/oak/trunk/oak-blob-plugins/src/main/java/org/apache/jackrabbit/oak/plugins/blob/datastore/directaccess/DataRecordUploadToken.java   (with props)
    jackrabbit/oak/trunk/oak-blob-plugins/src/test/java/org/apache/jackrabbit/oak/plugins/blob/datastore/directaccess/
    jackrabbit/oak/trunk/oak-blob-plugins/src/test/java/org/apache/jackrabbit/oak/plugins/blob/datastore/directaccess/AbstractDataRecordAccessProviderIT.java   (with props)
    jackrabbit/oak/trunk/oak-blob-plugins/src/test/java/org/apache/jackrabbit/oak/plugins/blob/datastore/directaccess/AbstractDataRecordAccessProviderTest.java   (with props)
    jackrabbit/oak/trunk/oak-blob-plugins/src/test/java/org/apache/jackrabbit/oak/plugins/blob/datastore/directaccess/DataRecordDownloadOptionsTest.java   (with props)
Modified:
    jackrabbit/oak/trunk/oak-blob-cloud-azure/pom.xml
    jackrabbit/oak/trunk/oak-blob-cloud-azure/src/main/java/org/apache/jackrabbit/oak/blob/cloud/azure/blobstorage/AzureBlobStoreBackend.java
    jackrabbit/oak/trunk/oak-blob-cloud-azure/src/main/java/org/apache/jackrabbit/oak/blob/cloud/azure/blobstorage/AzureConstants.java
    jackrabbit/oak/trunk/oak-blob-cloud-azure/src/main/java/org/apache/jackrabbit/oak/blob/cloud/azure/blobstorage/AzureDataStore.java
    jackrabbit/oak/trunk/oak-blob-cloud-azure/src/test/java/org/apache/jackrabbit/oak/blob/cloud/azure/blobstorage/AzureDataStoreUtils.java
    jackrabbit/oak/trunk/oak-blob-cloud/pom.xml
    jackrabbit/oak/trunk/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/s3/S3Backend.java
    jackrabbit/oak/trunk/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/s3/S3Constants.java
    jackrabbit/oak/trunk/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/s3/S3DataStore.java
    jackrabbit/oak/trunk/oak-blob-cloud/src/test/java/org/apache/jackrabbit/oak/blob/cloud/s3/S3DataStoreUtils.java
    jackrabbit/oak/trunk/oak-blob-plugins/src/main/java/org/apache/jackrabbit/oak/plugins/blob/datastore/AbstractDataStoreService.java
    jackrabbit/oak/trunk/oak-blob-plugins/src/main/java/org/apache/jackrabbit/oak/plugins/blob/datastore/DataStoreBlobStore.java
    jackrabbit/oak/trunk/oak-blob-plugins/src/test/java/org/apache/jackrabbit/oak/plugins/blob/datastore/DataStoreUtils.java

Modified: jackrabbit/oak/trunk/oak-blob-cloud-azure/pom.xml
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-blob-cloud-azure/pom.xml?rev=1837047&r1=1837046&r2=1837047&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-blob-cloud-azure/pom.xml (original)
+++ jackrabbit/oak/trunk/oak-blob-cloud-azure/pom.xml Mon Jul 30 11:05:10 2018
@@ -139,6 +139,13 @@
             <scope>test</scope>
         </dependency>
         <dependency>
+            <groupId>org.apache.jackrabbit</groupId>
+            <artifactId>oak-blob</artifactId>
+            <version>${project.version}</version>
+            <classifier>tests</classifier>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
             <groupId>junit</groupId>
             <artifactId>junit</artifactId>
             <scope>test</scope>

Modified: jackrabbit/oak/trunk/oak-blob-cloud-azure/src/main/java/org/apache/jackrabbit/oak/blob/cloud/azure/blobstorage/AzureBlobStoreBackend.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-blob-cloud-azure/src/main/java/org/apache/jackrabbit/oak/blob/cloud/azure/blobstorage/AzureBlobStoreBackend.java?rev=1837047&r1=1837046&r2=1837047&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-blob-cloud-azure/src/main/java/org/apache/jackrabbit/oak/blob/cloud/azure/blobstorage/AzureBlobStoreBackend.java (original)
+++ jackrabbit/oak/trunk/oak-blob-cloud-azure/src/main/java/org/apache/jackrabbit/oak/blob/cloud/azure/blobstorage/AzureBlobStoreBackend.java Mon Jul 30 11:05:10 2018
@@ -19,10 +19,41 @@
 
 package org.apache.jackrabbit.oak.blob.cloud.azure.blobstorage;
 
+import static java.lang.Thread.currentThread;
+
+import java.io.BufferedInputStream;
+import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.UnsupportedEncodingException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.URLEncoder;
+import java.security.InvalidKeyException;
+import java.time.Instant;
+import java.util.Collection;
+import java.util.Date;
+import java.util.EnumSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Queue;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.base.Charsets;
 import com.google.common.base.Function;
 import com.google.common.base.Strings;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
 import com.google.common.collect.AbstractIterator;
 import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.microsoft.azure.storage.AccessCondition;
 import com.microsoft.azure.storage.RequestOptions;
 import com.microsoft.azure.storage.ResultContinuation;
 import com.microsoft.azure.storage.ResultSegment;
@@ -30,38 +61,33 @@ import com.microsoft.azure.storage.Retry
 import com.microsoft.azure.storage.StorageException;
 import com.microsoft.azure.storage.blob.BlobListingDetails;
 import com.microsoft.azure.storage.blob.BlobRequestOptions;
+import com.microsoft.azure.storage.blob.BlockEntry;
+import com.microsoft.azure.storage.blob.BlockListingFilter;
 import com.microsoft.azure.storage.blob.CloudBlob;
 import com.microsoft.azure.storage.blob.CloudBlobContainer;
 import com.microsoft.azure.storage.blob.CloudBlobDirectory;
 import com.microsoft.azure.storage.blob.CloudBlockBlob;
 import com.microsoft.azure.storage.blob.CopyStatus;
 import com.microsoft.azure.storage.blob.ListBlobItem;
+import com.microsoft.azure.storage.blob.SharedAccessBlobHeaders;
+import com.microsoft.azure.storage.blob.SharedAccessBlobPermissions;
+import com.microsoft.azure.storage.blob.SharedAccessBlobPolicy;
 import org.apache.commons.io.IOUtils;
 import org.apache.jackrabbit.core.data.DataIdentifier;
 import org.apache.jackrabbit.core.data.DataRecord;
 import org.apache.jackrabbit.core.data.DataStoreException;
 import org.apache.jackrabbit.oak.commons.PropertiesUtil;
+import org.apache.jackrabbit.oak.plugins.blob.datastore.directaccess.DataRecordUploadException;
+import org.apache.jackrabbit.oak.plugins.blob.datastore.directaccess.DataRecordUploadToken;
+import org.apache.jackrabbit.oak.plugins.blob.datastore.directaccess.DataRecordDownloadOptions;
+import org.apache.jackrabbit.oak.plugins.blob.datastore.directaccess.DataRecordUpload;
 import org.apache.jackrabbit.oak.spi.blob.AbstractDataRecord;
 import org.apache.jackrabbit.oak.spi.blob.AbstractSharedBackend;
+import org.apache.jackrabbit.util.Base64;
+import org.jetbrains.annotations.NotNull;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.BufferedInputStream;
-import java.io.ByteArrayInputStream;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.InputStream;
-import java.net.URISyntaxException;
-import java.util.EnumSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Properties;
-import java.util.Queue;
-
-import static java.lang.Thread.currentThread;
-
 public class AzureBlobStoreBackend extends AbstractSharedBackend {
 
     private static final Logger LOG = LoggerFactory.getLogger(AzureBlobStoreBackend.class);
@@ -72,6 +98,12 @@ public class AzureBlobStoreBackend exten
     private static final String REF_KEY = "reference.key";
 
     private static final long BUFFERED_STREAM_THRESHHOLD = 1024 * 1024;
+    static final long MIN_MULTIPART_UPLOAD_PART_SIZE = 1024 * 1024 * 10; // 10MB
+    static final long MAX_MULTIPART_UPLOAD_PART_SIZE = 1024 * 1024 * 100; // 100MB
+    static final long MAX_SINGLE_PUT_UPLOAD_SIZE = 1024 * 1024 * 256; // 256MB, Azure limit
+    static final long MAX_BINARY_UPLOAD_SIZE = (long) Math.floor(1024L * 1024L * 1024L * 1024L * 4.75); // 4.75TB, Azure limit
+    private static final int MAX_ALLOWABLE_UPLOAD_URIS = 50000; // Azure limit
+    private static final int MAX_UNIQUE_RECORD_TRIES = 10;
 
     private Properties properties;
     private String containerName;
@@ -79,6 +111,10 @@ public class AzureBlobStoreBackend exten
     private int concurrentRequestCount = 1;
     private RetryPolicy retryPolicy;
     private Integer requestTimeout;
+    private int httpDownloadURIExpirySeconds = 0; // disabled by default
+    private int httpUploadURIExpirySeconds = 0; // disabled by default
+
+    private Cache<DataIdentifier, URI> httpDownloadURICache;
 
     private byte[] secret;
 
@@ -135,6 +171,23 @@ public class AzureBlobStoreBackend exten
                 }
                 LOG.debug("Backend initialized. duration={}",
                           +(System.currentTimeMillis() - start));
+
+                // settings pertaining to DataRecordAccessProvider functionality
+                String putExpiry = properties.getProperty(AzureConstants.PRESIGNED_HTTP_UPLOAD_URI_EXPIRY_SECONDS);
+                if (null != putExpiry) {
+                    this.setHttpUploadURIExpirySeconds(Integer.parseInt(putExpiry));
+                }
+                String getExpiry = properties.getProperty(AzureConstants.PRESIGNED_HTTP_DOWNLOAD_URI_EXPIRY_SECONDS);
+                if (null != getExpiry) {
+                    this.setHttpDownloadURIExpirySeconds(Integer.parseInt(getExpiry));
+                    String cacheMaxSize = properties.getProperty(AzureConstants.PRESIGNED_HTTP_DOWNLOAD_URI_CACHE_MAX_SIZE);
+                    if (null != cacheMaxSize) {
+                        this.setHttpDownloadURICacheSize(Integer.parseInt(cacheMaxSize));
+                    }
+                    else {
+                        this.setHttpDownloadURICacheSize(0); // default
+                    }
+                }
             }
             catch (StorageException e) {
                 throw new DataStoreException(e);
@@ -667,6 +720,300 @@ public class AzureBlobStoreBackend exten
         return name;
     }
 
+    void setHttpDownloadURIExpirySeconds(int seconds) {
+        httpDownloadURIExpirySeconds = seconds;
+    }
+
+    void setHttpDownloadURICacheSize(int maxSize) {
+        // max size 0 or smaller is used to turn off the cache
+        if (maxSize > 0) {
+            LOG.info("presigned GET URI cache enabled, maxSize = {} items, expiry = {} seconds", maxSize, httpDownloadURIExpirySeconds / 2);
+            httpDownloadURICache = CacheBuilder.newBuilder()
+                    .maximumSize(maxSize)
+                    .expireAfterWrite(httpDownloadURIExpirySeconds / 2, TimeUnit.SECONDS)
+                    .build();
+        } else {
+            LOG.info("presigned GET URI cache disabled");
+            httpDownloadURICache = null;
+        }
+    }
+
+    URI createHttpDownloadURI(@NotNull DataIdentifier identifier,
+                              @NotNull DataRecordDownloadOptions downloadOptions) {
+        URI uri = null;
+        if (httpDownloadURIExpirySeconds > 0) {
+            if (null != httpDownloadURICache) {
+                uri = httpDownloadURICache.getIfPresent(identifier);
+            }
+            if (null == uri) {
+                String key = getKeyName(identifier);
+                SharedAccessBlobHeaders headers = new SharedAccessBlobHeaders();
+                headers.setCacheControl(String.format("private, max-age=%d, immutable", httpDownloadURIExpirySeconds));
+
+                String contentType = downloadOptions.getContentTypeHeader();
+                if (! Strings.isNullOrEmpty(contentType)) {
+                    headers.setContentType(contentType);
+                }
+
+                String contentDisposition =
+                        downloadOptions.getContentDispositionHeader();
+                if (! Strings.isNullOrEmpty(contentDisposition)) {
+                    headers.setContentDisposition(contentDisposition);
+                }
+
+                uri = createPresignedURI(key,
+                        EnumSet.of(SharedAccessBlobPermissions.READ),
+                        httpDownloadURIExpirySeconds,
+                        headers);
+                if (uri != null && httpDownloadURICache != null) {
+                    httpDownloadURICache.put(identifier, uri);
+                }
+            }
+        }
+        return uri;
+    }
+
+    void setHttpUploadURIExpirySeconds(int seconds) { httpUploadURIExpirySeconds = seconds; }
+
+    private DataIdentifier generateSafeRandomIdentifier() {
+        return new DataIdentifier(
+                String.format("%s-%d",
+                        UUID.randomUUID().toString(),
+                        Instant.now().toEpochMilli()
+                )
+        );
+    }
+
+    DataRecordUpload initiateHttpUpload(long maxUploadSizeInBytes, int maxNumberOfURIs) {
+        List<URI> uploadPartURIs = Lists.newArrayList();
+        long minPartSize = MIN_MULTIPART_UPLOAD_PART_SIZE;
+        long maxPartSize = MAX_MULTIPART_UPLOAD_PART_SIZE;
+
+        if (0L >= maxUploadSizeInBytes) {
+            throw new IllegalArgumentException("maxUploadSizeInBytes must be > 0");
+        }
+        else if (0 == maxNumberOfURIs) {
+            throw new IllegalArgumentException("maxNumberOfURIs must either be > 0 or -1");
+        }
+        else if (-1 > maxNumberOfURIs) {
+            throw new IllegalArgumentException("maxNumberOfURIs must either be > 0 or -1");
+        }
+        else if (maxUploadSizeInBytes > MAX_SINGLE_PUT_UPLOAD_SIZE &&
+                maxNumberOfURIs == 1) {
+            throw new IllegalArgumentException(
+                    String.format("Cannot do single-put upload with file size %d - exceeds max single-put upload size of %d",
+                            maxUploadSizeInBytes,
+                            MAX_SINGLE_PUT_UPLOAD_SIZE)
+            );
+        }
+        else if (maxUploadSizeInBytes > MAX_BINARY_UPLOAD_SIZE) {
+            throw new IllegalArgumentException(
+                    String.format("Cannot do upload with file size %d - exceeds max upload size of %d",
+                            maxUploadSizeInBytes,
+                            MAX_BINARY_UPLOAD_SIZE)
+            );
+        }
+
+        DataIdentifier newIdentifier = generateSafeRandomIdentifier();
+        String blobId = getKeyName(newIdentifier);
+        String uploadId = null;
+
+        if (httpUploadURIExpirySeconds > 0) {
+            // Always do multi-part uploads for Azure, even for small binaries.
+            //
+            // This is because Azure requires a unique header, "x-ms-blob-type=BlockBlob", to be
+            // set but only for single-put uploads, not multi-part.
+            // This would require clients to know not only the type of service provider being used
+            // but also the type of upload (single-put vs multi-part), which breaks abstraction.
+            // Instead we can insist that clients always do multi-part uploads to Azure, even
+            // if the multi-part upload consists of only one upload part.  This doesn't require
+            // additional work on the part of the client since the "complete" request must always
+            // be sent regardless, but it helps us avoid the client having to know what type
+            // of provider is being used, or us having to instruct the client to use specific
+            // types of headers, etc.
+
+            // Azure doesn't use upload IDs like AWS does
+            // Generate a fake one for compatibility - we use them to determine whether we are
+            // doing multi-part or single-put upload
+            uploadId = Base64.encode(UUID.randomUUID().toString());
+
+            long numParts = 0L;
+            if (maxNumberOfURIs > 0) {
+                long requestedPartSize = (long) Math.ceil(((double) maxUploadSizeInBytes) / ((double) maxNumberOfURIs));
+                if (requestedPartSize <= maxPartSize) {
+                    numParts = Math.min(
+                            maxNumberOfURIs,
+                            Math.min(
+                                    (long) Math.ceil(((double) maxUploadSizeInBytes) / ((double) minPartSize)),
+                                    MAX_ALLOWABLE_UPLOAD_URIS
+                            )
+                    );
+                } else {
+                    throw new IllegalArgumentException(
+                            String.format("Cannot do multi-part upload with requested part size %d", requestedPartSize)
+                    );
+                }
+            }
+            else {
+                long maximalNumParts = (long) Math.ceil(((double) maxUploadSizeInBytes) / ((double) MIN_MULTIPART_UPLOAD_PART_SIZE));
+                numParts = Math.min(maximalNumParts, MAX_ALLOWABLE_UPLOAD_URIS);
+            }
+
+            String key = getKeyName(newIdentifier);
+            EnumSet<SharedAccessBlobPermissions> perms = EnumSet.of(SharedAccessBlobPermissions.WRITE);
+            Map<String, String> presignedURIRequestParams = Maps.newHashMap();
+            presignedURIRequestParams.put("comp", "block");
+            for (long blockId = 1; blockId <= numParts; ++blockId) {
+                presignedURIRequestParams.put("blockId",
+                        Base64.encode(String.format("%06d", blockId)));
+                uploadPartURIs.add(createPresignedURI(key, perms, httpUploadURIExpirySeconds, presignedURIRequestParams));
+            }
+        }
+
+        try {
+            byte[] secret = getOrCreateReferenceKey();
+            String uploadToken = new DataRecordUploadToken(blobId, uploadId).getEncodedToken(secret);
+            return new DataRecordUpload() {
+                @Override
+                @NotNull
+                public String getUploadToken() { return uploadToken; }
+
+                @Override
+                public long getMinPartSize() { return minPartSize; }
+
+                @Override
+                public long getMaxPartSize() { return maxPartSize; }
+
+                @Override
+                @NotNull
+                public Collection<URI> getUploadURIs() { return uploadPartURIs; }
+            };
+        }
+        catch (DataStoreException e) {
+            LOG.warn("Unable to obtain data store key");
+        }
+
+        return null;
+    }
+
+    DataRecord completeHttpUpload(@NotNull String uploadTokenStr)
+            throws DataRecordUploadException, DataStoreException {
+
+        if (Strings.isNullOrEmpty(uploadTokenStr)) {
+            throw new IllegalArgumentException("uploadToken required");
+        }
+
+        DataRecordUploadToken uploadToken = DataRecordUploadToken.fromEncodedToken(uploadTokenStr, getOrCreateReferenceKey());
+        String key = uploadToken.getBlobId();
+        DataIdentifier blobId = new DataIdentifier(getIdentifierName(key));
+        try {
+            if (uploadToken.getUploadId().isPresent()) {
+                // An existing upload ID means this is a multi-part upload
+                CloudBlockBlob blob = getAzureContainer().getBlockBlobReference(key);
+                List<BlockEntry> blocks = blob.downloadBlockList(
+                        BlockListingFilter.UNCOMMITTED,
+                        AccessCondition.generateEmptyCondition(),
+                        null,
+                        null);
+                blob.commitBlockList(blocks);
+            }
+            // else do nothing - single put is already complete
+
+            if (! exists(blobId)) {
+            //if (! getAzureContainer().getBlockBlobReference(blobId).exists()) {
+                throw new DataRecordUploadException(
+                        String.format("Unable to finalize direct write of binary %s", blobId));
+            }
+        }
+        catch (URISyntaxException | StorageException e) {
+            throw new DataRecordUploadException(
+                    String.format("Unable to finalize direct write of binary %s", blobId));
+        }
+
+        return getRecord(blobId);
+    }
+
+    private URI createPresignedURI(String key,
+                                   EnumSet<SharedAccessBlobPermissions> permissions,
+                                   int expirySeconds,
+                                   SharedAccessBlobHeaders optionalHeaders) {
+        return createPresignedURI(key, permissions, expirySeconds, Maps.newHashMap(), optionalHeaders);
+    }
+
+    private URI createPresignedURI(String key,
+                                   EnumSet<SharedAccessBlobPermissions> permissions,
+                                   int expirySeconds,
+                                   Map<String, String> additionalQueryParams) {
+        return createPresignedURI(key, permissions, expirySeconds, additionalQueryParams, null);
+    }
+
+    private URI createPresignedURI(String key,
+                                   EnumSet<SharedAccessBlobPermissions> permissions,
+                                   int expirySeconds,
+                                   Map<String, String> additionalQueryParams,
+                                   SharedAccessBlobHeaders optionalHeaders) {
+        SharedAccessBlobPolicy policy = new SharedAccessBlobPolicy();
+        Date expiry = Date.from(Instant.now().plusSeconds(expirySeconds));
+        policy.setSharedAccessExpiryTime(expiry);
+        policy.setPermissions(permissions);
+
+        String accountName = properties.getProperty(AzureConstants.AZURE_STORAGE_ACCOUNT_NAME, "");
+        if (Strings.isNullOrEmpty(accountName)) {
+            LOG.warn("Can't generate presigned URI - Azure account name not found in properties");
+            return null;
+        }
+
+        URI presignedURI = null;
+        try {
+            CloudBlockBlob blob = getAzureContainer().getBlockBlobReference(key);
+            String sharedAccessSignature =
+                    null == optionalHeaders ?
+                            blob.generateSharedAccessSignature(policy,
+                                    null) :
+                            blob.generateSharedAccessSignature(policy,
+                                    optionalHeaders,
+                                    null);
+            // Shared access signature is returned encoded already.
+
+            String uriString = String.format("https://%s.blob.core.windows.net/%s/%s?%s",
+                    accountName,
+                    containerName,
+                    key,
+                    sharedAccessSignature);
+
+            if (! additionalQueryParams.isEmpty()) {
+                StringBuilder builder = new StringBuilder();
+                for (Map.Entry<String, String> e : additionalQueryParams.entrySet()) {
+                    builder.append("&");
+                    builder.append(URLEncoder.encode(e.getKey(), Charsets.UTF_8.name()));
+                    builder.append("=");
+                    builder.append(URLEncoder.encode(e.getValue(), Charsets.UTF_8.name()));
+                }
+                uriString += builder.toString();
+            }
+
+            presignedURI = new URI(uriString);
+        }
+        catch (DataStoreException e) {
+            LOG.error("No connection to Azure Blob Storage", e);
+        }
+        catch (URISyntaxException | InvalidKeyException | UnsupportedEncodingException e) {
+            LOG.error("Can't generate a presigned URI for key {}", key, e);
+        }
+        catch (StorageException e) {
+            LOG.error("Azure request to create presigned Azure Blob Storage {} URI failed. " +
+                            "Key: {}, Error: {}, HTTP Code: {}, Azure Error Code: {}",
+                    permissions.contains(SharedAccessBlobPermissions.READ) ? "GET" :
+                            (permissions.contains(SharedAccessBlobPermissions.WRITE) ? "PUT" : ""),
+                    key,
+                    e.getMessage(),
+                    e.getHttpStatusCode(),
+                    e.getErrorCode());
+        }
+
+        return presignedURI;
+    }
+
     private static class AzureBlobInfo {
         private final String name;
         private final long lastModified;
@@ -790,6 +1137,12 @@ public class AzureBlobStoreBackend exten
             if (isMeta) {
                 id = addMetaKeyPrefix(getIdentifier().toString());
             }
+            if (LOG.isDebugEnabled()) {
+                // Log message, with exception so we can get a trace to see where the call
+                // came from
+                LOG.debug("binary downloaded from Azure Blob Storage: " + getIdentifier(),
+                        new Exception());
+            }
             try {
                 return container.getBlockBlobReference(id).openInputStream();
             } catch (StorageException | URISyntaxException e) {

Modified: jackrabbit/oak/trunk/oak-blob-cloud-azure/src/main/java/org/apache/jackrabbit/oak/blob/cloud/azure/blobstorage/AzureConstants.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-blob-cloud-azure/src/main/java/org/apache/jackrabbit/oak/blob/cloud/azure/blobstorage/AzureConstants.java?rev=1837047&r1=1837046&r2=1837047&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-blob-cloud-azure/src/main/java/org/apache/jackrabbit/oak/blob/cloud/azure/blobstorage/AzureConstants.java (original)
+++ jackrabbit/oak/trunk/oak-blob-cloud-azure/src/main/java/org/apache/jackrabbit/oak/blob/cloud/azure/blobstorage/AzureConstants.java Mon Jul 30 11:05:10 2018
@@ -70,5 +70,20 @@ public final class AzureConstants {
      */
     public static final String PROXY_PORT = "proxyPort";
 
+    /**
+     * TTL for presigned HTTP upload URIs - default is 0 (disabled)
+     */
+    public static final String PRESIGNED_HTTP_UPLOAD_URI_EXPIRY_SECONDS = "presignedHttpUploadURIExpirySeconds";
+
+    /**
+     * TTL for presigned HTTP download URIs - default is 0 (disabled)
+     */
+    public static final String PRESIGNED_HTTP_DOWNLOAD_URI_EXPIRY_SECONDS = "presignedHttpDownloadURIExpirySeconds";
+
+    /**
+     * Maximum size of presigned HTTP download URI cache - default is 0 (no cache)
+     */
+    public static final String PRESIGNED_HTTP_DOWNLOAD_URI_CACHE_MAX_SIZE = "presignedHttpDownloadURICacheMaxSize";
+
     private AzureConstants() { }
 }

Modified: jackrabbit/oak/trunk/oak-blob-cloud-azure/src/main/java/org/apache/jackrabbit/oak/blob/cloud/azure/blobstorage/AzureDataStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-blob-cloud-azure/src/main/java/org/apache/jackrabbit/oak/blob/cloud/azure/blobstorage/AzureDataStore.java?rev=1837047&r1=1837046&r2=1837047&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-blob-cloud-azure/src/main/java/org/apache/jackrabbit/oak/blob/cloud/azure/blobstorage/AzureDataStore.java (original)
+++ jackrabbit/oak/trunk/oak-blob-cloud-azure/src/main/java/org/apache/jackrabbit/oak/blob/cloud/azure/blobstorage/AzureDataStore.java Mon Jul 30 11:05:10 2018
@@ -19,25 +19,36 @@
 
 package org.apache.jackrabbit.oak.blob.cloud.azure.blobstorage;
 
+import java.net.URI;
+import java.util.Properties;
+
+import org.apache.jackrabbit.core.data.DataIdentifier;
+import org.apache.jackrabbit.core.data.DataRecord;
+import org.apache.jackrabbit.core.data.DataStoreException;
 import org.apache.jackrabbit.oak.plugins.blob.AbstractSharedCachingDataStore;
+import org.apache.jackrabbit.oak.plugins.blob.datastore.directaccess.ConfigurableDataRecordAccessProvider;
+import org.apache.jackrabbit.oak.plugins.blob.datastore.directaccess.DataRecordUploadException;
+import org.apache.jackrabbit.oak.plugins.blob.datastore.directaccess.DataRecordDownloadOptions;
+import org.apache.jackrabbit.oak.plugins.blob.datastore.directaccess.DataRecordUpload;
 import org.apache.jackrabbit.oak.spi.blob.AbstractSharedBackend;
 import org.apache.jackrabbit.oak.spi.blob.SharedBackend;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
 
-import java.util.Properties;
-
-public class AzureDataStore extends AbstractSharedCachingDataStore {
-
+public class AzureDataStore extends AbstractSharedCachingDataStore implements ConfigurableDataRecordAccessProvider {
     private int minRecordLength = 16*1024;
 
     protected Properties properties;
 
+    private AzureBlobStoreBackend azureBlobStoreBackend;
+
     @Override
     protected AbstractSharedBackend createBackend() {
-        AzureBlobStoreBackend backend = new AzureBlobStoreBackend();
+        azureBlobStoreBackend = new AzureBlobStoreBackend();
         if (null != properties) {
-            backend.setProperties(properties);
+            azureBlobStoreBackend.setProperties(properties);
         }
-        return backend;
+        return azureBlobStoreBackend;
     }
 
     public void setProperties(final Properties properties) {
@@ -56,4 +67,61 @@ public class AzureDataStore extends Abst
     public void setMinRecordLength(int minRecordLength) {
         this.minRecordLength = minRecordLength;
     }
+
+    //
+    // ConfigurableDataRecordAccessProvider Implementation
+    //
+    @Override
+    public void setDirectUploadURIExpirySeconds(int seconds) {
+        if (null != azureBlobStoreBackend) {
+            azureBlobStoreBackend.setHttpUploadURIExpirySeconds(seconds);
+        }
+    }
+
+    @Override
+    public void setBinaryTransferAccelerationEnabled(boolean enabled) {
+        // NOOP - not a feature of Azure Blob Storage
+    }
+
+    @Nullable
+    @Override
+    public DataRecordUpload initiateDataRecordUpload(long maxUploadSizeInBytes, int maxNumberOfURIs)
+            throws IllegalArgumentException, DataRecordUploadException {
+        if (null == azureBlobStoreBackend) {
+            throw new DataRecordUploadException("Backend not initialized");
+        }
+        return azureBlobStoreBackend.initiateHttpUpload(maxUploadSizeInBytes, maxNumberOfURIs);
+    }
+
+    @NotNull
+    @Override
+    public DataRecord completeDataRecordUpload(String uploadToken)
+            throws IllegalArgumentException, DataRecordUploadException, DataStoreException {
+        if (null == azureBlobStoreBackend) {
+            throw new DataRecordUploadException("Backend not initialized");
+        }
+        return azureBlobStoreBackend.completeHttpUpload(uploadToken);
+    }
+
+    @Override
+    public void setDirectDownloadURIExpirySeconds(int seconds) {
+        if (null != azureBlobStoreBackend) {
+            azureBlobStoreBackend.setHttpDownloadURIExpirySeconds(seconds);
+        }
+    }
+
+    @Override
+    public void setDirectDownloadURICacheSize(int maxSize) {
+        azureBlobStoreBackend.setHttpDownloadURICacheSize(maxSize);
+    }
+
+    @Nullable
+    @Override
+    public URI getDownloadURI(@NotNull DataIdentifier identifier,
+                              @NotNull DataRecordDownloadOptions downloadOptions) {
+        if (null != azureBlobStoreBackend) {
+            return azureBlobStoreBackend.createHttpDownloadURI(identifier, downloadOptions);
+        }
+        return null;
+    }
 }

Added: jackrabbit/oak/trunk/oak-blob-cloud-azure/src/test/java/org/apache/jackrabbit/oak/blob/cloud/azure/blobstorage/AzureDataRecordAccessProviderIT.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-blob-cloud-azure/src/test/java/org/apache/jackrabbit/oak/blob/cloud/azure/blobstorage/AzureDataRecordAccessProviderIT.java?rev=1837047&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-blob-cloud-azure/src/test/java/org/apache/jackrabbit/oak/blob/cloud/azure/blobstorage/AzureDataRecordAccessProviderIT.java (added)
+++ jackrabbit/oak/trunk/oak-blob-cloud-azure/src/test/java/org/apache/jackrabbit/oak/blob/cloud/azure/blobstorage/AzureDataRecordAccessProviderIT.java Mon Jul 30 11:05:10 2018
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.blob.cloud.azure.blobstorage;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+
+import javax.net.ssl.HttpsURLConnection;
+
+import com.google.common.base.Strings;
+import org.apache.jackrabbit.core.data.DataIdentifier;
+import org.apache.jackrabbit.core.data.DataRecord;
+import org.apache.jackrabbit.core.data.DataStore;
+import org.apache.jackrabbit.core.data.DataStoreException;
+import org.apache.jackrabbit.oak.plugins.blob.datastore.directaccess.AbstractDataRecordAccessProviderIT;
+import org.apache.jackrabbit.oak.plugins.blob.datastore.directaccess.ConfigurableDataRecordAccessProvider;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.rules.TemporaryFolder;
+
+import static org.apache.jackrabbit.oak.blob.cloud.azure.blobstorage.AzureDataStoreUtils.getAzureConfig;
+import static org.apache.jackrabbit.oak.blob.cloud.azure.blobstorage.AzureDataStoreUtils.getAzureDataStore;
+import static org.apache.jackrabbit.oak.blob.cloud.azure.blobstorage.AzureDataStoreUtils.isAzureConfigured;
+import static org.junit.Assume.assumeTrue;
+
+/**
+ * As the test is memory intensive requires -Dtest.opts.memory=-Xmx2G
+ */
+public class AzureDataRecordAccessProviderIT extends AbstractDataRecordAccessProviderIT {
+
+    @ClassRule
+    public static TemporaryFolder homeDir = new TemporaryFolder(new File("target"));
+
+    private static AzureDataStore dataStore;
+
+    @BeforeClass
+    public static void setupDataStore() throws Exception {
+        assumeTrue(isAzureConfigured() && !Strings.isNullOrEmpty(System.getProperty("test.opts.memory")));
+
+        dataStore = (AzureDataStore) getAzureDataStore(getAzureConfig(), homeDir.newFolder().getAbsolutePath());
+        dataStore.setDirectDownloadURIExpirySeconds(expirySeconds);
+        dataStore.setDirectUploadURIExpirySeconds(expirySeconds);
+    }
+
+    @Override
+    protected ConfigurableDataRecordAccessProvider getDataStore() {
+        return dataStore;
+    }
+
+    @Override
+    protected DataRecord doGetRecord(DataStore ds, DataIdentifier identifier) throws DataStoreException {
+        return ds.getRecord(identifier);
+    }
+
+    @Override
+    protected void doDeleteRecord(DataStore ds, DataIdentifier identifier) throws DataStoreException {
+        ((AzureDataStore)ds).deleteRecord(identifier);
+    }
+
+    @Override
+    protected long getProviderMaxPartSize() {
+        return AzureBlobStoreBackend.MAX_MULTIPART_UPLOAD_PART_SIZE;
+    }
+
+    @Override
+    protected HttpsURLConnection getHttpsConnection(long length, URI uri) throws IOException {
+        return AzureDataStoreUtils.getHttpsConnection(length, uri);
+    }
+}

Propchange: jackrabbit/oak/trunk/oak-blob-cloud-azure/src/test/java/org/apache/jackrabbit/oak/blob/cloud/azure/blobstorage/AzureDataRecordAccessProviderIT.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: jackrabbit/oak/trunk/oak-blob-cloud-azure/src/test/java/org/apache/jackrabbit/oak/blob/cloud/azure/blobstorage/AzureDataRecordAccessProviderTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-blob-cloud-azure/src/test/java/org/apache/jackrabbit/oak/blob/cloud/azure/blobstorage/AzureDataRecordAccessProviderTest.java?rev=1837047&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-blob-cloud-azure/src/test/java/org/apache/jackrabbit/oak/blob/cloud/azure/blobstorage/AzureDataRecordAccessProviderTest.java (added)
+++ jackrabbit/oak/trunk/oak-blob-cloud-azure/src/test/java/org/apache/jackrabbit/oak/blob/cloud/azure/blobstorage/AzureDataRecordAccessProviderTest.java Mon Jul 30 11:05:10 2018
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.blob.cloud.azure.blobstorage;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.time.Instant;
+import java.time.temporal.ChronoUnit;
+import java.util.Map;
+import java.util.Properties;
+
+import javax.net.ssl.HttpsURLConnection;
+
+import org.apache.jackrabbit.core.data.DataIdentifier;
+import org.apache.jackrabbit.core.data.DataRecord;
+import org.apache.jackrabbit.core.data.DataStore;
+import org.apache.jackrabbit.core.data.DataStoreException;
+import org.apache.jackrabbit.oak.plugins.blob.datastore.directaccess.AbstractDataRecordAccessProviderTest;
+import org.apache.jackrabbit.oak.plugins.blob.datastore.directaccess.ConfigurableDataRecordAccessProvider;
+import org.apache.jackrabbit.oak.plugins.blob.datastore.directaccess.DataRecordUpload;
+import org.apache.jackrabbit.oak.plugins.blob.datastore.directaccess.DataRecordUploadException;
+import org.apache.jackrabbit.oak.spi.blob.BlobOptions;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assume.assumeTrue;
+
+public class AzureDataRecordAccessProviderTest extends AbstractDataRecordAccessProviderTest {
+    @ClassRule
+    public static TemporaryFolder homeDir = new TemporaryFolder(new File("target"));
+
+    private static AzureDataStore dataStore;
+
+    @BeforeClass
+    public static void setupDataStore() throws Exception {
+        assumeTrue(AzureDataStoreUtils.isAzureConfigured());
+        Properties props = AzureDataStoreUtils.getAzureConfig();
+        props.setProperty("cacheSize", "0");
+        dataStore = (AzureDataStore) AzureDataStoreUtils
+            .getAzureDataStore(props, homeDir.newFolder().getAbsolutePath());
+        dataStore.setDirectDownloadURIExpirySeconds(expirySeconds);
+        dataStore.setDirectUploadURIExpirySeconds(expirySeconds);
+    }
+
+    @Override
+    protected ConfigurableDataRecordAccessProvider getDataStore() {
+        return dataStore;
+    }
+
+    @Override
+    protected DataRecord doGetRecord(DataStore ds, DataIdentifier identifier) throws DataStoreException {
+        return ds.getRecord(identifier);
+    }
+
+    @Override
+    protected DataRecord doSynchronousAddRecord(DataStore ds, InputStream in) throws DataStoreException {
+        return ((AzureDataStore)ds).addRecord(in, new BlobOptions().setUpload(BlobOptions.UploadType.SYNCHRONOUS));
+    }
+
+    @Override
+    protected void doDeleteRecord(DataStore ds, DataIdentifier identifier) throws DataStoreException {
+        ((AzureDataStore)ds).deleteRecord(identifier);
+    }
+
+    @Override
+    protected long getProviderMinPartSize() {
+        return Math.max(0L, AzureBlobStoreBackend.MIN_MULTIPART_UPLOAD_PART_SIZE);
+    }
+
+    @Override
+    protected long getProviderMaxPartSize() {
+        return AzureBlobStoreBackend.MAX_MULTIPART_UPLOAD_PART_SIZE;
+    }
+
+    @Override
+    protected long getProviderMaxSinglePutSize() { return AzureBlobStoreBackend.MAX_SINGLE_PUT_UPLOAD_SIZE; }
+
+    @Override
+    protected long getProviderMaxBinaryUploadSize() { return AzureBlobStoreBackend.MAX_BINARY_UPLOAD_SIZE; }
+
+    @Override
+    protected boolean isSinglePutURI(URI uri) {
+        // Since strictly speaking we don't support single-put for Azure due to the odd
+        // required header for single-put uploads, we don't care and just always return true
+        // here to avoid failing tests for this.
+        return true;
+    }
+
+    @Override
+    protected HttpsURLConnection getHttpsConnection(long length, URI uri) throws IOException {
+        return AzureDataStoreUtils.getHttpsConnection(length, uri);
+    }
+
+    @Test
+    public void testInitDirectUploadURIHonorsExpiryTime() throws DataRecordUploadException {
+        ConfigurableDataRecordAccessProvider ds = getDataStore();
+        try {
+            Instant now = Instant.now().truncatedTo(ChronoUnit.SECONDS);
+            ds.setDirectUploadURIExpirySeconds(60);
+            DataRecordUpload uploadContext = ds.initiateDataRecordUpload(ONE_MB, 1);
+            URI uploadURI = uploadContext.getUploadURIs().iterator().next();
+            Map<String, String> params = parseQueryString(uploadURI);
+            String expiryDateStr = params.get("se");
+            Instant expiry = Instant.parse(expiryDateStr);
+            assertEquals(now, expiry.minusSeconds(60));
+        }
+        finally {
+            ds.setDirectUploadURIExpirySeconds(expirySeconds);
+        }
+    }
+
+    @Test
+    public void testInitiateDirectUploadUnlimitedURIs() throws DataRecordUploadException {
+        ConfigurableDataRecordAccessProvider ds = getDataStore();
+        long uploadSize = ONE_GB * 100;
+        int expectedNumURIs = 10000;
+        DataRecordUpload upload = ds.initiateDataRecordUpload(uploadSize, -1);
+        assertEquals(expectedNumURIs, upload.getUploadURIs().size());
+
+        uploadSize = ONE_GB * 500;
+        expectedNumURIs = 50000;
+        upload = ds.initiateDataRecordUpload(uploadSize, -1);
+        assertEquals(expectedNumURIs, upload.getUploadURIs().size());
+
+        uploadSize = ONE_GB * 1000;
+        // expectedNumURIs still 50000, Azure limit
+        upload = ds.initiateDataRecordUpload(uploadSize, -1);
+        assertEquals(expectedNumURIs, upload.getUploadURIs().size());
+    }
+}

Propchange: jackrabbit/oak/trunk/oak-blob-cloud-azure/src/test/java/org/apache/jackrabbit/oak/blob/cloud/azure/blobstorage/AzureDataRecordAccessProviderTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: jackrabbit/oak/trunk/oak-blob-cloud-azure/src/test/java/org/apache/jackrabbit/oak/blob/cloud/azure/blobstorage/AzureDataStoreUtils.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-blob-cloud-azure/src/test/java/org/apache/jackrabbit/oak/blob/cloud/azure/blobstorage/AzureDataStoreUtils.java?rev=1837047&r1=1837046&r2=1837047&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-blob-cloud-azure/src/test/java/org/apache/jackrabbit/oak/blob/cloud/azure/blobstorage/AzureDataStoreUtils.java (original)
+++ jackrabbit/oak/trunk/oak-blob-cloud-azure/src/test/java/org/apache/jackrabbit/oak/blob/cloud/azure/blobstorage/AzureDataStoreUtils.java Mon Jul 30 11:05:10 2018
@@ -18,11 +18,26 @@
  */
 package org.apache.jackrabbit.oak.blob.cloud.azure.blobstorage;
 
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.time.Instant;
+import java.time.ZoneOffset;
+import java.time.format.DateTimeFormatter;
+import java.util.Map;
+import java.util.Properties;
+
+import javax.net.ssl.HttpsURLConnection;
+
 import com.google.common.base.Predicate;
+import com.google.common.base.StandardSystemProperty;
 import com.google.common.base.Strings;
 import com.google.common.collect.Maps;
 import com.microsoft.azure.storage.blob.CloudBlobContainer;
-
 import org.apache.commons.io.IOUtils;
 import org.apache.jackrabbit.core.data.DataStore;
 import org.apache.jackrabbit.oak.commons.PropertiesUtil;
@@ -30,11 +45,7 @@ import org.apache.jackrabbit.oak.plugins
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.InputStream;
-import java.util.Map;
-import java.util.Properties;
+import static com.google.common.base.StandardSystemProperty.USER_HOME;
 
 /**
  * Extension to {@link DataStoreUtils} to enable Azure extensions for cleaning and initialization.
@@ -43,7 +54,8 @@ public class AzureDataStoreUtils extends
     private static final Logger log = LoggerFactory.getLogger(AzureDataStoreUtils.class);
 
     private static final String DEFAULT_CONFIG_PATH = "./src/test/resources/azure.properties";
-
+    private static final String DEFAULT_PROPERTY_FILE = "azure.properties";
+    private static final String SYS_PROP_NAME = "azure.config";
 
     /**
      * Check for presence of mandatory properties.
@@ -71,10 +83,17 @@ public class AzureDataStoreUtils extends
      * @return Properties instance
      */
     public static Properties getAzureConfig() {
-        String config = System.getProperty("azure.config");
+        String config = System.getProperty(SYS_PROP_NAME);
+        if (Strings.isNullOrEmpty(config)) {
+            File cfgFile = new File(USER_HOME.value(), DEFAULT_PROPERTY_FILE);
+            if (cfgFile.exists()) {
+                config = cfgFile.getAbsolutePath();
+            }
+        }
         if (Strings.isNullOrEmpty(config)) {
             config = DEFAULT_CONFIG_PATH;
         }
+
         Properties props = new Properties();
         if (new File(config).exists()) {
             InputStream is = null;
@@ -118,4 +137,17 @@ public class AzureDataStoreUtils extends
         boolean result = container.deleteIfExists();
         log.info("Container deleted. containerName={} existed={}", containerName, result);
     }
+
+    protected static HttpsURLConnection getHttpsConnection(long length, URI uri) throws IOException {
+        HttpsURLConnection conn = (HttpsURLConnection) uri.toURL().openConnection();
+        conn.setDoOutput(true);
+        conn.setRequestMethod("PUT");
+        conn.setRequestProperty("Content-Length", String.valueOf(length));
+        conn.setRequestProperty("Date", DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ssX")
+            .withZone(ZoneOffset.UTC)
+            .format(Instant.now()));
+        conn.setRequestProperty("x-ms-version", "2017-11-09");
+
+        return conn;
+    }
 }

Modified: jackrabbit/oak/trunk/oak-blob-cloud/pom.xml
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-blob-cloud/pom.xml?rev=1837047&r1=1837046&r2=1837047&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-blob-cloud/pom.xml (original)
+++ jackrabbit/oak/trunk/oak-blob-cloud/pom.xml Mon Jul 30 11:05:10 2018
@@ -196,6 +196,13 @@
             <scope>test</scope>
         </dependency>
         <dependency>
+            <groupId>org.apache.jackrabbit</groupId>
+            <artifactId>oak-blob</artifactId>
+            <version>${project.version}</version>
+            <classifier>tests</classifier>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
             <groupId>junit</groupId>
             <artifactId>junit</artifactId>
             <scope>test</scope>

Modified: jackrabbit/oak/trunk/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/s3/S3Backend.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/s3/S3Backend.java?rev=1837047&r1=1837046&r2=1837047&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/s3/S3Backend.java (original)
+++ jackrabbit/oak/trunk/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/s3/S3Backend.java Mon Jul 30 11:05:10 2018
@@ -17,11 +17,19 @@
 
 package org.apache.jackrabbit.oak.blob.cloud.s3;
 
+import static com.google.common.collect.Iterables.filter;
+import static java.lang.Thread.currentThread;
+
 import java.io.ByteArrayInputStream;
 import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.time.Instant;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.Date;
 import java.util.Iterator;
@@ -29,20 +37,33 @@ import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.Queue;
+import java.util.UUID;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 
 import com.amazonaws.AmazonClientException;
 import com.amazonaws.AmazonServiceException;
+import com.amazonaws.HttpMethod;
 import com.amazonaws.regions.Regions;
 import com.amazonaws.services.s3.AmazonS3Client;
+import com.amazonaws.services.s3.S3ClientOptions;
+import com.amazonaws.services.s3.model.BucketAccelerateConfiguration;
+import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest;
 import com.amazonaws.services.s3.model.CopyObjectRequest;
 import com.amazonaws.services.s3.model.DeleteObjectsRequest;
 import com.amazonaws.services.s3.model.DeleteObjectsResult;
+import com.amazonaws.services.s3.model.GeneratePresignedUrlRequest;
+import com.amazonaws.services.s3.model.GetBucketAccelerateConfigurationRequest;
+import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest;
+import com.amazonaws.services.s3.model.InitiateMultipartUploadResult;
 import com.amazonaws.services.s3.model.ListObjectsRequest;
+import com.amazonaws.services.s3.model.ListPartsRequest;
 import com.amazonaws.services.s3.model.ObjectListing;
 import com.amazonaws.services.s3.model.ObjectMetadata;
+import com.amazonaws.services.s3.model.PartETag;
+import com.amazonaws.services.s3.model.PartListing;
+import com.amazonaws.services.s3.model.PartSummary;
 import com.amazonaws.services.s3.model.PutObjectRequest;
 import com.amazonaws.services.s3.model.Region;
 import com.amazonaws.services.s3.model.S3Object;
@@ -54,6 +75,8 @@ import com.amazonaws.util.StringUtils;
 import com.google.common.base.Function;
 import com.google.common.base.Predicate;
 import com.google.common.base.Strings;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
 import com.google.common.collect.AbstractIterator;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
@@ -62,14 +85,16 @@ import org.apache.jackrabbit.core.data.D
 import org.apache.jackrabbit.core.data.DataRecord;
 import org.apache.jackrabbit.core.data.DataStoreException;
 import org.apache.jackrabbit.core.data.util.NamedThreadFactory;
+import org.apache.jackrabbit.oak.plugins.blob.datastore.directaccess.DataRecordDownloadOptions;
+import org.apache.jackrabbit.oak.plugins.blob.datastore.directaccess.DataRecordUpload;
+import org.apache.jackrabbit.oak.plugins.blob.datastore.directaccess.DataRecordUploadException;
+import org.apache.jackrabbit.oak.plugins.blob.datastore.directaccess.DataRecordUploadToken;
 import org.apache.jackrabbit.oak.spi.blob.AbstractDataRecord;
 import org.apache.jackrabbit.oak.spi.blob.AbstractSharedBackend;
+import org.jetbrains.annotations.NotNull;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import static com.google.common.collect.Iterables.filter;
-import static java.lang.Thread.currentThread;
-
 /**
  * A data store backend that stores data on Amazon S3.
  */
@@ -86,8 +111,23 @@ public class S3Backend extends AbstractS
 
     private static final String REF_KEY = "reference.key";
 
+    private static final int MAX_UNIQUE_RECORD_TRIES = 10;
+
+    static final String PART_NUMBER = "partNumber";
+    static final String UPLOAD_ID = "uploadId";
+
+    private static final int ONE_MB = 1024*1024;
+    static final long MIN_MULTIPART_UPLOAD_PART_SIZE = 1024 * 1024 * 10; // 10MB
+    static final long MAX_MULTIPART_UPLOAD_PART_SIZE = 1024 * 1024 * 256; // 256MB
+    static final long MAX_SINGLE_PUT_UPLOAD_SIZE = 1024L * 1024L * 1024L * 5L; // 5GB, AWS limitation
+    static final long MAX_BINARY_UPLOAD_SIZE = 1024L * 1024L * 1024L * 1024L * 5L; // 5TB, AWS limitation
+    private static final int MAX_ALLOWABLE_UPLOAD_URIS = 10000; // AWS limitation
+
     private AmazonS3Client s3service;
 
+    // needed only in case of transfer acceleration is enabled for presigned URIs
+    private AmazonS3Client s3PresignService;
+
     private String bucket;
 
     private byte[] secret;
@@ -100,6 +140,12 @@ public class S3Backend extends AbstractS
 
     private S3RequestDecorator s3ReqDecorator;
 
+    private Cache<DataIdentifier, URI> httpDownloadURICache;
+
+    // 0 = off by default
+    private int httpUploadURIExpirySeconds = 0;
+    private int httpDownloadURIExpirySeconds = 0;
+
     public void init() throws DataStoreException {
         ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
 
@@ -111,6 +157,8 @@ public class S3Backend extends AbstractS
 
             s3ReqDecorator = new S3RequestDecorator(properties);
             s3service = Utils.openService(properties);
+            s3PresignService = s3service;
+
             if (bucket == null || "".equals(bucket.trim())) {
                 bucket = properties.getProperty(S3Constants.S3_BUCKET);
                 // Alternately check if the 'container' property is set
@@ -164,6 +212,31 @@ public class S3Backend extends AbstractS
             if (renameKeyBool) {
                 renameKeys();
             }
+
+            // settings around pre-signing
+
+            String putExpiry = properties.getProperty(S3Constants.PRESIGNED_PUT_EXPIRY_SEC);
+            if (putExpiry != null) {
+                setHttpUploadURIExpirySeconds(Integer.parseInt(putExpiry));
+            }
+
+            String getExpiry = properties.getProperty(S3Constants.PRESIGNED_GET_EXPIRY_SEC);
+            if (getExpiry != null) {
+                final int getExpirySeconds = Integer.parseInt(getExpiry);
+                setHttpDownloadURIExpirySeconds(getExpirySeconds);
+
+                int cacheMaxSize = 0; // off by default
+                String cacheMaxSizeStr = properties.getProperty(S3Constants.PRESIGNED_GET_URI_CACHE_MAX_SIZE);
+                if (cacheMaxSizeStr != null) {
+                    cacheMaxSize = Integer.parseInt(cacheMaxSizeStr);
+                }
+
+                setHttpDownloadURICacheSize(cacheMaxSize);
+            }
+
+            String enablePresignedAccelerationStr = properties.getProperty(S3Constants.PRESIGNED_URI_ENABLE_ACCELERATION);
+            setBinaryTransferAccelerationEnabled(enablePresignedAccelerationStr != null && "true".equals(enablePresignedAccelerationStr));
+
             LOG.debug("S3 Backend initialized in [{}] ms",
                 +(System.currentTimeMillis() - startTime.getTime()));
         } catch (Exception e) {
@@ -185,6 +258,27 @@ public class S3Backend extends AbstractS
         }
     }
 
+    void setBinaryTransferAccelerationEnabled(boolean enabled) {
+        if (enabled) {
+            // verify acceleration is enabled on the bucket
+            BucketAccelerateConfiguration accelerateConfig = s3service.getBucketAccelerateConfiguration(new GetBucketAccelerateConfigurationRequest(bucket));
+            if (accelerateConfig.isAccelerateEnabled()) {
+                // If transfer acceleration is enabled for presigned URIs, we need a separate AmazonS3Client
+                // instance with the acceleration mode enabled, because we don't want the requests from the
+                // data store itself to S3 to use acceleration
+                s3PresignService = Utils.openService(properties);
+                s3PresignService.setS3ClientOptions(S3ClientOptions.builder().setAccelerateModeEnabled(true).build());
+                LOG.info("S3 Transfer Acceleration enabled for presigned URIs.");
+
+            } else {
+                LOG.warn("S3 Transfer Acceleration is not enabled on the bucket {}. Will create normal, non-accelerated presigned URIs.",
+                    bucket, S3Constants.PRESIGNED_URI_ENABLE_ACCELERATION);
+            }
+        } else {
+            s3PresignService = s3service;
+        }
+    }
+
     /**
      * It uploads file to Amazon S3. If file size is greater than 5MB, this
      * method uses parallel concurrent connections to upload.
@@ -292,6 +386,9 @@ public class S3Backend extends AbstractS
             S3Object object = s3service.getObject(bucket, key);
             InputStream in = object.getObjectContent();
             LOG.debug("[{}] read took [{}]ms", identifier, (System.currentTimeMillis() - start));
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("binary downloaded from S3: " + identifier, new Exception());
+            }
             return in;
         } catch (AmazonServiceException e) {
             throw new DataStoreException("Object not found: " + key, e);
@@ -516,8 +613,8 @@ public class S3Backend extends AbstractS
         } catch (AmazonServiceException e) {
             if (e.getStatusCode() == 404 || e.getStatusCode() == 403) {
                 LOG.info(
-                    "getRecord:Identifier [{}] not found. Took [{}] ms.",
-                    identifier, (System.currentTimeMillis() - start));
+                        "getRecord:Identifier [{}] not found. Took [{}] ms.",
+                        identifier, (System.currentTimeMillis() - start));
             }
             throw new DataStoreException(e);
         } finally {
@@ -575,6 +672,286 @@ public class S3Backend extends AbstractS
         }
     }
 
+    void setHttpUploadURIExpirySeconds(int seconds) {
+        this.httpUploadURIExpirySeconds = seconds;
+    }
+
+    private DataIdentifier generateSafeRandomIdentifier() {
+        return new DataIdentifier(
+                String.format("%s-%d",
+                        UUID.randomUUID().toString(),
+                        Instant.now().toEpochMilli()
+                )
+        );
+    }
+
+    private URI createPresignedPutURI(DataIdentifier identifier) {
+        if (httpUploadURIExpirySeconds <= 0) {
+            // feature disabled
+            return null;
+        }
+
+        return createPresignedURI(identifier, HttpMethod.PUT, httpUploadURIExpirySeconds);
+    }
+
+    void setHttpDownloadURIExpirySeconds(int seconds) {
+        this.httpDownloadURIExpirySeconds = seconds;
+    }
+
+    void setHttpDownloadURICacheSize(int maxSize) {
+        // max size 0 or smaller is used to turn off the cache
+        if (maxSize > 0) {
+            LOG.info("presigned GET URI cache enabled, maxSize = {} items, expiry = {} seconds", maxSize, httpDownloadURIExpirySeconds / 2);
+            httpDownloadURICache = CacheBuilder.newBuilder()
+                    .maximumSize(maxSize)
+                    // cache for half the expiry time of the URIs before giving out new ones
+                    .expireAfterWrite(httpDownloadURIExpirySeconds / 2, TimeUnit.SECONDS)
+                    .build();
+        } else {
+            LOG.info("presigned GET URI cache disabled");
+            httpDownloadURICache = null;
+        }
+    }
+
+    URI createHttpDownloadURI(@NotNull DataIdentifier identifier,
+                              @NotNull DataRecordDownloadOptions downloadOptions) {
+        if (httpDownloadURIExpirySeconds <= 0) {
+            // feature disabled
+            return null;
+        }
+
+        URI uri = null;
+        // if cache is enabled, check the cache
+        if (httpDownloadURICache != null) {
+            uri = httpDownloadURICache.getIfPresent(identifier);
+        }
+        if (uri == null) {
+            Map<String, String> requestParams = Maps.newHashMap();
+            requestParams.put("response-cache-control",
+                    String.format("private, max-age=%d, immutable",
+                            httpDownloadURIExpirySeconds)
+            );
+
+            String contentType = downloadOptions.getContentTypeHeader();
+            if (! Strings.isNullOrEmpty(contentType)) {
+                requestParams.put("response-content-type", contentType);
+            }
+            String contentDisposition =
+                    downloadOptions.getContentDispositionHeader();
+
+            if (! Strings.isNullOrEmpty(contentDisposition)) {
+                requestParams.put("response-content-disposition",
+                        contentDisposition);
+            }
+
+            uri = createPresignedURI(identifier,
+                    HttpMethod.GET,
+                    httpDownloadURIExpirySeconds,
+                    requestParams);
+            if (uri != null && httpDownloadURICache != null) {
+                httpDownloadURICache.put(identifier, uri);
+            }
+        }
+        return uri;
+    }
+
+    DataRecordUpload initiateHttpUpload(long maxUploadSizeInBytes, int maxNumberOfURIs) {
+        List<URI> uploadPartURIs = Lists.newArrayList();
+        long minPartSize = MIN_MULTIPART_UPLOAD_PART_SIZE;
+        long maxPartSize = MAX_MULTIPART_UPLOAD_PART_SIZE;
+
+        if (0L >= maxUploadSizeInBytes) {
+            throw new IllegalArgumentException("maxUploadSizeInBytes must be > 0");
+        }
+        else if (0 == maxNumberOfURIs) {
+            throw new IllegalArgumentException("maxNumberOfURIs must either be > 0 or -1");
+        }
+        else if (-1 > maxNumberOfURIs) {
+            throw new IllegalArgumentException("maxNumberOfURIs must either be > 0 or -1");
+        }
+        else if (maxUploadSizeInBytes > MAX_SINGLE_PUT_UPLOAD_SIZE &&
+                maxNumberOfURIs == 1) {
+            throw new IllegalArgumentException(
+                    String.format("Cannot do single-put upload with file size %d - exceeds max single-put upload size of %d",
+                            maxUploadSizeInBytes,
+                            MAX_SINGLE_PUT_UPLOAD_SIZE)
+            );
+        }
+        else if (maxUploadSizeInBytes > MAX_BINARY_UPLOAD_SIZE) {
+            throw new IllegalArgumentException(
+                    String.format("Cannot do upload with file size %d - exceeds max upload size of %d",
+                            maxUploadSizeInBytes,
+                            MAX_BINARY_UPLOAD_SIZE)
+            );
+        }
+
+        DataIdentifier newIdentifier = generateSafeRandomIdentifier();
+        String blobId = getKeyName(newIdentifier);
+        String uploadId = null;
+
+        if (httpUploadURIExpirySeconds > 0) {
+            if (maxNumberOfURIs == 1 ||
+                    maxUploadSizeInBytes <= minPartSize) {
+                // single put
+                uploadPartURIs.add(createPresignedPutURI(newIdentifier));
+            }
+            else {
+                // multi-part
+                InitiateMultipartUploadRequest req = new InitiateMultipartUploadRequest(bucket, blobId);
+                InitiateMultipartUploadResult res = s3service.initiateMultipartUpload(req);
+                uploadId = res.getUploadId();
+
+                long numParts;
+                if (maxNumberOfURIs > 1) {
+                    long requestedPartSize = (long) Math.ceil(((double) maxUploadSizeInBytes) / ((double) maxNumberOfURIs));
+                    if (requestedPartSize <= maxPartSize) {
+                        numParts = Math.min(
+                                maxNumberOfURIs,
+                                Math.min(
+                                        (long) Math.ceil(((double) maxUploadSizeInBytes) / ((double) minPartSize)),
+                                        MAX_ALLOWABLE_UPLOAD_URIS
+                                )
+                        );
+                    } else {
+                        throw new IllegalArgumentException(
+                                String.format("Cannot do multi-part upload with requested part size %d", requestedPartSize)
+                        );
+                    }
+                }
+                else {
+                    long maximalNumParts = (long) Math.ceil(((double) maxUploadSizeInBytes) / ((double) MIN_MULTIPART_UPLOAD_PART_SIZE));
+                    numParts = Math.min(maximalNumParts, MAX_ALLOWABLE_UPLOAD_URIS);
+                }
+
+                Map<String, String> presignedURIRequestParams = Maps.newHashMap();
+                for (long blockId = 1; blockId <= numParts; ++blockId) {
+                    presignedURIRequestParams.put("partNumber", String.valueOf(blockId));
+                    presignedURIRequestParams.put("uploadId", uploadId);
+                    uploadPartURIs.add(createPresignedURI(newIdentifier,
+                            HttpMethod.PUT,
+                            httpUploadURIExpirySeconds,
+                            presignedURIRequestParams));
+                }
+            }
+        }
+
+        try {
+            byte[] secret = getOrCreateReferenceKey();
+            String uploadToken = new DataRecordUploadToken(blobId, uploadId).getEncodedToken(secret);
+
+            return new DataRecordUpload() {
+                @Override
+                @NotNull
+                public String getUploadToken() { return uploadToken; }
+
+                @Override
+                public long getMinPartSize() { return minPartSize; }
+
+                @Override
+                public long getMaxPartSize() { return maxPartSize; }
+
+                @Override
+                @NotNull
+                public Collection<URI> getUploadURIs() { return uploadPartURIs; }
+            };
+        }
+        catch (DataStoreException e) {
+            LOG.warn("Unable to obtain data store key");
+        }
+
+        return null;
+    }
+
+    DataRecord completeHttpUpload(@NotNull String uploadTokenStr)
+            throws DataRecordUploadException, DataStoreException {
+
+        if (Strings.isNullOrEmpty(uploadTokenStr)) {
+            throw new IllegalArgumentException("uploadToken required");
+        }
+
+        DataRecordUploadToken uploadToken = DataRecordUploadToken.fromEncodedToken(uploadTokenStr, getOrCreateReferenceKey());
+        String blobId = uploadToken.getBlobId();
+        if (uploadToken.getUploadId().isPresent()) {
+            // An existing upload ID means this is a multi-part upload
+            String uploadId = uploadToken.getUploadId().get();
+            ListPartsRequest listPartsRequest = new ListPartsRequest(bucket, blobId, uploadId);
+            PartListing listing = s3service.listParts(listPartsRequest);
+            List<PartETag> eTags = Lists.newArrayList();
+            for (PartSummary partSummary : listing.getParts()) {
+                PartETag eTag = new PartETag(partSummary.getPartNumber(), partSummary.getETag());
+                eTags.add(eTag);
+            }
+
+            CompleteMultipartUploadRequest completeReq = new CompleteMultipartUploadRequest(
+                    bucket,
+                    blobId,
+                    uploadId,
+                    eTags
+            );
+
+            s3service.completeMultipartUpload(completeReq);
+        }
+        // else do nothing - single-put upload is already complete
+
+
+        if (! s3service.doesObjectExist(bucket, blobId)) {
+            throw new DataRecordUploadException(
+                    String.format("Unable to finalize direct write of binary %s", blobId)
+            );
+        }
+
+        return getRecord(new DataIdentifier(getIdentifierName(blobId)));
+    }
+
+    private URI createPresignedURI(DataIdentifier identifier,
+                                   HttpMethod method,
+                                   int expirySeconds) {
+        return createPresignedURI(identifier, method, expirySeconds, Maps.newHashMap());
+    }
+
+    private URI createPresignedURI(DataIdentifier identifier,
+                                   HttpMethod method,
+                                   int expirySeconds,
+                                   Map<String, String> reqParams) {
+        final String key = getKeyName(identifier);
+
+        try {
+            final Date expiration = new Date();
+            expiration.setTime(expiration.getTime() + expirySeconds * 1000);
+
+            GeneratePresignedUrlRequest request = new GeneratePresignedUrlRequest(bucket, key)
+                    .withMethod(method)
+                    .withExpiration(expiration);
+
+            for (Map.Entry<String, String> e : reqParams.entrySet()) {
+                request.addRequestParameter(e.getKey(), e.getValue());
+            }
+
+            URI uri = null;
+            URL presignedURL = null;
+            try {
+                presignedURL = s3PresignService.generatePresignedUrl(request);
+                uri = presignedURL.toURI();
+
+                LOG.debug("Presigned {} URI for key {}: {}", method.name(), key, uri.toString());
+            }
+            catch (URISyntaxException e) {
+                LOG.error("AWS request to create presigned S3 URI failed - could not convert '{}' to URI",
+                        (null != presignedURL ? presignedURL.toString() : "")
+                );
+            }
+
+            return uri;
+
+        } catch (AmazonServiceException e) {
+            LOG.error("AWS request to create presigned S3 {} URI failed. " +
+                            "Key: {}, Error: {}, HTTP Code: {}, AWS Error Code: {}, Error Type: {}, Request ID: {}",
+                    method.name(), key, e.getMessage(), e.getStatusCode(), e.getErrorCode(), e.getErrorType(), e.getRequestId());
+
+            return null;
+        }
+    }
+
     /**
      * Returns an iterator over the S3 objects
      * @param <T>
@@ -699,6 +1076,11 @@ public class S3Backend extends AbstractS
             if (isMeta) {
                 id = addMetaKeyPrefix(getIdentifier().toString());
             }
+            if (LOG.isDebugEnabled()) {
+                // Log message, with exception so we can get a trace to see where the call
+                // came from
+                LOG.debug("binary downloaded from S3: " + getIdentifier(), new Exception());
+            }
             return s3service.getObject(bucket, id).getObjectContent();
         }
 

Modified: jackrabbit/oak/trunk/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/s3/S3Constants.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/s3/S3Constants.java?rev=1837047&r1=1837046&r2=1837047&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/s3/S3Constants.java (original)
+++ jackrabbit/oak/trunk/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/s3/S3Constants.java Mon Jul 30 11:05:10 2018
@@ -112,6 +112,14 @@ public final class S3Constants {
      */
     public static final String PROXY_PORT = "proxyPort";
 
+    public static final String PRESIGNED_PUT_EXPIRY_SEC = "presignedPutExpirySeconds";
+
+    public static final String PRESIGNED_GET_EXPIRY_SEC = "presignedGetExpirySeconds";
+
+    public static final String PRESIGNED_GET_URI_CACHE_MAX_SIZE = "presignedGetURICacheMaxSize";
+
+    public static final String PRESIGNED_URI_ENABLE_ACCELERATION = "presignedURIEnableTransferAcceleration";
+
     /**
      * private constructor so that class cannot initialized from outside.
      */

Modified: jackrabbit/oak/trunk/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/s3/S3DataStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/s3/S3DataStore.java?rev=1837047&r1=1837046&r2=1837047&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/s3/S3DataStore.java (original)
+++ jackrabbit/oak/trunk/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/s3/S3DataStore.java Mon Jul 30 11:05:10 2018
@@ -16,22 +16,32 @@
  */
 package org.apache.jackrabbit.oak.blob.cloud.s3;
 
+import java.net.URI;
 import java.util.Properties;
 
+import org.apache.jackrabbit.core.data.DataIdentifier;
+import org.apache.jackrabbit.core.data.DataRecord;
+import org.apache.jackrabbit.core.data.DataStoreException;
 import org.apache.jackrabbit.oak.plugins.blob.AbstractSharedCachingDataStore;
+import org.apache.jackrabbit.oak.plugins.blob.datastore.directaccess.ConfigurableDataRecordAccessProvider;
+import org.apache.jackrabbit.oak.plugins.blob.datastore.directaccess.DataRecordUploadException;
+import org.apache.jackrabbit.oak.plugins.blob.datastore.directaccess.DataRecordDownloadOptions;
+import org.apache.jackrabbit.oak.plugins.blob.datastore.directaccess.DataRecordUpload;
 import org.apache.jackrabbit.oak.spi.blob.AbstractSharedBackend;
 import org.apache.jackrabbit.oak.spi.blob.SharedBackend;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
 
 
 /**
  * Amazon S3 data store extending from {@link AbstractSharedCachingDataStore}.
  */
-public class S3DataStore extends AbstractSharedCachingDataStore {
+public class S3DataStore extends AbstractSharedCachingDataStore implements ConfigurableDataRecordAccessProvider {
 
     protected Properties properties;
 
+    private S3Backend s3Backend;
+
     /**
      * The minimum size of an object that should be stored in this data store.
      */
@@ -39,11 +49,11 @@ public class S3DataStore extends Abstrac
 
     @Override
     protected AbstractSharedBackend createBackend() {
-        S3Backend backend = new S3Backend();
+        s3Backend = new S3Backend();
         if(properties != null){
-            backend.setProperties(properties);
+            s3Backend.setProperties(properties);
         }
-        return backend;
+        return s3Backend;
     }
 
     /**------------------------------------------- Getters & Setters-----------------------------**/
@@ -67,4 +77,65 @@ public class S3DataStore extends Abstrac
     public void setMinRecordLength(int minRecordLength) {
         this.minRecordLength = minRecordLength;
     }
+
+    //
+    // ConfigurableDataRecordAccessProvider implementation
+    //
+    @Override
+    public void setDirectUploadURIExpirySeconds(int seconds) {
+        if (s3Backend != null) {
+            s3Backend.setHttpUploadURIExpirySeconds(seconds);
+        }
+    }
+
+    @Override
+    public void setBinaryTransferAccelerationEnabled(boolean enabled) {
+        if (s3Backend != null) {
+            s3Backend.setBinaryTransferAccelerationEnabled(enabled);
+        }
+    }
+
+    @Nullable
+    @Override
+    public DataRecordUpload initiateDataRecordUpload(long maxUploadSizeInBytes, int maxNumberOfURIs)
+            throws IllegalArgumentException, DataRecordUploadException {
+        if (null == s3Backend) {
+            throw new DataRecordUploadException("Backend not initialized");
+        }
+        return s3Backend.initiateHttpUpload(maxUploadSizeInBytes, maxNumberOfURIs);
+    }
+
+    @NotNull
+    @Override
+    public DataRecord completeDataRecordUpload(@NotNull String uploadToken)
+            throws IllegalArgumentException, DataRecordUploadException, DataStoreException {
+        if (null == s3Backend) {
+            throw new DataRecordUploadException("Backend not initialized");
+        }
+        return s3Backend.completeHttpUpload(uploadToken);
+    }
+
+    @Override
+    public void setDirectDownloadURIExpirySeconds(int seconds) {
+        if (s3Backend != null) {
+            s3Backend.setHttpDownloadURIExpirySeconds(seconds);
+        }
+    }
+
+    @Override
+    public void setDirectDownloadURICacheSize(int maxSize) {
+        if (s3Backend != null) {
+            s3Backend.setHttpDownloadURICacheSize(maxSize);
+        }
+    }
+
+    @Nullable
+    @Override
+    public URI getDownloadURI(@NotNull DataIdentifier identifier,
+                              @NotNull DataRecordDownloadOptions downloadOptions) {
+        if (s3Backend == null) {
+            return null;
+        }
+        return s3Backend.createHttpDownloadURI(identifier, downloadOptions);
+    }
 }

Added: jackrabbit/oak/trunk/oak-blob-cloud/src/test/java/org/apache/jackrabbit/oak/blob/cloud/s3/S3DataRecordAccessProviderIT.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-blob-cloud/src/test/java/org/apache/jackrabbit/oak/blob/cloud/s3/S3DataRecordAccessProviderIT.java?rev=1837047&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-blob-cloud/src/test/java/org/apache/jackrabbit/oak/blob/cloud/s3/S3DataRecordAccessProviderIT.java (added)
+++ jackrabbit/oak/trunk/oak-blob-cloud/src/test/java/org/apache/jackrabbit/oak/blob/cloud/s3/S3DataRecordAccessProviderIT.java Mon Jul 30 11:05:10 2018
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.blob.cloud.s3;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+
+import javax.net.ssl.HttpsURLConnection;
+
+import org.apache.jackrabbit.core.data.DataIdentifier;
+import org.apache.jackrabbit.core.data.DataRecord;
+import org.apache.jackrabbit.core.data.DataStore;
+import org.apache.jackrabbit.core.data.DataStoreException;
+import org.apache.jackrabbit.oak.plugins.blob.datastore.directaccess.AbstractDataRecordAccessProviderIT;
+import org.apache.jackrabbit.oak.plugins.blob.datastore.directaccess.ConfigurableDataRecordAccessProvider;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.rules.TemporaryFolder;
+
+import static com.google.common.base.Strings.isNullOrEmpty;
+import static org.apache.jackrabbit.oak.blob.cloud.s3.S3DataStoreUtils.getFixtures;
+import static org.apache.jackrabbit.oak.blob.cloud.s3.S3DataStoreUtils.getS3DataStore;
+import static org.apache.jackrabbit.oak.blob.cloud.s3.S3DataStoreUtils.isS3Configured;
+import static org.junit.Assume.assumeTrue;
+
+/**
+ * The test is memory intensive and requires -Dtest.opts.memory=-Xmx2G
+ */
+public class S3DataRecordAccessProviderIT extends AbstractDataRecordAccessProviderIT {
+    @ClassRule
+    public static TemporaryFolder homeDir = new TemporaryFolder(new File("target"));
+
+    private static S3DataStore dataStore;
+
+    @BeforeClass
+    public static void setupDataStore() throws Exception {
+        assumeTrue(isS3Configured() && !isNullOrEmpty(System.getProperty("test.opts.memory")));
+        dataStore = (S3DataStore) getS3DataStore(getFixtures().get(0), S3DataStoreUtils.getS3Config(),
+            homeDir.newFolder().getAbsolutePath());
+
+        dataStore.setDirectDownloadURIExpirySeconds(expirySeconds);
+        dataStore.setDirectUploadURIExpirySeconds(expirySeconds);
+    }
+
+    @Override
+    protected ConfigurableDataRecordAccessProvider getDataStore() {
+        return dataStore;
+    }
+
+    @Override
+    protected DataRecord doGetRecord(DataStore ds, DataIdentifier identifier) throws DataStoreException {
+        return ds.getRecord(identifier);
+    }
+
+    @Override
+    protected void doDeleteRecord(DataStore ds, DataIdentifier identifier) throws DataStoreException {
+        ((S3DataStore)ds).deleteRecord(identifier);
+    }
+
+    @Override
+    protected long getProviderMaxPartSize() {
+        return S3Backend.MAX_MULTIPART_UPLOAD_PART_SIZE;
+    }
+
+    @Override
+    protected HttpsURLConnection getHttpsConnection(long length, URI uri) throws IOException {
+        return S3DataStoreUtils.getHttpsConnection(length, uri);
+    }
+}

Propchange: jackrabbit/oak/trunk/oak-blob-cloud/src/test/java/org/apache/jackrabbit/oak/blob/cloud/s3/S3DataRecordAccessProviderIT.java
------------------------------------------------------------------------------
    svn:eol-style = native