You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by yo...@apache.org on 2017/02/15 14:57:04 UTC
lucene-solr:master: SOLR-10121: fix race conditions in
BlockCache.fetch and BlockCache.store
Repository: lucene-solr
Updated Branches:
refs/heads/master b6f49dc1f -> b71a667d7
SOLR-10121: fix race conditions in BlockCache.fetch and BlockCache.store
Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/b71a667d
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/b71a667d
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/b71a667d
Branch: refs/heads/master
Commit: b71a667d74dfabeaad9584372bded80b0c609add
Parents: b6f49dc
Author: yonik <yo...@apache.org>
Authored: Wed Feb 15 09:56:50 2017 -0500
Committer: yonik <yo...@apache.org>
Committed: Wed Feb 15 09:56:50 2017 -0500
----------------------------------------------------------------------
solr/CHANGES.txt | 4 ++
.../solr/store/blockcache/BlockCache.java | 39 +++++++-----
.../solr/store/blockcache/BlockCacheTest.java | 63 +++++++-------------
3 files changed, 48 insertions(+), 58 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/b71a667d/solr/CHANGES.txt
----------------------------------------------------------------------
diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index 2ea65fd..518c52c 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -159,6 +159,10 @@ Bug Fixes
* SOLR-10104: BlockDirectoryCache release hooks do not work with multiple directories. (Mike Drob, Mark Miller)
+* SOLR-10121: Fix race conditions in HDFS BlockCache that can contribute to corruption in high
+ concurrency situations. (yonik)
+
+
Optimizations
----------------------
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/b71a667d/solr/core/src/java/org/apache/solr/store/blockcache/BlockCache.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/store/blockcache/BlockCache.java b/solr/core/src/java/org/apache/solr/store/blockcache/BlockCache.java
index f00ca1d..20d2721 100644
--- a/solr/core/src/java/org/apache/solr/store/blockcache/BlockCache.java
+++ b/solr/core/src/java/org/apache/solr/store/blockcache/BlockCache.java
@@ -94,7 +94,10 @@ public class BlockCache {
}
int bankId = location.getBankId();
int block = location.getBlock();
+
+ // mark the block removed before we release the lock to allow it to be reused
location.setRemoved(true);
+
locks[bankId].clear(block);
lockCounters[bankId].decrementAndGet();
for (OnRelease onRelease : onReleases) {
@@ -105,7 +108,8 @@ public class BlockCache {
}
/**
- * This is only best-effort... it's possible for false to be returned.
+ * This is only best-effort... it's possible for false to be returned, meaning the block was not able to be cached.
+ * NOTE: blocks may not currently be updated (false will be returned if the block is already cached)
* The blockCacheKey is cloned before it is inserted into the map, so it may be reused by clients if desired.
*
* @param blockCacheKey the key for the block
@@ -123,9 +127,7 @@ public class BlockCache {
+ blockOffset + "]");
}
BlockCacheLocation location = cache.getIfPresent(blockCacheKey);
- boolean newLocation = false;
if (location == null) {
- newLocation = true;
location = new BlockCacheLocation();
if (!findEmptyLocation(location)) {
// YCS: it looks like when the cache is full (a normal scenario), then two concurrent writes will result in one of them failing
@@ -133,11 +135,12 @@ public class BlockCache {
// TODO: simplest fix would be to leave more than one block empty
return false;
}
- }
-
- // YCS: I think this means that the block existed, but it is in the process of being
- // concurrently removed. This flag is set in the releaseLocation eviction listener.
- if (location.isRemoved()) {
+ } else {
+ // If we allocated a new block, then it has never been published and is thus never in danger of being concurrently removed.
+ // On the other hand, if this is an existing block we are updating, it may concurrently be removed and reused for another
+ // purpose (and then our write may overwrite that). This can happen even if clients never try to update existing blocks,
+ // since two clients can try to cache the same block concurrently. Because of this, the ability to update an existing
+ // block has been removed for the time being (see SOLR-10121).
return false;
}
@@ -146,10 +149,10 @@ public class BlockCache {
ByteBuffer bank = getBank(bankId);
bank.position(bankOffset + blockOffset);
bank.put(data, offset, length);
- if (newLocation) {
- cache.put(blockCacheKey.clone(), location);
- metrics.blockCacheSize.incrementAndGet();
- }
+
+ // make sure all modifications to the block have been completed before we publish it.
+ cache.put(blockCacheKey.clone(), location);
+ metrics.blockCacheSize.incrementAndGet();
return true;
}
@@ -167,16 +170,20 @@ public class BlockCache {
if (location == null) {
return false;
}
- if (location.isRemoved()) {
- // location is in the process of being removed and the block may have already been reused by this point.
- return false;
- }
+
int bankId = location.getBankId();
int bankOffset = location.getBlock() * blockSize;
location.touch();
ByteBuffer bank = getBank(bankId);
bank.position(bankOffset + blockOffset);
bank.get(buffer, off, length);
+
+ if (location.isRemoved()) {
+ // must check *after* the read is done since the bank may have been reused for another block
+ // before or during the read.
+ return false;
+ }
+
return true;
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/b71a667d/solr/core/src/test/org/apache/solr/store/blockcache/BlockCacheTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/store/blockcache/BlockCacheTest.java b/solr/core/src/test/org/apache/solr/store/blockcache/BlockCacheTest.java
index 795518d..0389f43 100644
--- a/solr/core/src/test/org/apache/solr/store/blockcache/BlockCacheTest.java
+++ b/solr/core/src/test/org/apache/solr/store/blockcache/BlockCacheTest.java
@@ -25,6 +25,7 @@ import org.apache.lucene.util.LuceneTestCase;
import org.junit.Test;
public class BlockCacheTest extends LuceneTestCase {
+
@Test
public void testBlockCache() {
int blocksInTest = 2000000;
@@ -60,8 +61,9 @@ public class BlockCacheTest extends LuceneTestCase {
byte[] testData = testData(random, blockSize, newData);
long t1 = System.nanoTime();
- blockCache.store(blockCacheKey, 0, testData, 0, blockSize);
+ boolean success = blockCache.store(blockCacheKey, 0, testData, 0, blockSize);
storeTime += (System.nanoTime() - t1);
+ if (!success) continue; // for now, updating existing blocks is not supported... see SOLR-10121
long t3 = System.nanoTime();
if (blockCache.fetch(blockCacheKey, buffer)) {
@@ -76,33 +78,6 @@ public class BlockCacheTest extends LuceneTestCase {
System.out.println("# of Elements = " + blockCache.getSize());
}
- /**
- * Verify checking of buffer size limits against the cached block size.
- */
- @Test
- public void testLongBuffer() {
- Random random = random();
- int blockSize = BlockCache._32K;
- int slabSize = blockSize * 1024;
- long totalMemory = 2 * slabSize;
-
- BlockCache blockCache = new BlockCache(new Metrics(), true, totalMemory, slabSize);
- BlockCacheKey blockCacheKey = new BlockCacheKey();
- blockCacheKey.setBlock(0);
- blockCacheKey.setFile(0);
- blockCacheKey.setPath("/");
- byte[] newData = new byte[blockSize*3];
- byte[] testData = testData(random, blockSize, newData);
-
- assertTrue(blockCache.store(blockCacheKey, 0, testData, 0, blockSize));
- assertTrue(blockCache.store(blockCacheKey, 0, testData, blockSize, blockSize));
- assertTrue(blockCache.store(blockCacheKey, 0, testData, blockSize*2, blockSize));
-
- assertTrue(blockCache.store(blockCacheKey, 1, testData, 0, blockSize - 1));
- assertTrue(blockCache.store(blockCacheKey, 1, testData, blockSize, blockSize - 1));
- assertTrue(blockCache.store(blockCacheKey, 1, testData, blockSize*2, blockSize - 1));
- }
-
private static byte[] testData(Random random, int size, byte[] buf) {
random.nextBytes(buf);
return buf;
@@ -123,22 +98,23 @@ public class BlockCacheTest extends LuceneTestCase {
public void testBlockCacheConcurrent() throws Exception {
Random rnd = random();
+ final int blocksInTest = 400; // pick something bigger than 256, since that would lead to a slab size of 64 blocks and the bitset locks would consist of a single word.
+ final int blockSize = 64;
+ final int slabSize = blocksInTest * blockSize / 4;
+ final long totalMemory = 2 * slabSize; // 2 slabs of memory, so only half of what is needed for all blocks
+
/***
- final int blocksInTest = 256;
- final int blockSize = 1024;
- final int slabSize = blockSize * 128;
- final long totalMemory = 2 * slabSize;
- ***/
-
- final int blocksInTest = 16384; // pick something that won't fit in memory, but is small enough to cause a medium hit rate. 16MB of blocks is double the total memory size of the cache.
- final int blockSize = 1024;
- final int slabSize = blockSize * 4096;
- final long totalMemory = 2 * slabSize; // should give us 2 slabs (8MB)
-
- final int nThreads=2;
+ final int blocksInTest = 16384; // pick something bigger than 256, since that would lead to a slab size of 64 blocks and the bitset locks would consist of a single word.
+ final int blockSize = 1024;
+ final int slabSize = blocksInTest * blockSize / 4;
+ final long totalMemory = 2 * slabSize; // 2 slabs of memory, so only half of what is needed for all blocks
+ ***/
+
+ final int nThreads=64;
final int nReads=1000000;
final int readsPerThread=nReads/nThreads;
final int readLastBlockOdds=10; // odds (1 in N) of the next block operation being on the same block as the previous operation... helps flush concurrency issues
+ final int showErrors=50; // show first 50 validation failures
final BlockCache blockCache = new BlockCache(new Metrics(), true, totalMemory, slabSize, blockSize);
@@ -147,6 +123,7 @@ public class BlockCacheTest extends LuceneTestCase {
final AtomicLong missesInCache = new AtomicLong();
final AtomicLong storeFails = new AtomicLong();
final AtomicLong lastBlock = new AtomicLong();
+ final AtomicLong validateFails = new AtomicLong(0);
final int file = 0;
@@ -158,7 +135,7 @@ public class BlockCacheTest extends LuceneTestCase {
threads[i] = new Thread() {
Random r;
- BlockCacheKey blockCacheKey = new BlockCacheKey();
+ BlockCacheKey blockCacheKey;
byte[] buffer = new byte[blockSize];
@Override
@@ -201,8 +178,9 @@ public class BlockCacheTest extends LuceneTestCase {
for (int i = 0; i < len; i++) {
long globalPos = globalOffset + i;
if (buffer[i] != getByte(globalPos)) {
- System.out.println("ERROR: read was " + "block=" + block + " blockOffset=" + blockOffset + " len=" + len + " globalPos=" + globalPos + " localReadOffset=" + i + " got=" + buffer[i] + " expected=" + getByte(globalPos));
failed.set(true);
+ if (validateFails.incrementAndGet() <= showErrors) System.out.println("ERROR: read was " + "block=" + block + " blockOffset=" + blockOffset + " len=" + len + " globalPos=" + globalPos + " localReadOffset=" + i + " got=" + buffer[i] + " expected=" + getByte(globalPos));
+ break;
}
}
} else {
@@ -236,6 +214,7 @@ public class BlockCacheTest extends LuceneTestCase {
System.out.println("Cache Hits = " + hitsInCache.get());
System.out.println("Cache Misses = " + missesInCache.get());
System.out.println("Cache Store Fails = " + storeFails.get());
+ System.out.println("Blocks with Errors = " + validateFails.get());
assertFalse( failed.get() );
}