You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by am...@apache.org on 2015/03/16 05:57:22 UTC

svn commit: r1666862 - in /jackrabbit/oak/trunk: oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/aws/s3/ oak-core/ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins...

Author: amitj
Date: Mon Mar 16 04:57:22 2015
New Revision: 1666862

URL: http://svn.apache.org/r1666862
Log:
OAK-2494: Shared DataStore GC support for S3DataStore

New SharedS3DataStore implements methods for supporting Shared data store.
Added S3 cleanup to tests
Added optional dependency on oak-blob-cloud to oak-core/pom.xml
New SharedS3DataStoreService to use the SharedS3DataStore

Added:
    jackrabbit/oak/trunk/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/aws/s3/SharedS3Backend.java   (with props)
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/datastore/SharedS3DataStore.java   (with props)
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/datastore/SharedS3DataStoreService.java   (with props)
Modified:
    jackrabbit/oak/trunk/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/aws/s3/S3Backend.java
    jackrabbit/oak/trunk/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/aws/s3/S3DataStore.java
    jackrabbit/oak/trunk/oak-core/pom.xml
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/MarkSweepGarbageCollector.java
    jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/datastore/DataStoreUtils.java
    jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/MongoBlobGCTest.java
    jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/SharedBlobStoreGCTest.java
    jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/blob/ds/MongoDataStoreBlobGCTest.java
    jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/SegmentDataStoreBlobGCTest.java

Modified: jackrabbit/oak/trunk/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/aws/s3/S3Backend.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/aws/s3/S3Backend.java?rev=1666862&r1=1666861&r2=1666862&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/aws/s3/S3Backend.java (original)
+++ jackrabbit/oak/trunk/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/aws/s3/S3Backend.java Mon Mar 16 04:57:22 2015
@@ -16,6 +16,7 @@
  */
 
 package org.apache.jackrabbit.oak.blob.cloud.aws.s3;
+
 import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
@@ -31,13 +32,15 @@ import java.util.concurrent.ExecutorServ
 import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+
+import com.amazonaws.services.s3.model.ListObjectsRequest;
 import org.apache.jackrabbit.core.data.AsyncTouchCallback;
 import org.apache.jackrabbit.core.data.AsyncTouchResult;
 import org.apache.jackrabbit.core.data.AsyncUploadCallback;
 import org.apache.jackrabbit.core.data.AsyncUploadResult;
-import org.apache.jackrabbit.core.data.Backend;
 import org.apache.jackrabbit.core.data.CachingDataStore;
 import org.apache.jackrabbit.core.data.DataIdentifier;
+import org.apache.jackrabbit.core.data.DataRecord;
 import org.apache.jackrabbit.core.data.DataStoreException;
 import org.apache.jackrabbit.core.data.util.NamedThreadFactory;
 import org.slf4j.Logger;
@@ -64,7 +67,7 @@ import com.amazonaws.services.s3.transfe
 /**
  * A data store backend that stores data on Amazon S3.
  */
-public class S3Backend implements Backend {
+public class S3Backend implements SharedS3Backend {
 
     /**
      * Logger instance.
@@ -73,6 +76,8 @@ public class S3Backend implements Backen
 
     private static final String KEY_PREFIX = "dataStore_";
 
+    private static final String META_KEY_PREFIX = "META/";
+
     private AmazonS3Client s3service;
 
     private String bucket;
@@ -400,7 +405,7 @@ public class S3Backend implements Backen
             while (true) {
                 for (S3ObjectSummary s3ObjSumm : prevObjectListing.getObjectSummaries()) {
                     String id = getIdentifierName(s3ObjSumm.getKey());
-                    if (id != null) {
+                    if (id != null && !id.startsWith(META_KEY_PREFIX)) {
                         ids.add(new DataIdentifier(id));
                     }
                 }
@@ -511,23 +516,21 @@ public class S3Backend implements Backen
             while (true) {
                 List<DeleteObjectsRequest.KeyVersion> deleteList = new ArrayList<DeleteObjectsRequest.KeyVersion>();
                 for (S3ObjectSummary s3ObjSumm : prevObjectListing.getObjectSummaries()) {
-                    DataIdentifier identifier = new DataIdentifier(
-                        getIdentifierName(s3ObjSumm.getKey()));
-                    long lastModified = s3ObjSumm.getLastModified().getTime();
-                    LOG.debug("Identifier [{}]'s lastModified = [{}]", identifier, lastModified);
-                    if (lastModified < min
-                        && store.confirmDelete(identifier)
-                         // confirm once more that record's lastModified < min
-                        //  order is important here
-                        && s3service.getObjectMetadata(bucket,
-                            s3ObjSumm.getKey()).getLastModified().getTime() < min) {
+                    if (!s3ObjSumm.getKey().startsWith(META_KEY_PREFIX)) {
+                        DataIdentifier identifier = new DataIdentifier(getIdentifierName(s3ObjSumm.getKey()));
+                        long lastModified = s3ObjSumm.getLastModified().getTime();
+                        LOG.debug("Identifier [{}]'s lastModified = [{}]", identifier, lastModified);
+                        if (lastModified < min && store.confirmDelete(identifier)
+                            // confirm once more that record's lastModified < min
+                            //  order is important here
+                            && s3service.getObjectMetadata(bucket, s3ObjSumm.getKey()).getLastModified().getTime() <
+                            min) {
 
 
-                        LOG.debug("add id [{}] to delete lists",
-                            s3ObjSumm.getKey());
-                        deleteList.add(new DeleteObjectsRequest.KeyVersion(
-                            s3ObjSumm.getKey()));
-                        deleteIdSet.add(identifier);
+                            LOG.debug("add id [{}] to delete lists", s3ObjSumm.getKey());
+                            deleteList.add(new DeleteObjectsRequest.KeyVersion(s3ObjSumm.getKey()));
+                            deleteIdSet.add(identifier);
+                        }
                     }
                 }
                 if (deleteList.size() > 0) {
@@ -592,6 +595,155 @@ public class S3Backend implements Backen
         this.properties = properties;
     }
 
+    public void addMetadataRecord(final InputStream input, final String name) throws DataStoreException {
+        ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
+
+        try {
+            Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
+
+            Upload upload = tmx.upload(s3ReqDecorator
+                .decorate(new PutObjectRequest(bucket, addMetaKeyPrefix(name), input, new ObjectMetadata())));
+            upload.waitForUploadResult();
+        } catch (InterruptedException e) {
+            LOG.error("Error in uploading", e);
+            throw new DataStoreException("Error in uploading", e);
+        } finally {
+            if (contextClassLoader != null) {
+                Thread.currentThread().setContextClassLoader(contextClassLoader);
+            }
+        }
+    }
+
+    public DataRecord getMetadataRecord(String name) {
+        ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
+        try {
+            Thread.currentThread().setContextClassLoader(
+                getClass().getClassLoader());
+            ObjectMetadata meta = s3service.getObjectMetadata(bucket, addMetaKeyPrefix(name));
+            return new S3DataRecord(s3service, bucket, name,
+                meta.getLastModified().getTime(), meta.getContentLength());
+        } finally {
+            if (contextClassLoader != null) {
+                Thread.currentThread().setContextClassLoader(contextClassLoader);
+            }
+        }
+    }
+
+    public List<DataRecord> getAllMetadataRecords(String prefix) {
+        List<DataRecord> metadataList = new ArrayList<DataRecord>();
+        ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
+        try {
+            Thread.currentThread().setContextClassLoader(
+                getClass().getClassLoader());
+            ListObjectsRequest listObjectsRequest =
+                new ListObjectsRequest().withBucketName(bucket).withPrefix(addMetaKeyPrefix(prefix));
+            ObjectListing prevObjectListing = s3service.listObjects(listObjectsRequest);
+            for (final S3ObjectSummary s3ObjSumm : prevObjectListing.getObjectSummaries()) {
+                metadataList.add(new S3DataRecord(s3service, bucket, stripMetaKeyPrefix(s3ObjSumm.getKey()),
+                    s3ObjSumm.getLastModified().getTime(), s3ObjSumm.getSize()));
+            }
+        } finally {
+            if (contextClassLoader != null) {
+                Thread.currentThread().setContextClassLoader(contextClassLoader);
+            }
+        }
+        return metadataList;
+    }
+
+    public boolean deleteMetadataRecord(String name) {
+        ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
+        try {
+            Thread.currentThread().setContextClassLoader(
+                getClass().getClassLoader());
+            s3service.deleteObject(bucket, addMetaKeyPrefix(name));
+        } finally {
+            if (contextClassLoader != null) {
+                Thread.currentThread().setContextClassLoader(contextClassLoader);
+            }
+        }
+        return true;
+    }
+
+    public void deleteAllMetadataRecords(String prefix) {
+        ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
+        try {
+            Thread.currentThread().setContextClassLoader(
+                getClass().getClassLoader());
+
+            ListObjectsRequest listObjectsRequest =
+                new ListObjectsRequest().withBucketName(bucket).withPrefix(addMetaKeyPrefix(prefix));
+            ObjectListing metaList = s3service.listObjects(listObjectsRequest);
+            List<DeleteObjectsRequest.KeyVersion> deleteList = new ArrayList<DeleteObjectsRequest.KeyVersion>();
+            for (S3ObjectSummary s3ObjSumm : metaList.getObjectSummaries()) {
+                deleteList.add(new DeleteObjectsRequest.KeyVersion(s3ObjSumm.getKey()));
+            }
+            if (deleteList.size() > 0) {
+                DeleteObjectsRequest delObjsReq = new DeleteObjectsRequest(bucket);
+                delObjsReq.setKeys(deleteList);
+                DeleteObjectsResult dobjs = s3service.deleteObjects(delObjsReq);
+            }
+        } finally {
+            if (contextClassLoader != null) {
+                Thread.currentThread().setContextClassLoader(contextClassLoader);
+            }
+        }
+    }
+
+    private static String addMetaKeyPrefix(String key) {
+        return META_KEY_PREFIX + key;
+    }
+
+    private static String stripMetaKeyPrefix(String name) {
+        if (name.startsWith(META_KEY_PREFIX)) {
+            return name.substring(META_KEY_PREFIX.length());
+        }
+        return name;
+    }
+
+    /**
+     * S3DataRecord which lazily retrieves the input stream of the record.
+     */
+    static class S3DataRecord implements DataRecord {
+        private AmazonS3Client s3service;
+        private DataIdentifier identifier;
+        private long length;
+        private long lastModified;
+        private String bucket;
+
+        public S3DataRecord(AmazonS3Client s3service, String bucket, String key, long lastModified, long length) {
+            this.s3service = s3service;
+            this.identifier = new DataIdentifier(key);
+            this.lastModified = lastModified;
+            this.length = length;
+            this.bucket = bucket;
+        }
+
+        @Override
+        public DataIdentifier getIdentifier() {
+            return identifier;
+        }
+
+        @Override
+        public String getReference() {
+            return identifier.toString();
+        }
+
+        @Override
+        public long getLength() throws DataStoreException {
+            return length;
+        }
+
+        @Override
+        public InputStream getStream() throws DataStoreException {
+            return s3service.getObject(bucket, addMetaKeyPrefix(identifier.toString())).getObjectContent();
+        }
+
+        @Override
+        public long getLastModified() {
+            return lastModified;
+        }
+    }
+
     private void write(DataIdentifier identifier, File file,
             boolean asyncUpload, AsyncUploadCallback callback)
             throws DataStoreException {
@@ -785,6 +937,8 @@ public class S3Backend implements Backen
     private static String getIdentifierName(String key) {
         if (!key.contains(Utils.DASH)) {
             return null;
+        } else if (key.contains(META_KEY_PREFIX)) {
+            return key;
         }
         return key.substring(0, 4) + key.substring(5);
     }

Modified: jackrabbit/oak/trunk/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/aws/s3/S3DataStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/aws/s3/S3DataStore.java?rev=1666862&r1=1666861&r2=1666862&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/aws/s3/S3DataStore.java (original)
+++ jackrabbit/oak/trunk/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/aws/s3/S3DataStore.java Mon Mar 16 04:57:22 2015
@@ -25,7 +25,7 @@ import org.apache.jackrabbit.core.data.C
  * An Amazon S3 data store.
  */
 public class S3DataStore extends CachingDataStore {
-    private Properties properties;
+    protected Properties properties;
 
     @Override
     protected Backend createBackend() {

Added: jackrabbit/oak/trunk/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/aws/s3/SharedS3Backend.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/aws/s3/SharedS3Backend.java?rev=1666862&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/aws/s3/SharedS3Backend.java (added)
+++ jackrabbit/oak/trunk/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/aws/s3/SharedS3Backend.java Mon Mar 16 04:57:22 2015
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.blob.cloud.aws.s3;
+
+import org.apache.jackrabbit.core.data.Backend;
+import org.apache.jackrabbit.core.data.DataRecord;
+import org.apache.jackrabbit.core.data.DataStoreException;
+
+import java.io.InputStream;
+import java.util.List;
+
+/**
+ * Extension to the {@link org.apache.jackrabbit.core.data.Backend} for supporting adding meta data to the underlying
+ * store.
+ */
+public interface SharedS3Backend extends Backend {
+    /**
+     * Adds a metadata record with the specified name
+     *
+     * @param input the record input stream
+     * @param name the name
+     * @throws org.apache.jackrabbit.core.data.DataStoreException
+     */
+    public void addMetadataRecord(final InputStream input, final String name) throws DataStoreException;
+
+    /**
+     * Gets the metadata of the specified name.
+     *
+     * @param name the name of the record
+     * @return the metadata DataRecord
+     */
+    public DataRecord getMetadataRecord(String name);
+
+    /**
+     * Gets all the metadata with a specified prefix.
+     *
+     * @param prefix the prefix of the records to retrieve
+     * @return list of all the metadata DataRecords
+     */
+    public List<DataRecord> getAllMetadataRecords(String prefix);
+
+    /**
+     * Deletes the metadata record with the specified name
+     *
+     * @param name the name of the record
+     * @return boolean to indicate success of deletion
+     */
+    public boolean deleteMetadataRecord(String name);
+
+    /**
+     * Deletes all the metadata records with the specified prefix.
+     *
+     * @param prefix the prefix of the record
+     */
+    public void deleteAllMetadataRecords(String prefix);
+}

Propchange: jackrabbit/oak/trunk/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/aws/s3/SharedS3Backend.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: jackrabbit/oak/trunk/oak-core/pom.xml
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/pom.xml?rev=1666862&r1=1666861&r2=1666862&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/pom.xml (original)
+++ jackrabbit/oak/trunk/oak-core/pom.xml Mon Mar 16 04:57:22 2015
@@ -209,7 +209,12 @@
       <artifactId>oak-blob</artifactId>
       <version>${project.version}</version>
     </dependency>
-
+    <dependency>
+      <groupId>org.apache.jackrabbit</groupId>
+      <artifactId>oak-blob-cloud</artifactId>
+      <version>${project.version}</version>
+      <optional>true</optional>
+    </dependency>
     <!-- General utility libraries -->
     <dependency>
       <groupId>com.google.guava</groupId>

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/MarkSweepGarbageCollector.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/MarkSweepGarbageCollector.java?rev=1666862&r1=1666861&r2=1666862&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/MarkSweepGarbageCollector.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/MarkSweepGarbageCollector.java Mon Mar 16 04:57:22 2015
@@ -485,6 +485,7 @@ public class MarkSweepGarbageCollector i
                 if (!ids.isEmpty()) {
                     blobsCount += ids.size();
                     saveBatchToFile(ids, bufferWriter);
+                    LOG.debug("retrieved {} blobs", blobsCount);
                 }
 
                 // sort the file

Added: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/datastore/SharedS3DataStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/datastore/SharedS3DataStore.java?rev=1666862&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/datastore/SharedS3DataStore.java (added)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/datastore/SharedS3DataStore.java Mon Mar 16 04:57:22 2015
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.blob.datastore;
+
+import org.apache.jackrabbit.core.data.Backend;
+import org.apache.jackrabbit.core.data.DataRecord;
+import org.apache.jackrabbit.core.data.DataStoreException;
+import org.apache.jackrabbit.oak.blob.cloud.aws.s3.S3Backend;
+import org.apache.jackrabbit.oak.blob.cloud.aws.s3.S3DataStore;
+import org.apache.jackrabbit.oak.plugins.blob.SharedDataStore;
+
+import java.io.InputStream;
+import java.util.List;
+
+/**
+ * {@link org.apache.jackrabbit.oak.plugins.blob.SharedDataStore} implementation for S3
+ */
+public class SharedS3DataStore extends S3DataStore implements SharedDataStore {
+    protected S3Backend backend;
+
+    @Override
+    protected Backend createBackend() {
+        backend = new S3Backend();
+        if(properties != null){
+            backend.setProperties(properties);
+        }
+        return backend;
+    }
+
+    @Override
+    public void addMetadataRecord(InputStream stream, String name) throws DataStoreException {
+        backend.addMetadataRecord(stream, name);
+    }
+
+    @Override
+    public DataRecord getMetadataRecord(String name) {
+        return backend.getMetadataRecord(name);
+    }
+
+    @Override
+    public List<DataRecord> getAllMetadataRecords(String prefix) {
+        return backend.getAllMetadataRecords(prefix);
+    }
+
+    @Override
+    public boolean deleteMetadataRecord(String name) {
+        return backend.deleteMetadataRecord(name);
+    }
+
+    @Override
+    public void deleteAllMetadataRecords(String prefix) {
+        backend.deleteAllMetadataRecords(prefix);
+    }
+
+    @Override
+    public Type getType() {
+        return Type.SHARED;
+    }
+}

Propchange: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/datastore/SharedS3DataStore.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/datastore/SharedS3DataStoreService.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/datastore/SharedS3DataStoreService.java?rev=1666862&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/datastore/SharedS3DataStoreService.java (added)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/datastore/SharedS3DataStoreService.java Mon Mar 16 04:57:22 2015
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.plugins.blob.datastore;
+
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.ConfigurationPolicy;
+import org.apache.jackrabbit.core.data.DataStore;
+import org.osgi.service.component.ComponentContext;
+
+import java.util.Map;
+import java.util.Properties;
+
+@Component(policy = ConfigurationPolicy.REQUIRE, name = SharedS3DataStoreService.NAME)
+public class SharedS3DataStoreService extends AbstractDataStoreService{
+    public static final String NAME = "org.apache.jackrabbit.oak.plugins.blob.datastore.SharedS3DataStore";
+
+    @Override
+    protected DataStore createDataStore(ComponentContext context, Map<String, Object> config) {
+        SharedS3DataStore dataStore = new SharedS3DataStore();
+
+        Properties properties = new Properties();
+        properties.putAll(config);
+
+        dataStore.setProperties(properties);
+        return dataStore;
+    }
+
+    @Override
+    protected String[] getDescription() {
+        return new String[] {"type=SharedS3"};
+    }
+}

Propchange: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/datastore/SharedS3DataStoreService.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/datastore/DataStoreUtils.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/datastore/DataStoreUtils.java?rev=1666862&r1=1666861&r2=1666862&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/datastore/DataStoreUtils.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/datastore/DataStoreUtils.java Mon Mar 16 04:57:22 2015
@@ -17,16 +17,28 @@
 package org.apache.jackrabbit.oak.plugins.blob.datastore;
 
 import java.io.File;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
 import java.util.Map;
+import java.util.Properties;
 
+import com.amazonaws.services.s3.AmazonS3Client;
+import com.amazonaws.services.s3.model.DeleteObjectsRequest;
+import com.amazonaws.services.s3.model.ObjectListing;
+import com.amazonaws.services.s3.model.S3ObjectSummary;
+import com.amazonaws.services.s3.transfer.TransferManager;
 import com.google.common.collect.Maps;
+import org.apache.jackrabbit.core.data.Backend;
 import org.apache.jackrabbit.core.data.DataStore;
 import org.apache.jackrabbit.core.data.FileDataStore;
+import org.apache.jackrabbit.oak.blob.cloud.aws.s3.S3Backend;
+import org.apache.jackrabbit.oak.blob.cloud.aws.s3.S3DataStore;
+import org.apache.jackrabbit.oak.blob.cloud.aws.s3.Utils;
 import org.apache.jackrabbit.oak.commons.PropertiesUtil;
-import org.apache.jackrabbit.oak.plugins.blob.datastore.DataStoreBlobStore;
-import org.apache.jackrabbit.oak.plugins.blob.datastore.OakFileDataStore;
-import org.apache.jackrabbit.oak.plugins.document.AbstractMongoConnectionTest;
 import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import static org.apache.commons.io.FilenameUtils.concat;
 import static org.junit.Assert.assertEquals;
@@ -41,6 +53,8 @@ import static org.junit.Assert.assertEqu
  * the system property as 'ds.minRecordLength'
  */
 public class DataStoreUtils {
+    private static final Logger log = LoggerFactory.getLogger(DataStoreUtils.class);
+
     public static final String DS_CLASS_NAME = "dataStore";
 
     private static final String DS_PROP_PREFIX = "ds.";
@@ -49,7 +63,7 @@ public class DataStoreUtils {
     /**
      * By default create a default directory. But if overridden will need to be unset
      */
-    private static long time = -1;
+    public static long time = -1;
 
     public static DataStoreBlobStore getBlobStore() throws Exception {
         String className = System.getProperty(DS_CLASS_NAME, OakFileDataStore.class.getName());
@@ -59,6 +73,12 @@ public class DataStoreUtils {
         return new DataStoreBlobStore(ds);
     }
 
+    public static boolean isS3DataStore() {
+        String dsName = System.getProperty(DS_CLASS_NAME);
+        return (dsName != null) && (dsName.equals(S3DataStore.class.getName()) || dsName
+            .equals(SharedS3DataStore.class.getName()));
+    }
+
     private static Map<String, ?> getConfig() {
         Map<String, Object> result = Maps.newHashMap();
         for (Map.Entry<String, ?> e : Maps.fromProperties(System.getProperties()).entrySet()) {
@@ -73,7 +93,7 @@ public class DataStoreUtils {
 
     public static String getHomeDir() {
         return concat(new File(".").getAbsolutePath(), "target/blobstore/" +
-            (time == -1 ? 0 : System.currentTimeMillis()));
+            (time == -1 ? 0 : time));
     }
 
     @Test
@@ -84,4 +104,56 @@ public class DataStoreUtils {
         DataStoreBlobStore dbs = getBlobStore();
         assertEquals(1000, dbs.getDataStore().getMinRecordLength());
     }
+
+    /**
+     * S3 specific cleanup
+     *
+     * @param dataStore the underlying DataStore instance
+     * @param date the date of initialization
+     * @throws Exception
+     */
+    public static void cleanup(DataStore dataStore, Date date) throws Exception {
+        if (dataStore instanceof S3DataStore) {
+            Backend backend = ((S3DataStore) dataStore).getBackend();
+            if (backend instanceof S3Backend) {
+                String bucket = ((S3Backend) backend).getBucket();
+                deleteBucket(bucket, date);
+            }
+        }
+    }
+
+    private static void deleteBucket(String bucket, Date date) throws Exception {
+        log.info("cleaning bucket [" + bucket + "]");
+        Properties props = Utils.readConfig((String) getConfig().get("config"));
+        AmazonS3Client s3service = Utils.openService(props);
+        TransferManager tmx = new TransferManager(s3service);
+        if (s3service.doesBucketExist(bucket)) {
+            for (int i = 0; i < 4; i++) {
+                tmx.abortMultipartUploads(bucket, date);
+                ObjectListing prevObjectListing = s3service.listObjects(bucket);
+                while (prevObjectListing != null ) {
+                    List<DeleteObjectsRequest.KeyVersion>
+                        deleteList = new ArrayList<DeleteObjectsRequest.KeyVersion>();
+                    for (S3ObjectSummary s3ObjSumm : prevObjectListing.getObjectSummaries()) {
+                        deleteList.add(new DeleteObjectsRequest.KeyVersion(
+                            s3ObjSumm.getKey()));
+                    }
+                    if (deleteList.size() > 0) {
+                        DeleteObjectsRequest delObjsReq = new DeleteObjectsRequest(
+                            bucket);
+                        delObjsReq.setKeys(deleteList);
+                        s3service.deleteObjects(delObjsReq);
+                    }
+                    if (!prevObjectListing.isTruncated()) break;
+                    prevObjectListing = s3service.listNextBatchOfObjects(prevObjectListing);
+                }
+            }
+            //s3service.deleteBucket(bucket);
+            log.info("bucket [ " + bucket + "] cleaned");
+        } else {
+            log.info("bucket [" + bucket + "] doesn't exists");
+        }
+        tmx.shutdownNow();
+        s3service.shutdown();
+    }
 }

Modified: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/MongoBlobGCTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/MongoBlobGCTest.java?rev=1666862&r1=1666861&r2=1666862&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/MongoBlobGCTest.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/MongoBlobGCTest.java Mon Mar 16 04:57:22 2015
@@ -47,8 +47,6 @@ import org.apache.jackrabbit.oak.spi.sta
 import org.apache.jackrabbit.oak.stats.Clock;
 import org.junit.Test;
 
-import static org.junit.Assert.assertTrue;
-
 /**
  * Tests for MongoMK GC
  */
@@ -73,7 +71,7 @@ public class MongoBlobGCTest extends Abs
             }
         }
         for (int i = 0; i < number; i++) {
-            Blob b = s.createBlob(randomStream(i, 4160));
+            Blob b = s.createBlob(randomStream(i, 16516));
             if (!processed.contains(i)) {
                 Iterator<String> idIter =
                         ((GarbageCollectableBlobStore) s.getBlobStore())
@@ -165,6 +163,7 @@ public class MongoBlobGCTest extends Abs
                 (GarbageCollectableBlobStore) store.getBlobStore(),
                 MoreExecutors.sameThreadExecutor(),
                 "./target", 5, 0, repoId);
+        Thread.sleep(4000);
         gc.collectGarbage(false);
 
         Set<String> existingAfterGC = iterate();

Modified: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/SharedBlobStoreGCTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/SharedBlobStoreGCTest.java?rev=1666862&r1=1666861&r2=1666862&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/SharedBlobStoreGCTest.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/SharedBlobStoreGCTest.java Mon Mar 16 04:57:22 2015
@@ -23,6 +23,7 @@ import java.io.ByteArrayInputStream;
 import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
+import java.util.Date;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
@@ -37,10 +38,12 @@ import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.MoreExecutors;
 
 import org.apache.commons.io.FileUtils;
+import org.apache.jackrabbit.core.data.DataStore;
 import org.apache.jackrabbit.oak.api.Blob;
 import org.apache.jackrabbit.oak.plugins.blob.BlobGarbageCollector;
 import org.apache.jackrabbit.oak.plugins.blob.MarkSweepGarbageCollector;
 import org.apache.jackrabbit.oak.plugins.blob.SharedDataStore;
+import org.apache.jackrabbit.oak.plugins.blob.datastore.DataStoreBlobStore;
 import org.apache.jackrabbit.oak.plugins.blob.datastore.SharedDataStoreUtils.SharedStoreRecordType;
 import org.apache.jackrabbit.oak.plugins.document.VersionGarbageCollector.VersionGCStats;
 import org.apache.jackrabbit.oak.plugins.blob.datastore.DataStoreUtils;
@@ -55,19 +58,26 @@ import org.apache.jackrabbit.oak.stats.C
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Test for gc in a shared data store among hetrogeneous oak node stores.
  */
 public class SharedBlobStoreGCTest {
+    private static final Logger log = LoggerFactory.getLogger(SharedBlobStoreGCTest.class);
+
     private Cluster cluster1;
     private Cluster cluster2;
     private Clock clock;
 
     @Before
     public void setUp() throws Exception {
+        log.debug("In setUp()");
+
         clock = new Clock.Virtual();
         clock.waitUntil(Revision.getCurrentTimestamp());
+        DataStoreUtils.time = clock.getTime();
 
         BlobStore blobeStore1 = DataStoreUtils.getBlobStore();
         DocumentNodeStore ds1 = new DocumentMK.Builder()
@@ -95,8 +105,11 @@ public class SharedBlobStoreGCTest {
 
         cluster1 = new Cluster(ds1, repoId1, 20);
         cluster1.init();
+        log.debug("Initialized {}", cluster1);
+
         cluster2 = new Cluster(ds2, repoId2, 100);
         cluster2.init();
+        log.debug("Initialized {}", cluster2);
     }
 
     static InputStream randomStream(int seed, int size) {
@@ -108,6 +121,7 @@ public class SharedBlobStoreGCTest {
 
     @Test
     public void testGC() throws Exception {
+        log.debug("Running testGC()");
         // Only run the mark phase on both the clusters
         cluster1.gc.collectGarbage(true);
         cluster2.gc.collectGarbage(true);
@@ -122,21 +136,25 @@ public class SharedBlobStoreGCTest {
     @Test
     // GC should fail
     public void testOnly1ClusterMark() throws Exception {
+        log.debug("Running testOnly1ClusterMark()");
+
         // Only run the mark phase on one cluster
         cluster1.gc.collectGarbage(true);
 
         // Execute the gc with sweep
         cluster1.gc.collectGarbage(false);
 
-        Assert.assertTrue(
-            (cluster1.getInitBlobs().size() + cluster2.getInitBlobs().size()) < cluster1.getExistingBlobIds().size());
         Set<String> existing = cluster1.getExistingBlobIds();
+        log.debug("Existing blobs {}", existing);
+        Assert.assertTrue((cluster1.getInitBlobs().size() + cluster2.getInitBlobs().size()) <= existing.size());
         Assert.assertTrue(existing.containsAll(cluster2.getInitBlobs()));
         Assert.assertTrue(existing.containsAll(cluster1.getInitBlobs()));
     }
 
     @Test
     public void testRepeatedMarkWithSweep() throws Exception {
+        log.debug("Running testRepeatedMarkWithSweep()");
+
         // Only run the mark phase on one cluster
         cluster1.gc.collectGarbage(true);
         cluster2.gc.collectGarbage(true);
@@ -151,14 +169,17 @@ public class SharedBlobStoreGCTest {
     }
 
     @After
-    public void tearDown() throws IOException {
-        FileUtils.cleanDirectory(new File(DataStoreUtils.getHomeDir()));
+    public void tearDown() throws Exception {
+        DataStoreUtils.cleanup(cluster1.getDataStore(), cluster1.getDate());
+        FileUtils.cleanDirectory((new File(DataStoreUtils.getHomeDir())).getParentFile());
+        DataStoreUtils.time = -1;
     }
 
     class Cluster {
         private DocumentNodeStore ds;
         private int seed;
         private BlobGarbageCollector gc;
+        private Date startDate;
 
         private Set<String> initBlobs = new HashSet<String>();
 
@@ -180,7 +201,7 @@ public class SharedBlobStoreGCTest {
                     gc.collectGarbage(markOnly);
                 }
             };
-
+            this.startDate = new Date();
             this.seed = seed;
         }
 
@@ -203,7 +224,7 @@ public class SharedBlobStoreGCTest {
                 }
             }
             for (int i = 0; i < number; i++) {
-                Blob b = ds.createBlob(randomStream(i + seed, 4160));
+                Blob b = ds.createBlob(randomStream(i + seed, 16516));
                 if (!deletes.contains(i)) {
                     Iterator<String> idIter =
                             ((GarbageCollectableBlobStore) ds.getBlobStore())
@@ -228,6 +249,10 @@ public class SharedBlobStoreGCTest {
             VersionGarbageCollector vGC = ds.getVersionGarbageCollector();
             VersionGCStats stats = vGC.gc(0, TimeUnit.MILLISECONDS);
             Assert.assertEquals(deletes.size(), stats.deletedDocGCCount);
+
+            if (DataStoreUtils.isS3DataStore()) {
+                Thread.sleep(1000);
+            }
         }
 
         public Set<String> getExistingBlobIds() throws Exception {
@@ -240,6 +265,14 @@ public class SharedBlobStoreGCTest {
             }
             return existing;
         }
+
+        public DataStore getDataStore() {
+            return ((DataStoreBlobStore) ds.getBlobStore()).getDataStore();
+        }
+
+        public Date getDate() {
+            return startDate;
+        }
     }
 }
 

Modified: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/blob/ds/MongoDataStoreBlobGCTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/blob/ds/MongoDataStoreBlobGCTest.java?rev=1666862&r1=1666861&r2=1666862&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/blob/ds/MongoDataStoreBlobGCTest.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/blob/ds/MongoDataStoreBlobGCTest.java Mon Mar 16 04:57:22 2015
@@ -17,6 +17,7 @@
 package org.apache.jackrabbit.oak.plugins.document.blob.ds;
 
 import java.io.File;
+import java.util.Date;
 
 import org.apache.commons.io.FileUtils;
 import org.apache.jackrabbit.oak.plugins.blob.datastore.DataStoreBlobStore;
@@ -34,6 +35,9 @@ import org.junit.BeforeClass;
  * 
  */
 public class MongoDataStoreBlobGCTest extends MongoBlobGCTest {
+    Date startDate;
+    DataStoreBlobStore blobStore;
+
     @BeforeClass
     public static void setUpBeforeClass() throws Exception {
         try {
@@ -46,15 +50,18 @@ public class MongoDataStoreBlobGCTest ex
     @Before
     @Override
     public void setUpConnection() throws Exception {
+        startDate = new Date();
         mongoConnection = MongoUtils.getConnection();
         MongoUtils.dropCollections(mongoConnection.getDB());
+        blobStore = DataStoreUtils.getBlobStore();
         mk = new DocumentMK.Builder().clock(getTestClock()).setMongoDB(mongoConnection.getDB())
-                .setBlobStore(DataStoreUtils.getBlobStore()).open();
+                .setBlobStore(blobStore).open();
     }
 
     @After
     @Override
     public void tearDownConnection() throws Exception {
+        DataStoreUtils.cleanup(blobStore.getDataStore(), startDate);
         FileUtils.deleteDirectory(new File(DataStoreUtils.getHomeDir()));
         mk.dispose();
         // the db might already be closed

Modified: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/SegmentDataStoreBlobGCTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/SegmentDataStoreBlobGCTest.java?rev=1666862&r1=1666861&r2=1666862&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/SegmentDataStoreBlobGCTest.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/SegmentDataStoreBlobGCTest.java Mon Mar 16 04:57:22 2015
@@ -26,6 +26,7 @@ import java.io.ByteArrayInputStream;
 import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
+import java.util.Date;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
@@ -69,6 +70,7 @@ public class SegmentDataStoreBlobGCTest
     SegmentNodeStore nodeStore;
     FileStore store;
     DataStoreBlobStore blobStore;
+    Date startDate;
 
     protected SegmentNodeStore getNodeStore(BlobStore blobStore) throws IOException {
         if (nodeStore == null) {
@@ -94,6 +96,7 @@ public class SegmentDataStoreBlobGCTest
     public HashSet<String> setUp() throws Exception {
         blobStore = DataStoreUtils.getBlobStore();
         nodeStore = getNodeStore(blobStore);
+        startDate = new Date();
 
         HashSet<String> set = new HashSet<String>();
 
@@ -208,10 +211,11 @@ public class SegmentDataStoreBlobGCTest
     }
 
     @After
-    public void close() throws IOException {
+    public void close() throws Exception {
         if (store != null) {
             store.close();
         }
+        DataStoreUtils.cleanup(blobStore.getDataStore(), startDate);
         FileUtils.deleteDirectory(getWorkDir());
         FileUtils.deleteDirectory(new File(DataStoreUtils.getHomeDir()));
     }