You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by am...@apache.org on 2016/10/24 04:51:26 UTC
svn commit: r1766338 - in /jackrabbit/oak/trunk:
oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/aws/s3/
oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/s3/
oak-blob-cloud/src/test/java/org/apache/jackrabbit/oak/blob...
Author: amitj
Date: Mon Oct 24 04:51:26 2016
New Revision: 1766338
URL: http://svn.apache.org/viewvc?rev=1766338&view=rev
Log:
OAK-4870: Implement caching for S3DataStore
S3DataStore implementation using AbstractCachingDataStore
Added:
jackrabbit/oak/trunk/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/s3/AbstractS3DataStoreService.java (contents, props changed)
- copied, changed from r1766337, jackrabbit/oak/trunk/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/aws/s3/S3DataStoreService.java
jackrabbit/oak/trunk/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/s3/S3Backend.java (with props)
jackrabbit/oak/trunk/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/s3/S3DataStore.java (with props)
jackrabbit/oak/trunk/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/s3/S3DataStoreService.java (with props)
jackrabbit/oak/trunk/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/s3/SharedS3DataStoreService.java (with props)
jackrabbit/oak/trunk/oak-blob-cloud/src/test/java/org/apache/jackrabbit/oak/blob/cloud/s3/S3DataStoreServiceTest.java (with props)
Removed:
jackrabbit/oak/trunk/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/aws/s3/S3DataStoreService.java
jackrabbit/oak/trunk/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/aws/s3/SharedS3DataStoreService.java
Modified:
jackrabbit/oak/trunk/oak-blob-cloud/src/test/java/org/apache/jackrabbit/oak/blob/cloud/aws/s3/TestS3DSAsyncTouch.java
jackrabbit/oak/trunk/oak-blob-cloud/src/test/java/org/apache/jackrabbit/oak/blob/cloud/s3/S3DataStoreUtils.java
jackrabbit/oak/trunk/oak-blob-cloud/src/test/java/org/apache/jackrabbit/oak/blob/cloud/s3/TestS3DataStore.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/datastore/AbstractDataStoreService.java
Copied: jackrabbit/oak/trunk/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/s3/AbstractS3DataStoreService.java (from r1766337, jackrabbit/oak/trunk/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/aws/s3/S3DataStoreService.java)
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/s3/AbstractS3DataStoreService.java?p2=jackrabbit/oak/trunk/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/s3/AbstractS3DataStoreService.java&p1=jackrabbit/oak/trunk/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/aws/s3/S3DataStoreService.java&r1=1766337&r2=1766338&rev=1766338&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/aws/s3/S3DataStoreService.java (original)
+++ jackrabbit/oak/trunk/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/s3/AbstractS3DataStoreService.java Mon Oct 24 04:51:26 2016
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.jackrabbit.oak.blob.cloud.aws.s3;
+package org.apache.jackrabbit.oak.blob.cloud.s3;
import java.util.Dictionary;
import java.util.Hashtable;
@@ -25,40 +25,55 @@ import java.util.Map;
import java.util.Properties;
import org.apache.felix.scr.annotations.Component;
-import org.apache.felix.scr.annotations.ConfigurationPolicy;
import org.apache.jackrabbit.core.data.DataStore;
import org.apache.jackrabbit.core.data.DataStoreException;
+import org.apache.jackrabbit.oak.blob.cloud.aws.s3.SharedS3DataStore;
+import org.apache.jackrabbit.oak.plugins.blob.AbstractSharedCachingDataStore;
import org.apache.jackrabbit.oak.plugins.blob.datastore.AbstractDataStoreService;
import org.osgi.framework.Constants;
import org.osgi.framework.ServiceRegistration;
import org.osgi.service.component.ComponentContext;
-@Component(policy = ConfigurationPolicy.REQUIRE, name = S3DataStoreService.NAME)
-public class S3DataStoreService extends AbstractDataStoreService {
- public static final String NAME = "org.apache.jackrabbit.oak.plugins.blob.datastore.S3DataStore";
+@Component(componentAbstract = true)
+public class AbstractS3DataStoreService extends AbstractDataStoreService {
private static final String DESCRIPTION = "oak.datastore.description";
private ServiceRegistration delegateReg;
@Override
protected DataStore createDataStore(ComponentContext context, Map<String, Object> config) {
- SharedS3DataStore dataStore = new SharedS3DataStore();
-
Properties properties = new Properties();
properties.putAll(config);
- dataStore.setProperties(properties);
-
- Dictionary<String, Object> props = new Hashtable<String, Object>();
- props.put(Constants.SERVICE_PID, dataStore.getClass().getName());
- props.put(DESCRIPTION, getDescription());
-
- delegateReg = context.getBundleContext().registerService(new String[] {
- SharedS3DataStore.class.getName(),
- SharedS3DataStore.class.getName()
- }, dataStore , props);
+ if (JR2_CACHING) {
+ SharedS3DataStore dataStore = new SharedS3DataStore();
+ dataStore.setProperties(properties);
+
+ Dictionary<String, Object> props = new Hashtable<String, Object>();
+ props.put(Constants.SERVICE_PID, dataStore.getClass().getName());
+ props.put(DESCRIPTION, getDescription());
+
+ delegateReg = context.getBundleContext().registerService(new String[] {
+ SharedS3DataStore.class.getName(),
+ SharedS3DataStore.class.getName()
+ }, dataStore , props);
+ return dataStore;
+ } else {
+ S3DataStore dataStore = new S3DataStore();
+ dataStore.setStatisticsProvider(getStatisticsProvider());
+ dataStore.setProperties(properties);
+
+ Dictionary<String, Object> props = new Hashtable<String, Object>();
+ props.put(Constants.SERVICE_PID, dataStore.getClass().getName());
+ props.put(DESCRIPTION, getDescription());
+
+ delegateReg = context.getBundleContext().registerService(new String[] {
+ AbstractSharedCachingDataStore.class.getName(),
+ AbstractSharedCachingDataStore.class.getName()
+ }, dataStore , props);
- return dataStore;
+ return dataStore;
+ }
}
protected void deactivate() throws DataStoreException {
Propchange: jackrabbit/oak/trunk/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/s3/AbstractS3DataStoreService.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: jackrabbit/oak/trunk/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/s3/S3Backend.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/s3/S3Backend.java?rev=1766338&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/s3/S3Backend.java (added)
+++ jackrabbit/oak/trunk/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/s3/S3Backend.java Mon Oct 24 04:51:26 2016
@@ -0,0 +1,818 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jackrabbit.oak.blob.cloud.s3;
+
+import java.io.File;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Date;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Queue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import com.amazonaws.AmazonClientException;
+import com.amazonaws.AmazonServiceException;
+import com.amazonaws.regions.Regions;
+import com.amazonaws.services.s3.AmazonS3Client;
+import com.amazonaws.services.s3.model.CopyObjectRequest;
+import com.amazonaws.services.s3.model.DeleteObjectsRequest;
+import com.amazonaws.services.s3.model.DeleteObjectsResult;
+import com.amazonaws.services.s3.model.ListObjectsRequest;
+import com.amazonaws.services.s3.model.ObjectListing;
+import com.amazonaws.services.s3.model.ObjectMetadata;
+import com.amazonaws.services.s3.model.PutObjectRequest;
+import com.amazonaws.services.s3.model.Region;
+import com.amazonaws.services.s3.model.S3Object;
+import com.amazonaws.services.s3.model.S3ObjectSummary;
+import com.amazonaws.services.s3.transfer.Copy;
+import com.amazonaws.services.s3.transfer.TransferManager;
+import com.amazonaws.services.s3.transfer.Upload;
+import com.amazonaws.util.StringUtils;
+import com.google.common.base.Function;
+import com.google.common.base.Predicate;
+import com.google.common.collect.AbstractIterator;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.jackrabbit.core.data.DataIdentifier;
+import org.apache.jackrabbit.core.data.DataRecord;
+import org.apache.jackrabbit.core.data.DataStoreException;
+import org.apache.jackrabbit.core.data.util.NamedThreadFactory;
+import org.apache.jackrabbit.oak.spi.blob.SharedBackend;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static com.google.common.collect.Iterables.filter;
+import static java.lang.Thread.currentThread;
+
+/**
+ * A data store backend that stores data on Amazon S3.
+ */
+public class S3Backend implements SharedBackend {
+
+ /**
+ * Logger instance.
+ */
+ private static final Logger LOG = LoggerFactory.getLogger(S3Backend.class);
+
+ private static final String KEY_PREFIX = "dataStore_";
+
+ private static final String META_KEY_PREFIX = "META/";
+
+ private AmazonS3Client s3service;
+
+ private String bucket;
+
+ private TransferManager tmx;
+
+ private Properties properties;
+
+ private Date startTime;
+
+ private S3RequestDecorator s3ReqDecorator;
+
+ public void init() throws DataStoreException {
+ ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
+
+ try {
+ startTime = new Date();
+ Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
+
+ LOG.debug("init");
+
+ s3ReqDecorator = new S3RequestDecorator(properties);
+ s3service = Utils.openService(properties);
+ if (bucket == null || "".equals(bucket.trim())) {
+ bucket = properties.getProperty(S3Constants.S3_BUCKET);
+ }
+
+ String region = properties.getProperty(S3Constants.S3_REGION);
+ Region s3Region = null;
+ if (StringUtils.isNullOrEmpty(region)) {
+ com.amazonaws.regions.Region ec2Region = Regions.getCurrentRegion();
+ if (ec2Region != null) {
+ s3Region = Region.fromValue(ec2Region.getName());
+ } else {
+ throw new AmazonClientException(
+ "parameter ["
+ + S3Constants.S3_REGION
+ + "] not configured and cannot be derived from environment");
+ }
+ } else {
+ if (Utils.DEFAULT_AWS_BUCKET_REGION.equals(region)) {
+ s3Region = Region.US_Standard;
+ } else if (Region.EU_Ireland.toString().equals(region)) {
+ s3Region = Region.EU_Ireland;
+ } else {
+ s3Region = Region.fromValue(region);
+ }
+ }
+
+ if (!s3service.doesBucketExist(bucket)) {
+ s3service.createBucket(bucket, s3Region);
+ LOG.info("Created bucket [{}] in [{}] ", bucket, region);
+ } else {
+ LOG.info("Using bucket [{}] in [{}] ", bucket, region);
+ }
+
+ int writeThreads = 10;
+ String writeThreadsStr = properties.getProperty(S3Constants.S3_WRITE_THREADS);
+ if (writeThreadsStr != null) {
+ writeThreads = Integer.parseInt(writeThreadsStr);
+ }
+ LOG.info("Using thread pool of [{}] threads in S3 transfer manager.", writeThreads);
+ tmx = new TransferManager(s3service, Executors.newFixedThreadPool(writeThreads,
+ new NamedThreadFactory("s3-transfer-manager-worker")));
+
+ String renameKeyProp = properties.getProperty(S3Constants.S3_RENAME_KEYS);
+ boolean renameKeyBool = (renameKeyProp == null || "".equals(renameKeyProp))
+ ? false
+ : Boolean.parseBoolean(renameKeyProp);
+ LOG.info("Rename keys [{}]", renameKeyBool);
+ if (renameKeyBool) {
+ renameKeys();
+ }
+ LOG.debug("S3 Backend initialized in [{}] ms",
+ +(System.currentTimeMillis() - startTime.getTime()));
+ } catch (Exception e) {
+ LOG.debug(" error ", e);
+ Map<String, String> filteredMap = Maps.newHashMap();
+ if (properties != null) {
+ filteredMap = Maps.filterKeys(Maps.fromProperties(properties), new Predicate<String>() {
+ @Override public boolean apply(String input) {
+ return !input.equals(S3Constants.ACCESS_KEY) && !input.equals(S3Constants
+ .SECRET_KEY);
+ }
+ });
+ }
+ throw new DataStoreException("Could not initialize S3 from " + filteredMap, e);
+ } finally {
+ if (contextClassLoader != null) {
+ Thread.currentThread().setContextClassLoader(contextClassLoader);
+ }
+ }
+ }
+
+ /**
+ * It uploads file to Amazon S3. If file size is greater than 5MB, this
+ * method uses parallel concurrent connections to upload.
+ */
+ @Override
+ public void write(DataIdentifier identifier, File file)
+ throws DataStoreException {
+ String key = getKeyName(identifier);
+ ObjectMetadata objectMetaData = null;
+ long start = System.currentTimeMillis();
+ ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
+ try {
+ Thread.currentThread().setContextClassLoader(
+ getClass().getClassLoader());
+ // check if the same record already exists
+ try {
+ objectMetaData = s3service.getObjectMetadata(bucket, key);
+ } catch (AmazonServiceException ase) {
+ if (!(ase.getStatusCode() == 404 || ase.getStatusCode() == 403)) {
+ throw ase;
+ }
+ }
+ if (objectMetaData != null) {
+ long l = objectMetaData.getContentLength();
+ if (l != file.length()) {
+ throw new DataStoreException("Collision: " + key
+ + " new length: " + file.length() + " old length: " + l);
+ }
+ LOG.debug("[{}]'s exists, lastmodified = [{}]", key,
+ objectMetaData.getLastModified().getTime());
+ CopyObjectRequest copReq = new CopyObjectRequest(bucket, key,
+ bucket, key);
+ copReq.setNewObjectMetadata(objectMetaData);
+ Copy copy = tmx.copy(s3ReqDecorator.decorate(copReq));
+ try {
+ copy.waitForCopyResult();
+ LOG.debug("lastModified of [{}] updated successfully.", identifier);
+ }catch (Exception e2) {
+ throw new DataStoreException("Could not upload " + key, e2);
+ }
+ }
+
+ if (objectMetaData == null) {
+ try {
+ // start multipart parallel upload using amazon sdk
+ Upload up = tmx.upload(s3ReqDecorator.decorate(new PutObjectRequest(
+ bucket, key, file)));
+ // wait for upload to finish
+ up.waitForUploadResult();
+ LOG.debug("synchronous upload to identifier [{}] completed.", identifier);
+ } catch (Exception e2 ) {
+ throw new DataStoreException("Could not upload " + key, e2);
+ }
+ }
+ } finally {
+ if (contextClassLoader != null) {
+ Thread.currentThread().setContextClassLoader(contextClassLoader);
+ }
+ }
+ LOG.debug(
+ "write of [{}], length=[{}], in [{}]ms",
+ new Object[] { identifier, file.length(), (System.currentTimeMillis() - start) });
+ }
+
+ /**
+ * Check if record identified by identifier exists in Amazon S3.
+ */
+ @Override
+ public boolean exists(DataIdentifier identifier) throws DataStoreException {
+ long start = System.currentTimeMillis();
+ String key = getKeyName(identifier);
+ ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
+ try {
+ Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
+ ObjectMetadata objectMetaData = s3service.getObjectMetadata(bucket,
+ key);
+ if (objectMetaData != null) {
+ LOG.trace("exists [{}]: [true] took [{}] ms.",
+ identifier, (System.currentTimeMillis() - start) );
+ return true;
+ }
+ return false;
+ } catch (AmazonServiceException e) {
+ if (e.getStatusCode() == 404 || e.getStatusCode() == 403) {
+ LOG.debug("exists [{}]: [false] took [{}] ms.",
+ identifier, (System.currentTimeMillis() - start) );
+ return false;
+ }
+ throw new DataStoreException(
+ "Error occured to getObjectMetadata for key [" + identifier.toString() + "]", e);
+ } finally {
+ if (contextClassLoader != null) {
+ Thread.currentThread().setContextClassLoader(contextClassLoader);
+ }
+ }
+ }
+
+ @Override
+ public InputStream read(DataIdentifier identifier)
+ throws DataStoreException {
+ long start = System.currentTimeMillis();
+ String key = getKeyName(identifier);
+ ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
+ try {
+ Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
+ S3Object object = s3service.getObject(bucket, key);
+ InputStream in = object.getObjectContent();
+ LOG.debug("[{}] read took [{}]ms", identifier, (System.currentTimeMillis() - start));
+ return in;
+ } catch (AmazonServiceException e) {
+ throw new DataStoreException("Object not found: " + key, e);
+ } finally {
+ if (contextClassLoader != null) {
+ Thread.currentThread().setContextClassLoader(contextClassLoader);
+ }
+ }
+ }
+
+ @Override
+ public Iterator<DataIdentifier> getAllIdentifiers()
+ throws DataStoreException {
+ return new RecordsIterator<DataIdentifier>(
+ new Function<S3ObjectSummary, DataIdentifier>() {
+ @Override
+ public DataIdentifier apply(S3ObjectSummary input) {
+ return new DataIdentifier(getIdentifierName(input.getKey()));
+ }
+ });
+ }
+
+ @Override
+ public void deleteRecord(DataIdentifier identifier)
+ throws DataStoreException {
+ long start = System.currentTimeMillis();
+ String key = getKeyName(identifier);
+ ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
+ try {
+ Thread.currentThread().setContextClassLoader(
+ getClass().getClassLoader());
+ s3service.deleteObject(bucket, key);
+ LOG.debug("Identifier [{}] deleted. It took [{}]ms.", new Object[] {
+ identifier, (System.currentTimeMillis() - start) });
+ } catch (AmazonServiceException e) {
+ throw new DataStoreException(
+ "Could not delete dataIdentifier " + identifier, e);
+ } finally {
+ if (contextClassLoader != null) {
+ Thread.currentThread().setContextClassLoader(contextClassLoader);
+ }
+ }
+ }
+
+ @Override
+ public void close() {
+ // backend is closing. abort all mulitpart uploads from start.
+ if(s3service.doesBucketExist(bucket)) {
+ tmx.abortMultipartUploads(bucket, startTime);
+ }
+ tmx.shutdownNow();
+ s3service.shutdown();
+ LOG.info("S3Backend closed.");
+ }
+
+ public String getBucket() {
+ return bucket;
+ }
+
+ public void setBucket(String bucket) {
+ this.bucket = bucket;
+ }
+
+ /**
+ * Properties used to configure the backend. If provided explicitly
+ * before init is invoked then these take precedence
+ *
+ * @param properties to configure S3Backend
+ */
+ public void setProperties(Properties properties) {
+ this.properties = properties;
+ }
+
+ @Override
+ public void addMetadataRecord(final InputStream input, final String name) throws DataStoreException {
+ ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
+
+ try {
+ Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
+
+ Upload upload = tmx.upload(s3ReqDecorator
+ .decorate(new PutObjectRequest(bucket, addMetaKeyPrefix(name), input, new ObjectMetadata())));
+ upload.waitForUploadResult();
+ } catch (InterruptedException e) {
+ LOG.error("Error in uploading", e);
+ throw new DataStoreException("Error in uploading", e);
+ } finally {
+ if (contextClassLoader != null) {
+ Thread.currentThread().setContextClassLoader(contextClassLoader);
+ }
+ }
+ }
+
+ @Override
+ public void addMetadataRecord(File input, String name) throws DataStoreException {
+ ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
+ try {
+ Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
+
+ Upload upload = tmx.upload(s3ReqDecorator
+ .decorate(new PutObjectRequest(bucket, addMetaKeyPrefix(name), input)));
+ upload.waitForUploadResult();
+ } catch (InterruptedException e) {
+ LOG.error("Exception in uploading metadata file {}", new Object[] {input, e});
+ throw new DataStoreException("Error in uploading metadata file", e);
+ } finally {
+ if (contextClassLoader != null) {
+ Thread.currentThread().setContextClassLoader(contextClassLoader);
+ }
+ }
+ }
+
+ @Override
+ public DataRecord getMetadataRecord(String name) {
+ ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
+ try {
+ Thread.currentThread().setContextClassLoader(
+ getClass().getClassLoader());
+ ObjectMetadata meta = s3service.getObjectMetadata(bucket, addMetaKeyPrefix(name));
+ return new S3DataRecord(s3service, bucket, name,
+ meta.getLastModified().getTime(), meta.getContentLength(), true);
+ } finally {
+ if (contextClassLoader != null) {
+ Thread.currentThread().setContextClassLoader(contextClassLoader);
+ }
+ }
+ }
+
+ @Override
+ public List<DataRecord> getAllMetadataRecords(String prefix) {
+ List<DataRecord> metadataList = new ArrayList<DataRecord>();
+ ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
+ try {
+ Thread.currentThread().setContextClassLoader(
+ getClass().getClassLoader());
+ ListObjectsRequest listObjectsRequest =
+ new ListObjectsRequest().withBucketName(bucket).withPrefix(addMetaKeyPrefix(prefix));
+ ObjectListing prevObjectListing = s3service.listObjects(listObjectsRequest);
+ for (final S3ObjectSummary s3ObjSumm : prevObjectListing.getObjectSummaries()) {
+ metadataList.add(new S3DataRecord(s3service, bucket, stripMetaKeyPrefix(s3ObjSumm.getKey()),
+ s3ObjSumm.getLastModified().getTime(), s3ObjSumm.getSize(), true));
+ }
+ } finally {
+ if (contextClassLoader != null) {
+ Thread.currentThread().setContextClassLoader(contextClassLoader);
+ }
+ }
+ return metadataList;
+ }
+
+ @Override
+ public boolean deleteMetadataRecord(String name) {
+ ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
+ try {
+ Thread.currentThread().setContextClassLoader(
+ getClass().getClassLoader());
+ s3service.deleteObject(bucket, addMetaKeyPrefix(name));
+ } finally {
+ if (contextClassLoader != null) {
+ Thread.currentThread().setContextClassLoader(contextClassLoader);
+ }
+ }
+ return true;
+ }
+
+ @Override
+ public void deleteAllMetadataRecords(String prefix) {
+ ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
+ try {
+ Thread.currentThread().setContextClassLoader(
+ getClass().getClassLoader());
+
+ ListObjectsRequest listObjectsRequest =
+ new ListObjectsRequest().withBucketName(bucket).withPrefix(addMetaKeyPrefix(prefix));
+ ObjectListing metaList = s3service.listObjects(listObjectsRequest);
+ List<DeleteObjectsRequest.KeyVersion> deleteList = new ArrayList<DeleteObjectsRequest.KeyVersion>();
+ for (S3ObjectSummary s3ObjSumm : metaList.getObjectSummaries()) {
+ deleteList.add(new DeleteObjectsRequest.KeyVersion(s3ObjSumm.getKey()));
+ }
+ if (deleteList.size() > 0) {
+ DeleteObjectsRequest delObjsReq = new DeleteObjectsRequest(bucket);
+ delObjsReq.setKeys(deleteList);
+ DeleteObjectsResult dobjs = s3service.deleteObjects(delObjsReq);
+ }
+ } finally {
+ if (contextClassLoader != null) {
+ Thread.currentThread().setContextClassLoader(contextClassLoader);
+ }
+ }
+ }
+
+ @Override
+ public Iterator<DataRecord> getAllRecords() {
+ return new RecordsIterator<DataRecord>(
+ new Function<S3ObjectSummary, DataRecord>() {
+ @Override
+ public DataRecord apply(S3ObjectSummary input) {
+ return new S3DataRecord(s3service, bucket, getIdentifierName(input.getKey()),
+ input.getLastModified().getTime(), input.getSize());
+ }
+ });
+ }
+
+ @Override
+ public DataRecord getRecord(DataIdentifier identifier) throws DataStoreException {
+ long start = System.currentTimeMillis();
+ String key = getKeyName(identifier);
+ ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
+ try {
+ Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
+
+ ObjectMetadata object = s3service.getObjectMetadata(bucket, key);
+ S3DataRecord record = new S3DataRecord(s3service, bucket, identifier.toString(),
+ object.getLastModified().getTime(), object.getContentLength());
+ LOG.debug("Identifier [{}]'s getRecord = [{}] took [{}]ms.",
+ new Object[] {identifier, record, (System.currentTimeMillis() - start)});
+
+ return record;
+ } catch (AmazonServiceException e) {
+ if (e.getStatusCode() == 404 || e.getStatusCode() == 403) {
+ LOG.info(
+ "getRecord:Identifier [{}] not found. Took [{}] ms.",
+ identifier, (System.currentTimeMillis() - start));
+ }
+ throw new DataStoreException(e);
+ } finally {
+ if (contextClassLoader != null) {
+ Thread.currentThread().setContextClassLoader(contextClassLoader);
+ }
+ }
+ }
+
+ /**
+ * Returns an iterator over the S3 objects
+ * @param <T>
+ */
+ class RecordsIterator<T> extends AbstractIterator<T> {
+ ObjectListing prevObjectListing;
+ Queue<S3ObjectSummary> queue;
+ long size;
+ Function<S3ObjectSummary, T> transformer;
+
+ public RecordsIterator (Function<S3ObjectSummary, T> transformer) {
+ queue = Lists.newLinkedList();
+ this.transformer = transformer;
+ }
+
+ @Override
+ protected T computeNext() {
+ if (queue.isEmpty()) {
+ loadBatch();
+ }
+
+ if (!queue.isEmpty()) {
+ return transformer.apply(queue.remove());
+ }
+
+ return endOfData();
+ }
+
+ private boolean loadBatch() {
+ ClassLoader contextClassLoader = currentThread().getContextClassLoader();
+ long start = System.currentTimeMillis();
+ try {
+ currentThread().setContextClassLoader(getClass().getClassLoader());
+
+ // initialize the listing the first time
+ if (prevObjectListing == null) {
+ prevObjectListing = s3service.listObjects(bucket);
+ } else if (prevObjectListing.isTruncated()) { //already initialized more objects available
+ prevObjectListing = s3service.listNextBatchOfObjects(prevObjectListing);
+ } else { // no more available
+ return false;
+ }
+
+ List<S3ObjectSummary> listing = Lists.newArrayList(
+ filter(prevObjectListing.getObjectSummaries(),
+ new Predicate<S3ObjectSummary>() {
+ @Override
+ public boolean apply(S3ObjectSummary input) {
+ return !input.getKey().startsWith(META_KEY_PREFIX);
+ }
+ }));
+
+ // After filtering no elements
+ if (listing.isEmpty()) {
+ return false;
+ }
+
+ size += listing.size();
+ queue.addAll(listing);
+
+ LOG.info("Loaded batch of size [{}] in [{}] ms.",
+ listing.size(), (System.currentTimeMillis() - start));
+
+ return true;
+ } catch (AmazonServiceException e) {
+ LOG.warn("Could not list objects", e);
+ } finally {
+ if (contextClassLoader != null) {
+ currentThread().setContextClassLoader(contextClassLoader);
+ }
+ }
+ return false;
+ }
+ }
+
+ private static String addMetaKeyPrefix(String key) {
+ return META_KEY_PREFIX + key;
+ }
+
+ private static String stripMetaKeyPrefix(String name) {
+ if (name.startsWith(META_KEY_PREFIX)) {
+ return name.substring(META_KEY_PREFIX.length());
+ }
+ return name;
+ }
+
+ /**
+ * S3DataRecord which lazily retrieves the input stream of the record.
+ */
+ static class S3DataRecord implements DataRecord {
+ private AmazonS3Client s3service;
+ private DataIdentifier identifier;
+ private long length;
+ private long lastModified;
+ private String bucket;
+ private boolean isMeta;
+
+ public S3DataRecord(AmazonS3Client s3service, String bucket, String key, long lastModified,
+ long length) {
+ this(s3service, bucket, key, lastModified, length, false);
+ }
+
+ public S3DataRecord(AmazonS3Client s3service, String bucket, String key, long lastModified,
+ long length, boolean isMeta) {
+ this.s3service = s3service;
+ this.identifier = new DataIdentifier(key);
+ this.lastModified = lastModified;
+ this.length = length;
+ this.bucket = bucket;
+ this.isMeta = isMeta;
+ }
+
+ @Override
+ public DataIdentifier getIdentifier() {
+ return identifier;
+ }
+
+ @Override
+ public String getReference() {
+ return identifier.toString();
+ }
+
+ @Override
+ public long getLength() throws DataStoreException {
+ return length;
+ }
+
+ @Override
+ public InputStream getStream() throws DataStoreException {
+ String id = getKeyName(identifier);
+ if (isMeta) {
+ id = addMetaKeyPrefix(identifier.toString());
+ }
+ return s3service.getObject(bucket, id).getObjectContent();
+ }
+
+ @Override
+ public long getLastModified() {
+ return lastModified;
+ }
+
+ @Override
+ public String toString() {
+ return "S3DataRecord{" +
+ "identifier=" + identifier +
+ ", length=" + length +
+ ", lastModified=" + lastModified +
+ ", bucket='" + bucket + '\'' +
+ '}';
+ }
+ }
+
+ /**
+ * This method rename object keys in S3 concurrently. The number of
+ * concurrent threads is defined by 'maxConnections' property in
+ * aws.properties. As S3 doesn't have "move" command, this method simulate
+ * move as copy object object to new key and then delete older key.
+ */
+ private void renameKeys() throws DataStoreException {
+ long startTime = System.currentTimeMillis();
+ ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
+ long count = 0;
+ try {
+ Thread.currentThread().setContextClassLoader(
+ getClass().getClassLoader());
+ ObjectListing prevObjectListing = s3service.listObjects(bucket);
+ List<DeleteObjectsRequest.KeyVersion> deleteList = new ArrayList<DeleteObjectsRequest.KeyVersion>();
+ int nThreads = Integer.parseInt(properties.getProperty("maxConnections"));
+ ExecutorService executor = Executors.newFixedThreadPool(nThreads,
+ new NamedThreadFactory("s3-object-rename-worker"));
+ boolean taskAdded = false;
+ while (true) {
+ for (S3ObjectSummary s3ObjSumm : prevObjectListing.getObjectSummaries()) {
+ executor.execute(new KeyRenameThread(s3ObjSumm.getKey()));
+ taskAdded = true;
+ count++;
+ // delete the object if it follows old key name format
+ if( s3ObjSumm.getKey().startsWith(KEY_PREFIX)) {
+ deleteList.add(new DeleteObjectsRequest.KeyVersion(
+ s3ObjSumm.getKey()));
+ }
+ }
+ if (!prevObjectListing.isTruncated()) break;
+ prevObjectListing = s3service.listNextBatchOfObjects(prevObjectListing);
+ }
+ // This will make the executor accept no new threads
+ // and finish all existing threads in the queue
+ executor.shutdown();
+
+ try {
+ // Wait until all threads are finish
+ while (taskAdded
+ && !executor.awaitTermination(10, TimeUnit.SECONDS)) {
+ LOG.info("Rename S3 keys tasks timedout. Waiting again");
+ }
+ } catch (InterruptedException ie) {
+
+ }
+ LOG.info("Renamed [{}] keys, time taken [{}]sec", count,
+ ((System.currentTimeMillis() - startTime) / 1000));
+ // Delete older keys.
+ if (deleteList.size() > 0) {
+ DeleteObjectsRequest delObjsReq = new DeleteObjectsRequest(
+ bucket);
+ int batchSize = 500, startIndex = 0, size = deleteList.size();
+ int endIndex = batchSize < size ? batchSize : size;
+ while (endIndex <= size) {
+ delObjsReq.setKeys(Collections.unmodifiableList(deleteList.subList(
+ startIndex, endIndex)));
+ DeleteObjectsResult dobjs = s3service.deleteObjects(delObjsReq);
+ LOG.info(
+ "Records[{}] deleted in datastore from index [{}] to [{}]",
+ new Object[] { dobjs.getDeletedObjects().size(),
+ startIndex, (endIndex - 1) });
+ if (endIndex == size) {
+ break;
+ } else {
+ startIndex = endIndex;
+ endIndex = (startIndex + batchSize) < size
+ ? (startIndex + batchSize)
+ : size;
+ }
+ }
+ }
+ } finally {
+ if (contextClassLoader != null) {
+ Thread.currentThread().setContextClassLoader(contextClassLoader);
+ }
+ }
+ }
+
+ /**
+ * The method convert old key format to new format. For e.g. this method
+ * converts old key dataStore_004cb70c8f87d78f04da41e7547cb434094089ea to
+ * 004c-b70c8f87d78f04da41e7547cb434094089ea.
+ */
+ private static String convertKey(String oldKey)
+ throws IllegalArgumentException {
+ if (!oldKey.startsWith(KEY_PREFIX)) {
+ return oldKey;
+ }
+ String key = oldKey.substring(KEY_PREFIX.length());
+ return key.substring(0, 4) + Utils.DASH + key.substring(4);
+ }
+
+ /**
+ * Get key from data identifier. Object is stored with key in S3.
+ */
+ private static String getKeyName(DataIdentifier identifier) {
+ String key = identifier.toString();
+ return key.substring(0, 4) + Utils.DASH + key.substring(4);
+ }
+
+ /**
+ * Get data identifier from key.
+ */
+ private static String getIdentifierName(String key) {
+ if (!key.contains(Utils.DASH)) {
+ return null;
+ } else if (key.contains(META_KEY_PREFIX)) {
+ return key;
+ }
+ return key.substring(0, 4) + key.substring(5);
+ }
+
+
+ /**
+ * The class renames object key in S3 in a thread.
+ */
+ private class KeyRenameThread implements Runnable {
+
+ private String oldKey;
+
+ public void run() {
+ ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
+ try {
+ Thread.currentThread().setContextClassLoader(
+ getClass().getClassLoader());
+ String newS3Key = convertKey(oldKey);
+ CopyObjectRequest copReq = new CopyObjectRequest(bucket,
+ oldKey, bucket, newS3Key);
+ Copy copy = tmx.copy(s3ReqDecorator.decorate(copReq));
+ try {
+ copy.waitForCopyResult();
+ LOG.debug("[{}] renamed to [{}] ", oldKey, newS3Key);
+ } catch (InterruptedException ie) {
+ LOG.error(" Exception in renaming [{}] to [{}] ",
+ new Object[] { ie, oldKey, newS3Key });
+ }
+
+ } finally {
+ if (contextClassLoader != null) {
+ Thread.currentThread().setContextClassLoader(
+ contextClassLoader);
+ }
+ }
+ }
+
+ public KeyRenameThread(String oldKey) {
+ this.oldKey = oldKey;
+ }
+ }
+}
Propchange: jackrabbit/oak/trunk/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/s3/S3Backend.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: jackrabbit/oak/trunk/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/s3/S3DataStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/s3/S3DataStore.java?rev=1766338&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/s3/S3DataStore.java (added)
+++ jackrabbit/oak/trunk/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/s3/S3DataStore.java Mon Oct 24 04:51:26 2016
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.blob.cloud.s3;
+
+import java.util.Properties;
+
+import com.google.common.base.Strings;
+import org.apache.jackrabbit.core.data.DataIdentifier;
+import org.apache.jackrabbit.core.data.DataStoreException;
+import org.apache.jackrabbit.oak.plugins.blob.AbstractSharedCachingDataStore;
+import org.apache.jackrabbit.oak.spi.blob.SharedBackend;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Amazon S3 data store extending from {@link AbstractSharedCachingDataStore}.
+ */
+public class S3DataStore extends AbstractSharedCachingDataStore {
+ /**
+ * Logger instance.
+ */
+ private static final Logger LOG = LoggerFactory.getLogger(S3DataStore.class);
+
+ protected Properties properties;
+
+ /**
+ * The minimum size of an object that should be stored in this data store.
+ */
+ private int minRecordLength = 16 * 1024;
+
+ private String secret;
+
+ @Override
+ protected SharedBackend createBackend() {
+ S3Backend backend = new S3Backend();
+ if(properties != null){
+ backend.setProperties(properties);
+ }
+ return backend;
+ }
+
+ @Override
+ protected byte[] getOrCreateReferenceKey() throws DataStoreException {
+ try {
+ return secret.getBytes("UTF-8");
+ } catch (Exception e) {
+ LOG.info("Error in creating reference key", e);
+ throw new DataStoreException(e);
+ }
+ }
+
+ /**
+ * Look in the backend for a record matching the given identifier. Returns true
+ * if such a record exists.
+ *
+ * @param identifier - An identifier for the record.
+ * @return true if a record for the provided identifier can be found.
+ */
+ public boolean haveRecordForIdentifier(final String identifier) {
+ try {
+ if (!Strings.isNullOrEmpty(identifier)) {
+ return backend.exists(new DataIdentifier(identifier));
+ }
+ }
+ catch (DataStoreException e) {
+ LOG.warn(String.format("Data Store Exception caught checking for %s in pending uploads",
+ identifier), e);
+ }
+ return false;
+ }
+
+ /**------------------------------------------- Getters & Setters-----------------------------**/
+
+ /**
+ * Properties required to configure the S3Backend
+ */
+ public void setProperties(Properties properties) {
+ this.properties = properties;
+ }
+
+ public SharedBackend getBackend() {
+ return backend;
+ }
+
+ @Override
+ public int getMinRecordLength() {
+ return minRecordLength;
+ }
+
+ public void setMinRecordLength(int minRecordLength) {
+ this.minRecordLength = minRecordLength;
+ }
+
+ public void setSecret(String secret) {
+ this.secret = secret;
+ }
+}
Propchange: jackrabbit/oak/trunk/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/s3/S3DataStore.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: jackrabbit/oak/trunk/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/s3/S3DataStoreService.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/s3/S3DataStoreService.java?rev=1766338&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/s3/S3DataStoreService.java (added)
+++ jackrabbit/oak/trunk/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/s3/S3DataStoreService.java Mon Oct 24 04:51:26 2016
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.blob.cloud.s3;
+
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.ConfigurationPolicy;
+
+@Component(policy = ConfigurationPolicy.REQUIRE, name = S3DataStoreService.NAME, metatype = true)
+public class S3DataStoreService extends AbstractS3DataStoreService {
+ public static final String NAME = "org.apache.jackrabbit.oak.plugins.blob.datastore.S3DataStore";
+}
Propchange: jackrabbit/oak/trunk/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/s3/S3DataStoreService.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: jackrabbit/oak/trunk/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/s3/SharedS3DataStoreService.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/s3/SharedS3DataStoreService.java?rev=1766338&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/s3/SharedS3DataStoreService.java (added)
+++ jackrabbit/oak/trunk/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/s3/SharedS3DataStoreService.java Mon Oct 24 04:51:26 2016
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.blob.cloud.s3;
+
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.ConfigurationPolicy;
+
+@Component(policy = ConfigurationPolicy.REQUIRE, name = SharedS3DataStoreService.NAME, metatype = true)
+public class SharedS3DataStoreService extends AbstractS3DataStoreService {
+ public static final String NAME = "org.apache.jackrabbit.oak.plugins.blob.datastore.SharedS3DataStore";
+}
Propchange: jackrabbit/oak/trunk/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/s3/SharedS3DataStoreService.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: jackrabbit/oak/trunk/oak-blob-cloud/src/test/java/org/apache/jackrabbit/oak/blob/cloud/aws/s3/TestS3DSAsyncTouch.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-blob-cloud/src/test/java/org/apache/jackrabbit/oak/blob/cloud/aws/s3/TestS3DSAsyncTouch.java?rev=1766338&r1=1766337&r2=1766338&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-blob-cloud/src/test/java/org/apache/jackrabbit/oak/blob/cloud/aws/s3/TestS3DSAsyncTouch.java (original)
+++ jackrabbit/oak/trunk/oak-blob-cloud/src/test/java/org/apache/jackrabbit/oak/blob/cloud/aws/s3/TestS3DSAsyncTouch.java Mon Oct 24 04:51:26 2016
@@ -44,7 +44,7 @@ public class TestS3DSAsyncTouch extends
@Parameterized.Parameters(name = "{index}: ({0})")
public static List<String> fixtures() {
- return Lists.newArrayList(getFixtures().get(0));
+ return Lists.newArrayList(getFixtures().get(1));
}
@Override
Added: jackrabbit/oak/trunk/oak-blob-cloud/src/test/java/org/apache/jackrabbit/oak/blob/cloud/s3/S3DataStoreServiceTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-blob-cloud/src/test/java/org/apache/jackrabbit/oak/blob/cloud/s3/S3DataStoreServiceTest.java?rev=1766338&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-blob-cloud/src/test/java/org/apache/jackrabbit/oak/blob/cloud/s3/S3DataStoreServiceTest.java (added)
+++ jackrabbit/oak/trunk/oak-blob-cloud/src/test/java/org/apache/jackrabbit/oak/blob/cloud/s3/S3DataStoreServiceTest.java Mon Oct 24 04:51:26 2016
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.blob.cloud.s3;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Map;
+
+import com.google.common.collect.Maps;
+import org.apache.jackrabbit.oak.blob.cloud.aws.s3.SharedS3DataStore;
+import org.apache.jackrabbit.oak.plugins.blob.AbstractSharedCachingDataStore;
+import org.apache.jackrabbit.oak.stats.StatisticsProvider;
+import org.apache.sling.testing.mock.osgi.junit.OsgiContext;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.rules.TemporaryFolder;
+
+import static com.google.common.collect.Maps.newHashMap;
+import static org.apache.jackrabbit.oak.blob.cloud.s3.S3DataStoreUtils.isS3Configured;
+import static org.apache.jackrabbit.oak.plugins.blob.datastore.AbstractDataStoreService
+ .JR2_CACHING_PROP;
+import static org.apache.sling.testing.mock.osgi.MockOsgi.deactivate;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assume.assumeTrue;
+
+/**
+ * Tests the registration of the S3DataStore.
+ */
+public class S3DataStoreServiceTest {
+
+ @Rule
+ public OsgiContext context = new OsgiContext();
+
+ @Rule
+ public ExpectedException expectedEx = ExpectedException.none();
+
+ @Rule
+ public TemporaryFolder folder = new TemporaryFolder(new File("target"));
+
+ @BeforeClass
+ public static void assumptions() {
+ assumeTrue(isS3Configured());
+ }
+
+ @Before
+ public void setUp() {
+ context.registerService(StatisticsProvider.class, StatisticsProvider.NOOP);
+ }
+
+ @After
+ public void tear() {
+ System.clearProperty(JR2_CACHING_PROP);
+ }
+
+ @Test
+ public void testDefaultS3Implementation() throws IOException {
+ registerBlobStore();
+ assertNotNull(context.getService(AbstractSharedCachingDataStore.class));
+
+ unregisterBlobStore();
+ }
+
+ @Test
+ public void testJR2Caching() throws IOException {
+ System.setProperty(JR2_CACHING_PROP, "true");
+ registerBlobStore();
+ assertNotNull(context.getService(SharedS3DataStore.class));
+
+ unregisterBlobStore();
+ }
+
+ private S3DataStoreService service;
+
+ private void registerBlobStore() throws IOException {
+ Map<String, Object> properties = newHashMap();
+ properties.putAll(Maps.fromProperties(S3DataStoreUtils.getS3Config()));
+ properties.put("repository.home", folder.newFolder().getAbsolutePath());
+ service = context
+ .registerInjectActivateService(new S3DataStoreService(), properties);
+ }
+
+ private void unregisterBlobStore() {
+ deactivate(service);
+ }
+}
Propchange: jackrabbit/oak/trunk/oak-blob-cloud/src/test/java/org/apache/jackrabbit/oak/blob/cloud/s3/S3DataStoreServiceTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: jackrabbit/oak/trunk/oak-blob-cloud/src/test/java/org/apache/jackrabbit/oak/blob/cloud/s3/S3DataStoreUtils.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-blob-cloud/src/test/java/org/apache/jackrabbit/oak/blob/cloud/s3/S3DataStoreUtils.java?rev=1766338&r1=1766337&r2=1766338&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-blob-cloud/src/test/java/org/apache/jackrabbit/oak/blob/cloud/s3/S3DataStoreUtils.java (original)
+++ jackrabbit/oak/trunk/oak-blob-cloud/src/test/java/org/apache/jackrabbit/oak/blob/cloud/s3/S3DataStoreUtils.java Mon Oct 24 04:51:26 2016
@@ -36,10 +36,7 @@ import com.google.common.base.Strings;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Maps;
import org.apache.commons.io.IOUtils;
-import org.apache.jackrabbit.core.data.Backend;
import org.apache.jackrabbit.core.data.DataStore;
-import org.apache.jackrabbit.oak.blob.cloud.aws.s3.S3Backend;
-import org.apache.jackrabbit.oak.blob.cloud.aws.s3.S3DataStore;
import org.apache.jackrabbit.oak.blob.cloud.aws.s3.SharedS3DataStore;
import org.apache.jackrabbit.oak.commons.PropertiesUtil;
import org.apache.jackrabbit.oak.plugins.blob.datastore.DataStoreUtils;
@@ -47,21 +44,26 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * Extension to {@link DataStoreUtils} to enable S3 extensions for cleaning.
+ * Extension to {@link DataStoreUtils} to enable S3 extensions for cleaning and initialization.
*/
public class S3DataStoreUtils extends DataStoreUtils {
private static final Logger log = LoggerFactory.getLogger(S3DataStoreUtils.class);
private static final String DEFAULT_CONFIG_PATH = "./src/test/resources/aws.properties";
+ private static Class JR2_S3 = SharedS3DataStore.class;
+ private static Class S3 = S3DataStore.class;
+
public static List<String> getFixtures() {
- return ImmutableList.of(SharedS3DataStore.class.getName());
+ return ImmutableList.of(
+ S3.getName(),
+ JR2_S3.getName());
}
public static boolean isS3DataStore() {
String dsName = System.getProperty(DS_CLASS_NAME);
- boolean s3Class = (dsName != null) && (dsName.equals(S3DataStore.class.getName()) || dsName
- .equals(SharedS3DataStore.class.getName()));
+ boolean s3Class =
+ (dsName != null) && (dsName.equals(S3.getName()) || dsName.equals(JR2_S3.getName()));
if (!isS3Configured()) {
return false;
}
@@ -120,8 +122,10 @@ public class S3DataStoreUtils extends Da
DataStore ds = Class.forName(className).asSubclass(DataStore.class).newInstance();
PropertiesUtil.populate(ds, Maps.fromProperties(props), false);
// Set the props object
- if (SharedS3DataStore.class.getName().equals(className)) {
- ((SharedS3DataStore) ds).setProperties(props);
+ if (S3.getName().equals(className)) {
+ ((S3DataStore) ds).setProperties(props);
+ } else if (JR2_S3.getName().equals(className)) {
+ ((org.apache.jackrabbit.oak.blob.cloud.aws.s3.SharedS3DataStore) ds).setProperties(props);
}
ds.init(homeDir);
@@ -132,23 +136,6 @@ public class S3DataStoreUtils extends Da
return getS3DataStore(className, getS3Config(), homeDir);
}
- /**
- * S3 specific cleanup
- *
- * @param dataStore the underlying DataStore instance
- * @param date the date of initialization
- * @throws Exception
- */
- public static void cleanup(DataStore dataStore, Date date) throws Exception {
- if (dataStore instanceof S3DataStore) {
- Backend backend = ((S3DataStore) dataStore).getBackend();
- if (backend instanceof S3Backend) {
- String bucket = ((S3Backend) backend).getBucket();
- deleteBucket(bucket, date);
- }
- }
- }
-
public static void deleteBucket(String bucket, Date date) throws Exception {
log.info("cleaning bucket [" + bucket + "]");
Properties props = getS3Config();
Modified: jackrabbit/oak/trunk/oak-blob-cloud/src/test/java/org/apache/jackrabbit/oak/blob/cloud/s3/TestS3DataStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-blob-cloud/src/test/java/org/apache/jackrabbit/oak/blob/cloud/s3/TestS3DataStore.java?rev=1766338&r1=1766337&r2=1766338&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-blob-cloud/src/test/java/org/apache/jackrabbit/oak/blob/cloud/s3/TestS3DataStore.java (original)
+++ jackrabbit/oak/trunk/oak-blob-cloud/src/test/java/org/apache/jackrabbit/oak/blob/cloud/s3/TestS3DataStore.java Mon Oct 24 04:51:26 2016
@@ -23,9 +23,7 @@ import java.util.Properties;
import javax.jcr.RepositoryException;
-import com.google.common.base.Strings;
import org.apache.jackrabbit.core.data.DataStore;
-import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
@@ -61,8 +59,6 @@ public class TestS3DataStore {
private File dataStoreDir;
- private String bucket;
-
private DataStore ds;
@Parameterized.Parameters(name = "{index}: ({0})")
@@ -77,17 +73,6 @@ public class TestS3DataStore {
startTime = new Date();
}
- @After
- public void tearDown() {
- try {
- if (ds != null && !Strings.isNullOrEmpty(bucket)) {
- S3DataStoreUtils.deleteBucket(bucket, startTime);
- }
- } catch (Exception ignore) {
-
- }
- }
-
@Test
public void testAccessParamLeakOnError() throws Exception {
expectedEx.expect(RepositoryException.class);
Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/datastore/AbstractDataStoreService.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/datastore/AbstractDataStoreService.java?rev=1766338&r1=1766337&r2=1766338&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/datastore/AbstractDataStoreService.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/datastore/AbstractDataStoreService.java Mon Oct 24 04:51:26 2016
@@ -53,6 +53,9 @@ import static org.apache.jackrabbit.oak.
@Component(componentAbstract = true)
public abstract class AbstractDataStoreService {
+ public static final String JR2_CACHING_PROP = "oak.datastore.jr2caching";
+ protected final boolean JR2_CACHING = Boolean.getBoolean(JR2_CACHING_PROP);
+
private static final String PROP_HOME = "repository.home";
public static final String PROP_ENCODE_LENGTH = "encodeLengthInId";