You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ab...@apache.org on 2017/02/15 19:00:12 UTC

[08/52] [abbrv] lucene-solr:jira/solr-9858: SOLR-10116: add concurrency test to BlockCacheTest

SOLR-10116: add concurrency test to BlockCacheTest


Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/0b3710a5
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/0b3710a5
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/0b3710a5

Branch: refs/heads/jira/solr-9858
Commit: 0b3710a532d7adf87cd02f59865363ea1116e79b
Parents: 85141a2
Author: yonik <yo...@apache.org>
Authored: Fri Feb 10 12:08:09 2017 -0500
Committer: yonik <yo...@apache.org>
Committed: Fri Feb 10 12:08:09 2017 -0500

----------------------------------------------------------------------
 .../solr/store/blockcache/BlockCacheTest.java   | 135 +++++++++++++++++++
 1 file changed, 135 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/0b3710a5/solr/core/src/test/org/apache/solr/store/blockcache/BlockCacheTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/store/blockcache/BlockCacheTest.java b/solr/core/src/test/org/apache/solr/store/blockcache/BlockCacheTest.java
index 8e2edfe..0a733bc 100644
--- a/solr/core/src/test/org/apache/solr/store/blockcache/BlockCacheTest.java
+++ b/solr/core/src/test/org/apache/solr/store/blockcache/BlockCacheTest.java
@@ -18,6 +18,8 @@ package org.apache.solr.store.blockcache;
 
 import java.util.Arrays;
 import java.util.Random;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.lucene.util.LuceneTestCase;
@@ -106,4 +108,137 @@ public class BlockCacheTest extends LuceneTestCase {
     random.nextBytes(buf);
     return buf;
   }
+
+  // given a position, return the appropriate byte.
+  // always returns the same thing so we don't actually have to store the bytes redundantly to check them.
+  private static byte getByte(long pos) {
+    // knuth multiplicative hash method, then take top 8 bits
+    return (byte) ((((int)pos) * (int)(2654435761L)) >> 24);
+
+    // just the lower bits of the block number, to aid in debugging...
+    // return (byte)(pos>>10);
+  }
+
+  @Test
+  @AwaitsFix(bugUrl = "https://issues.apache.org/jira/browse/SOLR-10121")
+  public void testBlockCacheConcurrent() throws Exception {
+    Random rnd = random();
+
+    /***
+    final int blocksInTest = 256;
+    final int blockSize = 1024;
+    final int slabSize = blockSize * 128;
+    final long totalMemory = 2 * slabSize;
+    ***/
+
+    final int blocksInTest = 16384;  // pick something that won't fit in memory, but is small enough to cause a medium hit rate.  16MB of blocks is double the total memory size of the cache.
+    final int blockSize = 1024;
+    final int slabSize = blockSize * 4096;
+    final long totalMemory = 2 * slabSize;  // should give us 2 slabs (8MB)
+
+    final int nThreads=2;
+    final int nReads=1000000;
+    final int readsPerThread=nReads/nThreads;
+    final int readLastBlockOdds=10; // odds (1 in N) of the next block operation being on the same block as the previous operation... helps flush concurrency issues
+
+    final BlockCache blockCache = new BlockCache(new Metrics(), true, totalMemory, slabSize, blockSize);
+
+    final AtomicBoolean failed = new AtomicBoolean(false);
+    final AtomicLong hitsInCache = new AtomicLong();
+    final AtomicLong missesInCache = new AtomicLong();
+    final AtomicLong storeFails = new AtomicLong();
+    final AtomicLong lastBlock = new AtomicLong();
+
+    final int file = 0;
+
+
+    Thread[] threads = new Thread[nThreads];
+    for (int i=0; i<threads.length; i++) {
+      final int threadnum = i;
+      final long seed = rnd.nextLong();
+
+      threads[i] = new Thread() {
+        Random r;
+        BlockCacheKey blockCacheKey = new BlockCacheKey();
+        byte[] buffer = new byte[blockSize];
+
+        @Override
+        public void run() {
+          try {
+            r = new Random(seed);
+            blockCacheKey = new BlockCacheKey();
+            blockCacheKey.setFile(file);
+            blockCacheKey.setPath("/foo.txt");
+
+            test(readsPerThread);
+
+          } catch (Throwable e) {
+            failed.set(true);
+            e.printStackTrace();
+          }
+        }
+
+        public void test(int iter) {
+          for (int i=0; i<iter; i++) {
+            test();
+          }
+        }
+
+        public void test() {
+          long block = r.nextInt(blocksInTest);
+          if (r.nextInt(readLastBlockOdds) == 0) block = lastBlock.get();  // some percent of the time, try to read the last block another thread was just reading/writing
+          lastBlock.set(block);
+
+
+          int blockOffset = r.nextInt(blockSize);
+          long globalOffset = block * blockSize + blockOffset;
+          int len = r.nextInt(blockSize - blockOffset) + 1;  // TODO: bias toward smaller reads?
+
+          blockCacheKey.setBlock(block);
+
+          if (blockCache.fetch(blockCacheKey, buffer, blockOffset, 0, len)) {
+            hitsInCache.incrementAndGet();
+            // validate returned bytes
+            for (int i = 0; i < len; i++) {
+              long globalPos = globalOffset + i;
+              if (buffer[i] != getByte(globalPos)) {
+                System.out.println("ERROR: read was " + "block=" + block + " blockOffset=" + blockOffset + " len=" + len + " globalPos=" + globalPos + " localReadOffset=" + i + " got=" + buffer[i] + " expected=" + getByte(globalPos));
+                failed.set(true);
+              }
+            }
+          } else {
+            missesInCache.incrementAndGet();
+
+            // OK, we should "get" the data and then cache the block
+            for (int i = 0; i < blockSize; i++) {
+              buffer[i] = getByte(block * blockSize + i);
+            }
+            boolean cached = blockCache.store(blockCacheKey, 0, buffer, 0, blockSize);
+            if (!cached) {
+              storeFails.incrementAndGet();
+            }
+          }
+
+        }
+
+      };
+    }
+
+
+    for (Thread thread : threads) {
+      thread.start();
+    }
+
+    for (Thread thread : threads) {
+      thread.join();
+    }
+
+    System.out.println("# of Elements = " + blockCache.getSize());
+    System.out.println("Cache Hits = " + hitsInCache.get());
+    System.out.println("Cache Misses = " + missesInCache.get());
+    System.out.println("Cache Store Fails = " + storeFails.get());
+
+    assertFalse( failed.get() );
+  }
+
 }