You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by am...@apache.org on 2016/10/24 04:49:59 UTC

svn commit: r1766331 - /jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/CompositeDataStoreCacheTest.java

Author: amitj
Date: Mon Oct 24 04:49:59 2016
New Revision: 1766331

URL: http://svn.apache.org/viewvc?rev=1766331&view=rev
Log:
OAK-4979: Caching sub-system implementation for DataStore

- Tests for CompositeDataStoreCache

Added:
    jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/CompositeDataStoreCacheTest.java   (with props)

Added: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/CompositeDataStoreCacheTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/CompositeDataStoreCacheTest.java?rev=1766331&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/CompositeDataStoreCacheTest.java (added)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/CompositeDataStoreCacheTest.java Mon Oct 24 04:49:59 2016
@@ -0,0 +1,521 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.blob;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.io.Closer;
+import com.google.common.io.Files;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.SettableFuture;
+import org.apache.commons.io.FileUtils;
+import org.apache.jackrabbit.oak.commons.concurrent.ExecutorCloser;
+import org.apache.jackrabbit.oak.stats.DefaultStatisticsProvider;
+import org.apache.jackrabbit.oak.stats.StatisticsProvider;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for {@link CompositeDataStoreCache}.
+ */
+public class CompositeDataStoreCacheTest extends AbstractDataStoreCacheTest {
+    private static final Logger LOG = LoggerFactory.getLogger(UploadStagingCacheTest.class);
+    private static final String ID_PREFIX = "12345";
+
+    @Rule
+    public TemporaryFolder folder = new TemporaryFolder(new File("target"));
+
+    @Rule
+    public ExpectedException expectedEx = ExpectedException.none();
+
+    private CompositeDataStoreCache cache;
+    private final Closer closer = Closer.create();
+    private File root;
+    private TestStagingUploader uploader;
+    private TestCacheLoader loader;
+
+    private CountDownLatch taskLatch;
+    private CountDownLatch callbackLatch;
+    private CountDownLatch afterExecuteLatch;
+    private TestExecutor executor;
+    private StatisticsProvider statsProvider;
+    private ScheduledExecutorService scheduledExecutor;
+
+    @Before
+    public void setup() throws IOException {
+        root = folder.newFolder();
+        loader = new TestCacheLoader<String, InputStream>(root);
+        uploader = new TestStagingUploader(root);
+
+        // create executor
+        taskLatch = new CountDownLatch(1);
+        callbackLatch = new CountDownLatch(1);
+        afterExecuteLatch = new CountDownLatch(1);
+        executor = new TestExecutor(1, taskLatch, callbackLatch, afterExecuteLatch);
+
+        // stats
+        ScheduledExecutorService statsExecutor = Executors.newSingleThreadScheduledExecutor();
+        closer.register(new ExecutorCloser(statsExecutor, 500, TimeUnit.MILLISECONDS));
+        statsProvider = new DefaultStatisticsProvider(statsExecutor);
+
+        scheduledExecutor = Executors.newSingleThreadScheduledExecutor();
+        closer.register(new ExecutorCloser(scheduledExecutor, 500, TimeUnit.MILLISECONDS));
+
+        //cache instance
+        cache = new CompositeDataStoreCache(root.getAbsolutePath(),
+            80 * 1024 /* bytes */, 10, 1/*threads*/, loader,
+            uploader, statsProvider, executor, scheduledExecutor, 3000);
+        closer.register(cache);
+    }
+
+    @After
+    public void tear() throws IOException {
+        closer.close();
+    }
+
+    /**
+     * {@link CompositeDataStoreCache#getIfPresent(String)} when no cache.
+     */
+    @Test
+    public void getIfPresentNoCache() {
+        File file = cache.getIfPresent(ID_PREFIX + 0);
+        assertNull(file);
+        assertCacheStats(cache.getStagingCacheStats(), 0, 0, 0, 0);
+    }
+
+    /**
+     * {@link CompositeDataStoreCache#get(String)} when no cache.
+     * @throws IOException
+     */
+    @Test
+    public void getNoCache() throws IOException {
+        expectedEx.expect(IOException.class);
+        cache.get(ID_PREFIX + 0);
+    }
+
+    /**
+     * {@link CompositeDataStoreCache#getIfPresent(Object)} when no cache.
+     */
+    @Test
+    public void getIfPresentObjectNoCache() {
+        File file = cache.getIfPresent((Object) (ID_PREFIX + 0));
+        assertNull(file);
+        assertCacheStats(cache.getStagingCacheStats(), 0, 0, 0, 0);
+        assertCacheStats(cache.getDownloadCache().getStats(), 0, 0, 0, 0);
+    }
+
+    /**
+     * Add to staging
+     */
+    @Test
+    public void add() throws Exception {
+        File f = copyToFile(randomStream(0, 4 * 1024), folder.newFile());
+        boolean accepted = cache.stage(ID_PREFIX + 0, f);
+        assertTrue(accepted);
+
+        //start
+        taskLatch.countDown();
+        /** might be redundant **/
+        callbackLatch.countDown();
+
+        waitFinish();
+
+        File file = cache.getIfPresent(ID_PREFIX + 0);
+        assertNotNull(f);
+        assertFile(file, 0, folder);
+        assertCacheStats(cache.getStagingCacheStats(), 0, 0, 1, 1);
+    }
+
+    /**
+     * Add to staging when cache full.
+     */
+    @Test
+    public void addCacheFull() throws IOException {
+        cache = new CompositeDataStoreCache(root.getAbsolutePath(),
+            40 * 1024 /* bytes */, 10 /* staging % */,
+            1/*threads*/, loader, uploader, statsProvider, executor, scheduledExecutor, 3000);
+        closer.register(cache);
+
+        File f = copyToFile(randomStream(0, 4 * 1024), folder.newFile());
+        boolean accepted = cache.stage(ID_PREFIX + 0, f);
+        assertTrue(accepted);
+
+        File f2 = copyToFile(randomStream(1, 4 * 1024), folder.newFile());
+        accepted = cache.stage(ID_PREFIX + 1, f2);
+        assertFalse(accepted);
+
+        //start the original upload
+        taskLatch.countDown();
+        /** might be redundant **/
+        callbackLatch.countDown();
+
+        waitFinish();
+
+        File file = cache.getIfPresent(ID_PREFIX + 0);
+        assertNotNull(f);
+        assertFile(file, 0, folder);
+        assertCacheStats(cache.getStagingCacheStats(), 0, 0, 1, 2);
+    }
+
+    /**
+     * Test {@link CompositeDataStoreCache#getIfPresent(String)} when file staged
+     * and then download cache when uploaded.
+     * @throws IOException
+     */
+    @Test
+    public void getIfPresentStaged() throws IOException {
+        get(false);
+    }
+
+    /**
+     * Test {@link CompositeDataStoreCache#get(String)} when file staged and then
+     * download cache when uploaded.
+     * @throws IOException
+     */
+    @Test
+    public void getStaged() throws IOException {
+        get(true);
+    }
+
+    private void get(boolean get) throws IOException {
+        File f = copyToFile(randomStream(0, 4 * 1024), folder.newFile());
+        boolean accepted = cache.stage(ID_PREFIX + 0, f);
+        assertTrue(accepted);
+
+        // hit the staging cache as not uploaded
+        File file;
+        if (get) {
+            file = cache.get(ID_PREFIX + 0);
+        } else {
+            file = cache.getIfPresent(ID_PREFIX + 0);
+        }
+        assertNotNull(file);
+        assertFile(file, 0, folder);
+        assertCacheStats(cache.getStagingCacheStats(), 1, 4 * 1024, 1, 1);
+
+        //start the original upload
+        taskLatch.countDown();
+        /** might be redundant **/
+        callbackLatch.countDown();
+
+        waitFinish();
+
+        // Not should hit the download cache
+        if (get) {
+            file = cache.get(ID_PREFIX + 0);
+        } else {
+            file = cache.getIfPresent(ID_PREFIX + 0);
+        }
+        assertNotNull(f);
+        assertFile(file, 0, folder);
+        assertCacheStats(cache.getStagingCacheStats(), 0, 0, 1, 1);
+        assertCacheStats(cache.getCacheStats(), 1, 4 * 1024, 1, 1);
+    }
+
+    /**
+     * Load and get from the download cache.
+     * @throws Exception
+     */
+    @Test
+    public void getLoad() throws Exception {
+        File f = copyToFile(randomStream(0, 4 * 1024), folder.newFile());
+        loader.write(ID_PREFIX + 0, f);
+
+        // Not present yet
+        File cached = cache.getIfPresent(ID_PREFIX + 0);
+        assertNull(cached);
+
+        // present after loading
+        cached = cache.get(ID_PREFIX + 0);
+        assertNotNull(cached);
+        assertTrue(Files.equal(f, cached));
+
+        assertCacheStats(cache.getStagingCacheStats(), 0, 0, 0, 0);
+        assertEquals(2, cache.getStagingCacheStats().getLoadCount());
+        assertEquals(0, cache.getStagingCacheStats().getLoadSuccessCount());
+
+        assertCacheStats(cache.getCacheStats(), 1, 4 * 1024, 0, 2);
+        assertEquals(1, cache.getCacheStats().getLoadCount());
+        assertEquals(1, cache.getCacheStats().getLoadSuccessCount());
+    }
+
+    /**
+     * Invalidate cache entry.
+     * @throws Exception
+     */
+    @Test
+    public void invalidate() throws Exception {
+        File f = copyToFile(randomStream(0, 4 * 1024), folder.newFile());
+        loader.write(ID_PREFIX + 0, f);
+
+        // present after loading
+        File cached = cache.get(ID_PREFIX + 0);
+        assertNotNull(cached);
+        assertTrue(Files.equal(f, cached));
+
+        cache.invalidate(ID_PREFIX + 0);
+
+        // Not present now
+        cached = cache.getIfPresent(ID_PREFIX + 0);
+        assertNull(cached);
+
+        assertCacheStats(cache.getStagingCacheStats(), 0, 0, 0, 0);
+        assertEquals(2, cache.getStagingCacheStats().getLoadCount());
+        assertEquals(0, cache.getStagingCacheStats().getLoadSuccessCount());
+
+        assertCacheStats(cache.getCacheStats(), 0, 0, 0, 2);
+        assertEquals(1, cache.getCacheStats().getLoadCount());
+        assertEquals(1, cache.getCacheStats().getLoadSuccessCount());
+
+        /** Check eviction count */
+        assertEquals(0, cache.getCacheStats().getEvictionCount());
+    }
+
+    /**
+     * Concurrently retrieves 2 different files from cache.
+     * @throws Exception
+     */
+    @Test
+    public void concurrentGetCached() throws Exception {
+        // Add 2 files to backend
+        // Concurrently get both
+        ListeningExecutorService executorService =
+            MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(2));
+        closer.register(new ExecutorCloser(executorService, 5, TimeUnit.MILLISECONDS));
+
+        File f = copyToFile(randomStream(0, 4 * 1024), folder.newFile());
+        loader.write(ID_PREFIX + 0, f);
+
+        File f2 = copyToFile(randomStream(1, 4 * 1024), folder.newFile());
+        loader.write(ID_PREFIX + 1, f2);
+
+        CountDownLatch thread1Start = new CountDownLatch(1);
+        SettableFuture<File> future1 =
+            retrieveThread(executorService, ID_PREFIX + 0, cache, thread1Start);
+
+        CountDownLatch thread2Start = new CountDownLatch(1);
+        SettableFuture<File> future2 =
+            retrieveThread(executorService, ID_PREFIX + 1, cache, thread2Start);
+
+        thread1Start.countDown();
+        thread2Start.countDown();
+
+        File cached = future1.get();
+        File cached2 = future2.get();
+        LOG.info("Async tasks finished");
+
+        assertTrue(Files.equal(f, cached));
+        assertTrue(Files.equal(f2, cached2));
+
+        assertCacheStats(cache.getStagingCacheStats(), 0, 0, 0, 0);
+        assertEquals(2, cache.getStagingCacheStats().getLoadCount());
+        assertEquals(0, cache.getStagingCacheStats().getLoadSuccessCount());
+
+        assertCacheStats(cache.getCacheStats(), 2, 8 * 1024, 0, 4);
+        assertEquals(2, cache.getCacheStats().getLoadCount());
+        assertEquals(2, cache.getCacheStats().getLoadSuccessCount());
+    }
+
+    /**
+     * Concurrently retrieves 2 different files from cache.
+     * One is staged and other in the download cache.
+     * @throws Exception
+     */
+    @Test
+    public void concurrentGetFromStagedAndCached() throws Exception {
+        // Add 1 to backend
+        // Add 2 to upload area
+        // Stop upload execution
+        // Concurrently get 1 & 2
+        // continue upload execution
+        ListeningExecutorService executorService =
+            MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(2));
+        closer.register(new ExecutorCloser(executorService, 5, TimeUnit.MILLISECONDS));
+
+        // Add file to backend
+        File f2 = copyToFile(randomStream(1, 4 * 1024), folder.newFile());
+        loader.write(ID_PREFIX + 1, f2);
+
+        // stage for upload
+        File f = copyToFile(randomStream(0, 4 * 1024), folder.newFile());
+        boolean accepted = cache.stage(ID_PREFIX + 0, f);
+        assertTrue(accepted);
+
+        // Would hit the staging cache
+        CountDownLatch thread1Start = new CountDownLatch(1);
+        SettableFuture<File> future1 =
+            retrieveThread(executorService, ID_PREFIX + 0, cache, thread1Start);
+
+        // Would hit the download cache and load
+        CountDownLatch thread2Start = new CountDownLatch(1);
+        SettableFuture<File> future2 =
+            retrieveThread(executorService, ID_PREFIX + 1, cache, thread2Start);
+
+        thread1Start.countDown();
+        thread2Start.countDown();
+
+        File cached = future1.get();
+        File cached2 = future2.get();
+        LOG.info("Async tasks finished");
+
+        assertFile(cached, 0, folder);
+        assertTrue(Files.equal(f2, cached2));
+
+        //start the original upload
+        taskLatch.countDown();
+        /** might be redundant **/
+        callbackLatch.countDown();
+
+        waitFinish();
+
+        assertCacheStats(cache.getStagingCacheStats(), 0, 0, 1, 1);
+        assertEquals(2, cache.getStagingCacheStats().getLoadCount());
+        assertEquals(1, cache.getStagingCacheStats().getLoadSuccessCount());
+
+        assertCacheStats(cache.getCacheStats(), 2, 8 * 1024, 0, 2);
+        assertEquals(1, cache.getCacheStats().getLoadCount());
+        assertEquals(1, cache.getCacheStats().getLoadSuccessCount());
+    }
+
+    /**
+     * Concurrently stage and get a file and then upload.
+     * Use the file retrieve to read contents.
+     * @throws Exception
+     */
+    @Test
+    public void concurrentAddGet() throws Exception {
+        // Add to the upload area
+        // stop upload execution
+        // Same as above but concurrently
+        // Get
+        // Continue upload execution
+        ListeningExecutorService executorService =
+            MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(2));
+        closer.register(new ExecutorCloser(executorService, 5, TimeUnit.MILLISECONDS));
+
+        // stage for upload
+        File f = copyToFile(randomStream(0, 4 * 1024), folder.newFile());
+        boolean accepted = cache.stage(ID_PREFIX + 0, f);
+        assertTrue(accepted);
+
+        // Would hit the staging cache
+        CountDownLatch thread1Start = new CountDownLatch(1);
+        SettableFuture<File> future1 =
+            retrieveThread(executorService, ID_PREFIX + 0, cache, thread1Start);
+        // Get a handle to the file and open stream
+        File fileOnUpload = cache.getIfPresent(ID_PREFIX + 0);
+        assertNotNull(fileOnUpload);
+        final FileInputStream fStream = Files.newInputStreamSupplier(fileOnUpload).getInput();
+
+        thread1Start.countDown();
+
+        //start the original upload
+        taskLatch.countDown();
+        /** might be redundant **/
+        callbackLatch.countDown();
+
+        future1.get();
+        waitFinish();
+        LOG.info("Async tasks finished");
+
+        // File was returned from async cache but now deleted
+        // assertFalse(fileOnUpload.exists());
+
+        File gold = copyToFile(randomStream(0, 4 * 1024), folder.newFile());
+        File fromUploadStream = copyToFile(fStream, folder.newFile());
+        assertTrue(Files.equal(gold, fromUploadStream));
+
+        assertCacheStats(cache.getStagingCacheStats(), 0, 0, 1, 1);
+        assertEquals(2, cache.getStagingCacheStats().getLoadCount());
+
+        assertEquals(0, cache.getCacheStats().getLoadCount());
+        assertEquals(0, cache.getCacheStats().getLoadSuccessCount());
+    }
+
+    /**--------------------------- Helper Methods -----------------------------------------------**/
+
+    private static SettableFuture<File> retrieveThread(ListeningExecutorService executor,
+        final String id, final CompositeDataStoreCache cache, final CountDownLatch start) {
+        final SettableFuture<File> future = SettableFuture.create();
+        executor.submit(new Runnable() {
+            @Override public void run() {
+                try {
+                    LOG.info("Waiting for start retrieve");
+                    start.await();
+                    LOG.info("Starting retrieve [{}]", id);
+                    File cached = cache.get(id);
+                    LOG.info("Finished retrieve");
+                    future.set(cached);
+                } catch (Exception e) {
+                    LOG.info("Exception in get", e);
+                    future.setException(e);
+                }
+            }
+        });
+        return future;
+    }
+
+    private void waitFinish() {
+        try {
+            // wait for upload finish
+            afterExecuteLatch.await();
+            // Force execute removal from staging cache
+            ScheduledFuture<?> scheduledFuture = scheduledExecutor
+                .schedule(cache.getStagingCache().new RemoveJob(), 0, TimeUnit.MILLISECONDS);
+            scheduledFuture.get();
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+    }
+
+    private static void assertCacheStats(DataStoreCacheStatsMBean cache, long elems, long weight,
+        long hits, long count) {
+        assertEquals("elements don't match", elems, cache.getElementCount());
+        assertEquals("weight doesn't match", weight, cache.estimateCurrentWeight());
+        assertEquals("hits count don't match", hits, cache.getHitCount());
+        assertEquals("requests count don't match", count, cache.getRequestCount());
+    }
+
+    private void assertFile(File f, int seed, TemporaryFolder folder) throws IOException {
+        assertTrue(f.exists());
+        File temp = copyToFile(randomStream(seed, 4 * 1024), folder.newFile());
+        assertTrue("Uploaded file content differs", FileUtils.contentEquals(temp, f));
+    }
+}

Propchange: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/CompositeDataStoreCacheTest.java
------------------------------------------------------------------------------
    svn:eol-style = native