You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by am...@apache.org on 2018/07/30 11:05:11 UTC
svn commit: r1837047 [1/3] - in /jackrabbit/oak/trunk: oak-blob-cloud-azure/
oak-blob-cloud-azure/src/main/java/org/apache/jackrabbit/oak/blob/cloud/azure/blobstorage/
oak-blob-cloud-azure/src/test/java/org/apache/jackrabbit/oak/blob/cloud/azure/blobst...
Author: amitj
Date: Mon Jul 30 11:05:10 2018
New Revision: 1837047
URL: http://svn.apache.org/viewvc?rev=1837047&view=rev
Log:
OAK-7569: Direct Binary Access
Blob APIs and S3/Azure implementation for direct binary upload/download. Modified patch from Matt Ryan
Added:
jackrabbit/oak/trunk/oak-blob-cloud-azure/src/test/java/org/apache/jackrabbit/oak/blob/cloud/azure/blobstorage/AzureDataRecordAccessProviderIT.java (with props)
jackrabbit/oak/trunk/oak-blob-cloud-azure/src/test/java/org/apache/jackrabbit/oak/blob/cloud/azure/blobstorage/AzureDataRecordAccessProviderTest.java (with props)
jackrabbit/oak/trunk/oak-blob-cloud/src/test/java/org/apache/jackrabbit/oak/blob/cloud/s3/S3DataRecordAccessProviderIT.java (with props)
jackrabbit/oak/trunk/oak-blob-cloud/src/test/java/org/apache/jackrabbit/oak/blob/cloud/s3/S3DataRecordAccessProviderTest.java (with props)
jackrabbit/oak/trunk/oak-blob-plugins/src/main/java/org/apache/jackrabbit/oak/plugins/blob/datastore/directaccess/
jackrabbit/oak/trunk/oak-blob-plugins/src/main/java/org/apache/jackrabbit/oak/plugins/blob/datastore/directaccess/ConfigurableDataRecordAccessProvider.java (with props)
jackrabbit/oak/trunk/oak-blob-plugins/src/main/java/org/apache/jackrabbit/oak/plugins/blob/datastore/directaccess/DataRecordAccessProvider.java (with props)
jackrabbit/oak/trunk/oak-blob-plugins/src/main/java/org/apache/jackrabbit/oak/plugins/blob/datastore/directaccess/DataRecordDownloadOptions.java (with props)
jackrabbit/oak/trunk/oak-blob-plugins/src/main/java/org/apache/jackrabbit/oak/plugins/blob/datastore/directaccess/DataRecordUpload.java (with props)
jackrabbit/oak/trunk/oak-blob-plugins/src/main/java/org/apache/jackrabbit/oak/plugins/blob/datastore/directaccess/DataRecordUploadException.java (with props)
jackrabbit/oak/trunk/oak-blob-plugins/src/main/java/org/apache/jackrabbit/oak/plugins/blob/datastore/directaccess/DataRecordUploadToken.java (with props)
jackrabbit/oak/trunk/oak-blob-plugins/src/test/java/org/apache/jackrabbit/oak/plugins/blob/datastore/directaccess/
jackrabbit/oak/trunk/oak-blob-plugins/src/test/java/org/apache/jackrabbit/oak/plugins/blob/datastore/directaccess/AbstractDataRecordAccessProviderIT.java (with props)
jackrabbit/oak/trunk/oak-blob-plugins/src/test/java/org/apache/jackrabbit/oak/plugins/blob/datastore/directaccess/AbstractDataRecordAccessProviderTest.java (with props)
jackrabbit/oak/trunk/oak-blob-plugins/src/test/java/org/apache/jackrabbit/oak/plugins/blob/datastore/directaccess/DataRecordDownloadOptionsTest.java (with props)
Modified:
jackrabbit/oak/trunk/oak-blob-cloud-azure/pom.xml
jackrabbit/oak/trunk/oak-blob-cloud-azure/src/main/java/org/apache/jackrabbit/oak/blob/cloud/azure/blobstorage/AzureBlobStoreBackend.java
jackrabbit/oak/trunk/oak-blob-cloud-azure/src/main/java/org/apache/jackrabbit/oak/blob/cloud/azure/blobstorage/AzureConstants.java
jackrabbit/oak/trunk/oak-blob-cloud-azure/src/main/java/org/apache/jackrabbit/oak/blob/cloud/azure/blobstorage/AzureDataStore.java
jackrabbit/oak/trunk/oak-blob-cloud-azure/src/test/java/org/apache/jackrabbit/oak/blob/cloud/azure/blobstorage/AzureDataStoreUtils.java
jackrabbit/oak/trunk/oak-blob-cloud/pom.xml
jackrabbit/oak/trunk/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/s3/S3Backend.java
jackrabbit/oak/trunk/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/s3/S3Constants.java
jackrabbit/oak/trunk/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/s3/S3DataStore.java
jackrabbit/oak/trunk/oak-blob-cloud/src/test/java/org/apache/jackrabbit/oak/blob/cloud/s3/S3DataStoreUtils.java
jackrabbit/oak/trunk/oak-blob-plugins/src/main/java/org/apache/jackrabbit/oak/plugins/blob/datastore/AbstractDataStoreService.java
jackrabbit/oak/trunk/oak-blob-plugins/src/main/java/org/apache/jackrabbit/oak/plugins/blob/datastore/DataStoreBlobStore.java
jackrabbit/oak/trunk/oak-blob-plugins/src/test/java/org/apache/jackrabbit/oak/plugins/blob/datastore/DataStoreUtils.java
Modified: jackrabbit/oak/trunk/oak-blob-cloud-azure/pom.xml
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-blob-cloud-azure/pom.xml?rev=1837047&r1=1837046&r2=1837047&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-blob-cloud-azure/pom.xml (original)
+++ jackrabbit/oak/trunk/oak-blob-cloud-azure/pom.xml Mon Jul 30 11:05:10 2018
@@ -139,6 +139,13 @@
<scope>test</scope>
</dependency>
<dependency>
+ <groupId>org.apache.jackrabbit</groupId>
+ <artifactId>oak-blob</artifactId>
+ <version>${project.version}</version>
+ <classifier>tests</classifier>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
Modified: jackrabbit/oak/trunk/oak-blob-cloud-azure/src/main/java/org/apache/jackrabbit/oak/blob/cloud/azure/blobstorage/AzureBlobStoreBackend.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-blob-cloud-azure/src/main/java/org/apache/jackrabbit/oak/blob/cloud/azure/blobstorage/AzureBlobStoreBackend.java?rev=1837047&r1=1837046&r2=1837047&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-blob-cloud-azure/src/main/java/org/apache/jackrabbit/oak/blob/cloud/azure/blobstorage/AzureBlobStoreBackend.java (original)
+++ jackrabbit/oak/trunk/oak-blob-cloud-azure/src/main/java/org/apache/jackrabbit/oak/blob/cloud/azure/blobstorage/AzureBlobStoreBackend.java Mon Jul 30 11:05:10 2018
@@ -19,10 +19,41 @@
package org.apache.jackrabbit.oak.blob.cloud.azure.blobstorage;
+import static java.lang.Thread.currentThread;
+
+import java.io.BufferedInputStream;
+import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.UnsupportedEncodingException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.URLEncoder;
+import java.security.InvalidKeyException;
+import java.time.Instant;
+import java.util.Collection;
+import java.util.Date;
+import java.util.EnumSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Queue;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.base.Charsets;
import com.google.common.base.Function;
import com.google.common.base.Strings;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
import com.google.common.collect.AbstractIterator;
import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.microsoft.azure.storage.AccessCondition;
import com.microsoft.azure.storage.RequestOptions;
import com.microsoft.azure.storage.ResultContinuation;
import com.microsoft.azure.storage.ResultSegment;
@@ -30,38 +61,33 @@ import com.microsoft.azure.storage.Retry
import com.microsoft.azure.storage.StorageException;
import com.microsoft.azure.storage.blob.BlobListingDetails;
import com.microsoft.azure.storage.blob.BlobRequestOptions;
+import com.microsoft.azure.storage.blob.BlockEntry;
+import com.microsoft.azure.storage.blob.BlockListingFilter;
import com.microsoft.azure.storage.blob.CloudBlob;
import com.microsoft.azure.storage.blob.CloudBlobContainer;
import com.microsoft.azure.storage.blob.CloudBlobDirectory;
import com.microsoft.azure.storage.blob.CloudBlockBlob;
import com.microsoft.azure.storage.blob.CopyStatus;
import com.microsoft.azure.storage.blob.ListBlobItem;
+import com.microsoft.azure.storage.blob.SharedAccessBlobHeaders;
+import com.microsoft.azure.storage.blob.SharedAccessBlobPermissions;
+import com.microsoft.azure.storage.blob.SharedAccessBlobPolicy;
import org.apache.commons.io.IOUtils;
import org.apache.jackrabbit.core.data.DataIdentifier;
import org.apache.jackrabbit.core.data.DataRecord;
import org.apache.jackrabbit.core.data.DataStoreException;
import org.apache.jackrabbit.oak.commons.PropertiesUtil;
+import org.apache.jackrabbit.oak.plugins.blob.datastore.directaccess.DataRecordUploadException;
+import org.apache.jackrabbit.oak.plugins.blob.datastore.directaccess.DataRecordUploadToken;
+import org.apache.jackrabbit.oak.plugins.blob.datastore.directaccess.DataRecordDownloadOptions;
+import org.apache.jackrabbit.oak.plugins.blob.datastore.directaccess.DataRecordUpload;
import org.apache.jackrabbit.oak.spi.blob.AbstractDataRecord;
import org.apache.jackrabbit.oak.spi.blob.AbstractSharedBackend;
+import org.apache.jackrabbit.util.Base64;
+import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.BufferedInputStream;
-import java.io.ByteArrayInputStream;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.InputStream;
-import java.net.URISyntaxException;
-import java.util.EnumSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Properties;
-import java.util.Queue;
-
-import static java.lang.Thread.currentThread;
-
public class AzureBlobStoreBackend extends AbstractSharedBackend {
private static final Logger LOG = LoggerFactory.getLogger(AzureBlobStoreBackend.class);
@@ -72,6 +98,12 @@ public class AzureBlobStoreBackend exten
private static final String REF_KEY = "reference.key";
private static final long BUFFERED_STREAM_THRESHHOLD = 1024 * 1024;
+ static final long MIN_MULTIPART_UPLOAD_PART_SIZE = 1024 * 1024 * 10; // 10MB
+ static final long MAX_MULTIPART_UPLOAD_PART_SIZE = 1024 * 1024 * 100; // 100MB
+ static final long MAX_SINGLE_PUT_UPLOAD_SIZE = 1024 * 1024 * 256; // 256MB, Azure limit
+ static final long MAX_BINARY_UPLOAD_SIZE = (long) Math.floor(1024L * 1024L * 1024L * 1024L * 4.75); // 4.75TB, Azure limit
+ private static final int MAX_ALLOWABLE_UPLOAD_URIS = 50000; // Azure limit
+ private static final int MAX_UNIQUE_RECORD_TRIES = 10;
private Properties properties;
private String containerName;
@@ -79,6 +111,10 @@ public class AzureBlobStoreBackend exten
private int concurrentRequestCount = 1;
private RetryPolicy retryPolicy;
private Integer requestTimeout;
+ private int httpDownloadURIExpirySeconds = 0; // disabled by default
+ private int httpUploadURIExpirySeconds = 0; // disabled by default
+
+ private Cache<DataIdentifier, URI> httpDownloadURICache;
private byte[] secret;
@@ -135,6 +171,23 @@ public class AzureBlobStoreBackend exten
}
LOG.debug("Backend initialized. duration={}",
+(System.currentTimeMillis() - start));
+
+ // settings pertaining to DataRecordAccessProvider functionality
+ String putExpiry = properties.getProperty(AzureConstants.PRESIGNED_HTTP_UPLOAD_URI_EXPIRY_SECONDS);
+ if (null != putExpiry) {
+ this.setHttpUploadURIExpirySeconds(Integer.parseInt(putExpiry));
+ }
+ String getExpiry = properties.getProperty(AzureConstants.PRESIGNED_HTTP_DOWNLOAD_URI_EXPIRY_SECONDS);
+ if (null != getExpiry) {
+ this.setHttpDownloadURIExpirySeconds(Integer.parseInt(getExpiry));
+ String cacheMaxSize = properties.getProperty(AzureConstants.PRESIGNED_HTTP_DOWNLOAD_URI_CACHE_MAX_SIZE);
+ if (null != cacheMaxSize) {
+ this.setHttpDownloadURICacheSize(Integer.parseInt(cacheMaxSize));
+ }
+ else {
+ this.setHttpDownloadURICacheSize(0); // default
+ }
+ }
}
catch (StorageException e) {
throw new DataStoreException(e);
@@ -667,6 +720,300 @@ public class AzureBlobStoreBackend exten
return name;
}
+ void setHttpDownloadURIExpirySeconds(int seconds) {
+ httpDownloadURIExpirySeconds = seconds;
+ }
+
+ void setHttpDownloadURICacheSize(int maxSize) {
+ // max size 0 or smaller is used to turn off the cache
+ if (maxSize > 0) {
+ LOG.info("presigned GET URI cache enabled, maxSize = {} items, expiry = {} seconds", maxSize, httpDownloadURIExpirySeconds / 2);
+ httpDownloadURICache = CacheBuilder.newBuilder()
+ .maximumSize(maxSize)
+ .expireAfterWrite(httpDownloadURIExpirySeconds / 2, TimeUnit.SECONDS)
+ .build();
+ } else {
+ LOG.info("presigned GET URI cache disabled");
+ httpDownloadURICache = null;
+ }
+ }
+
+ URI createHttpDownloadURI(@NotNull DataIdentifier identifier,
+ @NotNull DataRecordDownloadOptions downloadOptions) {
+ URI uri = null;
+ if (httpDownloadURIExpirySeconds > 0) {
+ if (null != httpDownloadURICache) {
+ uri = httpDownloadURICache.getIfPresent(identifier);
+ }
+ if (null == uri) {
+ String key = getKeyName(identifier);
+ SharedAccessBlobHeaders headers = new SharedAccessBlobHeaders();
+ headers.setCacheControl(String.format("private, max-age=%d, immutable", httpDownloadURIExpirySeconds));
+
+ String contentType = downloadOptions.getContentTypeHeader();
+ if (! Strings.isNullOrEmpty(contentType)) {
+ headers.setContentType(contentType);
+ }
+
+ String contentDisposition =
+ downloadOptions.getContentDispositionHeader();
+ if (! Strings.isNullOrEmpty(contentDisposition)) {
+ headers.setContentDisposition(contentDisposition);
+ }
+
+ uri = createPresignedURI(key,
+ EnumSet.of(SharedAccessBlobPermissions.READ),
+ httpDownloadURIExpirySeconds,
+ headers);
+ if (uri != null && httpDownloadURICache != null) {
+ httpDownloadURICache.put(identifier, uri);
+ }
+ }
+ }
+ return uri;
+ }
+
+ void setHttpUploadURIExpirySeconds(int seconds) { httpUploadURIExpirySeconds = seconds; }
+
+ private DataIdentifier generateSafeRandomIdentifier() {
+ return new DataIdentifier(
+ String.format("%s-%d",
+ UUID.randomUUID().toString(),
+ Instant.now().toEpochMilli()
+ )
+ );
+ }
+
+ DataRecordUpload initiateHttpUpload(long maxUploadSizeInBytes, int maxNumberOfURIs) {
+ List<URI> uploadPartURIs = Lists.newArrayList();
+ long minPartSize = MIN_MULTIPART_UPLOAD_PART_SIZE;
+ long maxPartSize = MAX_MULTIPART_UPLOAD_PART_SIZE;
+
+ if (0L >= maxUploadSizeInBytes) {
+ throw new IllegalArgumentException("maxUploadSizeInBytes must be > 0");
+ }
+ else if (0 == maxNumberOfURIs) {
+ throw new IllegalArgumentException("maxNumberOfURIs must either be > 0 or -1");
+ }
+ else if (-1 > maxNumberOfURIs) {
+ throw new IllegalArgumentException("maxNumberOfURIs must either be > 0 or -1");
+ }
+ else if (maxUploadSizeInBytes > MAX_SINGLE_PUT_UPLOAD_SIZE &&
+ maxNumberOfURIs == 1) {
+ throw new IllegalArgumentException(
+ String.format("Cannot do single-put upload with file size %d - exceeds max single-put upload size of %d",
+ maxUploadSizeInBytes,
+ MAX_SINGLE_PUT_UPLOAD_SIZE)
+ );
+ }
+ else if (maxUploadSizeInBytes > MAX_BINARY_UPLOAD_SIZE) {
+ throw new IllegalArgumentException(
+ String.format("Cannot do upload with file size %d - exceeds max upload size of %d",
+ maxUploadSizeInBytes,
+ MAX_BINARY_UPLOAD_SIZE)
+ );
+ }
+
+ DataIdentifier newIdentifier = generateSafeRandomIdentifier();
+ String blobId = getKeyName(newIdentifier);
+ String uploadId = null;
+
+ if (httpUploadURIExpirySeconds > 0) {
+ // Always do multi-part uploads for Azure, even for small binaries.
+ //
+ // This is because Azure requires a unique header, "x-ms-blob-type=BlockBlob", to be
+ // set but only for single-put uploads, not multi-part.
+ // This would require clients to know not only the type of service provider being used
+ // but also the type of upload (single-put vs multi-part), which breaks abstraction.
+ // Instead we can insist that clients always do multi-part uploads to Azure, even
+ // if the multi-part upload consists of only one upload part. This doesn't require
+ // additional work on the part of the client since the "complete" request must always
+ // be sent regardless, but it helps us avoid the client having to know what type
+ // of provider is being used, or us having to instruct the client to use specific
+ // types of headers, etc.
+
+ // Azure doesn't use upload IDs like AWS does
+ // Generate a fake one for compatibility - we use them to determine whether we are
+ // doing multi-part or single-put upload
+ uploadId = Base64.encode(UUID.randomUUID().toString());
+
+ long numParts = 0L;
+ if (maxNumberOfURIs > 0) {
+ long requestedPartSize = (long) Math.ceil(((double) maxUploadSizeInBytes) / ((double) maxNumberOfURIs));
+ if (requestedPartSize <= maxPartSize) {
+ numParts = Math.min(
+ maxNumberOfURIs,
+ Math.min(
+ (long) Math.ceil(((double) maxUploadSizeInBytes) / ((double) minPartSize)),
+ MAX_ALLOWABLE_UPLOAD_URIS
+ )
+ );
+ } else {
+ throw new IllegalArgumentException(
+ String.format("Cannot do multi-part upload with requested part size %d", requestedPartSize)
+ );
+ }
+ }
+ else {
+ long maximalNumParts = (long) Math.ceil(((double) maxUploadSizeInBytes) / ((double) MIN_MULTIPART_UPLOAD_PART_SIZE));
+ numParts = Math.min(maximalNumParts, MAX_ALLOWABLE_UPLOAD_URIS);
+ }
+
+ String key = getKeyName(newIdentifier);
+ EnumSet<SharedAccessBlobPermissions> perms = EnumSet.of(SharedAccessBlobPermissions.WRITE);
+ Map<String, String> presignedURIRequestParams = Maps.newHashMap();
+ presignedURIRequestParams.put("comp", "block");
+ for (long blockId = 1; blockId <= numParts; ++blockId) {
+ presignedURIRequestParams.put("blockId",
+ Base64.encode(String.format("%06d", blockId)));
+ uploadPartURIs.add(createPresignedURI(key, perms, httpUploadURIExpirySeconds, presignedURIRequestParams));
+ }
+ }
+
+ try {
+ byte[] secret = getOrCreateReferenceKey();
+ String uploadToken = new DataRecordUploadToken(blobId, uploadId).getEncodedToken(secret);
+ return new DataRecordUpload() {
+ @Override
+ @NotNull
+ public String getUploadToken() { return uploadToken; }
+
+ @Override
+ public long getMinPartSize() { return minPartSize; }
+
+ @Override
+ public long getMaxPartSize() { return maxPartSize; }
+
+ @Override
+ @NotNull
+ public Collection<URI> getUploadURIs() { return uploadPartURIs; }
+ };
+ }
+ catch (DataStoreException e) {
+ LOG.warn("Unable to obtain data store key");
+ }
+
+ return null;
+ }
+
+ DataRecord completeHttpUpload(@NotNull String uploadTokenStr)
+ throws DataRecordUploadException, DataStoreException {
+
+ if (Strings.isNullOrEmpty(uploadTokenStr)) {
+ throw new IllegalArgumentException("uploadToken required");
+ }
+
+ DataRecordUploadToken uploadToken = DataRecordUploadToken.fromEncodedToken(uploadTokenStr, getOrCreateReferenceKey());
+ String key = uploadToken.getBlobId();
+ DataIdentifier blobId = new DataIdentifier(getIdentifierName(key));
+ try {
+ if (uploadToken.getUploadId().isPresent()) {
+ // An existing upload ID means this is a multi-part upload
+ CloudBlockBlob blob = getAzureContainer().getBlockBlobReference(key);
+ List<BlockEntry> blocks = blob.downloadBlockList(
+ BlockListingFilter.UNCOMMITTED,
+ AccessCondition.generateEmptyCondition(),
+ null,
+ null);
+ blob.commitBlockList(blocks);
+ }
+ // else do nothing - single put is already complete
+
+ if (! exists(blobId)) {
+ //if (! getAzureContainer().getBlockBlobReference(blobId).exists()) {
+ throw new DataRecordUploadException(
+ String.format("Unable to finalize direct write of binary %s", blobId));
+ }
+ }
+ catch (URISyntaxException | StorageException e) {
+ throw new DataRecordUploadException(
+ String.format("Unable to finalize direct write of binary %s", blobId));
+ }
+
+ return getRecord(blobId);
+ }
+
+ private URI createPresignedURI(String key,
+ EnumSet<SharedAccessBlobPermissions> permissions,
+ int expirySeconds,
+ SharedAccessBlobHeaders optionalHeaders) {
+ return createPresignedURI(key, permissions, expirySeconds, Maps.newHashMap(), optionalHeaders);
+ }
+
+ private URI createPresignedURI(String key,
+ EnumSet<SharedAccessBlobPermissions> permissions,
+ int expirySeconds,
+ Map<String, String> additionalQueryParams) {
+ return createPresignedURI(key, permissions, expirySeconds, additionalQueryParams, null);
+ }
+
+ private URI createPresignedURI(String key,
+ EnumSet<SharedAccessBlobPermissions> permissions,
+ int expirySeconds,
+ Map<String, String> additionalQueryParams,
+ SharedAccessBlobHeaders optionalHeaders) {
+ SharedAccessBlobPolicy policy = new SharedAccessBlobPolicy();
+ Date expiry = Date.from(Instant.now().plusSeconds(expirySeconds));
+ policy.setSharedAccessExpiryTime(expiry);
+ policy.setPermissions(permissions);
+
+ String accountName = properties.getProperty(AzureConstants.AZURE_STORAGE_ACCOUNT_NAME, "");
+ if (Strings.isNullOrEmpty(accountName)) {
+ LOG.warn("Can't generate presigned URI - Azure account name not found in properties");
+ return null;
+ }
+
+ URI presignedURI = null;
+ try {
+ CloudBlockBlob blob = getAzureContainer().getBlockBlobReference(key);
+ String sharedAccessSignature =
+ null == optionalHeaders ?
+ blob.generateSharedAccessSignature(policy,
+ null) :
+ blob.generateSharedAccessSignature(policy,
+ optionalHeaders,
+ null);
+ // Shared access signature is returned encoded already.
+
+ String uriString = String.format("https://%s.blob.core.windows.net/%s/%s?%s",
+ accountName,
+ containerName,
+ key,
+ sharedAccessSignature);
+
+ if (! additionalQueryParams.isEmpty()) {
+ StringBuilder builder = new StringBuilder();
+ for (Map.Entry<String, String> e : additionalQueryParams.entrySet()) {
+ builder.append("&");
+ builder.append(URLEncoder.encode(e.getKey(), Charsets.UTF_8.name()));
+ builder.append("=");
+ builder.append(URLEncoder.encode(e.getValue(), Charsets.UTF_8.name()));
+ }
+ uriString += builder.toString();
+ }
+
+ presignedURI = new URI(uriString);
+ }
+ catch (DataStoreException e) {
+ LOG.error("No connection to Azure Blob Storage", e);
+ }
+ catch (URISyntaxException | InvalidKeyException | UnsupportedEncodingException e) {
+ LOG.error("Can't generate a presigned URI for key {}", key, e);
+ }
+ catch (StorageException e) {
+ LOG.error("Azure request to create presigned Azure Blob Storage {} URI failed. " +
+ "Key: {}, Error: {}, HTTP Code: {}, Azure Error Code: {}",
+ permissions.contains(SharedAccessBlobPermissions.READ) ? "GET" :
+ (permissions.contains(SharedAccessBlobPermissions.WRITE) ? "PUT" : ""),
+ key,
+ e.getMessage(),
+ e.getHttpStatusCode(),
+ e.getErrorCode());
+ }
+
+ return presignedURI;
+ }
+
private static class AzureBlobInfo {
private final String name;
private final long lastModified;
@@ -790,6 +1137,12 @@ public class AzureBlobStoreBackend exten
if (isMeta) {
id = addMetaKeyPrefix(getIdentifier().toString());
}
+ if (LOG.isDebugEnabled()) {
+ // Log message, with exception so we can get a trace to see where the call
+ // came from
+ LOG.debug("binary downloaded from Azure Blob Storage: " + getIdentifier(),
+ new Exception());
+ }
try {
return container.getBlockBlobReference(id).openInputStream();
} catch (StorageException | URISyntaxException e) {
Modified: jackrabbit/oak/trunk/oak-blob-cloud-azure/src/main/java/org/apache/jackrabbit/oak/blob/cloud/azure/blobstorage/AzureConstants.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-blob-cloud-azure/src/main/java/org/apache/jackrabbit/oak/blob/cloud/azure/blobstorage/AzureConstants.java?rev=1837047&r1=1837046&r2=1837047&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-blob-cloud-azure/src/main/java/org/apache/jackrabbit/oak/blob/cloud/azure/blobstorage/AzureConstants.java (original)
+++ jackrabbit/oak/trunk/oak-blob-cloud-azure/src/main/java/org/apache/jackrabbit/oak/blob/cloud/azure/blobstorage/AzureConstants.java Mon Jul 30 11:05:10 2018
@@ -70,5 +70,20 @@ public final class AzureConstants {
*/
public static final String PROXY_PORT = "proxyPort";
+ /**
+ * TTL for presigned HTTP upload URIs - default is 0 (disabled)
+ */
+ public static final String PRESIGNED_HTTP_UPLOAD_URI_EXPIRY_SECONDS = "presignedHttpUploadURIExpirySeconds";
+
+ /**
+ * TTL for presigned HTTP download URIs - default is 0 (disabled)
+ */
+ public static final String PRESIGNED_HTTP_DOWNLOAD_URI_EXPIRY_SECONDS = "presignedHttpDownloadURIExpirySeconds";
+
+ /**
+ * Maximum size of presigned HTTP download URI cache - default is 0 (no cache)
+ */
+ public static final String PRESIGNED_HTTP_DOWNLOAD_URI_CACHE_MAX_SIZE = "presignedHttpDownloadURICacheMaxSize";
+
private AzureConstants() { }
}
Modified: jackrabbit/oak/trunk/oak-blob-cloud-azure/src/main/java/org/apache/jackrabbit/oak/blob/cloud/azure/blobstorage/AzureDataStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-blob-cloud-azure/src/main/java/org/apache/jackrabbit/oak/blob/cloud/azure/blobstorage/AzureDataStore.java?rev=1837047&r1=1837046&r2=1837047&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-blob-cloud-azure/src/main/java/org/apache/jackrabbit/oak/blob/cloud/azure/blobstorage/AzureDataStore.java (original)
+++ jackrabbit/oak/trunk/oak-blob-cloud-azure/src/main/java/org/apache/jackrabbit/oak/blob/cloud/azure/blobstorage/AzureDataStore.java Mon Jul 30 11:05:10 2018
@@ -19,25 +19,36 @@
package org.apache.jackrabbit.oak.blob.cloud.azure.blobstorage;
+import java.net.URI;
+import java.util.Properties;
+
+import org.apache.jackrabbit.core.data.DataIdentifier;
+import org.apache.jackrabbit.core.data.DataRecord;
+import org.apache.jackrabbit.core.data.DataStoreException;
import org.apache.jackrabbit.oak.plugins.blob.AbstractSharedCachingDataStore;
+import org.apache.jackrabbit.oak.plugins.blob.datastore.directaccess.ConfigurableDataRecordAccessProvider;
+import org.apache.jackrabbit.oak.plugins.blob.datastore.directaccess.DataRecordUploadException;
+import org.apache.jackrabbit.oak.plugins.blob.datastore.directaccess.DataRecordDownloadOptions;
+import org.apache.jackrabbit.oak.plugins.blob.datastore.directaccess.DataRecordUpload;
import org.apache.jackrabbit.oak.spi.blob.AbstractSharedBackend;
import org.apache.jackrabbit.oak.spi.blob.SharedBackend;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
-import java.util.Properties;
-
-public class AzureDataStore extends AbstractSharedCachingDataStore {
-
+public class AzureDataStore extends AbstractSharedCachingDataStore implements ConfigurableDataRecordAccessProvider {
private int minRecordLength = 16*1024;
protected Properties properties;
+ private AzureBlobStoreBackend azureBlobStoreBackend;
+
@Override
protected AbstractSharedBackend createBackend() {
- AzureBlobStoreBackend backend = new AzureBlobStoreBackend();
+ azureBlobStoreBackend = new AzureBlobStoreBackend();
if (null != properties) {
- backend.setProperties(properties);
+ azureBlobStoreBackend.setProperties(properties);
}
- return backend;
+ return azureBlobStoreBackend;
}
public void setProperties(final Properties properties) {
@@ -56,4 +67,61 @@ public class AzureDataStore extends Abst
public void setMinRecordLength(int minRecordLength) {
this.minRecordLength = minRecordLength;
}
+
+ //
+ // ConfigurableDataRecordAccessProvider Implementation
+ //
+ @Override
+ public void setDirectUploadURIExpirySeconds(int seconds) {
+ if (null != azureBlobStoreBackend) {
+ azureBlobStoreBackend.setHttpUploadURIExpirySeconds(seconds);
+ }
+ }
+
+ @Override
+ public void setBinaryTransferAccelerationEnabled(boolean enabled) {
+ // NOOP - not a feature of Azure Blob Storage
+ }
+
+ @Nullable
+ @Override
+ public DataRecordUpload initiateDataRecordUpload(long maxUploadSizeInBytes, int maxNumberOfURIs)
+ throws IllegalArgumentException, DataRecordUploadException {
+ if (null == azureBlobStoreBackend) {
+ throw new DataRecordUploadException("Backend not initialized");
+ }
+ return azureBlobStoreBackend.initiateHttpUpload(maxUploadSizeInBytes, maxNumberOfURIs);
+ }
+
+ @NotNull
+ @Override
+ public DataRecord completeDataRecordUpload(String uploadToken)
+ throws IllegalArgumentException, DataRecordUploadException, DataStoreException {
+ if (null == azureBlobStoreBackend) {
+ throw new DataRecordUploadException("Backend not initialized");
+ }
+ return azureBlobStoreBackend.completeHttpUpload(uploadToken);
+ }
+
+ @Override
+ public void setDirectDownloadURIExpirySeconds(int seconds) {
+ if (null != azureBlobStoreBackend) {
+ azureBlobStoreBackend.setHttpDownloadURIExpirySeconds(seconds);
+ }
+ }
+
+ @Override
+ public void setDirectDownloadURICacheSize(int maxSize) {
+ azureBlobStoreBackend.setHttpDownloadURICacheSize(maxSize);
+ }
+
+ @Nullable
+ @Override
+ public URI getDownloadURI(@NotNull DataIdentifier identifier,
+ @NotNull DataRecordDownloadOptions downloadOptions) {
+ if (null != azureBlobStoreBackend) {
+ return azureBlobStoreBackend.createHttpDownloadURI(identifier, downloadOptions);
+ }
+ return null;
+ }
}
Added: jackrabbit/oak/trunk/oak-blob-cloud-azure/src/test/java/org/apache/jackrabbit/oak/blob/cloud/azure/blobstorage/AzureDataRecordAccessProviderIT.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-blob-cloud-azure/src/test/java/org/apache/jackrabbit/oak/blob/cloud/azure/blobstorage/AzureDataRecordAccessProviderIT.java?rev=1837047&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-blob-cloud-azure/src/test/java/org/apache/jackrabbit/oak/blob/cloud/azure/blobstorage/AzureDataRecordAccessProviderIT.java (added)
+++ jackrabbit/oak/trunk/oak-blob-cloud-azure/src/test/java/org/apache/jackrabbit/oak/blob/cloud/azure/blobstorage/AzureDataRecordAccessProviderIT.java Mon Jul 30 11:05:10 2018
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.blob.cloud.azure.blobstorage;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+
+import javax.net.ssl.HttpsURLConnection;
+
+import com.google.common.base.Strings;
+import org.apache.jackrabbit.core.data.DataIdentifier;
+import org.apache.jackrabbit.core.data.DataRecord;
+import org.apache.jackrabbit.core.data.DataStore;
+import org.apache.jackrabbit.core.data.DataStoreException;
+import org.apache.jackrabbit.oak.plugins.blob.datastore.directaccess.AbstractDataRecordAccessProviderIT;
+import org.apache.jackrabbit.oak.plugins.blob.datastore.directaccess.ConfigurableDataRecordAccessProvider;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.rules.TemporaryFolder;
+
+import static org.apache.jackrabbit.oak.blob.cloud.azure.blobstorage.AzureDataStoreUtils.getAzureConfig;
+import static org.apache.jackrabbit.oak.blob.cloud.azure.blobstorage.AzureDataStoreUtils.getAzureDataStore;
+import static org.apache.jackrabbit.oak.blob.cloud.azure.blobstorage.AzureDataStoreUtils.isAzureConfigured;
+import static org.junit.Assume.assumeTrue;
+
+/**
+ * As the test is memory intensive requires -Dtest.opts.memory=-Xmx2G
+ */
+public class AzureDataRecordAccessProviderIT extends AbstractDataRecordAccessProviderIT {
+
+ @ClassRule
+ public static TemporaryFolder homeDir = new TemporaryFolder(new File("target"));
+
+ private static AzureDataStore dataStore;
+
+ @BeforeClass
+ public static void setupDataStore() throws Exception {
+ assumeTrue(isAzureConfigured() && !Strings.isNullOrEmpty(System.getProperty("test.opts.memory")));
+
+ dataStore = (AzureDataStore) getAzureDataStore(getAzureConfig(), homeDir.newFolder().getAbsolutePath());
+ dataStore.setDirectDownloadURIExpirySeconds(expirySeconds);
+ dataStore.setDirectUploadURIExpirySeconds(expirySeconds);
+ }
+
+ @Override
+ protected ConfigurableDataRecordAccessProvider getDataStore() {
+ return dataStore;
+ }
+
+ @Override
+ protected DataRecord doGetRecord(DataStore ds, DataIdentifier identifier) throws DataStoreException {
+ return ds.getRecord(identifier);
+ }
+
+ @Override
+ protected void doDeleteRecord(DataStore ds, DataIdentifier identifier) throws DataStoreException {
+ ((AzureDataStore)ds).deleteRecord(identifier);
+ }
+
+ @Override
+ protected long getProviderMaxPartSize() {
+ return AzureBlobStoreBackend.MAX_MULTIPART_UPLOAD_PART_SIZE;
+ }
+
+ @Override
+ protected HttpsURLConnection getHttpsConnection(long length, URI uri) throws IOException {
+ return AzureDataStoreUtils.getHttpsConnection(length, uri);
+ }
+}
Propchange: jackrabbit/oak/trunk/oak-blob-cloud-azure/src/test/java/org/apache/jackrabbit/oak/blob/cloud/azure/blobstorage/AzureDataRecordAccessProviderIT.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: jackrabbit/oak/trunk/oak-blob-cloud-azure/src/test/java/org/apache/jackrabbit/oak/blob/cloud/azure/blobstorage/AzureDataRecordAccessProviderTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-blob-cloud-azure/src/test/java/org/apache/jackrabbit/oak/blob/cloud/azure/blobstorage/AzureDataRecordAccessProviderTest.java?rev=1837047&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-blob-cloud-azure/src/test/java/org/apache/jackrabbit/oak/blob/cloud/azure/blobstorage/AzureDataRecordAccessProviderTest.java (added)
+++ jackrabbit/oak/trunk/oak-blob-cloud-azure/src/test/java/org/apache/jackrabbit/oak/blob/cloud/azure/blobstorage/AzureDataRecordAccessProviderTest.java Mon Jul 30 11:05:10 2018
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.blob.cloud.azure.blobstorage;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.time.Instant;
+import java.time.temporal.ChronoUnit;
+import java.util.Map;
+import java.util.Properties;
+
+import javax.net.ssl.HttpsURLConnection;
+
+import org.apache.jackrabbit.core.data.DataIdentifier;
+import org.apache.jackrabbit.core.data.DataRecord;
+import org.apache.jackrabbit.core.data.DataStore;
+import org.apache.jackrabbit.core.data.DataStoreException;
+import org.apache.jackrabbit.oak.plugins.blob.datastore.directaccess.AbstractDataRecordAccessProviderTest;
+import org.apache.jackrabbit.oak.plugins.blob.datastore.directaccess.ConfigurableDataRecordAccessProvider;
+import org.apache.jackrabbit.oak.plugins.blob.datastore.directaccess.DataRecordUpload;
+import org.apache.jackrabbit.oak.plugins.blob.datastore.directaccess.DataRecordUploadException;
+import org.apache.jackrabbit.oak.spi.blob.BlobOptions;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assume.assumeTrue;
+
+public class AzureDataRecordAccessProviderTest extends AbstractDataRecordAccessProviderTest {
+ @ClassRule
+ public static TemporaryFolder homeDir = new TemporaryFolder(new File("target"));
+
+ private static AzureDataStore dataStore;
+
+ @BeforeClass
+ public static void setupDataStore() throws Exception {
+ assumeTrue(AzureDataStoreUtils.isAzureConfigured());
+ Properties props = AzureDataStoreUtils.getAzureConfig();
+ props.setProperty("cacheSize", "0");
+ dataStore = (AzureDataStore) AzureDataStoreUtils
+ .getAzureDataStore(props, homeDir.newFolder().getAbsolutePath());
+ dataStore.setDirectDownloadURIExpirySeconds(expirySeconds);
+ dataStore.setDirectUploadURIExpirySeconds(expirySeconds);
+ }
+
+ @Override
+ protected ConfigurableDataRecordAccessProvider getDataStore() {
+ return dataStore;
+ }
+
+ @Override
+ protected DataRecord doGetRecord(DataStore ds, DataIdentifier identifier) throws DataStoreException {
+ return ds.getRecord(identifier);
+ }
+
+ @Override
+ protected DataRecord doSynchronousAddRecord(DataStore ds, InputStream in) throws DataStoreException {
+ return ((AzureDataStore)ds).addRecord(in, new BlobOptions().setUpload(BlobOptions.UploadType.SYNCHRONOUS));
+ }
+
+ @Override
+ protected void doDeleteRecord(DataStore ds, DataIdentifier identifier) throws DataStoreException {
+ ((AzureDataStore)ds).deleteRecord(identifier);
+ }
+
+ @Override
+ protected long getProviderMinPartSize() {
+ return Math.max(0L, AzureBlobStoreBackend.MIN_MULTIPART_UPLOAD_PART_SIZE);
+ }
+
+ @Override
+ protected long getProviderMaxPartSize() {
+ return AzureBlobStoreBackend.MAX_MULTIPART_UPLOAD_PART_SIZE;
+ }
+
+ @Override
+ protected long getProviderMaxSinglePutSize() { return AzureBlobStoreBackend.MAX_SINGLE_PUT_UPLOAD_SIZE; }
+
+ @Override
+ protected long getProviderMaxBinaryUploadSize() { return AzureBlobStoreBackend.MAX_BINARY_UPLOAD_SIZE; }
+
+ @Override
+ protected boolean isSinglePutURI(URI uri) {
+ // Since strictly speaking we don't support single-put for Azure due to the odd
+ // required header for single-put uploads, we don't care and just always return true
+ // here to avoid failing tests for this.
+ return true;
+ }
+
+ @Override
+ protected HttpsURLConnection getHttpsConnection(long length, URI uri) throws IOException {
+ return AzureDataStoreUtils.getHttpsConnection(length, uri);
+ }
+
+ @Test
+ public void testInitDirectUploadURIHonorsExpiryTime() throws DataRecordUploadException {
+ ConfigurableDataRecordAccessProvider ds = getDataStore();
+ try {
+ Instant now = Instant.now().truncatedTo(ChronoUnit.SECONDS);
+ ds.setDirectUploadURIExpirySeconds(60);
+ DataRecordUpload uploadContext = ds.initiateDataRecordUpload(ONE_MB, 1);
+ URI uploadURI = uploadContext.getUploadURIs().iterator().next();
+ Map<String, String> params = parseQueryString(uploadURI);
+ String expiryDateStr = params.get("se");
+ Instant expiry = Instant.parse(expiryDateStr);
+ assertEquals(now, expiry.minusSeconds(60));
+ }
+ finally {
+ ds.setDirectUploadURIExpirySeconds(expirySeconds);
+ }
+ }
+
+ @Test
+ public void testInitiateDirectUploadUnlimitedURIs() throws DataRecordUploadException {
+ ConfigurableDataRecordAccessProvider ds = getDataStore();
+ long uploadSize = ONE_GB * 100;
+ int expectedNumURIs = 10000;
+ DataRecordUpload upload = ds.initiateDataRecordUpload(uploadSize, -1);
+ assertEquals(expectedNumURIs, upload.getUploadURIs().size());
+
+ uploadSize = ONE_GB * 500;
+ expectedNumURIs = 50000;
+ upload = ds.initiateDataRecordUpload(uploadSize, -1);
+ assertEquals(expectedNumURIs, upload.getUploadURIs().size());
+
+ uploadSize = ONE_GB * 1000;
+ // expectedNumURIs still 50000, Azure limit
+ upload = ds.initiateDataRecordUpload(uploadSize, -1);
+ assertEquals(expectedNumURIs, upload.getUploadURIs().size());
+ }
+}
Propchange: jackrabbit/oak/trunk/oak-blob-cloud-azure/src/test/java/org/apache/jackrabbit/oak/blob/cloud/azure/blobstorage/AzureDataRecordAccessProviderTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: jackrabbit/oak/trunk/oak-blob-cloud-azure/src/test/java/org/apache/jackrabbit/oak/blob/cloud/azure/blobstorage/AzureDataStoreUtils.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-blob-cloud-azure/src/test/java/org/apache/jackrabbit/oak/blob/cloud/azure/blobstorage/AzureDataStoreUtils.java?rev=1837047&r1=1837046&r2=1837047&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-blob-cloud-azure/src/test/java/org/apache/jackrabbit/oak/blob/cloud/azure/blobstorage/AzureDataStoreUtils.java (original)
+++ jackrabbit/oak/trunk/oak-blob-cloud-azure/src/test/java/org/apache/jackrabbit/oak/blob/cloud/azure/blobstorage/AzureDataStoreUtils.java Mon Jul 30 11:05:10 2018
@@ -18,11 +18,26 @@
*/
package org.apache.jackrabbit.oak.blob.cloud.azure.blobstorage;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.time.Instant;
+import java.time.ZoneOffset;
+import java.time.format.DateTimeFormatter;
+import java.util.Map;
+import java.util.Properties;
+
+import javax.net.ssl.HttpsURLConnection;
+
import com.google.common.base.Predicate;
+import com.google.common.base.StandardSystemProperty;
import com.google.common.base.Strings;
import com.google.common.collect.Maps;
import com.microsoft.azure.storage.blob.CloudBlobContainer;
-
import org.apache.commons.io.IOUtils;
import org.apache.jackrabbit.core.data.DataStore;
import org.apache.jackrabbit.oak.commons.PropertiesUtil;
@@ -30,11 +45,7 @@ import org.apache.jackrabbit.oak.plugins
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.InputStream;
-import java.util.Map;
-import java.util.Properties;
+import static com.google.common.base.StandardSystemProperty.USER_HOME;
/**
* Extension to {@link DataStoreUtils} to enable Azure extensions for cleaning and initialization.
@@ -43,7 +54,8 @@ public class AzureDataStoreUtils extends
private static final Logger log = LoggerFactory.getLogger(AzureDataStoreUtils.class);
private static final String DEFAULT_CONFIG_PATH = "./src/test/resources/azure.properties";
-
+ private static final String DEFAULT_PROPERTY_FILE = "azure.properties";
+ private static final String SYS_PROP_NAME = "azure.config";
/**
* Check for presence of mandatory properties.
@@ -71,10 +83,17 @@ public class AzureDataStoreUtils extends
* @return Properties instance
*/
public static Properties getAzureConfig() {
- String config = System.getProperty("azure.config");
+ String config = System.getProperty(SYS_PROP_NAME);
+ if (Strings.isNullOrEmpty(config)) {
+ File cfgFile = new File(USER_HOME.value(), DEFAULT_PROPERTY_FILE);
+ if (cfgFile.exists()) {
+ config = cfgFile.getAbsolutePath();
+ }
+ }
if (Strings.isNullOrEmpty(config)) {
config = DEFAULT_CONFIG_PATH;
}
+
Properties props = new Properties();
if (new File(config).exists()) {
InputStream is = null;
@@ -118,4 +137,17 @@ public class AzureDataStoreUtils extends
boolean result = container.deleteIfExists();
log.info("Container deleted. containerName={} existed={}", containerName, result);
}
+
+ protected static HttpsURLConnection getHttpsConnection(long length, URI uri) throws IOException {
+ HttpsURLConnection conn = (HttpsURLConnection) uri.toURL().openConnection();
+ conn.setDoOutput(true);
+ conn.setRequestMethod("PUT");
+ conn.setRequestProperty("Content-Length", String.valueOf(length));
+ conn.setRequestProperty("Date", DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ssX")
+ .withZone(ZoneOffset.UTC)
+ .format(Instant.now()));
+ conn.setRequestProperty("x-ms-version", "2017-11-09");
+
+ return conn;
+ }
}
Modified: jackrabbit/oak/trunk/oak-blob-cloud/pom.xml
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-blob-cloud/pom.xml?rev=1837047&r1=1837046&r2=1837047&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-blob-cloud/pom.xml (original)
+++ jackrabbit/oak/trunk/oak-blob-cloud/pom.xml Mon Jul 30 11:05:10 2018
@@ -196,6 +196,13 @@
<scope>test</scope>
</dependency>
<dependency>
+ <groupId>org.apache.jackrabbit</groupId>
+ <artifactId>oak-blob</artifactId>
+ <version>${project.version}</version>
+ <classifier>tests</classifier>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
Modified: jackrabbit/oak/trunk/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/s3/S3Backend.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/s3/S3Backend.java?rev=1837047&r1=1837046&r2=1837047&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/s3/S3Backend.java (original)
+++ jackrabbit/oak/trunk/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/s3/S3Backend.java Mon Jul 30 11:05:10 2018
@@ -17,11 +17,19 @@
package org.apache.jackrabbit.oak.blob.cloud.s3;
+import static com.google.common.collect.Iterables.filter;
+import static java.lang.Thread.currentThread;
+
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.time.Instant;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.Iterator;
@@ -29,20 +37,33 @@ import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Queue;
+import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import com.amazonaws.AmazonClientException;
import com.amazonaws.AmazonServiceException;
+import com.amazonaws.HttpMethod;
import com.amazonaws.regions.Regions;
import com.amazonaws.services.s3.AmazonS3Client;
+import com.amazonaws.services.s3.S3ClientOptions;
+import com.amazonaws.services.s3.model.BucketAccelerateConfiguration;
+import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest;
import com.amazonaws.services.s3.model.CopyObjectRequest;
import com.amazonaws.services.s3.model.DeleteObjectsRequest;
import com.amazonaws.services.s3.model.DeleteObjectsResult;
+import com.amazonaws.services.s3.model.GeneratePresignedUrlRequest;
+import com.amazonaws.services.s3.model.GetBucketAccelerateConfigurationRequest;
+import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest;
+import com.amazonaws.services.s3.model.InitiateMultipartUploadResult;
import com.amazonaws.services.s3.model.ListObjectsRequest;
+import com.amazonaws.services.s3.model.ListPartsRequest;
import com.amazonaws.services.s3.model.ObjectListing;
import com.amazonaws.services.s3.model.ObjectMetadata;
+import com.amazonaws.services.s3.model.PartETag;
+import com.amazonaws.services.s3.model.PartListing;
+import com.amazonaws.services.s3.model.PartSummary;
import com.amazonaws.services.s3.model.PutObjectRequest;
import com.amazonaws.services.s3.model.Region;
import com.amazonaws.services.s3.model.S3Object;
@@ -54,6 +75,8 @@ import com.amazonaws.util.StringUtils;
import com.google.common.base.Function;
import com.google.common.base.Predicate;
import com.google.common.base.Strings;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
import com.google.common.collect.AbstractIterator;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
@@ -62,14 +85,16 @@ import org.apache.jackrabbit.core.data.D
import org.apache.jackrabbit.core.data.DataRecord;
import org.apache.jackrabbit.core.data.DataStoreException;
import org.apache.jackrabbit.core.data.util.NamedThreadFactory;
+import org.apache.jackrabbit.oak.plugins.blob.datastore.directaccess.DataRecordDownloadOptions;
+import org.apache.jackrabbit.oak.plugins.blob.datastore.directaccess.DataRecordUpload;
+import org.apache.jackrabbit.oak.plugins.blob.datastore.directaccess.DataRecordUploadException;
+import org.apache.jackrabbit.oak.plugins.blob.datastore.directaccess.DataRecordUploadToken;
import org.apache.jackrabbit.oak.spi.blob.AbstractDataRecord;
import org.apache.jackrabbit.oak.spi.blob.AbstractSharedBackend;
+import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import static com.google.common.collect.Iterables.filter;
-import static java.lang.Thread.currentThread;
-
/**
* A data store backend that stores data on Amazon S3.
*/
@@ -86,8 +111,23 @@ public class S3Backend extends AbstractS
private static final String REF_KEY = "reference.key";
+ private static final int MAX_UNIQUE_RECORD_TRIES = 10;
+
+ static final String PART_NUMBER = "partNumber";
+ static final String UPLOAD_ID = "uploadId";
+
+ private static final int ONE_MB = 1024*1024;
+ static final long MIN_MULTIPART_UPLOAD_PART_SIZE = 1024 * 1024 * 10; // 10MB
+ static final long MAX_MULTIPART_UPLOAD_PART_SIZE = 1024 * 1024 * 256; // 256MB
+ static final long MAX_SINGLE_PUT_UPLOAD_SIZE = 1024L * 1024L * 1024L * 5L; // 5GB, AWS limitation
+ static final long MAX_BINARY_UPLOAD_SIZE = 1024L * 1024L * 1024L * 1024L * 5L; // 5TB, AWS limitation
+ private static final int MAX_ALLOWABLE_UPLOAD_URIS = 10000; // AWS limitation
+
private AmazonS3Client s3service;
+ // needed only in case of transfer acceleration is enabled for presigned URIs
+ private AmazonS3Client s3PresignService;
+
private String bucket;
private byte[] secret;
@@ -100,6 +140,12 @@ public class S3Backend extends AbstractS
private S3RequestDecorator s3ReqDecorator;
+ private Cache<DataIdentifier, URI> httpDownloadURICache;
+
+ // 0 = off by default
+ private int httpUploadURIExpirySeconds = 0;
+ private int httpDownloadURIExpirySeconds = 0;
+
public void init() throws DataStoreException {
ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
@@ -111,6 +157,8 @@ public class S3Backend extends AbstractS
s3ReqDecorator = new S3RequestDecorator(properties);
s3service = Utils.openService(properties);
+ s3PresignService = s3service;
+
if (bucket == null || "".equals(bucket.trim())) {
bucket = properties.getProperty(S3Constants.S3_BUCKET);
// Alternately check if the 'container' property is set
@@ -164,6 +212,31 @@ public class S3Backend extends AbstractS
if (renameKeyBool) {
renameKeys();
}
+
+ // settings around pre-signing
+
+ String putExpiry = properties.getProperty(S3Constants.PRESIGNED_PUT_EXPIRY_SEC);
+ if (putExpiry != null) {
+ setHttpUploadURIExpirySeconds(Integer.parseInt(putExpiry));
+ }
+
+ String getExpiry = properties.getProperty(S3Constants.PRESIGNED_GET_EXPIRY_SEC);
+ if (getExpiry != null) {
+ final int getExpirySeconds = Integer.parseInt(getExpiry);
+ setHttpDownloadURIExpirySeconds(getExpirySeconds);
+
+ int cacheMaxSize = 0; // off by default
+ String cacheMaxSizeStr = properties.getProperty(S3Constants.PRESIGNED_GET_URI_CACHE_MAX_SIZE);
+ if (cacheMaxSizeStr != null) {
+ cacheMaxSize = Integer.parseInt(cacheMaxSizeStr);
+ }
+
+ setHttpDownloadURICacheSize(cacheMaxSize);
+ }
+
+ String enablePresignedAccelerationStr = properties.getProperty(S3Constants.PRESIGNED_URI_ENABLE_ACCELERATION);
+ setBinaryTransferAccelerationEnabled(enablePresignedAccelerationStr != null && "true".equals(enablePresignedAccelerationStr));
+
LOG.debug("S3 Backend initialized in [{}] ms",
+(System.currentTimeMillis() - startTime.getTime()));
} catch (Exception e) {
@@ -185,6 +258,27 @@ public class S3Backend extends AbstractS
}
}
+ void setBinaryTransferAccelerationEnabled(boolean enabled) {
+ if (enabled) {
+ // verify acceleration is enabled on the bucket
+ BucketAccelerateConfiguration accelerateConfig = s3service.getBucketAccelerateConfiguration(new GetBucketAccelerateConfigurationRequest(bucket));
+ if (accelerateConfig.isAccelerateEnabled()) {
+ // If transfer acceleration is enabled for presigned URIs, we need a separate AmazonS3Client
+ // instance with the acceleration mode enabled, because we don't want the requests from the
+ // data store itself to S3 to use acceleration
+ s3PresignService = Utils.openService(properties);
+ s3PresignService.setS3ClientOptions(S3ClientOptions.builder().setAccelerateModeEnabled(true).build());
+ LOG.info("S3 Transfer Acceleration enabled for presigned URIs.");
+
+ } else {
+ LOG.warn("S3 Transfer Acceleration is not enabled on the bucket {}. Will create normal, non-accelerated presigned URIs.",
+ bucket, S3Constants.PRESIGNED_URI_ENABLE_ACCELERATION);
+ }
+ } else {
+ s3PresignService = s3service;
+ }
+ }
+
/**
* It uploads file to Amazon S3. If file size is greater than 5MB, this
* method uses parallel concurrent connections to upload.
@@ -292,6 +386,9 @@ public class S3Backend extends AbstractS
S3Object object = s3service.getObject(bucket, key);
InputStream in = object.getObjectContent();
LOG.debug("[{}] read took [{}]ms", identifier, (System.currentTimeMillis() - start));
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("binary downloaded from S3: " + identifier, new Exception());
+ }
return in;
} catch (AmazonServiceException e) {
throw new DataStoreException("Object not found: " + key, e);
@@ -516,8 +613,8 @@ public class S3Backend extends AbstractS
} catch (AmazonServiceException e) {
if (e.getStatusCode() == 404 || e.getStatusCode() == 403) {
LOG.info(
- "getRecord:Identifier [{}] not found. Took [{}] ms.",
- identifier, (System.currentTimeMillis() - start));
+ "getRecord:Identifier [{}] not found. Took [{}] ms.",
+ identifier, (System.currentTimeMillis() - start));
}
throw new DataStoreException(e);
} finally {
@@ -575,6 +672,286 @@ public class S3Backend extends AbstractS
}
}
+ void setHttpUploadURIExpirySeconds(int seconds) {
+ this.httpUploadURIExpirySeconds = seconds;
+ }
+
+ private DataIdentifier generateSafeRandomIdentifier() {
+ return new DataIdentifier(
+ String.format("%s-%d",
+ UUID.randomUUID().toString(),
+ Instant.now().toEpochMilli()
+ )
+ );
+ }
+
+ private URI createPresignedPutURI(DataIdentifier identifier) {
+ if (httpUploadURIExpirySeconds <= 0) {
+ // feature disabled
+ return null;
+ }
+
+ return createPresignedURI(identifier, HttpMethod.PUT, httpUploadURIExpirySeconds);
+ }
+
+ void setHttpDownloadURIExpirySeconds(int seconds) {
+ this.httpDownloadURIExpirySeconds = seconds;
+ }
+
+ void setHttpDownloadURICacheSize(int maxSize) {
+ // max size 0 or smaller is used to turn off the cache
+ if (maxSize > 0) {
+ LOG.info("presigned GET URI cache enabled, maxSize = {} items, expiry = {} seconds", maxSize, httpDownloadURIExpirySeconds / 2);
+ httpDownloadURICache = CacheBuilder.newBuilder()
+ .maximumSize(maxSize)
+ // cache for half the expiry time of the URIs before giving out new ones
+ .expireAfterWrite(httpDownloadURIExpirySeconds / 2, TimeUnit.SECONDS)
+ .build();
+ } else {
+ LOG.info("presigned GET URI cache disabled");
+ httpDownloadURICache = null;
+ }
+ }
+
+ URI createHttpDownloadURI(@NotNull DataIdentifier identifier,
+ @NotNull DataRecordDownloadOptions downloadOptions) {
+ if (httpDownloadURIExpirySeconds <= 0) {
+ // feature disabled
+ return null;
+ }
+
+ URI uri = null;
+ // if cache is enabled, check the cache
+ if (httpDownloadURICache != null) {
+ uri = httpDownloadURICache.getIfPresent(identifier);
+ }
+ if (uri == null) {
+ Map<String, String> requestParams = Maps.newHashMap();
+ requestParams.put("response-cache-control",
+ String.format("private, max-age=%d, immutable",
+ httpDownloadURIExpirySeconds)
+ );
+
+ String contentType = downloadOptions.getContentTypeHeader();
+ if (! Strings.isNullOrEmpty(contentType)) {
+ requestParams.put("response-content-type", contentType);
+ }
+ String contentDisposition =
+ downloadOptions.getContentDispositionHeader();
+
+ if (! Strings.isNullOrEmpty(contentDisposition)) {
+ requestParams.put("response-content-disposition",
+ contentDisposition);
+ }
+
+ uri = createPresignedURI(identifier,
+ HttpMethod.GET,
+ httpDownloadURIExpirySeconds,
+ requestParams);
+ if (uri != null && httpDownloadURICache != null) {
+ httpDownloadURICache.put(identifier, uri);
+ }
+ }
+ return uri;
+ }
+
+ DataRecordUpload initiateHttpUpload(long maxUploadSizeInBytes, int maxNumberOfURIs) {
+ List<URI> uploadPartURIs = Lists.newArrayList();
+ long minPartSize = MIN_MULTIPART_UPLOAD_PART_SIZE;
+ long maxPartSize = MAX_MULTIPART_UPLOAD_PART_SIZE;
+
+ if (0L >= maxUploadSizeInBytes) {
+ throw new IllegalArgumentException("maxUploadSizeInBytes must be > 0");
+ }
+ else if (0 == maxNumberOfURIs) {
+ throw new IllegalArgumentException("maxNumberOfURIs must either be > 0 or -1");
+ }
+ else if (-1 > maxNumberOfURIs) {
+ throw new IllegalArgumentException("maxNumberOfURIs must either be > 0 or -1");
+ }
+ else if (maxUploadSizeInBytes > MAX_SINGLE_PUT_UPLOAD_SIZE &&
+ maxNumberOfURIs == 1) {
+ throw new IllegalArgumentException(
+ String.format("Cannot do single-put upload with file size %d - exceeds max single-put upload size of %d",
+ maxUploadSizeInBytes,
+ MAX_SINGLE_PUT_UPLOAD_SIZE)
+ );
+ }
+ else if (maxUploadSizeInBytes > MAX_BINARY_UPLOAD_SIZE) {
+ throw new IllegalArgumentException(
+ String.format("Cannot do upload with file size %d - exceeds max upload size of %d",
+ maxUploadSizeInBytes,
+ MAX_BINARY_UPLOAD_SIZE)
+ );
+ }
+
+ DataIdentifier newIdentifier = generateSafeRandomIdentifier();
+ String blobId = getKeyName(newIdentifier);
+ String uploadId = null;
+
+ if (httpUploadURIExpirySeconds > 0) {
+ if (maxNumberOfURIs == 1 ||
+ maxUploadSizeInBytes <= minPartSize) {
+ // single put
+ uploadPartURIs.add(createPresignedPutURI(newIdentifier));
+ }
+ else {
+ // multi-part
+ InitiateMultipartUploadRequest req = new InitiateMultipartUploadRequest(bucket, blobId);
+ InitiateMultipartUploadResult res = s3service.initiateMultipartUpload(req);
+ uploadId = res.getUploadId();
+
+ long numParts;
+ if (maxNumberOfURIs > 1) {
+ long requestedPartSize = (long) Math.ceil(((double) maxUploadSizeInBytes) / ((double) maxNumberOfURIs));
+ if (requestedPartSize <= maxPartSize) {
+ numParts = Math.min(
+ maxNumberOfURIs,
+ Math.min(
+ (long) Math.ceil(((double) maxUploadSizeInBytes) / ((double) minPartSize)),
+ MAX_ALLOWABLE_UPLOAD_URIS
+ )
+ );
+ } else {
+ throw new IllegalArgumentException(
+ String.format("Cannot do multi-part upload with requested part size %d", requestedPartSize)
+ );
+ }
+ }
+ else {
+ long maximalNumParts = (long) Math.ceil(((double) maxUploadSizeInBytes) / ((double) MIN_MULTIPART_UPLOAD_PART_SIZE));
+ numParts = Math.min(maximalNumParts, MAX_ALLOWABLE_UPLOAD_URIS);
+ }
+
+ Map<String, String> presignedURIRequestParams = Maps.newHashMap();
+ for (long blockId = 1; blockId <= numParts; ++blockId) {
+ presignedURIRequestParams.put("partNumber", String.valueOf(blockId));
+ presignedURIRequestParams.put("uploadId", uploadId);
+ uploadPartURIs.add(createPresignedURI(newIdentifier,
+ HttpMethod.PUT,
+ httpUploadURIExpirySeconds,
+ presignedURIRequestParams));
+ }
+ }
+ }
+
+ try {
+ byte[] secret = getOrCreateReferenceKey();
+ String uploadToken = new DataRecordUploadToken(blobId, uploadId).getEncodedToken(secret);
+
+ return new DataRecordUpload() {
+ @Override
+ @NotNull
+ public String getUploadToken() { return uploadToken; }
+
+ @Override
+ public long getMinPartSize() { return minPartSize; }
+
+ @Override
+ public long getMaxPartSize() { return maxPartSize; }
+
+ @Override
+ @NotNull
+ public Collection<URI> getUploadURIs() { return uploadPartURIs; }
+ };
+ }
+ catch (DataStoreException e) {
+ LOG.warn("Unable to obtain data store key");
+ }
+
+ return null;
+ }
+
+ DataRecord completeHttpUpload(@NotNull String uploadTokenStr)
+ throws DataRecordUploadException, DataStoreException {
+
+ if (Strings.isNullOrEmpty(uploadTokenStr)) {
+ throw new IllegalArgumentException("uploadToken required");
+ }
+
+ DataRecordUploadToken uploadToken = DataRecordUploadToken.fromEncodedToken(uploadTokenStr, getOrCreateReferenceKey());
+ String blobId = uploadToken.getBlobId();
+ if (uploadToken.getUploadId().isPresent()) {
+ // An existing upload ID means this is a multi-part upload
+ String uploadId = uploadToken.getUploadId().get();
+ ListPartsRequest listPartsRequest = new ListPartsRequest(bucket, blobId, uploadId);
+ PartListing listing = s3service.listParts(listPartsRequest);
+ List<PartETag> eTags = Lists.newArrayList();
+ for (PartSummary partSummary : listing.getParts()) {
+ PartETag eTag = new PartETag(partSummary.getPartNumber(), partSummary.getETag());
+ eTags.add(eTag);
+ }
+
+ CompleteMultipartUploadRequest completeReq = new CompleteMultipartUploadRequest(
+ bucket,
+ blobId,
+ uploadId,
+ eTags
+ );
+
+ s3service.completeMultipartUpload(completeReq);
+ }
+ // else do nothing - single-put upload is already complete
+
+
+ if (! s3service.doesObjectExist(bucket, blobId)) {
+ throw new DataRecordUploadException(
+ String.format("Unable to finalize direct write of binary %s", blobId)
+ );
+ }
+
+ return getRecord(new DataIdentifier(getIdentifierName(blobId)));
+ }
+
+ private URI createPresignedURI(DataIdentifier identifier,
+ HttpMethod method,
+ int expirySeconds) {
+ return createPresignedURI(identifier, method, expirySeconds, Maps.newHashMap());
+ }
+
+ private URI createPresignedURI(DataIdentifier identifier,
+ HttpMethod method,
+ int expirySeconds,
+ Map<String, String> reqParams) {
+ final String key = getKeyName(identifier);
+
+ try {
+ final Date expiration = new Date();
+ expiration.setTime(expiration.getTime() + expirySeconds * 1000);
+
+ GeneratePresignedUrlRequest request = new GeneratePresignedUrlRequest(bucket, key)
+ .withMethod(method)
+ .withExpiration(expiration);
+
+ for (Map.Entry<String, String> e : reqParams.entrySet()) {
+ request.addRequestParameter(e.getKey(), e.getValue());
+ }
+
+ URI uri = null;
+ URL presignedURL = null;
+ try {
+ presignedURL = s3PresignService.generatePresignedUrl(request);
+ uri = presignedURL.toURI();
+
+ LOG.debug("Presigned {} URI for key {}: {}", method.name(), key, uri.toString());
+ }
+ catch (URISyntaxException e) {
+ LOG.error("AWS request to create presigned S3 URI failed - could not convert '{}' to URI",
+ (null != presignedURL ? presignedURL.toString() : "")
+ );
+ }
+
+ return uri;
+
+ } catch (AmazonServiceException e) {
+ LOG.error("AWS request to create presigned S3 {} URI failed. " +
+ "Key: {}, Error: {}, HTTP Code: {}, AWS Error Code: {}, Error Type: {}, Request ID: {}",
+ method.name(), key, e.getMessage(), e.getStatusCode(), e.getErrorCode(), e.getErrorType(), e.getRequestId());
+
+ return null;
+ }
+ }
+
/**
* Returns an iterator over the S3 objects
* @param <T>
@@ -699,6 +1076,11 @@ public class S3Backend extends AbstractS
if (isMeta) {
id = addMetaKeyPrefix(getIdentifier().toString());
}
+ if (LOG.isDebugEnabled()) {
+ // Log message, with exception so we can get a trace to see where the call
+ // came from
+ LOG.debug("binary downloaded from S3: " + getIdentifier(), new Exception());
+ }
return s3service.getObject(bucket, id).getObjectContent();
}
Modified: jackrabbit/oak/trunk/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/s3/S3Constants.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/s3/S3Constants.java?rev=1837047&r1=1837046&r2=1837047&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/s3/S3Constants.java (original)
+++ jackrabbit/oak/trunk/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/s3/S3Constants.java Mon Jul 30 11:05:10 2018
@@ -112,6 +112,14 @@ public final class S3Constants {
*/
public static final String PROXY_PORT = "proxyPort";
+ public static final String PRESIGNED_PUT_EXPIRY_SEC = "presignedPutExpirySeconds";
+
+ public static final String PRESIGNED_GET_EXPIRY_SEC = "presignedGetExpirySeconds";
+
+ public static final String PRESIGNED_GET_URI_CACHE_MAX_SIZE = "presignedGetURICacheMaxSize";
+
+ public static final String PRESIGNED_URI_ENABLE_ACCELERATION = "presignedURIEnableTransferAcceleration";
+
/**
* private constructor so that class cannot initialized from outside.
*/
Modified: jackrabbit/oak/trunk/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/s3/S3DataStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/s3/S3DataStore.java?rev=1837047&r1=1837046&r2=1837047&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/s3/S3DataStore.java (original)
+++ jackrabbit/oak/trunk/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/s3/S3DataStore.java Mon Jul 30 11:05:10 2018
@@ -16,22 +16,32 @@
*/
package org.apache.jackrabbit.oak.blob.cloud.s3;
+import java.net.URI;
import java.util.Properties;
+import org.apache.jackrabbit.core.data.DataIdentifier;
+import org.apache.jackrabbit.core.data.DataRecord;
+import org.apache.jackrabbit.core.data.DataStoreException;
import org.apache.jackrabbit.oak.plugins.blob.AbstractSharedCachingDataStore;
+import org.apache.jackrabbit.oak.plugins.blob.datastore.directaccess.ConfigurableDataRecordAccessProvider;
+import org.apache.jackrabbit.oak.plugins.blob.datastore.directaccess.DataRecordUploadException;
+import org.apache.jackrabbit.oak.plugins.blob.datastore.directaccess.DataRecordDownloadOptions;
+import org.apache.jackrabbit.oak.plugins.blob.datastore.directaccess.DataRecordUpload;
import org.apache.jackrabbit.oak.spi.blob.AbstractSharedBackend;
import org.apache.jackrabbit.oak.spi.blob.SharedBackend;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
/**
* Amazon S3 data store extending from {@link AbstractSharedCachingDataStore}.
*/
-public class S3DataStore extends AbstractSharedCachingDataStore {
+public class S3DataStore extends AbstractSharedCachingDataStore implements ConfigurableDataRecordAccessProvider {
protected Properties properties;
+ private S3Backend s3Backend;
+
/**
* The minimum size of an object that should be stored in this data store.
*/
@@ -39,11 +49,11 @@ public class S3DataStore extends Abstrac
@Override
protected AbstractSharedBackend createBackend() {
- S3Backend backend = new S3Backend();
+ s3Backend = new S3Backend();
if(properties != null){
- backend.setProperties(properties);
+ s3Backend.setProperties(properties);
}
- return backend;
+ return s3Backend;
}
/**------------------------------------------- Getters & Setters-----------------------------**/
@@ -67,4 +77,65 @@ public class S3DataStore extends Abstrac
public void setMinRecordLength(int minRecordLength) {
this.minRecordLength = minRecordLength;
}
+
+ //
+ // ConfigurableDataRecordAccessProvider implementation
+ //
+ @Override
+ public void setDirectUploadURIExpirySeconds(int seconds) {
+ if (s3Backend != null) {
+ s3Backend.setHttpUploadURIExpirySeconds(seconds);
+ }
+ }
+
+ @Override
+ public void setBinaryTransferAccelerationEnabled(boolean enabled) {
+ if (s3Backend != null) {
+ s3Backend.setBinaryTransferAccelerationEnabled(enabled);
+ }
+ }
+
+ @Nullable
+ @Override
+ public DataRecordUpload initiateDataRecordUpload(long maxUploadSizeInBytes, int maxNumberOfURIs)
+ throws IllegalArgumentException, DataRecordUploadException {
+ if (null == s3Backend) {
+ throw new DataRecordUploadException("Backend not initialized");
+ }
+ return s3Backend.initiateHttpUpload(maxUploadSizeInBytes, maxNumberOfURIs);
+ }
+
+ @NotNull
+ @Override
+ public DataRecord completeDataRecordUpload(@NotNull String uploadToken)
+ throws IllegalArgumentException, DataRecordUploadException, DataStoreException {
+ if (null == s3Backend) {
+ throw new DataRecordUploadException("Backend not initialized");
+ }
+ return s3Backend.completeHttpUpload(uploadToken);
+ }
+
+ @Override
+ public void setDirectDownloadURIExpirySeconds(int seconds) {
+ if (s3Backend != null) {
+ s3Backend.setHttpDownloadURIExpirySeconds(seconds);
+ }
+ }
+
+ @Override
+ public void setDirectDownloadURICacheSize(int maxSize) {
+ if (s3Backend != null) {
+ s3Backend.setHttpDownloadURICacheSize(maxSize);
+ }
+ }
+
+ @Nullable
+ @Override
+ public URI getDownloadURI(@NotNull DataIdentifier identifier,
+ @NotNull DataRecordDownloadOptions downloadOptions) {
+ if (s3Backend == null) {
+ return null;
+ }
+ return s3Backend.createHttpDownloadURI(identifier, downloadOptions);
+ }
}
Added: jackrabbit/oak/trunk/oak-blob-cloud/src/test/java/org/apache/jackrabbit/oak/blob/cloud/s3/S3DataRecordAccessProviderIT.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-blob-cloud/src/test/java/org/apache/jackrabbit/oak/blob/cloud/s3/S3DataRecordAccessProviderIT.java?rev=1837047&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-blob-cloud/src/test/java/org/apache/jackrabbit/oak/blob/cloud/s3/S3DataRecordAccessProviderIT.java (added)
+++ jackrabbit/oak/trunk/oak-blob-cloud/src/test/java/org/apache/jackrabbit/oak/blob/cloud/s3/S3DataRecordAccessProviderIT.java Mon Jul 30 11:05:10 2018
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.blob.cloud.s3;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+
+import javax.net.ssl.HttpsURLConnection;
+
+import org.apache.jackrabbit.core.data.DataIdentifier;
+import org.apache.jackrabbit.core.data.DataRecord;
+import org.apache.jackrabbit.core.data.DataStore;
+import org.apache.jackrabbit.core.data.DataStoreException;
+import org.apache.jackrabbit.oak.plugins.blob.datastore.directaccess.AbstractDataRecordAccessProviderIT;
+import org.apache.jackrabbit.oak.plugins.blob.datastore.directaccess.ConfigurableDataRecordAccessProvider;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.rules.TemporaryFolder;
+
+import static com.google.common.base.Strings.isNullOrEmpty;
+import static org.apache.jackrabbit.oak.blob.cloud.s3.S3DataStoreUtils.getFixtures;
+import static org.apache.jackrabbit.oak.blob.cloud.s3.S3DataStoreUtils.getS3DataStore;
+import static org.apache.jackrabbit.oak.blob.cloud.s3.S3DataStoreUtils.isS3Configured;
+import static org.junit.Assume.assumeTrue;
+
+/**
+ * The test is memory intensive and requires -Dtest.opts.memory=-Xmx2G
+ */
+public class S3DataRecordAccessProviderIT extends AbstractDataRecordAccessProviderIT {
+ @ClassRule
+ public static TemporaryFolder homeDir = new TemporaryFolder(new File("target"));
+
+ private static S3DataStore dataStore;
+
+ @BeforeClass
+ public static void setupDataStore() throws Exception {
+ assumeTrue(isS3Configured() && !isNullOrEmpty(System.getProperty("test.opts.memory")));
+ dataStore = (S3DataStore) getS3DataStore(getFixtures().get(0), S3DataStoreUtils.getS3Config(),
+ homeDir.newFolder().getAbsolutePath());
+
+ dataStore.setDirectDownloadURIExpirySeconds(expirySeconds);
+ dataStore.setDirectUploadURIExpirySeconds(expirySeconds);
+ }
+
+ @Override
+ protected ConfigurableDataRecordAccessProvider getDataStore() {
+ return dataStore;
+ }
+
+ @Override
+ protected DataRecord doGetRecord(DataStore ds, DataIdentifier identifier) throws DataStoreException {
+ return ds.getRecord(identifier);
+ }
+
+ @Override
+ protected void doDeleteRecord(DataStore ds, DataIdentifier identifier) throws DataStoreException {
+ ((S3DataStore)ds).deleteRecord(identifier);
+ }
+
+ @Override
+ protected long getProviderMaxPartSize() {
+ return S3Backend.MAX_MULTIPART_UPLOAD_PART_SIZE;
+ }
+
+ @Override
+ protected HttpsURLConnection getHttpsConnection(long length, URI uri) throws IOException {
+ return S3DataStoreUtils.getHttpsConnection(length, uri);
+ }
+}
Propchange: jackrabbit/oak/trunk/oak-blob-cloud/src/test/java/org/apache/jackrabbit/oak/blob/cloud/s3/S3DataRecordAccessProviderIT.java
------------------------------------------------------------------------------
svn:eol-style = native