You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by am...@apache.org on 2016/10/24 04:51:26 UTC

svn commit: r1766338 - in /jackrabbit/oak/trunk: oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/aws/s3/ oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/s3/ oak-blob-cloud/src/test/java/org/apache/jackrabbit/oak/blob...

Author: amitj
Date: Mon Oct 24 04:51:26 2016
New Revision: 1766338

URL: http://svn.apache.org/viewvc?rev=1766338&view=rev
Log:
OAK-4870: Implement caching for S3DataStore

S3DataStore implementation using AbstractCachingDataStore

Added:
    jackrabbit/oak/trunk/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/s3/AbstractS3DataStoreService.java   (contents, props changed)
      - copied, changed from r1766337, jackrabbit/oak/trunk/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/aws/s3/S3DataStoreService.java
    jackrabbit/oak/trunk/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/s3/S3Backend.java   (with props)
    jackrabbit/oak/trunk/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/s3/S3DataStore.java   (with props)
    jackrabbit/oak/trunk/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/s3/S3DataStoreService.java   (with props)
    jackrabbit/oak/trunk/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/s3/SharedS3DataStoreService.java   (with props)
    jackrabbit/oak/trunk/oak-blob-cloud/src/test/java/org/apache/jackrabbit/oak/blob/cloud/s3/S3DataStoreServiceTest.java   (with props)
Removed:
    jackrabbit/oak/trunk/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/aws/s3/S3DataStoreService.java
    jackrabbit/oak/trunk/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/aws/s3/SharedS3DataStoreService.java
Modified:
    jackrabbit/oak/trunk/oak-blob-cloud/src/test/java/org/apache/jackrabbit/oak/blob/cloud/aws/s3/TestS3DSAsyncTouch.java
    jackrabbit/oak/trunk/oak-blob-cloud/src/test/java/org/apache/jackrabbit/oak/blob/cloud/s3/S3DataStoreUtils.java
    jackrabbit/oak/trunk/oak-blob-cloud/src/test/java/org/apache/jackrabbit/oak/blob/cloud/s3/TestS3DataStore.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/datastore/AbstractDataStoreService.java

Copied: jackrabbit/oak/trunk/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/s3/AbstractS3DataStoreService.java (from r1766337, jackrabbit/oak/trunk/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/aws/s3/S3DataStoreService.java)
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/s3/AbstractS3DataStoreService.java?p2=jackrabbit/oak/trunk/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/s3/AbstractS3DataStoreService.java&p1=jackrabbit/oak/trunk/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/aws/s3/S3DataStoreService.java&r1=1766337&r2=1766338&rev=1766338&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/aws/s3/S3DataStoreService.java (original)
+++ jackrabbit/oak/trunk/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/s3/AbstractS3DataStoreService.java Mon Oct 24 04:51:26 2016
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.jackrabbit.oak.blob.cloud.aws.s3;
+package org.apache.jackrabbit.oak.blob.cloud.s3;
 
 import java.util.Dictionary;
 import java.util.Hashtable;
@@ -25,40 +25,55 @@ import java.util.Map;
 import java.util.Properties;
 
 import org.apache.felix.scr.annotations.Component;
-import org.apache.felix.scr.annotations.ConfigurationPolicy;
 import org.apache.jackrabbit.core.data.DataStore;
 import org.apache.jackrabbit.core.data.DataStoreException;
+import org.apache.jackrabbit.oak.blob.cloud.aws.s3.SharedS3DataStore;
+import org.apache.jackrabbit.oak.plugins.blob.AbstractSharedCachingDataStore;
 import org.apache.jackrabbit.oak.plugins.blob.datastore.AbstractDataStoreService;
 import org.osgi.framework.Constants;
 import org.osgi.framework.ServiceRegistration;
 import org.osgi.service.component.ComponentContext;
 
-@Component(policy = ConfigurationPolicy.REQUIRE, name = S3DataStoreService.NAME)
-public class S3DataStoreService extends AbstractDataStoreService {
-    public static final String NAME = "org.apache.jackrabbit.oak.plugins.blob.datastore.S3DataStore";
+@Component(componentAbstract = true)
+public class AbstractS3DataStoreService extends AbstractDataStoreService {
     private static final String DESCRIPTION = "oak.datastore.description";
 
     private ServiceRegistration delegateReg;
 
     @Override
     protected DataStore createDataStore(ComponentContext context, Map<String, Object> config) {
-        SharedS3DataStore dataStore = new SharedS3DataStore();
-
         Properties properties = new Properties();
         properties.putAll(config);
 
-        dataStore.setProperties(properties);
-
-        Dictionary<String, Object> props = new Hashtable<String, Object>();
-        props.put(Constants.SERVICE_PID, dataStore.getClass().getName());
-        props.put(DESCRIPTION, getDescription());
-
-        delegateReg = context.getBundleContext().registerService(new String[] {
-            SharedS3DataStore.class.getName(),
-            SharedS3DataStore.class.getName()
-        }, dataStore , props);
+        if (JR2_CACHING) {
+            SharedS3DataStore dataStore = new SharedS3DataStore();
+            dataStore.setProperties(properties);
+
+            Dictionary<String, Object> props = new Hashtable<String, Object>();
+            props.put(Constants.SERVICE_PID, dataStore.getClass().getName());
+            props.put(DESCRIPTION, getDescription());
+
+            delegateReg = context.getBundleContext().registerService(new String[] {
+                SharedS3DataStore.class.getName(),
+                SharedS3DataStore.class.getName()
+            }, dataStore , props);
+            return dataStore;
+        } else {
+            S3DataStore dataStore = new S3DataStore();
+            dataStore.setStatisticsProvider(getStatisticsProvider());
+            dataStore.setProperties(properties);
+
+            Dictionary<String, Object> props = new Hashtable<String, Object>();
+            props.put(Constants.SERVICE_PID, dataStore.getClass().getName());
+            props.put(DESCRIPTION, getDescription());
+
+            delegateReg = context.getBundleContext().registerService(new String[] {
+                AbstractSharedCachingDataStore.class.getName(),
+                AbstractSharedCachingDataStore.class.getName()
+            }, dataStore , props);
 
-        return dataStore;
+            return dataStore;
+        }
     }
 
     protected void deactivate() throws DataStoreException {

Propchange: jackrabbit/oak/trunk/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/s3/AbstractS3DataStoreService.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: jackrabbit/oak/trunk/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/s3/S3Backend.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/s3/S3Backend.java?rev=1766338&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/s3/S3Backend.java (added)
+++ jackrabbit/oak/trunk/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/s3/S3Backend.java Mon Oct 24 04:51:26 2016
@@ -0,0 +1,818 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jackrabbit.oak.blob.cloud.s3;
+
+import java.io.File;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Date;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Queue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import com.amazonaws.AmazonClientException;
+import com.amazonaws.AmazonServiceException;
+import com.amazonaws.regions.Regions;
+import com.amazonaws.services.s3.AmazonS3Client;
+import com.amazonaws.services.s3.model.CopyObjectRequest;
+import com.amazonaws.services.s3.model.DeleteObjectsRequest;
+import com.amazonaws.services.s3.model.DeleteObjectsResult;
+import com.amazonaws.services.s3.model.ListObjectsRequest;
+import com.amazonaws.services.s3.model.ObjectListing;
+import com.amazonaws.services.s3.model.ObjectMetadata;
+import com.amazonaws.services.s3.model.PutObjectRequest;
+import com.amazonaws.services.s3.model.Region;
+import com.amazonaws.services.s3.model.S3Object;
+import com.amazonaws.services.s3.model.S3ObjectSummary;
+import com.amazonaws.services.s3.transfer.Copy;
+import com.amazonaws.services.s3.transfer.TransferManager;
+import com.amazonaws.services.s3.transfer.Upload;
+import com.amazonaws.util.StringUtils;
+import com.google.common.base.Function;
+import com.google.common.base.Predicate;
+import com.google.common.collect.AbstractIterator;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.jackrabbit.core.data.DataIdentifier;
+import org.apache.jackrabbit.core.data.DataRecord;
+import org.apache.jackrabbit.core.data.DataStoreException;
+import org.apache.jackrabbit.core.data.util.NamedThreadFactory;
+import org.apache.jackrabbit.oak.spi.blob.SharedBackend;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static com.google.common.collect.Iterables.filter;
+import static java.lang.Thread.currentThread;
+
+/**
+ * A data store backend that stores data on Amazon S3.
+ */
+public class S3Backend implements SharedBackend {
+
+    /**
+     * Logger instance.
+     */
+    private static final Logger LOG = LoggerFactory.getLogger(S3Backend.class);
+
+    private static final String KEY_PREFIX = "dataStore_";
+
+    private static final String META_KEY_PREFIX = "META/";
+
+    private AmazonS3Client s3service;
+
+    private String bucket;
+
+    private TransferManager tmx;
+
+    private Properties properties;
+
+    private Date startTime;
+
+    private S3RequestDecorator s3ReqDecorator;
+
+    public void init() throws DataStoreException {
+        ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
+
+        try {
+            startTime = new Date();
+            Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
+
+            LOG.debug("init");
+
+            s3ReqDecorator = new S3RequestDecorator(properties);
+            s3service = Utils.openService(properties);
+            if (bucket == null || "".equals(bucket.trim())) {
+                bucket = properties.getProperty(S3Constants.S3_BUCKET);
+            }
+
+            String region = properties.getProperty(S3Constants.S3_REGION);
+            Region s3Region = null;
+            if (StringUtils.isNullOrEmpty(region)) {
+                com.amazonaws.regions.Region ec2Region = Regions.getCurrentRegion();
+                if (ec2Region != null) {
+                    s3Region = Region.fromValue(ec2Region.getName());
+                } else {
+                    throw new AmazonClientException(
+                            "parameter ["
+                                    + S3Constants.S3_REGION
+                                    + "] not configured and cannot be derived from environment");
+                }
+            } else {
+                if (Utils.DEFAULT_AWS_BUCKET_REGION.equals(region)) {
+                    s3Region = Region.US_Standard;
+                } else if (Region.EU_Ireland.toString().equals(region)) {
+                    s3Region = Region.EU_Ireland;
+                } else {
+                    s3Region = Region.fromValue(region);
+                }
+            }
+
+            if (!s3service.doesBucketExist(bucket)) {
+                s3service.createBucket(bucket, s3Region);
+                LOG.info("Created bucket [{}] in [{}] ", bucket, region);
+            } else {
+                LOG.info("Using bucket [{}] in [{}] ", bucket, region);
+            }
+
+            int writeThreads = 10;
+            String writeThreadsStr = properties.getProperty(S3Constants.S3_WRITE_THREADS);
+            if (writeThreadsStr != null) {
+                writeThreads = Integer.parseInt(writeThreadsStr);
+            }
+            LOG.info("Using thread pool of [{}] threads in S3 transfer manager.", writeThreads);
+            tmx = new TransferManager(s3service, Executors.newFixedThreadPool(writeThreads,
+                new NamedThreadFactory("s3-transfer-manager-worker")));
+
+            String renameKeyProp = properties.getProperty(S3Constants.S3_RENAME_KEYS);
+            boolean renameKeyBool = (renameKeyProp == null || "".equals(renameKeyProp))
+                    ? false
+                    : Boolean.parseBoolean(renameKeyProp);
+            LOG.info("Rename keys [{}]", renameKeyBool);
+            if (renameKeyBool) {
+                renameKeys();
+            }
+            LOG.debug("S3 Backend initialized in [{}] ms",
+                +(System.currentTimeMillis() - startTime.getTime()));
+        } catch (Exception e) {
+            LOG.debug("  error ", e);
+            Map<String, String> filteredMap = Maps.newHashMap();
+            if (properties != null) {
+                filteredMap = Maps.filterKeys(Maps.fromProperties(properties), new Predicate<String>() {
+                    @Override public boolean apply(String input) {
+                        return !input.equals(S3Constants.ACCESS_KEY) && !input.equals(S3Constants
+                            .SECRET_KEY);
+                    }
+                });
+            }
+            throw new DataStoreException("Could not initialize S3 from " + filteredMap, e);
+        } finally {
+            if (contextClassLoader != null) {
+                Thread.currentThread().setContextClassLoader(contextClassLoader);
+            }
+        }
+    }
+
+    /**
+     * It uploads file to Amazon S3. If file size is greater than 5MB, this
+     * method uses parallel concurrent connections to upload.
+     */
+    @Override
+    public void write(DataIdentifier identifier, File file)
+        throws DataStoreException {
+        String key = getKeyName(identifier);
+        ObjectMetadata objectMetaData = null;
+        long start = System.currentTimeMillis();
+        ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
+        try {
+            Thread.currentThread().setContextClassLoader(
+                getClass().getClassLoader());
+            // check if the same record already exists
+            try {
+                objectMetaData = s3service.getObjectMetadata(bucket, key);
+            } catch (AmazonServiceException ase) {
+                if (!(ase.getStatusCode() == 404 || ase.getStatusCode() == 403)) {
+                    throw ase;
+                }
+            }
+            if (objectMetaData != null) {
+                long l = objectMetaData.getContentLength();
+                if (l != file.length()) {
+                    throw new DataStoreException("Collision: " + key
+                        + " new length: " + file.length() + " old length: " + l);
+                }
+                LOG.debug("[{}]'s exists, lastmodified = [{}]", key,
+                    objectMetaData.getLastModified().getTime());
+                CopyObjectRequest copReq = new CopyObjectRequest(bucket, key,
+                    bucket, key);
+                copReq.setNewObjectMetadata(objectMetaData);
+                Copy copy = tmx.copy(s3ReqDecorator.decorate(copReq));
+                try {
+                    copy.waitForCopyResult();
+                    LOG.debug("lastModified of [{}] updated successfully.", identifier);
+                }catch (Exception e2) {
+                    throw new DataStoreException("Could not upload " + key, e2);
+                }
+            }
+
+            if (objectMetaData == null) {
+                try {
+                    // start multipart parallel upload using amazon sdk
+                    Upload up = tmx.upload(s3ReqDecorator.decorate(new PutObjectRequest(
+                        bucket, key, file)));
+                    // wait for upload to finish
+                    up.waitForUploadResult();
+                    LOG.debug("synchronous upload to identifier [{}] completed.", identifier);
+                } catch (Exception e2 ) {
+                    throw new DataStoreException("Could not upload " + key, e2);
+                }
+            }
+        } finally {
+            if (contextClassLoader != null) {
+                Thread.currentThread().setContextClassLoader(contextClassLoader);
+            }
+        }
+        LOG.debug(
+            "write of [{}], length=[{}], in [{}]ms",
+            new Object[] { identifier, file.length(), (System.currentTimeMillis() - start) });
+    }
+
+    /**
+     * Check if record identified by identifier exists in Amazon S3.
+     */
+    @Override
+    public boolean exists(DataIdentifier identifier) throws DataStoreException {
+        long start = System.currentTimeMillis();
+        String key = getKeyName(identifier);
+        ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
+        try {
+            Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
+            ObjectMetadata objectMetaData = s3service.getObjectMetadata(bucket,
+                key);
+            if (objectMetaData != null) {
+                LOG.trace("exists [{}]: [true] took [{}] ms.",
+                    identifier, (System.currentTimeMillis() - start) );
+                return true;
+            }
+            return false;
+        } catch (AmazonServiceException e) {
+            if (e.getStatusCode() == 404 || e.getStatusCode() == 403) {
+                LOG.debug("exists [{}]: [false] took [{}] ms.",
+                    identifier, (System.currentTimeMillis() - start) );
+                return false;
+            }
+            throw new DataStoreException(
+                "Error occured to getObjectMetadata for key [" + identifier.toString() + "]", e);
+        } finally {
+            if (contextClassLoader != null) {
+                Thread.currentThread().setContextClassLoader(contextClassLoader);
+            }
+        }
+    }
+
+    @Override
+    public InputStream read(DataIdentifier identifier)
+            throws DataStoreException {
+        long start = System.currentTimeMillis();
+        String key = getKeyName(identifier);
+        ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
+        try {
+            Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
+            S3Object object = s3service.getObject(bucket, key);
+            InputStream in = object.getObjectContent();
+            LOG.debug("[{}] read took [{}]ms", identifier, (System.currentTimeMillis() - start));
+            return in;
+        } catch (AmazonServiceException e) {
+            throw new DataStoreException("Object not found: " + key, e);
+        } finally {
+            if (contextClassLoader != null) {
+                Thread.currentThread().setContextClassLoader(contextClassLoader);
+            }
+        }
+    }
+
+    @Override
+    public Iterator<DataIdentifier> getAllIdentifiers()
+            throws DataStoreException {
+        return new RecordsIterator<DataIdentifier>(
+            new Function<S3ObjectSummary, DataIdentifier>() {
+                @Override
+                public DataIdentifier apply(S3ObjectSummary input) {
+                    return new DataIdentifier(getIdentifierName(input.getKey()));
+                }
+        });
+    }
+
+    @Override
+    public void deleteRecord(DataIdentifier identifier)
+            throws DataStoreException {
+        long start = System.currentTimeMillis();
+        String key = getKeyName(identifier);
+        ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
+        try {
+            Thread.currentThread().setContextClassLoader(
+                getClass().getClassLoader());
+            s3service.deleteObject(bucket, key);
+            LOG.debug("Identifier [{}] deleted. It took [{}]ms.", new Object[] {
+                identifier, (System.currentTimeMillis() - start) });
+        } catch (AmazonServiceException e) {
+            throw new DataStoreException(
+                "Could not delete dataIdentifier " + identifier, e);
+        } finally {
+            if (contextClassLoader != null) {
+                Thread.currentThread().setContextClassLoader(contextClassLoader);
+            }
+        }
+    }
+
+    @Override
+    public void close() {
+        // backend is closing. abort all mulitpart uploads from start.
+        if(s3service.doesBucketExist(bucket)) {
+            tmx.abortMultipartUploads(bucket, startTime);
+        }
+        tmx.shutdownNow();
+        s3service.shutdown();
+        LOG.info("S3Backend closed.");
+    }
+
+    public String getBucket() {
+        return bucket;
+    }
+
+    public void setBucket(String bucket) {
+        this.bucket = bucket;
+    }
+
+    /**
+     * Properties used to configure the backend. If provided explicitly
+     * before init is invoked then these take precedence
+     *
+     * @param properties  to configure S3Backend
+     */
+    public void setProperties(Properties properties) {
+        this.properties = properties;
+    }
+
+    @Override
+    public void addMetadataRecord(final InputStream input, final String name) throws DataStoreException {
+        ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
+
+        try {
+            Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
+
+            Upload upload = tmx.upload(s3ReqDecorator
+                .decorate(new PutObjectRequest(bucket, addMetaKeyPrefix(name), input, new ObjectMetadata())));
+            upload.waitForUploadResult();
+        } catch (InterruptedException e) {
+            LOG.error("Error in uploading", e);
+            throw new DataStoreException("Error in uploading", e);
+        } finally {
+            if (contextClassLoader != null) {
+                Thread.currentThread().setContextClassLoader(contextClassLoader);
+            }
+        }
+    }
+
+    @Override
+    public void addMetadataRecord(File input, String name) throws DataStoreException {
+        ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
+        try {
+            Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
+
+            Upload upload = tmx.upload(s3ReqDecorator
+                .decorate(new PutObjectRequest(bucket, addMetaKeyPrefix(name), input)));
+            upload.waitForUploadResult();
+        } catch (InterruptedException e) {
+            LOG.error("Exception in uploading metadata file {}", new Object[] {input, e});
+            throw new DataStoreException("Error in uploading metadata file", e);
+        } finally {
+            if (contextClassLoader != null) {
+                Thread.currentThread().setContextClassLoader(contextClassLoader);
+            }
+        }
+    }
+
+    @Override
+    public DataRecord getMetadataRecord(String name) {
+        ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
+        try {
+            Thread.currentThread().setContextClassLoader(
+                getClass().getClassLoader());
+            ObjectMetadata meta = s3service.getObjectMetadata(bucket, addMetaKeyPrefix(name));
+            return new S3DataRecord(s3service, bucket, name,
+                meta.getLastModified().getTime(), meta.getContentLength(), true);
+        } finally {
+            if (contextClassLoader != null) {
+                Thread.currentThread().setContextClassLoader(contextClassLoader);
+            }
+        }
+    }
+
+    @Override
+    public List<DataRecord> getAllMetadataRecords(String prefix) {
+        List<DataRecord> metadataList = new ArrayList<DataRecord>();
+        ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
+        try {
+            Thread.currentThread().setContextClassLoader(
+                getClass().getClassLoader());
+            ListObjectsRequest listObjectsRequest =
+                new ListObjectsRequest().withBucketName(bucket).withPrefix(addMetaKeyPrefix(prefix));
+            ObjectListing prevObjectListing = s3service.listObjects(listObjectsRequest);
+            for (final S3ObjectSummary s3ObjSumm : prevObjectListing.getObjectSummaries()) {
+                metadataList.add(new S3DataRecord(s3service, bucket, stripMetaKeyPrefix(s3ObjSumm.getKey()),
+                    s3ObjSumm.getLastModified().getTime(), s3ObjSumm.getSize(), true));
+            }
+        } finally {
+            if (contextClassLoader != null) {
+                Thread.currentThread().setContextClassLoader(contextClassLoader);
+            }
+        }
+        return metadataList;
+    }
+
+    @Override
+    public boolean deleteMetadataRecord(String name) {
+        ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
+        try {
+            Thread.currentThread().setContextClassLoader(
+                getClass().getClassLoader());
+            s3service.deleteObject(bucket, addMetaKeyPrefix(name));
+        } finally {
+            if (contextClassLoader != null) {
+                Thread.currentThread().setContextClassLoader(contextClassLoader);
+            }
+        }
+        return true;
+    }
+
+    @Override
+    public void deleteAllMetadataRecords(String prefix) {
+        ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
+        try {
+            Thread.currentThread().setContextClassLoader(
+                getClass().getClassLoader());
+
+            ListObjectsRequest listObjectsRequest =
+                new ListObjectsRequest().withBucketName(bucket).withPrefix(addMetaKeyPrefix(prefix));
+            ObjectListing metaList = s3service.listObjects(listObjectsRequest);
+            List<DeleteObjectsRequest.KeyVersion> deleteList = new ArrayList<DeleteObjectsRequest.KeyVersion>();
+            for (S3ObjectSummary s3ObjSumm : metaList.getObjectSummaries()) {
+                deleteList.add(new DeleteObjectsRequest.KeyVersion(s3ObjSumm.getKey()));
+            }
+            if (deleteList.size() > 0) {
+                DeleteObjectsRequest delObjsReq = new DeleteObjectsRequest(bucket);
+                delObjsReq.setKeys(deleteList);
+                DeleteObjectsResult dobjs = s3service.deleteObjects(delObjsReq);
+            }
+        } finally {
+            if (contextClassLoader != null) {
+                Thread.currentThread().setContextClassLoader(contextClassLoader);
+            }
+        }
+    }
+
+    @Override
+    public Iterator<DataRecord> getAllRecords() {
+        return new RecordsIterator<DataRecord>(
+            new Function<S3ObjectSummary, DataRecord>() {
+                @Override
+                public DataRecord apply(S3ObjectSummary input) {
+                    return new S3DataRecord(s3service, bucket, getIdentifierName(input.getKey()),
+                        input.getLastModified().getTime(), input.getSize());
+                }
+            });
+    }
+
+    @Override
+    public DataRecord getRecord(DataIdentifier identifier) throws DataStoreException {
+        long start = System.currentTimeMillis();
+        String key = getKeyName(identifier);
+        ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
+        try {
+            Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
+
+            ObjectMetadata object = s3service.getObjectMetadata(bucket, key);
+            S3DataRecord record = new S3DataRecord(s3service, bucket, identifier.toString(),
+                object.getLastModified().getTime(), object.getContentLength());
+            LOG.debug("Identifier [{}]'s getRecord = [{}] took [{}]ms.",
+                new Object[] {identifier, record, (System.currentTimeMillis() - start)});
+
+            return record;
+        } catch (AmazonServiceException e) {
+            if (e.getStatusCode() == 404 || e.getStatusCode() == 403) {
+                LOG.info(
+                    "getRecord:Identifier [{}] not found. Took [{}] ms.",
+                    identifier, (System.currentTimeMillis() - start));
+            }
+            throw new DataStoreException(e);
+        } finally {
+            if (contextClassLoader != null) {
+                Thread.currentThread().setContextClassLoader(contextClassLoader);
+            }
+        }
+    }
+
+    /**
+     * Returns an iterator over the S3 objects
+     * @param <T>
+     */
+    class RecordsIterator<T> extends AbstractIterator<T> {
+        ObjectListing prevObjectListing;
+        Queue<S3ObjectSummary> queue;
+        long size;
+        Function<S3ObjectSummary, T> transformer;
+
+        public RecordsIterator (Function<S3ObjectSummary, T> transformer) {
+            queue = Lists.newLinkedList();
+            this.transformer = transformer;
+        }
+
+        @Override
+        protected T computeNext() {
+            if (queue.isEmpty()) {
+                loadBatch();
+            }
+
+            if (!queue.isEmpty()) {
+                return transformer.apply(queue.remove());
+            }
+
+            return endOfData();
+        }
+
+        private boolean loadBatch() {
+            ClassLoader contextClassLoader = currentThread().getContextClassLoader();
+            long start = System.currentTimeMillis();
+            try {
+                currentThread().setContextClassLoader(getClass().getClassLoader());
+
+                // initialize the listing the first time
+                if (prevObjectListing == null) {
+                    prevObjectListing = s3service.listObjects(bucket);
+                } else if (prevObjectListing.isTruncated()) { //already initialized more objects available
+                    prevObjectListing = s3service.listNextBatchOfObjects(prevObjectListing);
+                } else { // no more available
+                    return false;
+                }
+
+                List<S3ObjectSummary> listing = Lists.newArrayList(
+                    filter(prevObjectListing.getObjectSummaries(),
+                        new Predicate<S3ObjectSummary>() {
+                            @Override
+                            public boolean apply(S3ObjectSummary input) {
+                                return !input.getKey().startsWith(META_KEY_PREFIX);
+                            }
+                        }));
+
+                // After filtering no elements
+                if (listing.isEmpty()) {
+                    return false;
+                }
+
+                size += listing.size();
+                queue.addAll(listing);
+
+                LOG.info("Loaded batch of size [{}] in [{}] ms.",
+                    listing.size(), (System.currentTimeMillis() - start));
+
+                return true;
+            } catch (AmazonServiceException e) {
+                LOG.warn("Could not list objects", e);
+            } finally {
+                if (contextClassLoader != null) {
+                    currentThread().setContextClassLoader(contextClassLoader);
+                }
+            }
+            return false;
+        }
+    }
+
+    private static String addMetaKeyPrefix(String key) {
+        return META_KEY_PREFIX + key;
+    }
+
+    private static String stripMetaKeyPrefix(String name) {
+        if (name.startsWith(META_KEY_PREFIX)) {
+            return name.substring(META_KEY_PREFIX.length());
+        }
+        return name;
+    }
+
+    /**
+     * S3DataRecord which lazily retrieves the input stream of the record.
+     */
+    static class S3DataRecord implements DataRecord {
+        private AmazonS3Client s3service;
+        private DataIdentifier identifier;
+        private long length;
+        private long lastModified;
+        private String bucket;
+        private boolean isMeta;
+
+        public S3DataRecord(AmazonS3Client s3service, String bucket, String key, long lastModified,
+            long length) {
+            this(s3service, bucket, key, lastModified, length, false);
+        }
+
+        public S3DataRecord(AmazonS3Client s3service, String bucket, String key, long lastModified,
+            long length, boolean isMeta) {
+            this.s3service = s3service;
+            this.identifier = new DataIdentifier(key);
+            this.lastModified = lastModified;
+            this.length = length;
+            this.bucket = bucket;
+            this.isMeta = isMeta;
+        }
+
+        @Override
+        public DataIdentifier getIdentifier() {
+            return identifier;
+        }
+
+        @Override
+        public String getReference() {
+            return identifier.toString();
+        }
+
+        @Override
+        public long getLength() throws DataStoreException {
+            return length;
+        }
+
+        @Override
+        public InputStream getStream() throws DataStoreException {
+            String id = getKeyName(identifier);
+            if (isMeta) {
+                id = addMetaKeyPrefix(identifier.toString());
+            }
+            return s3service.getObject(bucket, id).getObjectContent();
+        }
+
+        @Override
+        public long getLastModified() {
+            return lastModified;
+        }
+
+        @Override
+        public String toString() {
+            return "S3DataRecord{" +
+                "identifier=" + identifier +
+                ", length=" + length +
+                ", lastModified=" + lastModified +
+                ", bucket='" + bucket + '\'' +
+                '}';
+        }
+    }
+
+    /**
+     * This method rename object keys in S3 concurrently. The number of
+     * concurrent threads is defined by 'maxConnections' property in
+     * aws.properties. As S3 doesn't have "move" command, this method simulate
+     * move as copy object object to new key and then delete older key.
+     */
+    private void renameKeys() throws DataStoreException {
+        long startTime = System.currentTimeMillis();
+        ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
+        long count = 0;
+        try {
+            Thread.currentThread().setContextClassLoader(
+                getClass().getClassLoader());
+            ObjectListing prevObjectListing = s3service.listObjects(bucket);
+            List<DeleteObjectsRequest.KeyVersion> deleteList = new ArrayList<DeleteObjectsRequest.KeyVersion>();
+            int nThreads = Integer.parseInt(properties.getProperty("maxConnections"));
+            ExecutorService executor = Executors.newFixedThreadPool(nThreads,
+                new NamedThreadFactory("s3-object-rename-worker"));
+            boolean taskAdded = false;
+            while (true) {
+                for (S3ObjectSummary s3ObjSumm : prevObjectListing.getObjectSummaries()) {
+                    executor.execute(new KeyRenameThread(s3ObjSumm.getKey()));
+                    taskAdded = true;
+                    count++;
+                    // delete the object if it follows old key name format
+                    if( s3ObjSumm.getKey().startsWith(KEY_PREFIX)) {
+                        deleteList.add(new DeleteObjectsRequest.KeyVersion(
+                            s3ObjSumm.getKey()));
+                    }
+                }
+                if (!prevObjectListing.isTruncated()) break;
+                prevObjectListing = s3service.listNextBatchOfObjects(prevObjectListing);
+            }
+            // This will make the executor accept no new threads
+            // and finish all existing threads in the queue
+            executor.shutdown();
+
+            try {
+                // Wait until all threads are finish
+                while (taskAdded
+                    && !executor.awaitTermination(10, TimeUnit.SECONDS)) {
+                    LOG.info("Rename S3 keys tasks timedout. Waiting again");
+                }
+            } catch (InterruptedException ie) {
+
+            }
+            LOG.info("Renamed [{}] keys, time taken [{}]sec", count,
+                ((System.currentTimeMillis() - startTime) / 1000));
+            // Delete older keys.
+            if (deleteList.size() > 0) {
+                DeleteObjectsRequest delObjsReq = new DeleteObjectsRequest(
+                    bucket);
+                int batchSize = 500, startIndex = 0, size = deleteList.size();
+                int endIndex = batchSize < size ? batchSize : size;
+                while (endIndex <= size) {
+                    delObjsReq.setKeys(Collections.unmodifiableList(deleteList.subList(
+                        startIndex, endIndex)));
+                    DeleteObjectsResult dobjs = s3service.deleteObjects(delObjsReq);
+                    LOG.info(
+                        "Records[{}] deleted in datastore from index [{}] to [{}]",
+                        new Object[] { dobjs.getDeletedObjects().size(),
+                            startIndex, (endIndex - 1) });
+                    if (endIndex == size) {
+                        break;
+                    } else {
+                        startIndex = endIndex;
+                        endIndex = (startIndex + batchSize) < size
+                                ? (startIndex + batchSize)
+                                : size;
+                    }
+                }
+            }
+        } finally {
+            if (contextClassLoader != null) {
+                Thread.currentThread().setContextClassLoader(contextClassLoader);
+            }
+        }
+    }
+
+    /**
+     * The method convert old key format to new format. For e.g. this method
+     * converts old key dataStore_004cb70c8f87d78f04da41e7547cb434094089ea to
+     * 004c-b70c8f87d78f04da41e7547cb434094089ea.
+     */
+    private static String convertKey(String oldKey)
+            throws IllegalArgumentException {
+        if (!oldKey.startsWith(KEY_PREFIX)) {
+            return oldKey;
+        }
+        String key = oldKey.substring(KEY_PREFIX.length());
+        return key.substring(0, 4) + Utils.DASH + key.substring(4);
+    }
+
+    /**
+     * Get key from data identifier. Object is stored with key in S3.
+     */
+    private static String getKeyName(DataIdentifier identifier) {
+        String key = identifier.toString();
+        return key.substring(0, 4) + Utils.DASH + key.substring(4);
+    }
+
+    /**
+     * Get data identifier from key.
+     */
+    private static String getIdentifierName(String key) {
+        if (!key.contains(Utils.DASH)) {
+            return null;
+        } else if (key.contains(META_KEY_PREFIX)) {
+            return key;
+        }
+        return key.substring(0, 4) + key.substring(5);
+    }
+
+
+    /**
+     * The class renames object key in S3 in a thread.
+     */
+    private class KeyRenameThread implements Runnable {
+
+        private String oldKey;
+
+        public void run() {
+            ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
+            try {
+                Thread.currentThread().setContextClassLoader(
+                    getClass().getClassLoader());
+                String newS3Key = convertKey(oldKey);
+                CopyObjectRequest copReq = new CopyObjectRequest(bucket,
+                    oldKey, bucket, newS3Key);
+                Copy copy = tmx.copy(s3ReqDecorator.decorate(copReq));
+                try {
+                    copy.waitForCopyResult();
+                    LOG.debug("[{}] renamed to [{}] ", oldKey, newS3Key);
+                } catch (InterruptedException ie) {
+                    LOG.error(" Exception in renaming [{}] to [{}] ",
+                        new Object[] { ie, oldKey, newS3Key });
+                }
+
+            } finally {
+                if (contextClassLoader != null) {
+                    Thread.currentThread().setContextClassLoader(
+                        contextClassLoader);
+                }
+            }
+        }
+
+        public KeyRenameThread(String oldKey) {
+            this.oldKey = oldKey;
+        }
+    }
+}

Propchange: jackrabbit/oak/trunk/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/s3/S3Backend.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: jackrabbit/oak/trunk/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/s3/S3DataStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/s3/S3DataStore.java?rev=1766338&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/s3/S3DataStore.java (added)
+++ jackrabbit/oak/trunk/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/s3/S3DataStore.java Mon Oct 24 04:51:26 2016
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.blob.cloud.s3;
+
+import java.util.Properties;
+
+import com.google.common.base.Strings;
+import org.apache.jackrabbit.core.data.DataIdentifier;
+import org.apache.jackrabbit.core.data.DataStoreException;
+import org.apache.jackrabbit.oak.plugins.blob.AbstractSharedCachingDataStore;
+import org.apache.jackrabbit.oak.spi.blob.SharedBackend;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Amazon S3 data store extending from {@link AbstractSharedCachingDataStore}.
+ */
+public class S3DataStore extends AbstractSharedCachingDataStore {
+    /**
+     * Logger instance.
+     */
+    private static final Logger LOG = LoggerFactory.getLogger(S3DataStore.class);
+
+    protected Properties properties;
+
+    /**
+     * The minimum size of an object that should be stored in this data store.
+     */
+    private int minRecordLength = 16 * 1024;
+
+    private String secret;
+
+    @Override
+    protected SharedBackend createBackend() {
+        S3Backend backend = new S3Backend();
+        if(properties != null){
+            backend.setProperties(properties);
+        }
+        return backend;
+    }
+
+    @Override
+    protected byte[] getOrCreateReferenceKey() throws DataStoreException {
+        try {
+            return secret.getBytes("UTF-8");
+        } catch (Exception e) {
+            LOG.info("Error in creating reference key", e);
+            throw new DataStoreException(e);
+        }
+    }
+
+    /**
+     * Look in the backend for a record matching the given identifier.  Returns true
+     * if such a record exists.
+     *
+     * @param identifier - An identifier for the record.
+     * @return true if a record for the provided identifier can be found.
+     */
+    public boolean haveRecordForIdentifier(final String identifier) {
+        try {
+            if (!Strings.isNullOrEmpty(identifier)) {
+                return backend.exists(new DataIdentifier(identifier));
+            }
+        }
+        catch (DataStoreException e) {
+            LOG.warn(String.format("Data Store Exception caught checking for %s in pending uploads",
+                identifier), e);
+        }
+        return false;
+    }
+
+    /**------------------------------------------- Getters & Setters-----------------------------**/
+
+    /**
+     * Properties required to configure the S3Backend
+     */
+    public void setProperties(Properties properties) {
+        this.properties = properties;
+    }
+
+    public SharedBackend getBackend() {
+        return backend;
+    }
+
+    @Override
+    public int getMinRecordLength() {
+        return minRecordLength;
+    }
+
+    public void setMinRecordLength(int minRecordLength) {
+        this.minRecordLength = minRecordLength;
+    }
+
+    public void setSecret(String secret) {
+        this.secret = secret;
+    }
+}

Propchange: jackrabbit/oak/trunk/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/s3/S3DataStore.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: jackrabbit/oak/trunk/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/s3/S3DataStoreService.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/s3/S3DataStoreService.java?rev=1766338&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/s3/S3DataStoreService.java (added)
+++ jackrabbit/oak/trunk/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/s3/S3DataStoreService.java Mon Oct 24 04:51:26 2016
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.blob.cloud.s3;
+
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.ConfigurationPolicy;
+
+@Component(policy = ConfigurationPolicy.REQUIRE, name = S3DataStoreService.NAME, metatype = true)
+public class S3DataStoreService extends AbstractS3DataStoreService {
+    public static final String NAME = "org.apache.jackrabbit.oak.plugins.blob.datastore.S3DataStore";
+}

Propchange: jackrabbit/oak/trunk/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/s3/S3DataStoreService.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: jackrabbit/oak/trunk/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/s3/SharedS3DataStoreService.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/s3/SharedS3DataStoreService.java?rev=1766338&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/s3/SharedS3DataStoreService.java (added)
+++ jackrabbit/oak/trunk/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/s3/SharedS3DataStoreService.java Mon Oct 24 04:51:26 2016
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.blob.cloud.s3;
+
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.ConfigurationPolicy;
+
+@Component(policy = ConfigurationPolicy.REQUIRE, name = SharedS3DataStoreService.NAME, metatype = true)
+public class SharedS3DataStoreService extends AbstractS3DataStoreService {
+    public static final String NAME = "org.apache.jackrabbit.oak.plugins.blob.datastore.SharedS3DataStore";
+}

Propchange: jackrabbit/oak/trunk/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/s3/SharedS3DataStoreService.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: jackrabbit/oak/trunk/oak-blob-cloud/src/test/java/org/apache/jackrabbit/oak/blob/cloud/aws/s3/TestS3DSAsyncTouch.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-blob-cloud/src/test/java/org/apache/jackrabbit/oak/blob/cloud/aws/s3/TestS3DSAsyncTouch.java?rev=1766338&r1=1766337&r2=1766338&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-blob-cloud/src/test/java/org/apache/jackrabbit/oak/blob/cloud/aws/s3/TestS3DSAsyncTouch.java (original)
+++ jackrabbit/oak/trunk/oak-blob-cloud/src/test/java/org/apache/jackrabbit/oak/blob/cloud/aws/s3/TestS3DSAsyncTouch.java Mon Oct 24 04:51:26 2016
@@ -44,7 +44,7 @@ public class TestS3DSAsyncTouch extends
 
     @Parameterized.Parameters(name = "{index}: ({0})")
     public static List<String> fixtures() {
-        return Lists.newArrayList(getFixtures().get(0));
+        return Lists.newArrayList(getFixtures().get(1));
     }
 
     @Override

Added: jackrabbit/oak/trunk/oak-blob-cloud/src/test/java/org/apache/jackrabbit/oak/blob/cloud/s3/S3DataStoreServiceTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-blob-cloud/src/test/java/org/apache/jackrabbit/oak/blob/cloud/s3/S3DataStoreServiceTest.java?rev=1766338&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-blob-cloud/src/test/java/org/apache/jackrabbit/oak/blob/cloud/s3/S3DataStoreServiceTest.java (added)
+++ jackrabbit/oak/trunk/oak-blob-cloud/src/test/java/org/apache/jackrabbit/oak/blob/cloud/s3/S3DataStoreServiceTest.java Mon Oct 24 04:51:26 2016
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.blob.cloud.s3;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Map;
+
+import com.google.common.collect.Maps;
+import org.apache.jackrabbit.oak.blob.cloud.aws.s3.SharedS3DataStore;
+import org.apache.jackrabbit.oak.plugins.blob.AbstractSharedCachingDataStore;
+import org.apache.jackrabbit.oak.stats.StatisticsProvider;
+import org.apache.sling.testing.mock.osgi.junit.OsgiContext;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.rules.TemporaryFolder;
+
+import static com.google.common.collect.Maps.newHashMap;
+import static org.apache.jackrabbit.oak.blob.cloud.s3.S3DataStoreUtils.isS3Configured;
+import static org.apache.jackrabbit.oak.plugins.blob.datastore.AbstractDataStoreService
+    .JR2_CACHING_PROP;
+import static org.apache.sling.testing.mock.osgi.MockOsgi.deactivate;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assume.assumeTrue;
+
+/**
+ * Tests the registration of the S3DataStore.
+ */
+public class S3DataStoreServiceTest {
+
+    @Rule
+    public OsgiContext context = new OsgiContext();
+
+    @Rule
+    public ExpectedException expectedEx = ExpectedException.none();
+
+    @Rule
+    public TemporaryFolder folder = new TemporaryFolder(new File("target"));
+
+    @BeforeClass
+    public static void assumptions() {
+        assumeTrue(isS3Configured());
+    }
+
+    @Before
+    public void setUp() {
+        context.registerService(StatisticsProvider.class, StatisticsProvider.NOOP);
+    }
+
+    @After
+    public void tear() {
+        System.clearProperty(JR2_CACHING_PROP);
+    }
+
+    @Test
+    public void testDefaultS3Implementation() throws IOException {
+        registerBlobStore();
+        assertNotNull(context.getService(AbstractSharedCachingDataStore.class));
+
+        unregisterBlobStore();
+    }
+
+    @Test
+    public void testJR2Caching() throws IOException {
+        System.setProperty(JR2_CACHING_PROP, "true");
+        registerBlobStore();
+        assertNotNull(context.getService(SharedS3DataStore.class));
+
+        unregisterBlobStore();
+    }
+
+    private S3DataStoreService service;
+
+    private void registerBlobStore() throws IOException {
+        Map<String, Object> properties = newHashMap();
+        properties.putAll(Maps.fromProperties(S3DataStoreUtils.getS3Config()));
+        properties.put("repository.home", folder.newFolder().getAbsolutePath());
+        service = context
+            .registerInjectActivateService(new S3DataStoreService(), properties);
+    }
+
+    private void unregisterBlobStore() {
+        deactivate(service);
+    }
+}

Propchange: jackrabbit/oak/trunk/oak-blob-cloud/src/test/java/org/apache/jackrabbit/oak/blob/cloud/s3/S3DataStoreServiceTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: jackrabbit/oak/trunk/oak-blob-cloud/src/test/java/org/apache/jackrabbit/oak/blob/cloud/s3/S3DataStoreUtils.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-blob-cloud/src/test/java/org/apache/jackrabbit/oak/blob/cloud/s3/S3DataStoreUtils.java?rev=1766338&r1=1766337&r2=1766338&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-blob-cloud/src/test/java/org/apache/jackrabbit/oak/blob/cloud/s3/S3DataStoreUtils.java (original)
+++ jackrabbit/oak/trunk/oak-blob-cloud/src/test/java/org/apache/jackrabbit/oak/blob/cloud/s3/S3DataStoreUtils.java Mon Oct 24 04:51:26 2016
@@ -36,10 +36,7 @@ import com.google.common.base.Strings;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Maps;
 import org.apache.commons.io.IOUtils;
-import org.apache.jackrabbit.core.data.Backend;
 import org.apache.jackrabbit.core.data.DataStore;
-import org.apache.jackrabbit.oak.blob.cloud.aws.s3.S3Backend;
-import org.apache.jackrabbit.oak.blob.cloud.aws.s3.S3DataStore;
 import org.apache.jackrabbit.oak.blob.cloud.aws.s3.SharedS3DataStore;
 import org.apache.jackrabbit.oak.commons.PropertiesUtil;
 import org.apache.jackrabbit.oak.plugins.blob.datastore.DataStoreUtils;
@@ -47,21 +44,26 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * Extension to {@link DataStoreUtils} to enable S3 extensions for cleaning.
+ * Extension to {@link DataStoreUtils} to enable S3 extensions for cleaning and initialization.
  */
 public class S3DataStoreUtils extends DataStoreUtils {
     private static final Logger log = LoggerFactory.getLogger(S3DataStoreUtils.class);
 
     private static final String DEFAULT_CONFIG_PATH = "./src/test/resources/aws.properties";
 
+    private static Class JR2_S3 = SharedS3DataStore.class;
+    private static Class S3 = S3DataStore.class;
+
     public static List<String> getFixtures() {
-        return ImmutableList.of(SharedS3DataStore.class.getName());
+        return ImmutableList.of(
+            S3.getName(),
+            JR2_S3.getName());
     }
 
     public static boolean isS3DataStore() {
         String dsName = System.getProperty(DS_CLASS_NAME);
-        boolean s3Class =  (dsName != null) && (dsName.equals(S3DataStore.class.getName()) || dsName
-            .equals(SharedS3DataStore.class.getName()));
+        boolean s3Class =
+            (dsName != null) && (dsName.equals(S3.getName()) || dsName.equals(JR2_S3.getName()));
         if (!isS3Configured()) {
             return false;
         }
@@ -120,8 +122,10 @@ public class S3DataStoreUtils extends Da
         DataStore ds = Class.forName(className).asSubclass(DataStore.class).newInstance();
         PropertiesUtil.populate(ds, Maps.fromProperties(props), false);
         // Set the props object
-        if (SharedS3DataStore.class.getName().equals(className)) {
-            ((SharedS3DataStore) ds).setProperties(props);
+        if (S3.getName().equals(className)) {
+            ((S3DataStore) ds).setProperties(props);
+        } else if (JR2_S3.getName().equals(className)) {
+            ((org.apache.jackrabbit.oak.blob.cloud.aws.s3.SharedS3DataStore) ds).setProperties(props);
         }
         ds.init(homeDir);
 
@@ -132,23 +136,6 @@ public class S3DataStoreUtils extends Da
         return getS3DataStore(className, getS3Config(), homeDir);
     }
 
-    /**
-     * S3 specific cleanup
-     *
-     * @param dataStore the underlying DataStore instance
-     * @param date the date of initialization
-     * @throws Exception
-     */
-    public static void cleanup(DataStore dataStore, Date date) throws Exception {
-        if (dataStore instanceof S3DataStore) {
-            Backend backend = ((S3DataStore) dataStore).getBackend();
-            if (backend instanceof S3Backend) {
-                String bucket = ((S3Backend) backend).getBucket();
-                deleteBucket(bucket, date);
-            }
-        }
-    }
-
     public static void deleteBucket(String bucket, Date date) throws Exception {
         log.info("cleaning bucket [" + bucket + "]");
         Properties props = getS3Config();

Modified: jackrabbit/oak/trunk/oak-blob-cloud/src/test/java/org/apache/jackrabbit/oak/blob/cloud/s3/TestS3DataStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-blob-cloud/src/test/java/org/apache/jackrabbit/oak/blob/cloud/s3/TestS3DataStore.java?rev=1766338&r1=1766337&r2=1766338&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-blob-cloud/src/test/java/org/apache/jackrabbit/oak/blob/cloud/s3/TestS3DataStore.java (original)
+++ jackrabbit/oak/trunk/oak-blob-cloud/src/test/java/org/apache/jackrabbit/oak/blob/cloud/s3/TestS3DataStore.java Mon Oct 24 04:51:26 2016
@@ -23,9 +23,7 @@ import java.util.Properties;
 
 import javax.jcr.RepositoryException;
 
-import com.google.common.base.Strings;
 import org.apache.jackrabbit.core.data.DataStore;
-import org.junit.After;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
@@ -61,8 +59,6 @@ public class TestS3DataStore {
 
     private File dataStoreDir;
 
-    private String bucket;
-
     private DataStore ds;
 
     @Parameterized.Parameters(name = "{index}: ({0})")
@@ -77,17 +73,6 @@ public class TestS3DataStore {
         startTime = new Date();
     }
 
-    @After
-    public void tearDown() {
-        try {
-            if (ds != null && !Strings.isNullOrEmpty(bucket)) {
-                S3DataStoreUtils.deleteBucket(bucket, startTime);
-            }
-        } catch (Exception ignore) {
-
-        }
-    }
-
     @Test
     public void testAccessParamLeakOnError() throws Exception {
         expectedEx.expect(RepositoryException.class);

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/datastore/AbstractDataStoreService.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/datastore/AbstractDataStoreService.java?rev=1766338&r1=1766337&r2=1766338&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/datastore/AbstractDataStoreService.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/datastore/AbstractDataStoreService.java Mon Oct 24 04:51:26 2016
@@ -53,6 +53,9 @@ import static org.apache.jackrabbit.oak.
 
 @Component(componentAbstract = true)
 public abstract class AbstractDataStoreService {
+    public static final String JR2_CACHING_PROP = "oak.datastore.jr2caching";
+    protected final boolean JR2_CACHING = Boolean.getBoolean(JR2_CACHING_PROP);
+
     private static final String PROP_HOME = "repository.home";
 
     public static final String PROP_ENCODE_LENGTH = "encodeLengthInId";