You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by am...@apache.org on 2015/03/16 05:57:22 UTC
svn commit: r1666862 - in /jackrabbit/oak/trunk:
oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/aws/s3/
oak-core/ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/
oak-core/src/main/java/org/apache/jackrabbit/oak/plugins...
Author: amitj
Date: Mon Mar 16 04:57:22 2015
New Revision: 1666862
URL: http://svn.apache.org/r1666862
Log:
OAK-2494: Shared DataStore GC support for S3DataStore
New SharedS3DataStore implements methods for supporting Shared data store.
Added S3 cleanup to tests
Added optional dependency on oak-blob-cloud to oak-core/pom.xml
New SharedS3DataStoreService to use the SharedS3DataStore
Added:
jackrabbit/oak/trunk/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/aws/s3/SharedS3Backend.java (with props)
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/datastore/SharedS3DataStore.java (with props)
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/datastore/SharedS3DataStoreService.java (with props)
Modified:
jackrabbit/oak/trunk/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/aws/s3/S3Backend.java
jackrabbit/oak/trunk/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/aws/s3/S3DataStore.java
jackrabbit/oak/trunk/oak-core/pom.xml
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/MarkSweepGarbageCollector.java
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/datastore/DataStoreUtils.java
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/MongoBlobGCTest.java
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/SharedBlobStoreGCTest.java
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/blob/ds/MongoDataStoreBlobGCTest.java
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/SegmentDataStoreBlobGCTest.java
Modified: jackrabbit/oak/trunk/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/aws/s3/S3Backend.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/aws/s3/S3Backend.java?rev=1666862&r1=1666861&r2=1666862&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/aws/s3/S3Backend.java (original)
+++ jackrabbit/oak/trunk/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/aws/s3/S3Backend.java Mon Mar 16 04:57:22 2015
@@ -16,6 +16,7 @@
*/
package org.apache.jackrabbit.oak.blob.cloud.aws.s3;
+
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
@@ -31,13 +32,15 @@ import java.util.concurrent.ExecutorServ
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+
+import com.amazonaws.services.s3.model.ListObjectsRequest;
import org.apache.jackrabbit.core.data.AsyncTouchCallback;
import org.apache.jackrabbit.core.data.AsyncTouchResult;
import org.apache.jackrabbit.core.data.AsyncUploadCallback;
import org.apache.jackrabbit.core.data.AsyncUploadResult;
-import org.apache.jackrabbit.core.data.Backend;
import org.apache.jackrabbit.core.data.CachingDataStore;
import org.apache.jackrabbit.core.data.DataIdentifier;
+import org.apache.jackrabbit.core.data.DataRecord;
import org.apache.jackrabbit.core.data.DataStoreException;
import org.apache.jackrabbit.core.data.util.NamedThreadFactory;
import org.slf4j.Logger;
@@ -64,7 +67,7 @@ import com.amazonaws.services.s3.transfe
/**
* A data store backend that stores data on Amazon S3.
*/
-public class S3Backend implements Backend {
+public class S3Backend implements SharedS3Backend {
/**
* Logger instance.
@@ -73,6 +76,8 @@ public class S3Backend implements Backen
private static final String KEY_PREFIX = "dataStore_";
+ private static final String META_KEY_PREFIX = "META/";
+
private AmazonS3Client s3service;
private String bucket;
@@ -400,7 +405,7 @@ public class S3Backend implements Backen
while (true) {
for (S3ObjectSummary s3ObjSumm : prevObjectListing.getObjectSummaries()) {
String id = getIdentifierName(s3ObjSumm.getKey());
- if (id != null) {
+ if (id != null && !id.startsWith(META_KEY_PREFIX)) {
ids.add(new DataIdentifier(id));
}
}
@@ -511,23 +516,21 @@ public class S3Backend implements Backen
while (true) {
List<DeleteObjectsRequest.KeyVersion> deleteList = new ArrayList<DeleteObjectsRequest.KeyVersion>();
for (S3ObjectSummary s3ObjSumm : prevObjectListing.getObjectSummaries()) {
- DataIdentifier identifier = new DataIdentifier(
- getIdentifierName(s3ObjSumm.getKey()));
- long lastModified = s3ObjSumm.getLastModified().getTime();
- LOG.debug("Identifier [{}]'s lastModified = [{}]", identifier, lastModified);
- if (lastModified < min
- && store.confirmDelete(identifier)
- // confirm once more that record's lastModified < min
- // order is important here
- && s3service.getObjectMetadata(bucket,
- s3ObjSumm.getKey()).getLastModified().getTime() < min) {
+ if (!s3ObjSumm.getKey().startsWith(META_KEY_PREFIX)) {
+ DataIdentifier identifier = new DataIdentifier(getIdentifierName(s3ObjSumm.getKey()));
+ long lastModified = s3ObjSumm.getLastModified().getTime();
+ LOG.debug("Identifier [{}]'s lastModified = [{}]", identifier, lastModified);
+ if (lastModified < min && store.confirmDelete(identifier)
+ // confirm once more that record's lastModified < min
+ // order is important here
+ && s3service.getObjectMetadata(bucket, s3ObjSumm.getKey()).getLastModified().getTime() <
+ min) {
- LOG.debug("add id [{}] to delete lists",
- s3ObjSumm.getKey());
- deleteList.add(new DeleteObjectsRequest.KeyVersion(
- s3ObjSumm.getKey()));
- deleteIdSet.add(identifier);
+ LOG.debug("add id [{}] to delete lists", s3ObjSumm.getKey());
+ deleteList.add(new DeleteObjectsRequest.KeyVersion(s3ObjSumm.getKey()));
+ deleteIdSet.add(identifier);
+ }
}
}
if (deleteList.size() > 0) {
@@ -592,6 +595,155 @@ public class S3Backend implements Backen
this.properties = properties;
}
+ public void addMetadataRecord(final InputStream input, final String name) throws DataStoreException {
+ ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
+
+ try {
+ Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
+
+ Upload upload = tmx.upload(s3ReqDecorator
+ .decorate(new PutObjectRequest(bucket, addMetaKeyPrefix(name), input, new ObjectMetadata())));
+ upload.waitForUploadResult();
+ } catch (InterruptedException e) {
+ LOG.error("Error in uploading", e);
+ throw new DataStoreException("Error in uploading", e);
+ } finally {
+ if (contextClassLoader != null) {
+ Thread.currentThread().setContextClassLoader(contextClassLoader);
+ }
+ }
+ }
+
+ public DataRecord getMetadataRecord(String name) {
+ ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
+ try {
+ Thread.currentThread().setContextClassLoader(
+ getClass().getClassLoader());
+ ObjectMetadata meta = s3service.getObjectMetadata(bucket, addMetaKeyPrefix(name));
+ return new S3DataRecord(s3service, bucket, name,
+ meta.getLastModified().getTime(), meta.getContentLength());
+ } finally {
+ if (contextClassLoader != null) {
+ Thread.currentThread().setContextClassLoader(contextClassLoader);
+ }
+ }
+ }
+
+ public List<DataRecord> getAllMetadataRecords(String prefix) {
+ List<DataRecord> metadataList = new ArrayList<DataRecord>();
+ ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
+ try {
+ Thread.currentThread().setContextClassLoader(
+ getClass().getClassLoader());
+ ListObjectsRequest listObjectsRequest =
+ new ListObjectsRequest().withBucketName(bucket).withPrefix(addMetaKeyPrefix(prefix));
+ ObjectListing prevObjectListing = s3service.listObjects(listObjectsRequest);
+ for (final S3ObjectSummary s3ObjSumm : prevObjectListing.getObjectSummaries()) {
+ metadataList.add(new S3DataRecord(s3service, bucket, stripMetaKeyPrefix(s3ObjSumm.getKey()),
+ s3ObjSumm.getLastModified().getTime(), s3ObjSumm.getSize()));
+ }
+ } finally {
+ if (contextClassLoader != null) {
+ Thread.currentThread().setContextClassLoader(contextClassLoader);
+ }
+ }
+ return metadataList;
+ }
+
+ public boolean deleteMetadataRecord(String name) {
+ ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
+ try {
+ Thread.currentThread().setContextClassLoader(
+ getClass().getClassLoader());
+ s3service.deleteObject(bucket, addMetaKeyPrefix(name));
+ } finally {
+ if (contextClassLoader != null) {
+ Thread.currentThread().setContextClassLoader(contextClassLoader);
+ }
+ }
+ return true;
+ }
+
+ public void deleteAllMetadataRecords(String prefix) {
+ ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
+ try {
+ Thread.currentThread().setContextClassLoader(
+ getClass().getClassLoader());
+
+ ListObjectsRequest listObjectsRequest =
+ new ListObjectsRequest().withBucketName(bucket).withPrefix(addMetaKeyPrefix(prefix));
+ ObjectListing metaList = s3service.listObjects(listObjectsRequest);
+ List<DeleteObjectsRequest.KeyVersion> deleteList = new ArrayList<DeleteObjectsRequest.KeyVersion>();
+ for (S3ObjectSummary s3ObjSumm : metaList.getObjectSummaries()) {
+ deleteList.add(new DeleteObjectsRequest.KeyVersion(s3ObjSumm.getKey()));
+ }
+ if (deleteList.size() > 0) {
+ DeleteObjectsRequest delObjsReq = new DeleteObjectsRequest(bucket);
+ delObjsReq.setKeys(deleteList);
+ DeleteObjectsResult dobjs = s3service.deleteObjects(delObjsReq);
+ }
+ } finally {
+ if (contextClassLoader != null) {
+ Thread.currentThread().setContextClassLoader(contextClassLoader);
+ }
+ }
+ }
+
+ private static String addMetaKeyPrefix(String key) {
+ return META_KEY_PREFIX + key;
+ }
+
+ private static String stripMetaKeyPrefix(String name) {
+ if (name.startsWith(META_KEY_PREFIX)) {
+ return name.substring(META_KEY_PREFIX.length());
+ }
+ return name;
+ }
+
+ /**
+ * S3DataRecord which lazily retrieves the input stream of the record.
+ */
+ static class S3DataRecord implements DataRecord {
+ private AmazonS3Client s3service;
+ private DataIdentifier identifier;
+ private long length;
+ private long lastModified;
+ private String bucket;
+
+ public S3DataRecord(AmazonS3Client s3service, String bucket, String key, long lastModified, long length) {
+ this.s3service = s3service;
+ this.identifier = new DataIdentifier(key);
+ this.lastModified = lastModified;
+ this.length = length;
+ this.bucket = bucket;
+ }
+
+ @Override
+ public DataIdentifier getIdentifier() {
+ return identifier;
+ }
+
+ @Override
+ public String getReference() {
+ return identifier.toString();
+ }
+
+ @Override
+ public long getLength() throws DataStoreException {
+ return length;
+ }
+
+ @Override
+ public InputStream getStream() throws DataStoreException {
+ return s3service.getObject(bucket, addMetaKeyPrefix(identifier.toString())).getObjectContent();
+ }
+
+ @Override
+ public long getLastModified() {
+ return lastModified;
+ }
+ }
+
private void write(DataIdentifier identifier, File file,
boolean asyncUpload, AsyncUploadCallback callback)
throws DataStoreException {
@@ -785,6 +937,8 @@ public class S3Backend implements Backen
private static String getIdentifierName(String key) {
if (!key.contains(Utils.DASH)) {
return null;
+ } else if (key.contains(META_KEY_PREFIX)) {
+ return key;
}
return key.substring(0, 4) + key.substring(5);
}
Modified: jackrabbit/oak/trunk/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/aws/s3/S3DataStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/aws/s3/S3DataStore.java?rev=1666862&r1=1666861&r2=1666862&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/aws/s3/S3DataStore.java (original)
+++ jackrabbit/oak/trunk/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/aws/s3/S3DataStore.java Mon Mar 16 04:57:22 2015
@@ -25,7 +25,7 @@ import org.apache.jackrabbit.core.data.C
* An Amazon S3 data store.
*/
public class S3DataStore extends CachingDataStore {
- private Properties properties;
+ protected Properties properties;
@Override
protected Backend createBackend() {
Added: jackrabbit/oak/trunk/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/aws/s3/SharedS3Backend.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/aws/s3/SharedS3Backend.java?rev=1666862&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/aws/s3/SharedS3Backend.java (added)
+++ jackrabbit/oak/trunk/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/aws/s3/SharedS3Backend.java Mon Mar 16 04:57:22 2015
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.blob.cloud.aws.s3;
+
+import org.apache.jackrabbit.core.data.Backend;
+import org.apache.jackrabbit.core.data.DataRecord;
+import org.apache.jackrabbit.core.data.DataStoreException;
+
+import java.io.InputStream;
+import java.util.List;
+
+/**
+ * Extension to the {@link org.apache.jackrabbit.core.data.Backend} for supporting adding meta data to the underlying
+ * store.
+ */
+public interface SharedS3Backend extends Backend {
+ /**
+ * Adds a metadata record with the specified name
+ *
+ * @param input the record input stream
+ * @param name the name
+ * @throws org.apache.jackrabbit.core.data.DataStoreException
+ */
+ public void addMetadataRecord(final InputStream input, final String name) throws DataStoreException;
+
+ /**
+ * Gets the metadata of the specified name.
+ *
+ * @param name the name of the record
+ * @return the metadata DataRecord
+ */
+ public DataRecord getMetadataRecord(String name);
+
+ /**
+ * Gets all the metadata with a specified prefix.
+ *
+ * @param prefix the prefix of the records to retrieve
+ * @return list of all the metadata DataRecords
+ */
+ public List<DataRecord> getAllMetadataRecords(String prefix);
+
+ /**
+ * Deletes the metadata record with the specified name
+ *
+ * @param name the name of the record
+ * @return boolean to indicate success of deletion
+ */
+ public boolean deleteMetadataRecord(String name);
+
+ /**
+ * Deletes all the metadata records with the specified prefix.
+ *
+ * @param prefix the prefix of the record
+ */
+ public void deleteAllMetadataRecords(String prefix);
+}
Propchange: jackrabbit/oak/trunk/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/aws/s3/SharedS3Backend.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: jackrabbit/oak/trunk/oak-core/pom.xml
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/pom.xml?rev=1666862&r1=1666861&r2=1666862&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/pom.xml (original)
+++ jackrabbit/oak/trunk/oak-core/pom.xml Mon Mar 16 04:57:22 2015
@@ -209,7 +209,12 @@
<artifactId>oak-blob</artifactId>
<version>${project.version}</version>
</dependency>
-
+ <dependency>
+ <groupId>org.apache.jackrabbit</groupId>
+ <artifactId>oak-blob-cloud</artifactId>
+ <version>${project.version}</version>
+ <optional>true</optional>
+ </dependency>
<!-- General utility libraries -->
<dependency>
<groupId>com.google.guava</groupId>
Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/MarkSweepGarbageCollector.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/MarkSweepGarbageCollector.java?rev=1666862&r1=1666861&r2=1666862&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/MarkSweepGarbageCollector.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/MarkSweepGarbageCollector.java Mon Mar 16 04:57:22 2015
@@ -485,6 +485,7 @@ public class MarkSweepGarbageCollector i
if (!ids.isEmpty()) {
blobsCount += ids.size();
saveBatchToFile(ids, bufferWriter);
+ LOG.debug("retrieved {} blobs", blobsCount);
}
// sort the file
Added: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/datastore/SharedS3DataStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/datastore/SharedS3DataStore.java?rev=1666862&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/datastore/SharedS3DataStore.java (added)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/datastore/SharedS3DataStore.java Mon Mar 16 04:57:22 2015
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.blob.datastore;
+
+import org.apache.jackrabbit.core.data.Backend;
+import org.apache.jackrabbit.core.data.DataRecord;
+import org.apache.jackrabbit.core.data.DataStoreException;
+import org.apache.jackrabbit.oak.blob.cloud.aws.s3.S3Backend;
+import org.apache.jackrabbit.oak.blob.cloud.aws.s3.S3DataStore;
+import org.apache.jackrabbit.oak.plugins.blob.SharedDataStore;
+
+import java.io.InputStream;
+import java.util.List;
+
+/**
+ * {@link org.apache.jackrabbit.oak.plugins.blob.SharedDataStore} implementation for S3
+ */
+public class SharedS3DataStore extends S3DataStore implements SharedDataStore {
+ protected S3Backend backend;
+
+ @Override
+ protected Backend createBackend() {
+ backend = new S3Backend();
+ if(properties != null){
+ backend.setProperties(properties);
+ }
+ return backend;
+ }
+
+ @Override
+ public void addMetadataRecord(InputStream stream, String name) throws DataStoreException {
+ backend.addMetadataRecord(stream, name);
+ }
+
+ @Override
+ public DataRecord getMetadataRecord(String name) {
+ return backend.getMetadataRecord(name);
+ }
+
+ @Override
+ public List<DataRecord> getAllMetadataRecords(String prefix) {
+ return backend.getAllMetadataRecords(prefix);
+ }
+
+ @Override
+ public boolean deleteMetadataRecord(String name) {
+ return backend.deleteMetadataRecord(name);
+ }
+
+ @Override
+ public void deleteAllMetadataRecords(String prefix) {
+ backend.deleteAllMetadataRecords(prefix);
+ }
+
+ @Override
+ public Type getType() {
+ return Type.SHARED;
+ }
+}
Propchange: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/datastore/SharedS3DataStore.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/datastore/SharedS3DataStoreService.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/datastore/SharedS3DataStoreService.java?rev=1666862&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/datastore/SharedS3DataStoreService.java (added)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/datastore/SharedS3DataStoreService.java Mon Mar 16 04:57:22 2015
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.plugins.blob.datastore;
+
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.ConfigurationPolicy;
+import org.apache.jackrabbit.core.data.DataStore;
+import org.osgi.service.component.ComponentContext;
+
+import java.util.Map;
+import java.util.Properties;
+
+@Component(policy = ConfigurationPolicy.REQUIRE, name = SharedS3DataStoreService.NAME)
+public class SharedS3DataStoreService extends AbstractDataStoreService{
+ public static final String NAME = "org.apache.jackrabbit.oak.plugins.blob.datastore.SharedS3DataStore";
+
+ @Override
+ protected DataStore createDataStore(ComponentContext context, Map<String, Object> config) {
+ SharedS3DataStore dataStore = new SharedS3DataStore();
+
+ Properties properties = new Properties();
+ properties.putAll(config);
+
+ dataStore.setProperties(properties);
+ return dataStore;
+ }
+
+ @Override
+ protected String[] getDescription() {
+ return new String[] {"type=SharedS3"};
+ }
+}
Propchange: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/datastore/SharedS3DataStoreService.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/datastore/DataStoreUtils.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/datastore/DataStoreUtils.java?rev=1666862&r1=1666861&r2=1666862&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/datastore/DataStoreUtils.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/datastore/DataStoreUtils.java Mon Mar 16 04:57:22 2015
@@ -17,16 +17,28 @@
package org.apache.jackrabbit.oak.plugins.blob.datastore;
import java.io.File;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
import java.util.Map;
+import java.util.Properties;
+import com.amazonaws.services.s3.AmazonS3Client;
+import com.amazonaws.services.s3.model.DeleteObjectsRequest;
+import com.amazonaws.services.s3.model.ObjectListing;
+import com.amazonaws.services.s3.model.S3ObjectSummary;
+import com.amazonaws.services.s3.transfer.TransferManager;
import com.google.common.collect.Maps;
+import org.apache.jackrabbit.core.data.Backend;
import org.apache.jackrabbit.core.data.DataStore;
import org.apache.jackrabbit.core.data.FileDataStore;
+import org.apache.jackrabbit.oak.blob.cloud.aws.s3.S3Backend;
+import org.apache.jackrabbit.oak.blob.cloud.aws.s3.S3DataStore;
+import org.apache.jackrabbit.oak.blob.cloud.aws.s3.Utils;
import org.apache.jackrabbit.oak.commons.PropertiesUtil;
-import org.apache.jackrabbit.oak.plugins.blob.datastore.DataStoreBlobStore;
-import org.apache.jackrabbit.oak.plugins.blob.datastore.OakFileDataStore;
-import org.apache.jackrabbit.oak.plugins.document.AbstractMongoConnectionTest;
import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import static org.apache.commons.io.FilenameUtils.concat;
import static org.junit.Assert.assertEquals;
@@ -41,6 +53,8 @@ import static org.junit.Assert.assertEqu
* the system property as 'ds.minRecordLength'
*/
public class DataStoreUtils {
+ private static final Logger log = LoggerFactory.getLogger(DataStoreUtils.class);
+
public static final String DS_CLASS_NAME = "dataStore";
private static final String DS_PROP_PREFIX = "ds.";
@@ -49,7 +63,7 @@ public class DataStoreUtils {
/**
* By default create a default directory. But if overridden will need to be unset
*/
- private static long time = -1;
+ public static long time = -1;
public static DataStoreBlobStore getBlobStore() throws Exception {
String className = System.getProperty(DS_CLASS_NAME, OakFileDataStore.class.getName());
@@ -59,6 +73,12 @@ public class DataStoreUtils {
return new DataStoreBlobStore(ds);
}
+ public static boolean isS3DataStore() {
+ String dsName = System.getProperty(DS_CLASS_NAME);
+ return (dsName != null) && (dsName.equals(S3DataStore.class.getName()) || dsName
+ .equals(SharedS3DataStore.class.getName()));
+ }
+
private static Map<String, ?> getConfig() {
Map<String, Object> result = Maps.newHashMap();
for (Map.Entry<String, ?> e : Maps.fromProperties(System.getProperties()).entrySet()) {
@@ -73,7 +93,7 @@ public class DataStoreUtils {
public static String getHomeDir() {
return concat(new File(".").getAbsolutePath(), "target/blobstore/" +
- (time == -1 ? 0 : System.currentTimeMillis()));
+ (time == -1 ? 0 : time));
}
@Test
@@ -84,4 +104,56 @@ public class DataStoreUtils {
DataStoreBlobStore dbs = getBlobStore();
assertEquals(1000, dbs.getDataStore().getMinRecordLength());
}
+
+ /**
+ * S3 specific cleanup
+ *
+ * @param dataStore the underlying DataStore instance
+ * @param date the date of initialization
+ * @throws Exception
+ */
+ public static void cleanup(DataStore dataStore, Date date) throws Exception {
+ if (dataStore instanceof S3DataStore) {
+ Backend backend = ((S3DataStore) dataStore).getBackend();
+ if (backend instanceof S3Backend) {
+ String bucket = ((S3Backend) backend).getBucket();
+ deleteBucket(bucket, date);
+ }
+ }
+ }
+
+ private static void deleteBucket(String bucket, Date date) throws Exception {
+ log.info("cleaning bucket [" + bucket + "]");
+ Properties props = Utils.readConfig((String) getConfig().get("config"));
+ AmazonS3Client s3service = Utils.openService(props);
+ TransferManager tmx = new TransferManager(s3service);
+ if (s3service.doesBucketExist(bucket)) {
+ for (int i = 0; i < 4; i++) {
+ tmx.abortMultipartUploads(bucket, date);
+ ObjectListing prevObjectListing = s3service.listObjects(bucket);
+ while (prevObjectListing != null ) {
+ List<DeleteObjectsRequest.KeyVersion>
+ deleteList = new ArrayList<DeleteObjectsRequest.KeyVersion>();
+ for (S3ObjectSummary s3ObjSumm : prevObjectListing.getObjectSummaries()) {
+ deleteList.add(new DeleteObjectsRequest.KeyVersion(
+ s3ObjSumm.getKey()));
+ }
+ if (deleteList.size() > 0) {
+ DeleteObjectsRequest delObjsReq = new DeleteObjectsRequest(
+ bucket);
+ delObjsReq.setKeys(deleteList);
+ s3service.deleteObjects(delObjsReq);
+ }
+ if (!prevObjectListing.isTruncated()) break;
+ prevObjectListing = s3service.listNextBatchOfObjects(prevObjectListing);
+ }
+ }
+ //s3service.deleteBucket(bucket);
+ log.info("bucket [ " + bucket + "] cleaned");
+ } else {
+ log.info("bucket [" + bucket + "] doesn't exists");
+ }
+ tmx.shutdownNow();
+ s3service.shutdown();
+ }
}
Modified: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/MongoBlobGCTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/MongoBlobGCTest.java?rev=1666862&r1=1666861&r2=1666862&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/MongoBlobGCTest.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/MongoBlobGCTest.java Mon Mar 16 04:57:22 2015
@@ -47,8 +47,6 @@ import org.apache.jackrabbit.oak.spi.sta
import org.apache.jackrabbit.oak.stats.Clock;
import org.junit.Test;
-import static org.junit.Assert.assertTrue;
-
/**
* Tests for MongoMK GC
*/
@@ -73,7 +71,7 @@ public class MongoBlobGCTest extends Abs
}
}
for (int i = 0; i < number; i++) {
- Blob b = s.createBlob(randomStream(i, 4160));
+ Blob b = s.createBlob(randomStream(i, 16516));
if (!processed.contains(i)) {
Iterator<String> idIter =
((GarbageCollectableBlobStore) s.getBlobStore())
@@ -165,6 +163,7 @@ public class MongoBlobGCTest extends Abs
(GarbageCollectableBlobStore) store.getBlobStore(),
MoreExecutors.sameThreadExecutor(),
"./target", 5, 0, repoId);
+ Thread.sleep(4000);
gc.collectGarbage(false);
Set<String> existingAfterGC = iterate();
Modified: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/SharedBlobStoreGCTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/SharedBlobStoreGCTest.java?rev=1666862&r1=1666861&r2=1666862&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/SharedBlobStoreGCTest.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/SharedBlobStoreGCTest.java Mon Mar 16 04:57:22 2015
@@ -23,6 +23,7 @@ import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
+import java.util.Date;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
@@ -37,10 +38,12 @@ import com.google.common.collect.Sets;
import com.google.common.util.concurrent.MoreExecutors;
import org.apache.commons.io.FileUtils;
+import org.apache.jackrabbit.core.data.DataStore;
import org.apache.jackrabbit.oak.api.Blob;
import org.apache.jackrabbit.oak.plugins.blob.BlobGarbageCollector;
import org.apache.jackrabbit.oak.plugins.blob.MarkSweepGarbageCollector;
import org.apache.jackrabbit.oak.plugins.blob.SharedDataStore;
+import org.apache.jackrabbit.oak.plugins.blob.datastore.DataStoreBlobStore;
import org.apache.jackrabbit.oak.plugins.blob.datastore.SharedDataStoreUtils.SharedStoreRecordType;
import org.apache.jackrabbit.oak.plugins.document.VersionGarbageCollector.VersionGCStats;
import org.apache.jackrabbit.oak.plugins.blob.datastore.DataStoreUtils;
@@ -55,19 +58,26 @@ import org.apache.jackrabbit.oak.stats.C
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Test for gc in a shared data store among hetrogeneous oak node stores.
*/
public class SharedBlobStoreGCTest {
+ private static final Logger log = LoggerFactory.getLogger(SharedBlobStoreGCTest.class);
+
private Cluster cluster1;
private Cluster cluster2;
private Clock clock;
@Before
public void setUp() throws Exception {
+ log.debug("In setUp()");
+
clock = new Clock.Virtual();
clock.waitUntil(Revision.getCurrentTimestamp());
+ DataStoreUtils.time = clock.getTime();
BlobStore blobeStore1 = DataStoreUtils.getBlobStore();
DocumentNodeStore ds1 = new DocumentMK.Builder()
@@ -95,8 +105,11 @@ public class SharedBlobStoreGCTest {
cluster1 = new Cluster(ds1, repoId1, 20);
cluster1.init();
+ log.debug("Initialized {}", cluster1);
+
cluster2 = new Cluster(ds2, repoId2, 100);
cluster2.init();
+ log.debug("Initialized {}", cluster2);
}
static InputStream randomStream(int seed, int size) {
@@ -108,6 +121,7 @@ public class SharedBlobStoreGCTest {
@Test
public void testGC() throws Exception {
+ log.debug("Running testGC()");
// Only run the mark phase on both the clusters
cluster1.gc.collectGarbage(true);
cluster2.gc.collectGarbage(true);
@@ -122,21 +136,25 @@ public class SharedBlobStoreGCTest {
@Test
// GC should fail
public void testOnly1ClusterMark() throws Exception {
+ log.debug("Running testOnly1ClusterMark()");
+
// Only run the mark phase on one cluster
cluster1.gc.collectGarbage(true);
// Execute the gc with sweep
cluster1.gc.collectGarbage(false);
- Assert.assertTrue(
- (cluster1.getInitBlobs().size() + cluster2.getInitBlobs().size()) < cluster1.getExistingBlobIds().size());
Set<String> existing = cluster1.getExistingBlobIds();
+ log.debug("Existing blobs {}", existing);
+ Assert.assertTrue((cluster1.getInitBlobs().size() + cluster2.getInitBlobs().size()) <= existing.size());
Assert.assertTrue(existing.containsAll(cluster2.getInitBlobs()));
Assert.assertTrue(existing.containsAll(cluster1.getInitBlobs()));
}
@Test
public void testRepeatedMarkWithSweep() throws Exception {
+ log.debug("Running testRepeatedMarkWithSweep()");
+
// Only run the mark phase on one cluster
cluster1.gc.collectGarbage(true);
cluster2.gc.collectGarbage(true);
@@ -151,14 +169,17 @@ public class SharedBlobStoreGCTest {
}
@After
- public void tearDown() throws IOException {
- FileUtils.cleanDirectory(new File(DataStoreUtils.getHomeDir()));
+ public void tearDown() throws Exception {
+ DataStoreUtils.cleanup(cluster1.getDataStore(), cluster1.getDate());
+ FileUtils.cleanDirectory((new File(DataStoreUtils.getHomeDir())).getParentFile());
+ DataStoreUtils.time = -1;
}
class Cluster {
private DocumentNodeStore ds;
private int seed;
private BlobGarbageCollector gc;
+ private Date startDate;
private Set<String> initBlobs = new HashSet<String>();
@@ -180,7 +201,7 @@ public class SharedBlobStoreGCTest {
gc.collectGarbage(markOnly);
}
};
-
+ this.startDate = new Date();
this.seed = seed;
}
@@ -203,7 +224,7 @@ public class SharedBlobStoreGCTest {
}
}
for (int i = 0; i < number; i++) {
- Blob b = ds.createBlob(randomStream(i + seed, 4160));
+ Blob b = ds.createBlob(randomStream(i + seed, 16516));
if (!deletes.contains(i)) {
Iterator<String> idIter =
((GarbageCollectableBlobStore) ds.getBlobStore())
@@ -228,6 +249,10 @@ public class SharedBlobStoreGCTest {
VersionGarbageCollector vGC = ds.getVersionGarbageCollector();
VersionGCStats stats = vGC.gc(0, TimeUnit.MILLISECONDS);
Assert.assertEquals(deletes.size(), stats.deletedDocGCCount);
+
+ if (DataStoreUtils.isS3DataStore()) {
+ Thread.sleep(1000);
+ }
}
public Set<String> getExistingBlobIds() throws Exception {
@@ -240,6 +265,14 @@ public class SharedBlobStoreGCTest {
}
return existing;
}
+
+ public DataStore getDataStore() {
+ return ((DataStoreBlobStore) ds.getBlobStore()).getDataStore();
+ }
+
+ public Date getDate() {
+ return startDate;
+ }
}
}
Modified: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/blob/ds/MongoDataStoreBlobGCTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/blob/ds/MongoDataStoreBlobGCTest.java?rev=1666862&r1=1666861&r2=1666862&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/blob/ds/MongoDataStoreBlobGCTest.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/blob/ds/MongoDataStoreBlobGCTest.java Mon Mar 16 04:57:22 2015
@@ -17,6 +17,7 @@
package org.apache.jackrabbit.oak.plugins.document.blob.ds;
import java.io.File;
+import java.util.Date;
import org.apache.commons.io.FileUtils;
import org.apache.jackrabbit.oak.plugins.blob.datastore.DataStoreBlobStore;
@@ -34,6 +35,9 @@ import org.junit.BeforeClass;
*
*/
public class MongoDataStoreBlobGCTest extends MongoBlobGCTest {
+ Date startDate;
+ DataStoreBlobStore blobStore;
+
@BeforeClass
public static void setUpBeforeClass() throws Exception {
try {
@@ -46,15 +50,18 @@ public class MongoDataStoreBlobGCTest ex
@Before
@Override
public void setUpConnection() throws Exception {
+ startDate = new Date();
mongoConnection = MongoUtils.getConnection();
MongoUtils.dropCollections(mongoConnection.getDB());
+ blobStore = DataStoreUtils.getBlobStore();
mk = new DocumentMK.Builder().clock(getTestClock()).setMongoDB(mongoConnection.getDB())
- .setBlobStore(DataStoreUtils.getBlobStore()).open();
+ .setBlobStore(blobStore).open();
}
@After
@Override
public void tearDownConnection() throws Exception {
+ DataStoreUtils.cleanup(blobStore.getDataStore(), startDate);
FileUtils.deleteDirectory(new File(DataStoreUtils.getHomeDir()));
mk.dispose();
// the db might already be closed
Modified: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/SegmentDataStoreBlobGCTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/SegmentDataStoreBlobGCTest.java?rev=1666862&r1=1666861&r2=1666862&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/SegmentDataStoreBlobGCTest.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/SegmentDataStoreBlobGCTest.java Mon Mar 16 04:57:22 2015
@@ -26,6 +26,7 @@ import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
+import java.util.Date;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
@@ -69,6 +70,7 @@ public class SegmentDataStoreBlobGCTest
SegmentNodeStore nodeStore;
FileStore store;
DataStoreBlobStore blobStore;
+ Date startDate;
protected SegmentNodeStore getNodeStore(BlobStore blobStore) throws IOException {
if (nodeStore == null) {
@@ -94,6 +96,7 @@ public class SegmentDataStoreBlobGCTest
public HashSet<String> setUp() throws Exception {
blobStore = DataStoreUtils.getBlobStore();
nodeStore = getNodeStore(blobStore);
+ startDate = new Date();
HashSet<String> set = new HashSet<String>();
@@ -208,10 +211,11 @@ public class SegmentDataStoreBlobGCTest
}
@After
- public void close() throws IOException {
+ public void close() throws Exception {
if (store != null) {
store.close();
}
+ DataStoreUtils.cleanup(blobStore.getDataStore(), startDate);
FileUtils.deleteDirectory(getWorkDir());
FileUtils.deleteDirectory(new File(DataStoreUtils.getHomeDir()));
}