You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by yc...@apache.org on 2021/04/06 16:56:09 UTC
[cassandra] branch trunk updated: Fix race between secondary index
building and active compactions tracking
This is an automated email from the ASF dual-hosted git repository.
ycai pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push:
new a1285ac Fix race between secondary index building and active compactions tracking
a1285ac is described below
commit a1285ac92ded45ab6e9f6c7c98917daf14a4a320
Author: Yifan Cai <yc...@apache.org>
AuthorDate: Mon Apr 5 10:20:52 2021 -0700
Fix race between secondary index building and active compactions tracking
patch by Yifan Cai; reviewed by Andres de la Peña, Jon Meredith for CASSANDRA-16554
---
CHANGES.txt | 1 +
.../apache/cassandra/db/compaction/Scrubber.java | 28 ++++++++--
.../apache/cassandra/db/compaction/Verifier.java | 29 +++++++++--
.../apache/cassandra/io/sstable/KeyIterator.java | 45 +++++++++++++++--
.../cassandra/io/sstable/ReducingKeyIterator.java | 46 +++++++++--------
.../cassandra/io/util/RandomAccessReader.java | 3 ++
.../db/compaction/ActiveCompactionsTest.java | 59 ++++++++++++++++++++--
7 files changed, 173 insertions(+), 38 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index 9fec37d..d5b7134 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
4.0-rc1
+ * Fix race between secondary index building and active compactions tracking (CASSANDRA-16554)
* Migrate dependency handling from maven-ant-tasks to resolver-ant-tasks, removing lib/ directory from version control (CASSANDRA-16391)
* Fix 4.0 node sending a repair prepare message to a 3.x node breaking the connection (CASSANDRA-16542)
* Removed synchronized modifier from StreamSession#onChannelClose to prevent deadlocking on flush (CASSANDRA-15892)
diff --git a/src/java/org/apache/cassandra/db/compaction/Scrubber.java b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
index 681a833..afbfe3d 100644
--- a/src/java/org/apache/cassandra/db/compaction/Scrubber.java
+++ b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
@@ -20,6 +20,9 @@ package org.apache.cassandra.db.compaction;
import java.nio.ByteBuffer;
import java.io.*;
import java.util.*;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Throwables;
@@ -55,6 +58,7 @@ public class Scrubber implements Closeable
private final boolean checkData;
private final long expectedBloomFilterSize;
+ private final ReadWriteLock fileAccessLock;
private final RandomAccessReader dataFile;
private final RandomAccessReader indexFile;
private final ScrubInfo scrubInfo;
@@ -128,6 +132,7 @@ public class Scrubber implements Closeable
cfs.metadata().params.minIndexInterval,
hasIndexFile ? SSTableReader.getApproximateKeyCount(toScrub) : 0);
+ this.fileAccessLock = new ReentrantReadWriteLock();
// loop through each row, deserializing to check for damage.
// we'll also loop through the index at the same time, using the position from the index to recover if the
// row header (key or data size) is corrupt. (This means our position in the index file will be one row
@@ -140,7 +145,7 @@ public class Scrubber implements Closeable
? RandomAccessReader.open(new File(sstable.descriptor.filenameFor(Component.PRIMARY_INDEX)))
: null;
- this.scrubInfo = new ScrubInfo(dataFile, sstable);
+ this.scrubInfo = new ScrubInfo(dataFile, sstable, fileAccessLock.readLock());
this.currentRowPositionFromIndex = 0;
this.nextRowPositionFromIndex = 0;
@@ -446,8 +451,16 @@ public class Scrubber implements Closeable
public void close()
{
- FileUtils.closeQuietly(dataFile);
- FileUtils.closeQuietly(indexFile);
+ fileAccessLock.writeLock().lock();
+ try
+ {
+ FileUtils.closeQuietly(dataFile);
+ FileUtils.closeQuietly(indexFile);
+ }
+ finally
+ {
+ fileAccessLock.writeLock().unlock();
+ }
}
public CompactionInfo.Holder getScrubInfo()
@@ -460,16 +473,19 @@ public class Scrubber implements Closeable
private final RandomAccessReader dataFile;
private final SSTableReader sstable;
private final UUID scrubCompactionId;
+ private final Lock fileReadLock;
- public ScrubInfo(RandomAccessReader dataFile, SSTableReader sstable)
+ public ScrubInfo(RandomAccessReader dataFile, SSTableReader sstable, Lock fileReadLock)
{
this.dataFile = dataFile;
this.sstable = sstable;
+ this.fileReadLock = fileReadLock;
scrubCompactionId = UUIDGen.getTimeUUID();
}
public CompactionInfo getCompactionInfo()
{
+ fileReadLock.lock();
try
{
return new CompactionInfo(sstable.metadata(),
@@ -483,6 +499,10 @@ public class Scrubber implements Closeable
{
throw new RuntimeException(e);
}
+ finally
+ {
+ fileReadLock.unlock();
+ }
}
public boolean isGlobal()
diff --git a/src/java/org/apache/cassandra/db/compaction/Verifier.java b/src/java/org/apache/cassandra/db/compaction/Verifier.java
index 577f136..5a04235 100644
--- a/src/java/org/apache/cassandra/db/compaction/Verifier.java
+++ b/src/java/org/apache/cassandra/db/compaction/Verifier.java
@@ -59,6 +59,9 @@ import java.nio.ByteBuffer;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.*;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Function;
import java.util.function.LongPredicate;
@@ -69,7 +72,7 @@ public class Verifier implements Closeable
private final CompactionController controller;
-
+ private final ReadWriteLock fileAccessLock;
private final RandomAccessReader dataFile;
private final RandomAccessReader indexFile;
private final VerifyInfo verifyInfo;
@@ -102,11 +105,12 @@ public class Verifier implements Closeable
this.controller = new VerifyController(cfs);
+ this.fileAccessLock = new ReentrantReadWriteLock();
this.dataFile = isOffline
? sstable.openDataReader()
: sstable.openDataReader(CompactionManager.instance.getRateLimiter());
this.indexFile = RandomAccessReader.open(new File(sstable.descriptor.filenameFor(Component.PRIMARY_INDEX)));
- this.verifyInfo = new VerifyInfo(dataFile, sstable);
+ this.verifyInfo = new VerifyInfo(dataFile, sstable, fileAccessLock.readLock());
this.options = options;
this.isOffline = isOffline;
this.tokenLookup = options.tokenLookup;
@@ -445,8 +449,16 @@ public class Verifier implements Closeable
public void close()
{
- FileUtils.closeQuietly(dataFile);
- FileUtils.closeQuietly(indexFile);
+ fileAccessLock.writeLock().lock();
+ try
+ {
+ FileUtils.closeQuietly(dataFile);
+ FileUtils.closeQuietly(indexFile);
+ }
+ finally
+ {
+ fileAccessLock.writeLock().unlock();
+ }
}
private void throwIfFatal(Throwable th)
@@ -491,16 +503,19 @@ public class Verifier implements Closeable
private final RandomAccessReader dataFile;
private final SSTableReader sstable;
private final UUID verificationCompactionId;
+ private final Lock fileReadLock;
- public VerifyInfo(RandomAccessReader dataFile, SSTableReader sstable)
+ public VerifyInfo(RandomAccessReader dataFile, SSTableReader sstable, Lock fileReadLock)
{
this.dataFile = dataFile;
this.sstable = sstable;
+ this.fileReadLock = fileReadLock;
verificationCompactionId = UUIDGen.getTimeUUID();
}
public CompactionInfo getCompactionInfo()
{
+ fileReadLock.lock();
try
{
return new CompactionInfo(sstable.metadata(),
@@ -514,6 +529,10 @@ public class Verifier implements Closeable
{
throw new RuntimeException();
}
+ finally
+ {
+ fileReadLock.unlock();
+ }
}
public boolean isGlobal()
diff --git a/src/java/org/apache/cassandra/io/sstable/KeyIterator.java b/src/java/org/apache/cassandra/io/sstable/KeyIterator.java
index 091e969..1a5792c 100644
--- a/src/java/org/apache/cassandra/io/sstable/KeyIterator.java
+++ b/src/java/org/apache/cassandra/io/sstable/KeyIterator.java
@@ -19,6 +19,8 @@ package org.apache.cassandra.io.sstable;
import java.io.File;
import java.io.IOException;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.RowIndexEntry;
@@ -35,7 +37,7 @@ public class KeyIterator extends AbstractIterator<DecoratedKey> implements Close
private final static class In
{
private final File path;
- private RandomAccessReader in;
+ private volatile RandomAccessReader in;
public In(File path)
{
@@ -44,8 +46,16 @@ public class KeyIterator extends AbstractIterator<DecoratedKey> implements Close
private void maybeInit()
{
- if (in == null)
- in = RandomAccessReader.open(path);
+ if (in != null)
+ return;
+
+ synchronized (this)
+ {
+ if (in == null)
+ {
+ in = RandomAccessReader.open(path);
+ }
+ }
}
public DataInputPlus get()
@@ -82,6 +92,7 @@ public class KeyIterator extends AbstractIterator<DecoratedKey> implements Close
private final Descriptor desc;
private final In in;
private final IPartitioner partitioner;
+ private final ReadWriteLock fileAccessLock;
private long keyPosition;
@@ -90,10 +101,12 @@ public class KeyIterator extends AbstractIterator<DecoratedKey> implements Close
this.desc = desc;
in = new In(new File(desc.filenameFor(Component.PRIMARY_INDEX)));
partitioner = metadata.partitioner;
+ fileAccessLock = new ReentrantReadWriteLock();
}
protected DecoratedKey computeNext()
{
+ fileAccessLock.readLock().lock();
try
{
if (in.isEOF())
@@ -108,20 +121,42 @@ public class KeyIterator extends AbstractIterator<DecoratedKey> implements Close
{
throw new RuntimeException(e);
}
+ finally
+ {
+ fileAccessLock.readLock().unlock();
+ }
}
public void close()
{
- in.close();
+ fileAccessLock.writeLock().lock();
+ try
+ {
+ in.close();
+ }
+ finally
+ {
+ fileAccessLock.writeLock().unlock();
+ }
}
public long getBytesRead()
{
- return in.getFilePointer();
+ fileAccessLock.readLock().lock();
+ try
+ {
+ return in.getFilePointer();
+ }
+ finally
+ {
+ fileAccessLock.readLock().unlock();
+ }
}
public long getTotalBytes()
{
+ // length is final in the referenced object.
+ // no need to acquire the lock
return in.length();
}
diff --git a/src/java/org/apache/cassandra/io/sstable/ReducingKeyIterator.java b/src/java/org/apache/cassandra/io/sstable/ReducingKeyIterator.java
index e64d95d..826b91d 100644
--- a/src/java/org/apache/cassandra/io/sstable/ReducingKeyIterator.java
+++ b/src/java/org/apache/cassandra/io/sstable/ReducingKeyIterator.java
@@ -33,7 +33,7 @@ import org.apache.cassandra.utils.MergeIterator;
public class ReducingKeyIterator implements CloseableIterator<DecoratedKey>
{
private final ArrayList<KeyIterator> iters;
- private IMergeIterator<DecoratedKey,DecoratedKey> mi;
+ private volatile IMergeIterator<DecoratedKey, DecoratedKey> mi;
public ReducingKeyIterator(Collection<SSTableReader> sstables)
{
@@ -44,28 +44,34 @@ public class ReducingKeyIterator implements CloseableIterator<DecoratedKey>
private void maybeInit()
{
- if (mi == null)
+ if (mi != null)
+ return;
+
+ synchronized (this)
{
- mi = MergeIterator.get(iters, DecoratedKey.comparator, new MergeIterator.Reducer<DecoratedKey,DecoratedKey>()
+ if (mi == null)
{
- DecoratedKey reduced = null;
-
- @Override
- public boolean trivialReduceIsTrivial()
- {
- return true;
- }
-
- public void reduce(int idx, DecoratedKey current)
- {
- reduced = current;
- }
-
- protected DecoratedKey getReduced()
+ mi = MergeIterator.get(iters, DecoratedKey.comparator, new MergeIterator.Reducer<DecoratedKey, DecoratedKey>()
{
- return reduced;
- }
- });
+ DecoratedKey reduced = null;
+
+ @Override
+ public boolean trivialReduceIsTrivial()
+ {
+ return true;
+ }
+
+ public void reduce(int idx, DecoratedKey current)
+ {
+ reduced = current;
+ }
+
+ protected DecoratedKey getReduced()
+ {
+ return reduced;
+ }
+ });
+ }
}
}
diff --git a/src/java/org/apache/cassandra/io/util/RandomAccessReader.java b/src/java/org/apache/cassandra/io/util/RandomAccessReader.java
index a0ea520..4a164a7 100644
--- a/src/java/org/apache/cassandra/io/util/RandomAccessReader.java
+++ b/src/java/org/apache/cassandra/io/util/RandomAccessReader.java
@@ -21,11 +21,14 @@ import java.io.File;
import java.io.IOException;
import java.nio.ByteOrder;
+import javax.annotation.concurrent.NotThreadSafe;
+
import com.google.common.primitives.Ints;
import org.apache.cassandra.io.compress.BufferType;
import org.apache.cassandra.io.util.Rebufferer.BufferHolder;
+@NotThreadSafe
public class RandomAccessReader extends RebufferingInputStream implements FileDataInput
{
// The default buffer size when the client doesn't specify it
diff --git a/test/unit/org/apache/cassandra/db/compaction/ActiveCompactionsTest.java b/test/unit/org/apache/cassandra/db/compaction/ActiveCompactionsTest.java
index be5e7df..08c76bf 100644
--- a/test/unit/org/apache/cassandra/db/compaction/ActiveCompactionsTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/ActiveCompactionsTest.java
@@ -18,15 +18,21 @@
package org.apache.cassandra.db.compaction;
-import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.Uninterruptibles;
import org.junit.Test;
import org.apache.cassandra.cache.AutoSavingCache;
@@ -43,6 +49,7 @@ import org.apache.cassandra.io.sstable.IndexSummaryRedistribution;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.schema.TableId;
import org.apache.cassandra.service.CacheService;
+import org.apache.cassandra.utils.FBUtilities;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@@ -52,6 +59,50 @@ import static org.junit.Assert.assertTrue;
public class ActiveCompactionsTest extends CQLTester
{
@Test
+ public void testActiveCompactionTrackingRaceWithIndexBuilder() throws Throwable
+ {
+ createTable("CREATE TABLE %s (pk int, ck int, a int, b int, PRIMARY KEY (pk, ck))");
+ String idxName = createIndex("CREATE INDEX on %s(a)");
+ getCurrentColumnFamilyStore().disableAutoCompaction();
+ for (int i = 0; i < 5; i++)
+ {
+ execute("INSERT INTO %s (pk, ck, a, b) VALUES (" + i + ", 2, 3, 4)");
+ getCurrentColumnFamilyStore().forceBlockingFlush();
+ }
+
+ Index idx = getCurrentColumnFamilyStore().indexManager.getIndexByName(idxName);
+ Set<SSTableReader> sstables = getCurrentColumnFamilyStore().getLiveSSTables();
+
+ ExecutorService es = Executors.newFixedThreadPool(2);
+
+ final int loopCount = 5000;
+ for (int ii = 0; ii < loopCount; ii++)
+ {
+ CountDownLatch trigger = new CountDownLatch(1);
+ SecondaryIndexBuilder builder = idx.getBuildTaskSupport().getIndexBuildTask(getCurrentColumnFamilyStore(), Collections.singleton(idx), sstables);
+ Future<?> f1 = es.submit(() -> {
+ Uninterruptibles.awaitUninterruptibly(trigger);
+ try
+ {
+ CompactionManager.instance.submitIndexBuild(builder).get();
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException(e);
+ }
+ });
+ Future<?> f2 = es.submit(() -> {
+ Uninterruptibles.awaitUninterruptibly(trigger);
+ CompactionManager.instance.active.getCompactionsForSSTable(null, null);
+ });
+ trigger.countDown();
+ FBUtilities.waitOnFutures(Arrays.asList(f1, f2));
+ }
+ es.shutdown();
+ es.awaitTermination(1, TimeUnit.MINUTES);
+ }
+
+ @Test
public void testSecondaryIndexTracking() throws Throwable
{
createTable("CREATE TABLE %s (pk int, ck int, a int, b int, PRIMARY KEY (pk, ck))");
@@ -59,7 +110,7 @@ public class ActiveCompactionsTest extends CQLTester
getCurrentColumnFamilyStore().disableAutoCompaction();
for (int i = 0; i < 5; i++)
{
- execute("INSERT INTO %s (pk, ck, a, b) VALUES ("+i+", 2, 3, 4)");
+ execute("INSERT INTO %s (pk, ck, a, b) VALUES (" + i + ", 2, 3, 4)");
getCurrentColumnFamilyStore().forceBlockingFlush();
}
@@ -82,7 +133,7 @@ public class ActiveCompactionsTest extends CQLTester
getCurrentColumnFamilyStore().disableAutoCompaction();
for (int i = 0; i < 5; i++)
{
- execute("INSERT INTO %s (pk, ck, a, b) VALUES ("+i+", 2, 3, 4)");
+ execute("INSERT INTO %s (pk, ck, a, b) VALUES (" + i + ", 2, 3, 4)");
getCurrentColumnFamilyStore().forceBlockingFlush();
}
Set<SSTableReader> sstables = getCurrentColumnFamilyStore().getLiveSSTables();
@@ -107,7 +158,7 @@ public class ActiveCompactionsTest extends CQLTester
getCurrentColumnFamilyStore().disableAutoCompaction();
for (int i = 0; i < 5; i++)
{
- execute("INSERT INTO %s (k1, c1, val) VALUES ("+i+", 2, 3)");
+ execute("INSERT INTO %s (k1, c1, val) VALUES (" + i + ", 2, 3)");
getCurrentColumnFamilyStore().forceBlockingFlush();
}
execute(String.format("CREATE MATERIALIZED VIEW %s.view1 AS SELECT k1, c1, val FROM %s.%s WHERE k1 IS NOT NULL AND c1 IS NOT NULL AND val IS NOT NULL PRIMARY KEY (val, k1, c1)", keyspace(), keyspace(), currentTable()));
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org