You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by am...@apache.org on 2017/10/09 06:00:43 UTC
svn commit: r1811530 -
/jackrabbit/oak/trunk/oak-blob-plugins/src/test/java/org/apache/jackrabbit/oak/plugins/blob/BlobGCTest.java
Author: amitj
Date: Mon Oct 9 06:00:43 2017
New Revision: 1811530
URL: http://svn.apache.org/viewvc?rev=1811530&view=rev
Log:
OAK-6798 - Basic Blob GC test for in-memory NodeStore and BlobStore
- Added a BlobGCTest with a in-memory test extension to MemoryNodeStore and an in-memory test BlobStore
Added:
jackrabbit/oak/trunk/oak-blob-plugins/src/test/java/org/apache/jackrabbit/oak/plugins/blob/BlobGCTest.java (with props)
Added: jackrabbit/oak/trunk/oak-blob-plugins/src/test/java/org/apache/jackrabbit/oak/plugins/blob/BlobGCTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-blob-plugins/src/test/java/org/apache/jackrabbit/oak/plugins/blob/BlobGCTest.java?rev=1811530&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-blob-plugins/src/test/java/org/apache/jackrabbit/oak/plugins/blob/BlobGCTest.java (added)
+++ jackrabbit/oak/trunk/oak-blob-plugins/src/test/java/org/apache/jackrabbit/oak/plugins/blob/BlobGCTest.java Mon Oct 9 06:00:43 2017
@@ -0,0 +1,592 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.plugins.blob;
+
+import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.security.DigestOutputStream;
+import java.security.MessageDigest;
+import java.util.Date;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+
+import javax.annotation.CheckForNull;
+import javax.annotation.Nonnull;
+import javax.management.openmbean.TabularData;
+
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.io.output.NullOutputStream;
+import org.apache.jackrabbit.core.data.DataIdentifier;
+import org.apache.jackrabbit.core.data.DataRecord;
+import org.apache.jackrabbit.core.data.DataStoreException;
+import org.apache.jackrabbit.oak.api.Blob;
+import org.apache.jackrabbit.oak.api.CommitFailedException;
+import org.apache.jackrabbit.oak.api.jmx.CheckpointMBean;
+import org.apache.jackrabbit.oak.plugins.blob.datastore.SharedDataStoreUtils;
+import org.apache.jackrabbit.oak.plugins.memory.ArrayBasedBlob;
+import org.apache.jackrabbit.oak.plugins.memory.MemoryNodeStore;
+import org.apache.jackrabbit.oak.spi.blob.BlobOptions;
+import org.apache.jackrabbit.oak.spi.blob.BlobStore;
+import org.apache.jackrabbit.oak.spi.blob.GarbageCollectableBlobStore;
+import org.apache.jackrabbit.oak.spi.cluster.ClusterRepositoryInfo;
+import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
+import org.apache.jackrabbit.oak.spi.commit.EmptyHook;
+import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
+import org.apache.jackrabbit.oak.spi.state.NodeStore;
+import org.apache.jackrabbit.oak.spi.whiteboard.DefaultWhiteboard;
+import org.apache.jackrabbit.oak.spi.whiteboard.Registration;
+import org.apache.jackrabbit.oak.spi.whiteboard.Whiteboard;
+import org.apache.jackrabbit.oak.spi.whiteboard.WhiteboardUtils;
+import org.apache.jackrabbit.oak.stats.Clock;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.commons.codec.binary.Hex.encodeHexString;
+import static org.apache.jackrabbit.oak.plugins.blob.datastore.SharedDataStoreUtils.SharedStoreRecordType.REPOSITORY;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Generic class for BlobGC tests which uses custom MemoryNodeStore as well as a memory NodeStore.
+ */
+public class BlobGCTest {
+ protected static final Logger log = LoggerFactory.getLogger(BlobGCTest.class);
+
+ @Rule
+ public TemporaryFolder folder = new TemporaryFolder(new File("target"));
+
+ protected GarbageCollectableBlobStore blobStore;
+ protected NodeStore nodeStore;
+ protected Whiteboard wb;
+ protected long startReferenceTime;
+
+ protected BlobReferenceRetriever referenceRetriever;
+ protected CheckpointMBean checkpointMBean;
+
+ protected Clock clock;
+
+ @Before
+ public void before() {
+ clock = getClock();
+ blobStore = new TimeLapsedBlobStore();
+ nodeStore = new MemoryBlobStoreNodeStore(blobStore);
+ // add whiteboard
+ final AtomicReference<Map<?, ?>> props = new AtomicReference<Map<?, ?>>();
+ wb = new DefaultWhiteboard(){
+ @Override
+ public <T> Registration register(Class<T> type, T service, Map<?, ?> properties) {
+ props.set(properties);
+ return super.register(type, service, properties);
+ }
+ };
+ referenceRetriever = ((MemoryBlobStoreNodeStore) nodeStore).getBlobReferenceRetriever();
+ startReferenceTime = ((TimeLapsedBlobStore) blobStore).startTime;
+ }
+
+ protected Clock getClock() {
+ return new Clock.Virtual();
+ }
+
+ @Test
+ public void gc() throws Exception {
+ log.info("Staring gc()");
+
+ BlobStoreState state = setUp(10, 5, 100);
+
+ log.info("{} blobs added : {}", state.blobsAdded.size(), state.blobsAdded);
+ log.info("{} blobs remaining : {}", state.blobsPresent.size(), state.blobsPresent);
+
+ Set<String> existingAfterGC = gcInternal(0);
+ assertTrue(Sets.symmetricDifference(state.blobsPresent, existingAfterGC).isEmpty());
+ }
+
+ @Test
+ public void noGc() throws Exception {
+ log.info("Staring noGc()");
+ startReferenceTime = clock.getTime();
+
+ BlobStoreState state = setUp(10, 5, 100);
+ long afterSetupTime = clock.getTime();
+
+ log.info("{} blobs added : {}", state.blobsAdded.size(), state.blobsAdded);
+ log.info("{} blobs remaining : {}", state.blobsPresent.size(), state.blobsPresent);
+
+ Set<String> existingAfterGC = gcInternal(afterSetupTime - startReferenceTime + 2);
+ assertTrue(Sets.symmetricDifference(state.blobsAdded, existingAfterGC).isEmpty());
+ }
+
+ protected Set<String> gcInternal(long maxBlobGcInSecs) throws Exception {
+ ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(10);
+ MarkSweepGarbageCollector gc = initGC(maxBlobGcInSecs, executor);
+ gc.collectGarbage(false);
+
+ assertEquals(0, executor.getTaskCount());
+ Set<String> existingAfterGC = iterate();
+ log.info("{} blobs existing after gc : {}", existingAfterGC.size(), existingAfterGC);
+ return existingAfterGC;
+ }
+
+ private MarkSweepGarbageCollector initGC(long blobGcMaxAgeInSecs, ThreadPoolExecutor executor)
+ throws Exception {
+ return initGC(blobGcMaxAgeInSecs, executor, folder.newFolder().getAbsolutePath());
+ }
+
+ private MarkSweepGarbageCollector initGC(long blobGcMaxAgeInSecs, ThreadPoolExecutor executor,
+ String root) throws Exception {
+ String repoId = null;
+ if (SharedDataStoreUtils.isShared(blobStore)) {
+ repoId = ClusterRepositoryInfo.getOrCreateId(nodeStore);
+ ((SharedDataStore) blobStore).addMetadataRecord(
+ new ByteArrayInputStream(new byte[0]),
+ REPOSITORY.getNameFromId(repoId));
+ }
+
+ MarkSweepGarbageCollector gc =
+ new MarkSweepGarbageCollector(referenceRetriever,
+ blobStore, executor,
+ root, 2048, blobGcMaxAgeInSecs, repoId, wb);
+ return gc;
+ }
+
+ protected Set<String> iterate() throws Exception {
+ Iterator<String> cur = blobStore.getAllChunkIds(0);
+
+ Set<String> existing = Sets.newHashSet();
+ while (cur.hasNext()) {
+ existing.add(cur.next());
+ }
+ return existing;
+ }
+
+ public BlobStoreState setUp(
+ int count,
+ int deletions,
+ int blobSize) throws Exception {
+
+ preSetup();
+
+ NodeBuilder a = nodeStore.getRoot().builder();
+ /* Create and delete nodes with blobs stored in DS*/
+ int maxDeleted = deletions;
+ int numBlobs = count;
+ List<Integer> toBeDeleted = Lists.newArrayList();
+ Random rand = new Random();
+ for (int i = 0; i < maxDeleted; i++) {
+ int n = rand.nextInt(numBlobs);
+ if (!toBeDeleted.contains(n)) {
+ toBeDeleted.add(n);
+ }
+ }
+
+ BlobStoreState state = new BlobStoreState();
+ for (int i = 0; i < numBlobs; i++) {
+ Blob b = nodeStore.createBlob(randomStream(i, blobSize));
+ Iterator<String> idIter = blobStore.resolveChunks(b.getContentIdentity());
+ while (idIter.hasNext()) {
+ String chunk = idIter.next();
+ state.blobsAdded.add(chunk);
+ if (!toBeDeleted.contains(i)) {
+ state.blobsPresent.add(chunk);
+ }
+ }
+ a.child("c" + i).setProperty("x", b);
+ }
+
+ nodeStore.merge(a, EmptyHook.INSTANCE, CommitInfo.EMPTY);
+ log.info("Created blobs : {}", state.blobsAdded.size());
+
+ for (int id : toBeDeleted) {
+ delete("c" + id, nodeStore);
+ }
+ log.info("Deleted nodes : {}", toBeDeleted.size());
+
+ // Sleep a little to make eligible for cleanup
+ clock.waitUntil(5);
+
+ postSetup(state);
+
+ return state;
+ }
+
+ protected Set<String> createBlobs(int count, int size) throws Exception {
+ HashSet<String> blobSet = new HashSet<String>();
+ for (int i = 0; i < count; i++) {
+ String id = blobStore.writeBlob(randomStream(100 + i, size));
+ Iterator<String> idIter = blobStore.resolveChunks(id);
+ while (idIter.hasNext()) {
+ String chunk = idIter.next();
+ blobSet.add(chunk);
+ }
+ }
+ log.info("{} Additional created {}", blobSet.size(), blobSet);
+ return blobSet;
+ }
+
+ protected void preSetup() {}
+
+ protected void postSetup(BlobStoreState state) {
+ ((MemoryBlobStoreNodeStore) nodeStore).setReferencedBlobs(state.blobsPresent);
+ }
+
+ protected void delete(String nodeId, NodeStore nodeStore) throws CommitFailedException {
+ NodeBuilder builder = nodeStore.getRoot().builder();
+ builder.child(nodeId).remove();
+
+ nodeStore.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
+ }
+
+ static InputStream randomStream(int seed, int size) {
+ Random r = new Random(seed);
+ byte[] data = new byte[size];
+ r.nextBytes(data);
+ return new ByteArrayInputStream(data);
+ }
+
+ /**
+ * Represents state of the blobs after setup
+ */
+ class BlobStoreState {
+ Set<String> blobsAdded = Sets.newHashSet();
+ Set<String> blobsPresent = Sets.newHashSet();
+ }
+
+ /**
+ * MemoryNodeStore extension which created blobs in the in-memory blob store
+ */
+ static class MemoryBlobStoreNodeStore extends MemoryNodeStore {
+ private final BlobStore blobStore;
+ Set<String> referencedBlobs;
+
+ public MemoryBlobStoreNodeStore(BlobStore blobStore) {
+ this.blobStore = blobStore;
+ }
+
+ public void setReferencedBlobs(Set<String> referencedBlobs) {
+ this.referencedBlobs = referencedBlobs;
+ }
+
+ @Override
+ public ArrayBasedBlob createBlob(InputStream in) {
+ try {
+ String id = blobStore.writeBlob(in);
+ return new TestBlob(id, blobStore);
+ } catch(Exception e) {
+ log.error("Error in createBlobs", e);
+ }
+ return null;
+ }
+
+ public BlobReferenceRetriever getBlobReferenceRetriever() {
+ return collector -> {
+ for(String id : referencedBlobs) {
+ collector.addReference(id, null);
+ }
+ };
+ }
+
+ static class TestBlob extends ArrayBasedBlob {
+ private String id;
+ private BlobStore blobStore;
+
+ public TestBlob(String id, BlobStore blobStore) {
+ super(new byte[0]);
+ this.id = id;
+ this.blobStore = blobStore;
+ }
+
+ @Override
+ public String getContentIdentity() {
+ return id;
+ }
+ @Nonnull
+ @Override
+ public InputStream getNewStream() {
+ try {
+ return blobStore.getInputStream(id);
+ } catch (IOException e) {
+ log.error("Error in getNewStream", e);
+ }
+ return null;
+ }
+
+ @Override
+ public long length() {
+ try {
+ return blobStore.getBlobLength(id);
+ } catch (IOException e) {
+ log.error("Error in length", e);
+ }
+ return 0;
+ }
+ }
+ }
+
+ /**
+ * Test in memory DS to store the contents with an increasing time
+ */
+ class TimeLapsedBlobStore implements GarbageCollectableBlobStore, SharedDataStore {
+ private final long startTime;
+ Map<String, DataRecord> store;
+ Map<String, DataRecord> metadata;
+
+ public TimeLapsedBlobStore() {
+ this(System.currentTimeMillis());
+ }
+
+ public TimeLapsedBlobStore(long startTime) {
+ this.startTime = clock.getTime();
+ store = Maps.newHashMap();
+ metadata = Maps.newHashMap();
+ }
+
+ @Override public Iterator<String> getAllChunkIds(long maxLastModifiedTime) throws Exception {
+ return store.keySet().iterator();
+ }
+
+ @Override public boolean deleteChunks(List<String> chunkIds, long maxLastModifiedTime) throws Exception {
+ return (chunkIds.size() == countDeleteChunks(chunkIds, maxLastModifiedTime));
+ }
+
+ @Override public long countDeleteChunks(List<String> chunkIds, long maxLastModifiedTime) throws Exception {
+ int count = 0;
+
+ for(String id : chunkIds) {
+ log.info("maxLastModifiedTime {}", maxLastModifiedTime);
+ log.info("store.get(id).getLastModified() {}", store.get(id).getLastModified());
+ if (maxLastModifiedTime <= 0 || store.get(id).getLastModified() < maxLastModifiedTime) {
+ store.remove(id);
+ count++;
+ }
+ }
+ return count;
+ }
+
+ @Override public Iterator<String> resolveChunks(String blobId) throws IOException {
+ return Iterators.singletonIterator(blobId);
+ }
+
+ @Override public String writeBlob(InputStream in) throws IOException {
+ return writeBlob(in, new BlobOptions());
+ }
+
+ @Override public String writeBlob(InputStream in, BlobOptions options) throws IOException {
+ try {
+ byte[] data = IOUtils.toByteArray(in);
+ String id = getIdForInputStream(new ByteArrayInputStream(data));
+ TestRecord rec = new TestRecord(id, data, clock.getTime());
+ store.put(id, rec);
+ log.info("Blob created {} with timestamp {}", rec.id, rec.lastModified);
+ return id;
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ }
+
+ private String getIdForInputStream(final InputStream in)
+ throws Exception {
+ MessageDigest digest = MessageDigest.getInstance("SHA-256");
+ OutputStream output = new DigestOutputStream(new NullOutputStream(), digest);
+ try {
+ IOUtils.copyLarge(in, output);
+ } finally {
+ IOUtils.closeQuietly(output);
+ IOUtils.closeQuietly(in);
+ }
+ return encodeHexString(digest.digest());
+ }
+
+ @Override public long getBlobLength(String blobId) throws IOException {
+ return ((TestRecord) store.get(blobId)).data.length;
+ }
+
+ @Override public InputStream getInputStream(String blobId) throws IOException {
+ try {
+ return store.get(blobId).getStream();
+ } catch (DataStoreException e) {
+ e.printStackTrace();
+ }
+ return null;
+ }
+
+ @CheckForNull @Override public String getBlobId(@Nonnull String reference) {
+ return reference;
+ }
+
+ @CheckForNull @Override public String getReference(@Nonnull String blobId) {
+ return blobId;
+ }
+
+ @Override public void addMetadataRecord(InputStream stream, String name) throws DataStoreException {
+ try {
+ byte[] data = IOUtils.toByteArray(stream);
+ TestRecord rec = new TestRecord(name, data, clock.getTime());
+ metadata.put(name, rec);
+ log.info("Metadata created {} with timestamp {}", rec.id, rec.lastModified);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ @Override public void addMetadataRecord(File f, String name) throws DataStoreException {
+ FileInputStream fstream = null;
+ try {
+ fstream = new FileInputStream(f);
+ addMetadataRecord(fstream, name);
+ } catch (Exception e) {
+ e.printStackTrace();
+ } finally {
+ IOUtils.closeQuietly(fstream);
+ }
+ }
+
+ @Override public DataRecord getMetadataRecord(String name) {
+ return metadata.get(name);
+ }
+
+ @Override public List<DataRecord> getAllMetadataRecords(String prefix) {
+ List<DataRecord> recs = Lists.newArrayList();
+ Iterator<Map.Entry<String, DataRecord>> iter = metadata.entrySet().iterator();
+ while (iter.hasNext()) {
+ Map.Entry<String, DataRecord> entry = iter.next();
+ if (entry.getKey().startsWith(prefix)) {
+ recs.add(entry.getValue());
+ }
+ }
+ return recs;
+ }
+
+ @Override public boolean deleteMetadataRecord(String name) {
+ metadata.remove(name);
+ if (!metadata.containsKey(name)) {
+ return true;
+ }
+ return false;
+ }
+
+ @Override public void deleteAllMetadataRecords(String prefix) {
+ List<String> recs = Lists.newArrayList();
+ Iterator<Map.Entry<String, DataRecord>> iter = metadata.entrySet().iterator();
+ while (iter.hasNext()) {
+ Map.Entry<String, DataRecord> entry = iter.next();
+ if (entry.getKey().startsWith(prefix)) {
+ recs.add(entry.getKey());
+ }
+ }
+
+ for(String key: recs) {
+ metadata.remove(key);
+ }
+ }
+
+ @Override public Iterator<DataRecord> getAllRecords() throws DataStoreException {
+ return store.values().iterator();
+ }
+
+ @Override public DataRecord getRecordForId(DataIdentifier id) throws DataStoreException {
+ return store.get(id.toString());
+ }
+
+ @Override public Type getType() {
+ return Type.SHARED;
+ }
+
+ class TestRecord implements DataRecord {
+ String id;
+ byte[] data;
+ long lastModified;
+
+ public TestRecord(String id, byte[] data, long lastModified) {
+ this.id = id;
+ this.data = data;
+ this.lastModified = lastModified;
+ }
+
+ @Override public DataIdentifier getIdentifier() {
+ return new DataIdentifier(id);
+ }
+
+ @Override public String getReference() {
+ return id;
+ }
+
+ @Override public long getLength() throws DataStoreException {
+ return data.length;
+ }
+
+ @Override public InputStream getStream() throws DataStoreException {
+ return new ByteArrayInputStream(data);
+ }
+
+ @Override public long getLastModified() {
+ return lastModified;
+ }
+ }
+
+ /** No-op **/
+ @Override public int readBlob(String blobId, long pos, byte[] buff, int off, int length) throws IOException {
+ throw new UnsupportedOperationException("readBlob not supported");
+ }
+
+ @Override public void setBlockSize(int x) {
+ }
+
+ @Override public String writeBlob(String tempFileName) throws IOException {
+ throw new UnsupportedOperationException("getBlockSizeMin not supported");
+ }
+
+ @Override public int sweep() throws IOException {
+ throw new UnsupportedOperationException("sweep not supported");
+ }
+
+ @Override public void startMark() throws IOException {
+ }
+
+ @Override public void clearInUse() {
+ }
+
+ @Override public void clearCache() {
+ }
+
+ @Override public long getBlockSizeMin() {
+ throw new UnsupportedOperationException("getBlockSizeMin not supported");
+ }
+ }
+}
Propchange: jackrabbit/oak/trunk/oak-blob-plugins/src/test/java/org/apache/jackrabbit/oak/plugins/blob/BlobGCTest.java
------------------------------------------------------------------------------
svn:eol-style = native