You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by yo...@apache.org on 2017/02/15 15:19:17 UTC

lucene-solr:branch_6x: SOLR-10121: fix race conditions in BlockCache.fetch and BlockCache.store

Repository: lucene-solr
Updated Branches:
  refs/heads/branch_6x 835c96ba9 -> 65e2d2add


SOLR-10121: fix race conditions in BlockCache.fetch and BlockCache.store


Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/65e2d2ad
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/65e2d2ad
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/65e2d2ad

Branch: refs/heads/branch_6x
Commit: 65e2d2add68a557b1e628039c328f9346df282f9
Parents: 835c96b
Author: yonik <yo...@apache.org>
Authored: Wed Feb 15 09:56:50 2017 -0500
Committer: yonik <yo...@apache.org>
Committed: Wed Feb 15 10:19:11 2017 -0500

----------------------------------------------------------------------
 solr/CHANGES.txt                                |  4 ++
 .../solr/store/blockcache/BlockCache.java       | 39 +++++++-----
 .../solr/store/blockcache/BlockCacheTest.java   | 63 +++++++-------------
 3 files changed, 48 insertions(+), 58 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/65e2d2ad/solr/CHANGES.txt
----------------------------------------------------------------------
diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index 3730574..a12ca4c 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -109,6 +109,10 @@ Bug Fixes
 
 * SOLR-10104: BlockDirectoryCache release hooks do not work with multiple directories. (Mike Drob, Mark Miller)
 
+* SOLR-10121: Fix race conditions in HDFS BlockCache that can contribute to corruption in high
+  concurrency situations. (yonik)
+
+
 Other Changes
 ----------------------
 * SOLR-9980: Expose configVersion in core admin status (Jessica Cheng Mallet via Tom�s Fern�ndez L�bbe)

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/65e2d2ad/solr/core/src/java/org/apache/solr/store/blockcache/BlockCache.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/store/blockcache/BlockCache.java b/solr/core/src/java/org/apache/solr/store/blockcache/BlockCache.java
index f00ca1d..20d2721 100644
--- a/solr/core/src/java/org/apache/solr/store/blockcache/BlockCache.java
+++ b/solr/core/src/java/org/apache/solr/store/blockcache/BlockCache.java
@@ -94,7 +94,10 @@ public class BlockCache {
     }
     int bankId = location.getBankId();
     int block = location.getBlock();
+
+    // mark the block removed before we release the lock to allow it to be reused
     location.setRemoved(true);
+
     locks[bankId].clear(block);
     lockCounters[bankId].decrementAndGet();
     for (OnRelease onRelease : onReleases) {
@@ -105,7 +108,8 @@ public class BlockCache {
   }
 
   /**
-   * This is only best-effort... it's possible for false to be returned.
+   * This is only best-effort... it's possible for false to be returned, meaning the block was not able to be cached.
+   * NOTE: blocks may not currently be updated (false will be returned if the block is already cached)
    * The blockCacheKey is cloned before it is inserted into the map, so it may be reused by clients if desired.
    *
    * @param blockCacheKey the key for the block
@@ -123,9 +127,7 @@ public class BlockCache {
           + blockOffset + "]");
     }
     BlockCacheLocation location = cache.getIfPresent(blockCacheKey);
-    boolean newLocation = false;
     if (location == null) {
-      newLocation = true;
       location = new BlockCacheLocation();
       if (!findEmptyLocation(location)) {
         // YCS: it looks like when the cache is full (a normal scenario), then two concurrent writes will result in one of them failing
@@ -133,11 +135,12 @@ public class BlockCache {
         // TODO: simplest fix would be to leave more than one block empty
         return false;
       }
-    }
-
-    // YCS: I think this means that the block existed, but it is in the process of being
-    // concurrently removed.  This flag is set in the releaseLocation eviction listener.
-    if (location.isRemoved()) {
+    } else {
+      // If we allocated a new block, then it has never been published and is thus never in danger of being concurrently removed.
+      // On the other hand, if this is an existing block we are updating, it may concurrently be removed and reused for another
+      // purpose (and then our write may overwrite that).  This can happen even if clients never try to update existing blocks,
+      // since two clients can try to cache the same block concurrently.  Because of this, the ability to update an existing
+      // block has been removed for the time being (see SOLR-10121).
       return false;
     }
 
@@ -146,10 +149,10 @@ public class BlockCache {
     ByteBuffer bank = getBank(bankId);
     bank.position(bankOffset + blockOffset);
     bank.put(data, offset, length);
-    if (newLocation) {
-      cache.put(blockCacheKey.clone(), location);
-      metrics.blockCacheSize.incrementAndGet();
-    }
+
+    // make sure all modifications to the block have been completed before we publish it.
+    cache.put(blockCacheKey.clone(), location);
+    metrics.blockCacheSize.incrementAndGet();
     return true;
   }
 
@@ -167,16 +170,20 @@ public class BlockCache {
     if (location == null) {
       return false;
     }
-    if (location.isRemoved()) {
-      // location is in the process of being removed and the block may have already been reused by this point.
-      return false;
-    }
+
     int bankId = location.getBankId();
     int bankOffset = location.getBlock() * blockSize;
     location.touch();
     ByteBuffer bank = getBank(bankId);
     bank.position(bankOffset + blockOffset);
     bank.get(buffer, off, length);
+
+    if (location.isRemoved()) {
+      // must check *after* the read is done since the bank may have been reused for another block
+      // before or during the read.
+      return false;
+    }
+
     return true;
   }
   

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/65e2d2ad/solr/core/src/test/org/apache/solr/store/blockcache/BlockCacheTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/store/blockcache/BlockCacheTest.java b/solr/core/src/test/org/apache/solr/store/blockcache/BlockCacheTest.java
index 795518d..0389f43 100644
--- a/solr/core/src/test/org/apache/solr/store/blockcache/BlockCacheTest.java
+++ b/solr/core/src/test/org/apache/solr/store/blockcache/BlockCacheTest.java
@@ -25,6 +25,7 @@ import org.apache.lucene.util.LuceneTestCase;
 import org.junit.Test;
 
 public class BlockCacheTest extends LuceneTestCase {
+
   @Test
   public void testBlockCache() {
     int blocksInTest = 2000000;
@@ -60,8 +61,9 @@ public class BlockCacheTest extends LuceneTestCase {
 
       byte[] testData = testData(random, blockSize, newData);
       long t1 = System.nanoTime();
-      blockCache.store(blockCacheKey, 0, testData, 0, blockSize);
+      boolean success = blockCache.store(blockCacheKey, 0, testData, 0, blockSize);
       storeTime += (System.nanoTime() - t1);
+      if (!success) continue;  // for now, updating existing blocks is not supported... see SOLR-10121
 
       long t3 = System.nanoTime();
       if (blockCache.fetch(blockCacheKey, buffer)) {
@@ -76,33 +78,6 @@ public class BlockCacheTest extends LuceneTestCase {
     System.out.println("# of Elements = " + blockCache.getSize());
   }
 
-  /**
-   * Verify checking of buffer size limits against the cached block size.
-   */
-  @Test
-  public void testLongBuffer() {
-    Random random = random();
-    int blockSize = BlockCache._32K;
-    int slabSize = blockSize * 1024;
-    long totalMemory = 2 * slabSize;
-
-    BlockCache blockCache = new BlockCache(new Metrics(), true, totalMemory, slabSize);
-    BlockCacheKey blockCacheKey = new BlockCacheKey();
-    blockCacheKey.setBlock(0);
-    blockCacheKey.setFile(0);
-    blockCacheKey.setPath("/");
-    byte[] newData = new byte[blockSize*3];
-    byte[] testData = testData(random, blockSize, newData);
-
-    assertTrue(blockCache.store(blockCacheKey, 0, testData, 0, blockSize));
-    assertTrue(blockCache.store(blockCacheKey, 0, testData, blockSize, blockSize));
-    assertTrue(blockCache.store(blockCacheKey, 0, testData, blockSize*2, blockSize));
-
-    assertTrue(blockCache.store(blockCacheKey, 1, testData, 0, blockSize - 1));
-    assertTrue(blockCache.store(blockCacheKey, 1, testData, blockSize, blockSize - 1));
-    assertTrue(blockCache.store(blockCacheKey, 1, testData, blockSize*2, blockSize - 1));
-  }
-
   private static byte[] testData(Random random, int size, byte[] buf) {
     random.nextBytes(buf);
     return buf;
@@ -123,22 +98,23 @@ public class BlockCacheTest extends LuceneTestCase {
   public void testBlockCacheConcurrent() throws Exception {
     Random rnd = random();
 
+    final int blocksInTest = 400;  // pick something bigger than 256, since that would lead to a slab size of 64 blocks and the bitset locks would consist of a single word.
+    final int blockSize = 64;
+    final int slabSize = blocksInTest * blockSize / 4;
+    final long totalMemory = 2 * slabSize;  // 2 slabs of memory, so only half of what is needed for all blocks
+
     /***
-    final int blocksInTest = 256;
-    final int blockSize = 1024;
-    final int slabSize = blockSize * 128;
-    final long totalMemory = 2 * slabSize;
-    ***/
-
-    final int blocksInTest = 16384;  // pick something that won't fit in memory, but is small enough to cause a medium hit rate.  16MB of blocks is double the total memory size of the cache.
-    final int blockSize = 1024;
-    final int slabSize = blockSize * 4096;
-    final long totalMemory = 2 * slabSize;  // should give us 2 slabs (8MB)
-
-    final int nThreads=2;
+     final int blocksInTest = 16384;  // pick something bigger than 256, since that would lead to a slab size of 64 blocks and the bitset locks would consist of a single word.
+     final int blockSize = 1024;
+     final int slabSize = blocksInTest * blockSize / 4;
+     final long totalMemory = 2 * slabSize;  // 2 slabs of memory, so only half of what is needed for all blocks
+     ***/
+
+    final int nThreads=64;
     final int nReads=1000000;
     final int readsPerThread=nReads/nThreads;
     final int readLastBlockOdds=10; // odds (1 in N) of the next block operation being on the same block as the previous operation... helps flush concurrency issues
+    final int showErrors=50; // show first 50 validation failures
 
     final BlockCache blockCache = new BlockCache(new Metrics(), true, totalMemory, slabSize, blockSize);
 
@@ -147,6 +123,7 @@ public class BlockCacheTest extends LuceneTestCase {
     final AtomicLong missesInCache = new AtomicLong();
     final AtomicLong storeFails = new AtomicLong();
     final AtomicLong lastBlock = new AtomicLong();
+    final AtomicLong validateFails = new AtomicLong(0);
 
     final int file = 0;
 
@@ -158,7 +135,7 @@ public class BlockCacheTest extends LuceneTestCase {
 
       threads[i] = new Thread() {
         Random r;
-        BlockCacheKey blockCacheKey = new BlockCacheKey();
+        BlockCacheKey blockCacheKey;
         byte[] buffer = new byte[blockSize];
 
         @Override
@@ -201,8 +178,9 @@ public class BlockCacheTest extends LuceneTestCase {
             for (int i = 0; i < len; i++) {
               long globalPos = globalOffset + i;
               if (buffer[i] != getByte(globalPos)) {
-                System.out.println("ERROR: read was " + "block=" + block + " blockOffset=" + blockOffset + " len=" + len + " globalPos=" + globalPos + " localReadOffset=" + i + " got=" + buffer[i] + " expected=" + getByte(globalPos));
                 failed.set(true);
+                if (validateFails.incrementAndGet() <= showErrors) System.out.println("ERROR: read was " + "block=" + block + " blockOffset=" + blockOffset + " len=" + len + " globalPos=" + globalPos + " localReadOffset=" + i + " got=" + buffer[i] + " expected=" + getByte(globalPos));
+                break;
               }
             }
           } else {
@@ -236,6 +214,7 @@ public class BlockCacheTest extends LuceneTestCase {
     System.out.println("Cache Hits = " + hitsInCache.get());
     System.out.println("Cache Misses = " + missesInCache.get());
     System.out.println("Cache Store Fails = " + storeFails.get());
+    System.out.println("Blocks with Errors = " + validateFails.get());
 
     assertFalse( failed.get() );
   }