You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by am...@apache.org on 2016/10/24 04:50:08 UTC
svn commit: r1766332 -
/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/FileCacheTest.java
Author: amitj
Date: Mon Oct 24 04:50:08 2016
New Revision: 1766332
URL: http://svn.apache.org/viewvc?rev=1766332&view=rev
Log:
OAK-4979: Caching sub-system implementation for DataStore
- Tests for FileCache
Added:
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/FileCacheTest.java (with props)
Added: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/FileCacheTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/FileCacheTest.java?rev=1766332&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/FileCacheTest.java (added)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/FileCacheTest.java Mon Oct 24 04:50:08 2016
@@ -0,0 +1,458 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.blob;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.io.Closer;
+import com.google.common.io.Files;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.SettableFuture;
+import org.apache.jackrabbit.oak.commons.StringUtils;
+import org.apache.jackrabbit.oak.commons.concurrent.ExecutorCloser;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.rules.TestName;
+
+import static org.apache.commons.io.IOUtils.closeQuietly;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * - Tests for {@link FileCache}
+ */
+public class FileCacheTest extends AbstractDataStoreCacheTest {
+ private static final String ID_PREFIX = "12345";
+ private FileCache cache;
+ private File root;
+ private TestCacheLoader loader;
+ private Closer closer;
+
+ @Rule
+ public TemporaryFolder folder = new TemporaryFolder(new File("target"));
+
+ @Rule
+ public TestName testName = new TestName();
+
+ @Before
+ public void setup() throws Exception {
+ root = folder.newFolder();
+ closer = Closer.create();
+ loader = new TestCacheLoader<String, InputStream>(root);
+
+ if (!testName.getMethodName().equals("rebuild")) {
+
+ CountDownLatch beforeLatch = new CountDownLatch(1);
+ CountDownLatch afterLatch = new CountDownLatch(1);
+ CountDownLatch afterExecuteLatch = new CountDownLatch(1);
+
+ TestExecutor executor = new TestExecutor(1, beforeLatch, afterLatch, afterExecuteLatch);
+ beforeLatch.countDown();
+ afterLatch.countDown();
+ cache = new FileCache(0/** MB */, root, loader, executor);
+ Futures.successfulAsList((Iterable<? extends ListenableFuture<?>>) executor.futures).get();
+
+ closer.register(cache);
+ }
+ }
+
+ @After
+ public void tear() {
+ closeQuietly(closer);
+ }
+
+ /**
+ * Load and get from cache.
+ * @throws Exception
+ */
+ @Test
+ public void add() throws Exception {
+ LOG.info("Started add");
+
+ File f = createFile(0, loader, cache, folder);
+ assertCache(0, cache, f);
+ assertCacheStats(cache, 1, 4 * 1024, 1, 1);
+ assertEquals("Memory weight different",
+ getWeight(ID_PREFIX + 0, cache.getIfPresent(ID_PREFIX + 0)),
+ cache.getStats().estimateCurrentMemoryWeight());
+
+ LOG.info("Finished add");
+ }
+
+ /**
+ * Explicitly put in cache.
+ * @throws Exception
+ */
+ @Test
+ public void put() throws Exception {
+ LOG.info("Started put");
+
+ //File f = FileIOUtils.copy(randomStream(0, 4 * 1024));
+ cache.put(ID_PREFIX + 0, copyToFile(randomStream(0, 4 * 1024), folder.newFile()));
+ assertCacheIfPresent(0, cache, copyToFile(randomStream(0, 4 * 1024), folder.newFile()));
+ assertCacheStats(cache, 1, 4 * 1024, 0, 0);
+
+ LOG.info("Finished put");
+ }
+
+ /**
+ * Tests {@link FileCache#getIfPresent(Object)} when no cache.
+ */
+ @Test
+ public void getIfPresentObjectNoCache() {
+ LOG.info("Started getIfPresentObjectNoCache");
+
+ File file = cache.getIfPresent((Object) (ID_PREFIX + 0));
+ assertNull(file);
+ assertCacheStats(cache, 0, 0, 0, 0);
+
+ LOG.info("Finished getIfPresentObjectNoCache");
+ }
+
+ /**
+ * Retrieves same file concurrently.
+ * @throws Exception
+ */
+ @Test
+ public void retrieveSameConcurrent() throws Exception {
+ LOG.info("Started retrieveSameConcurrent");
+
+ File f = createFile(0, loader, cache, folder);
+ ListeningExecutorService executorService =
+ MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(2));
+ closer.register(new ExecutorCloser(executorService, 5, TimeUnit.MILLISECONDS));
+
+ CountDownLatch thread1Start = new CountDownLatch(1);
+ SettableFuture<File> future1 =
+ retrieveThread(executorService, ID_PREFIX + 0, cache, thread1Start);
+
+ CountDownLatch thread2Start = new CountDownLatch(1);
+ SettableFuture<File> future2 =
+ retrieveThread(executorService, ID_PREFIX + 0, cache, thread2Start);
+
+ thread1Start.countDown();
+ thread2Start.countDown();
+
+ future1.get();
+ future2.get();
+ LOG.info("Async tasks finished");
+
+ assertCacheIfPresent(0, cache, f);
+ assertCacheStats(cache, 1, 4 * 1024, 1, 1);
+
+ LOG.info("Finished retrieveSameConcurrent");
+ }
+
+ /**
+ * Retrieves different files concurrently.
+ * @throws Exception
+ */
+ @Test
+ public void getDifferentConcurrent() throws Exception {
+ LOG.info("Started getDifferentConcurrent");
+
+ File f = createFile(0, loader, cache, folder);
+ File f2 = createFile(1, loader, cache, folder);
+
+ ListeningExecutorService executorService =
+ MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(2));
+ closer.register(new ExecutorCloser(executorService, 5, TimeUnit.MILLISECONDS));
+
+ CountDownLatch thread1Start = new CountDownLatch(1);
+ SettableFuture<File> future1 =
+ retrieveThread(executorService, ID_PREFIX + 0, cache, thread1Start);
+
+ CountDownLatch thread2Start = new CountDownLatch(1);
+ SettableFuture<File> future2 =
+ retrieveThread(executorService, ID_PREFIX + 1, cache, thread2Start);
+
+ thread1Start.countDown();
+ thread2Start.countDown();
+
+ future1.get();
+ future2.get();
+ LOG.info("Async tasks finished");
+
+ assertCacheIfPresent(0, cache, f);
+ assertCacheIfPresent(1, cache, f2);
+ assertCacheStats(cache, 2, 8 * 1024, 2, 2);
+
+ LOG.info("Finished getDifferentConcurrent");
+ }
+
+ /**
+ * Retrieve and put different files concurrently.
+ * @throws Exception
+ */
+ @Test
+ public void retrievePutConcurrent() throws Exception {
+ LOG.info("Started retrievePutConcurrent");
+
+ //Create load
+ final File f = createFile(0, loader, cache, folder);
+ File f2 = copyToFile(randomStream(1, 4 * 1024), folder.newFile());
+
+ ListeningExecutorService executorService =
+ MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(2));
+ closer.register(new ExecutorCloser(executorService, 5, TimeUnit.MILLISECONDS));
+
+ CountDownLatch thread1Start = new CountDownLatch(1);
+ SettableFuture<File> future1 =
+ retrieveThread(executorService, ID_PREFIX + 0, cache, thread1Start);
+
+ CountDownLatch thread2Start = new CountDownLatch(1);
+ SettableFuture<Boolean> future2 = putThread(executorService, 1, f2, cache, thread2Start);
+
+ thread1Start.countDown();
+ thread2Start.countDown();
+
+ future1.get();
+ future2.get();
+ LOG.info("Async tasks finished");
+
+ assertCacheIfPresent(0, cache, f);
+ assertCacheIfPresent(1, cache, copyToFile(randomStream(1, 4 * 1024), folder.newFile()));
+ assertCacheStats(cache, 2, 8 * 1024, 1, 1);
+
+ LOG.info("Finished retrievePutConcurrent");
+ }
+
+ /**
+ * evict explicitly.
+ * @throws Exception
+ */
+ @Test
+ public void evictExplicit() throws Exception {
+ LOG.info("Started evictExplicit");
+
+ File f = createFile(0, loader, cache, folder);
+ assertCache(0, cache, f);
+
+ // trigger explicit invalidate
+ cache.invalidate(ID_PREFIX + 0);
+ assertFalse(cache.containsKey(ID_PREFIX + 0));
+ assertCacheStats(cache, 0, 0, 1, 1);
+
+ LOG.info("Finished evictExplicit");
+ }
+
+ /**
+ * evict implicitly.
+ * @throws Exception
+ */
+ @Test
+ public void evictImplicit() throws Exception {
+ LOG.info("Started evictImplicit");
+
+ for (int i = 0; i < 15; i++) {
+ File f = createFile(i, loader, cache, folder);
+ assertCache(i, cache, f);
+ }
+
+ File f = createFile(30, loader, cache, folder);
+ assertCache(30, cache, f);
+ // One of the entries should have been evicted
+ assertTrue(cache.getStats().getElementCount() == 15);
+ assertCacheStats(cache, 15, 60 * 1024, 16, 16);
+
+ LOG.info("Finished evictImplicit");
+ }
+
+ /**
+ * Retrieve and invalidate concurrently.
+ * @throws Exception
+ */
+ @Test
+ public void getInvalidateConcurrent() throws Exception {
+ LOG.info("Started getInvalidateConcurrent");
+
+ //Create load
+ for (int i = 0; i < 15; i++) {
+ if (i != 4) {
+ File f = createFile(i, loader, cache, folder);
+ assertCache(i, cache, f);
+ }
+ }
+ LOG.info("Finished creating load");
+ ListeningExecutorService executorService =
+ MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(2));
+ closer.register(new ExecutorCloser(executorService, 5, TimeUnit.MILLISECONDS));
+
+ CountDownLatch thread1Start = new CountDownLatch(1);
+ SettableFuture<File> future1 =
+ retrieveThread(executorService, ID_PREFIX + 10, cache, thread1Start);
+ thread1Start.countDown();
+
+ File f = createFile(4, loader, cache, folder);
+ CountDownLatch thread2Start = new CountDownLatch(1);
+ SettableFuture<File> future2 =
+ retrieveThread(executorService, ID_PREFIX + 4, cache, thread2Start);
+
+ thread2Start.countDown();
+
+ File f10 = future1.get();
+ future2.get();
+ LOG.info("Async tasks finished");
+
+ if (f10.exists()) {
+ assertCacheIfPresent(10, cache, f10);
+ }
+ if (f.exists()) {
+ assertCacheIfPresent(4, cache, f);
+ }
+ LOG.info("Finished getInvalidateConcurrent");
+ }
+
+ /**
+ * Trigger build cache on start.
+ * @throws Exception
+ */
+ @Test
+ public void rebuild() throws Exception {
+ LOG.info("Started rebuild");
+
+ root = folder.newFolder();
+ CountDownLatch beforeLatch = new CountDownLatch(1);
+ CountDownLatch afterLatch = new CountDownLatch(1);
+ CountDownLatch afterExecuteLatch = new CountDownLatch(1);
+
+ TestExecutor executor = new TestExecutor(1, beforeLatch, afterLatch, afterExecuteLatch);
+ beforeLatch.countDown();
+ afterLatch.countDown();
+ cache = new FileCache(4 * 1024/* bytes */, root, loader, executor);
+
+ afterExecuteLatch.await();
+ Futures.successfulAsList((Iterable<? extends ListenableFuture<?>>) executor.futures).get();
+ LOG.info("Cache built");
+
+ File f = createFile(0, loader, cache, folder);
+ assertCache(0, cache, f);
+ cache.close();
+
+ beforeLatch = new CountDownLatch(1);
+ afterLatch = new CountDownLatch(1);
+ afterExecuteLatch = new CountDownLatch(1);
+
+ executor = new TestExecutor(1, beforeLatch, afterLatch, afterExecuteLatch);
+ beforeLatch.countDown();
+ afterLatch.countDown();
+ cache = new FileCache(4 * 1024/* bytes */, root, loader, executor);
+ closer.register(cache);
+ afterExecuteLatch.await();
+ Futures.successfulAsList((Iterable<? extends ListenableFuture<?>>) executor.futures).get();
+ LOG.info("Cache rebuilt");
+
+ assertCacheIfPresent(0, cache, f);
+ assertCacheStats(cache, 1, 4 * 1024, 0, 0);
+
+ LOG.info("Finished rebuild");
+ }
+
+ /**------------------------------ Helper methods --------------------------------------------**/
+
+ private static SettableFuture<File> retrieveThread(ListeningExecutorService executor,
+ final String id, final FileCache cache, final CountDownLatch start) {
+ final SettableFuture<File> future = SettableFuture.create();
+ executor.submit(new Runnable() {
+ @Override public void run() {
+ try {
+ LOG.info("Waiting for start retrieve");
+ start.await();
+ LOG.info("Starting retrieve [{}]", id);
+ File cached = cache.get(id);
+ LOG.info("Finished retrieve");
+ future.set(cached);
+ } catch (Exception e) {
+ LOG.info("Exception in get", e);
+ }
+ }
+ });
+ return future;
+ }
+
+
+
+ private static SettableFuture<Boolean> putThread(ListeningExecutorService executor,
+ final int seed, final File f, final FileCache cache, final CountDownLatch start) {
+ final SettableFuture<Boolean> future = SettableFuture.create();
+ executor.submit(new Runnable() {
+ @Override public void run() {
+ try {
+ LOG.info("Waiting for start to put");
+ start.await();
+ LOG.info("Starting put");
+ cache.put(ID_PREFIX + seed, f);
+ LOG.info("Finished put");
+ future.set(true);
+ } catch (Exception e) {
+ LOG.info("Exception in get", e);
+ }
+ }
+ });
+ return future;
+ }
+
+ private static int getWeight(String key, File value) {
+ return StringUtils.estimateMemoryUsage(key) +
+ StringUtils.estimateMemoryUsage(value.getAbsolutePath()) + 48;
+ }
+
+ private static void assertCacheIfPresent(int seed, FileCache cache, File f) throws IOException {
+ File cached = cache.getIfPresent(ID_PREFIX + seed);
+ assertNotNull(cached);
+ assertTrue(Files.equal(f, cached));
+ }
+
+ private static void assertCache(int seed, FileCache cache, File f) throws IOException {
+ File cached = cache.get(ID_PREFIX + seed);
+ assertNotNull(cached);
+ assertTrue(Files.equal(f, cached));
+ }
+
+ private static File createFile(int seed, TestCacheLoader loader, FileCache cache,
+ TemporaryFolder folder) throws Exception {
+ File f = copyToFile(randomStream(0, 4 * 1024),
+ folder.newFile());
+ loader.write(ID_PREFIX + seed, f);
+ assertNull(cache.getIfPresent(ID_PREFIX + seed));
+ return f;
+ }
+
+ private static void assertCacheStats(FileCache cache, long elems, long weight, long loads,
+ long loadSuccesses) {
+ assertEquals(elems, cache.getStats().getElementCount());
+ assertEquals(weight, cache.getStats().estimateCurrentWeight());
+ assertEquals(loads, cache.getStats().getLoadCount());
+ assertEquals(loadSuccesses, cache.getStats().getLoadSuccessCount());
+ }
+}
Propchange: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/FileCacheTest.java
------------------------------------------------------------------------------
svn:eol-style = native