You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by yc...@apache.org on 2021/04/06 16:56:09 UTC

[cassandra] branch trunk updated: Fix race between secondary index building and active compactions tracking

This is an automated email from the ASF dual-hosted git repository.

ycai pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new a1285ac  Fix race between secondary index building and active compactions tracking
a1285ac is described below

commit a1285ac92ded45ab6e9f6c7c98917daf14a4a320
Author: Yifan Cai <yc...@apache.org>
AuthorDate: Mon Apr 5 10:20:52 2021 -0700

    Fix race between secondary index building and active compactions tracking
    
    patch by Yifan Cai; reviewed by Andres de la Peña, Jon Meredith for CASSANDRA-16554
---
 CHANGES.txt                                        |  1 +
 .../apache/cassandra/db/compaction/Scrubber.java   | 28 ++++++++--
 .../apache/cassandra/db/compaction/Verifier.java   | 29 +++++++++--
 .../apache/cassandra/io/sstable/KeyIterator.java   | 45 +++++++++++++++--
 .../cassandra/io/sstable/ReducingKeyIterator.java  | 46 +++++++++--------
 .../cassandra/io/util/RandomAccessReader.java      |  3 ++
 .../db/compaction/ActiveCompactionsTest.java       | 59 ++++++++++++++++++++--
 7 files changed, 173 insertions(+), 38 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 9fec37d..d5b7134 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.0-rc1
+ * Fix race between secondary index building and active compactions tracking (CASSANDRA-16554)
  * Migrate dependency handling from maven-ant-tasks to resolver-ant-tasks, removing lib/ directory from version control (CASSANDRA-16391)
  * Fix 4.0 node sending a repair prepare message to a 3.x node breaking the connection (CASSANDRA-16542)
  * Removed synchronized modifier from StreamSession#onChannelClose to prevent deadlocking on flush (CASSANDRA-15892)
diff --git a/src/java/org/apache/cassandra/db/compaction/Scrubber.java b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
index 681a833..afbfe3d 100644
--- a/src/java/org/apache/cassandra/db/compaction/Scrubber.java
+++ b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
@@ -20,6 +20,9 @@ package org.apache.cassandra.db.compaction;
 import java.nio.ByteBuffer;
 import java.io.*;
 import java.util.*;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Throwables;
@@ -55,6 +58,7 @@ public class Scrubber implements Closeable
     private final boolean checkData;
     private final long expectedBloomFilterSize;
 
+    private final ReadWriteLock fileAccessLock;
     private final RandomAccessReader dataFile;
     private final RandomAccessReader indexFile;
     private final ScrubInfo scrubInfo;
@@ -128,6 +132,7 @@ public class Scrubber implements Closeable
             cfs.metadata().params.minIndexInterval,
             hasIndexFile ? SSTableReader.getApproximateKeyCount(toScrub) : 0);
 
+        this.fileAccessLock = new ReentrantReadWriteLock();
         // loop through each row, deserializing to check for damage.
         // we'll also loop through the index at the same time, using the position from the index to recover if the
         // row header (key or data size) is corrupt. (This means our position in the index file will be one row
@@ -140,7 +145,7 @@ public class Scrubber implements Closeable
                 ? RandomAccessReader.open(new File(sstable.descriptor.filenameFor(Component.PRIMARY_INDEX)))
                 : null;
 
-        this.scrubInfo = new ScrubInfo(dataFile, sstable);
+        this.scrubInfo = new ScrubInfo(dataFile, sstable, fileAccessLock.readLock());
 
         this.currentRowPositionFromIndex = 0;
         this.nextRowPositionFromIndex = 0;
@@ -446,8 +451,16 @@ public class Scrubber implements Closeable
 
     public void close()
     {
-        FileUtils.closeQuietly(dataFile);
-        FileUtils.closeQuietly(indexFile);
+        fileAccessLock.writeLock().lock();
+        try
+        {
+            FileUtils.closeQuietly(dataFile);
+            FileUtils.closeQuietly(indexFile);
+        }
+        finally
+        {
+            fileAccessLock.writeLock().unlock();
+        }
     }
 
     public CompactionInfo.Holder getScrubInfo()
@@ -460,16 +473,19 @@ public class Scrubber implements Closeable
         private final RandomAccessReader dataFile;
         private final SSTableReader sstable;
         private final UUID scrubCompactionId;
+        private final Lock fileReadLock;
 
-        public ScrubInfo(RandomAccessReader dataFile, SSTableReader sstable)
+        public ScrubInfo(RandomAccessReader dataFile, SSTableReader sstable, Lock fileReadLock)
         {
             this.dataFile = dataFile;
             this.sstable = sstable;
+            this.fileReadLock = fileReadLock;
             scrubCompactionId = UUIDGen.getTimeUUID();
         }
 
         public CompactionInfo getCompactionInfo()
         {
+            fileReadLock.lock();
             try
             {
                 return new CompactionInfo(sstable.metadata(),
@@ -483,6 +499,10 @@ public class Scrubber implements Closeable
             {
                 throw new RuntimeException(e);
             }
+            finally
+            {
+                fileReadLock.unlock();
+            }
         }
 
         public boolean isGlobal()
diff --git a/src/java/org/apache/cassandra/db/compaction/Verifier.java b/src/java/org/apache/cassandra/db/compaction/Verifier.java
index 577f136..5a04235 100644
--- a/src/java/org/apache/cassandra/db/compaction/Verifier.java
+++ b/src/java/org/apache/cassandra/db/compaction/Verifier.java
@@ -59,6 +59,9 @@ import java.nio.ByteBuffer;
 import java.nio.file.Files;
 import java.nio.file.Paths;
 import java.util.*;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.function.Function;
 import java.util.function.LongPredicate;
 
@@ -69,7 +72,7 @@ public class Verifier implements Closeable
 
     private final CompactionController controller;
 
-
+    private final ReadWriteLock fileAccessLock;
     private final RandomAccessReader dataFile;
     private final RandomAccessReader indexFile;
     private final VerifyInfo verifyInfo;
@@ -102,11 +105,12 @@ public class Verifier implements Closeable
 
         this.controller = new VerifyController(cfs);
 
+        this.fileAccessLock = new ReentrantReadWriteLock();
         this.dataFile = isOffline
                         ? sstable.openDataReader()
                         : sstable.openDataReader(CompactionManager.instance.getRateLimiter());
         this.indexFile = RandomAccessReader.open(new File(sstable.descriptor.filenameFor(Component.PRIMARY_INDEX)));
-        this.verifyInfo = new VerifyInfo(dataFile, sstable);
+        this.verifyInfo = new VerifyInfo(dataFile, sstable, fileAccessLock.readLock());
         this.options = options;
         this.isOffline = isOffline;
         this.tokenLookup = options.tokenLookup;
@@ -445,8 +449,16 @@ public class Verifier implements Closeable
 
     public void close()
     {
-        FileUtils.closeQuietly(dataFile);
-        FileUtils.closeQuietly(indexFile);
+        fileAccessLock.writeLock().lock();
+        try
+        {
+            FileUtils.closeQuietly(dataFile);
+            FileUtils.closeQuietly(indexFile);
+        }
+        finally
+        {
+            fileAccessLock.writeLock().unlock();
+        }
     }
 
     private void throwIfFatal(Throwable th)
@@ -491,16 +503,19 @@ public class Verifier implements Closeable
         private final RandomAccessReader dataFile;
         private final SSTableReader sstable;
         private final UUID verificationCompactionId;
+        private final Lock fileReadLock;
 
-        public VerifyInfo(RandomAccessReader dataFile, SSTableReader sstable)
+        public VerifyInfo(RandomAccessReader dataFile, SSTableReader sstable, Lock fileReadLock)
         {
             this.dataFile = dataFile;
             this.sstable = sstable;
+            this.fileReadLock = fileReadLock;
             verificationCompactionId = UUIDGen.getTimeUUID();
         }
 
         public CompactionInfo getCompactionInfo()
         {
+            fileReadLock.lock();
             try
             {
                 return new CompactionInfo(sstable.metadata(),
@@ -514,6 +529,10 @@ public class Verifier implements Closeable
             {
                 throw new RuntimeException();
             }
+            finally
+            {
+                fileReadLock.unlock();
+            }
         }
 
         public boolean isGlobal()
diff --git a/src/java/org/apache/cassandra/io/sstable/KeyIterator.java b/src/java/org/apache/cassandra/io/sstable/KeyIterator.java
index 091e969..1a5792c 100644
--- a/src/java/org/apache/cassandra/io/sstable/KeyIterator.java
+++ b/src/java/org/apache/cassandra/io/sstable/KeyIterator.java
@@ -19,6 +19,8 @@ package org.apache.cassandra.io.sstable;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.RowIndexEntry;
@@ -35,7 +37,7 @@ public class KeyIterator extends AbstractIterator<DecoratedKey> implements Close
     private final static class In
     {
         private final File path;
-        private RandomAccessReader in;
+        private volatile RandomAccessReader in;
 
         public In(File path)
         {
@@ -44,8 +46,16 @@ public class KeyIterator extends AbstractIterator<DecoratedKey> implements Close
 
         private void maybeInit()
         {
-            if (in == null)
-                in = RandomAccessReader.open(path);
+            if (in != null)
+                return;
+
+            synchronized (this)
+            {
+                if (in == null)
+                {
+                    in = RandomAccessReader.open(path);
+                }
+            }
         }
 
         public DataInputPlus get()
@@ -82,6 +92,7 @@ public class KeyIterator extends AbstractIterator<DecoratedKey> implements Close
     private final Descriptor desc;
     private final In in;
     private final IPartitioner partitioner;
+    private final ReadWriteLock fileAccessLock;
 
     private long keyPosition;
 
@@ -90,10 +101,12 @@ public class KeyIterator extends AbstractIterator<DecoratedKey> implements Close
         this.desc = desc;
         in = new In(new File(desc.filenameFor(Component.PRIMARY_INDEX)));
         partitioner = metadata.partitioner;
+        fileAccessLock = new ReentrantReadWriteLock();
     }
 
     protected DecoratedKey computeNext()
     {
+        fileAccessLock.readLock().lock();
         try
         {
             if (in.isEOF())
@@ -108,20 +121,42 @@ public class KeyIterator extends AbstractIterator<DecoratedKey> implements Close
         {
             throw new RuntimeException(e);
         }
+        finally
+        {
+            fileAccessLock.readLock().unlock();
+        }
     }
 
     public void close()
     {
-        in.close();
+        fileAccessLock.writeLock().lock();
+        try
+        {
+            in.close();
+        }
+        finally
+        {
+            fileAccessLock.writeLock().unlock();
+        }
     }
 
     public long getBytesRead()
     {
-        return in.getFilePointer();
+        fileAccessLock.readLock().lock();
+        try
+        {
+            return in.getFilePointer();
+        }
+        finally
+        {
+            fileAccessLock.readLock().unlock();
+        }
     }
 
     public long getTotalBytes()
     {
+        // length is final in the referenced object.
+        // no need to acquire the lock
         return in.length();
     }
 
diff --git a/src/java/org/apache/cassandra/io/sstable/ReducingKeyIterator.java b/src/java/org/apache/cassandra/io/sstable/ReducingKeyIterator.java
index e64d95d..826b91d 100644
--- a/src/java/org/apache/cassandra/io/sstable/ReducingKeyIterator.java
+++ b/src/java/org/apache/cassandra/io/sstable/ReducingKeyIterator.java
@@ -33,7 +33,7 @@ import org.apache.cassandra.utils.MergeIterator;
 public class ReducingKeyIterator implements CloseableIterator<DecoratedKey>
 {
     private final ArrayList<KeyIterator> iters;
-    private IMergeIterator<DecoratedKey,DecoratedKey> mi;
+    private volatile IMergeIterator<DecoratedKey, DecoratedKey> mi;
 
     public ReducingKeyIterator(Collection<SSTableReader> sstables)
     {
@@ -44,28 +44,34 @@ public class ReducingKeyIterator implements CloseableIterator<DecoratedKey>
 
     private void maybeInit()
     {
-        if (mi == null)
+        if (mi != null)
+            return;
+
+        synchronized (this)
         {
-            mi = MergeIterator.get(iters, DecoratedKey.comparator, new MergeIterator.Reducer<DecoratedKey,DecoratedKey>()
+            if (mi == null)
             {
-                DecoratedKey reduced = null;
-
-                @Override
-                public boolean trivialReduceIsTrivial()
-                {
-                    return true;
-                }
-
-                public void reduce(int idx, DecoratedKey current)
-                {
-                    reduced = current;
-                }
-
-                protected DecoratedKey getReduced()
+                mi = MergeIterator.get(iters, DecoratedKey.comparator, new MergeIterator.Reducer<DecoratedKey, DecoratedKey>()
                 {
-                    return reduced;
-                }
-            });
+                    DecoratedKey reduced = null;
+
+                    @Override
+                    public boolean trivialReduceIsTrivial()
+                    {
+                        return true;
+                    }
+
+                    public void reduce(int idx, DecoratedKey current)
+                    {
+                        reduced = current;
+                    }
+
+                    protected DecoratedKey getReduced()
+                    {
+                        return reduced;
+                    }
+                });
+            }
         }
     }
 
diff --git a/src/java/org/apache/cassandra/io/util/RandomAccessReader.java b/src/java/org/apache/cassandra/io/util/RandomAccessReader.java
index a0ea520..4a164a7 100644
--- a/src/java/org/apache/cassandra/io/util/RandomAccessReader.java
+++ b/src/java/org/apache/cassandra/io/util/RandomAccessReader.java
@@ -21,11 +21,14 @@ import java.io.File;
 import java.io.IOException;
 import java.nio.ByteOrder;
 
+import javax.annotation.concurrent.NotThreadSafe;
+
 import com.google.common.primitives.Ints;
 
 import org.apache.cassandra.io.compress.BufferType;
 import org.apache.cassandra.io.util.Rebufferer.BufferHolder;
 
+@NotThreadSafe
 public class RandomAccessReader extends RebufferingInputStream implements FileDataInput
 {
     // The default buffer size when the client doesn't specify it
diff --git a/test/unit/org/apache/cassandra/db/compaction/ActiveCompactionsTest.java b/test/unit/org/apache/cassandra/db/compaction/ActiveCompactionsTest.java
index be5e7df..08c76bf 100644
--- a/test/unit/org/apache/cassandra/db/compaction/ActiveCompactionsTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/ActiveCompactionsTest.java
@@ -18,15 +18,21 @@
 
 package org.apache.cassandra.db.compaction;
 
-import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
 
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.Uninterruptibles;
 import org.junit.Test;
 
 import org.apache.cassandra.cache.AutoSavingCache;
@@ -43,6 +49,7 @@ import org.apache.cassandra.io.sstable.IndexSummaryRedistribution;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.schema.TableId;
 import org.apache.cassandra.service.CacheService;
+import org.apache.cassandra.utils.FBUtilities;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -52,6 +59,50 @@ import static org.junit.Assert.assertTrue;
 public class ActiveCompactionsTest extends CQLTester
 {
     @Test
+    public void testActiveCompactionTrackingRaceWithIndexBuilder() throws Throwable
+    {
+        createTable("CREATE TABLE %s (pk int, ck int, a int, b int, PRIMARY KEY (pk, ck))");
+        String idxName = createIndex("CREATE INDEX on %s(a)");
+        getCurrentColumnFamilyStore().disableAutoCompaction();
+        for (int i = 0; i < 5; i++)
+        {
+            execute("INSERT INTO %s (pk, ck, a, b) VALUES (" + i + ", 2, 3, 4)");
+            getCurrentColumnFamilyStore().forceBlockingFlush();
+        }
+
+        Index idx = getCurrentColumnFamilyStore().indexManager.getIndexByName(idxName);
+        Set<SSTableReader> sstables = getCurrentColumnFamilyStore().getLiveSSTables();
+
+        ExecutorService es = Executors.newFixedThreadPool(2);
+
+        final int loopCount = 5000;
+        for (int ii = 0; ii < loopCount; ii++)
+        {
+            CountDownLatch trigger = new CountDownLatch(1);
+            SecondaryIndexBuilder builder = idx.getBuildTaskSupport().getIndexBuildTask(getCurrentColumnFamilyStore(), Collections.singleton(idx), sstables);
+            Future<?> f1 = es.submit(() -> {
+                Uninterruptibles.awaitUninterruptibly(trigger);
+                try
+                {
+                    CompactionManager.instance.submitIndexBuild(builder).get();
+                }
+                catch (Exception e)
+                {
+                    throw new RuntimeException(e);
+                }
+            });
+            Future<?> f2 = es.submit(() -> {
+                Uninterruptibles.awaitUninterruptibly(trigger);
+                CompactionManager.instance.active.getCompactionsForSSTable(null, null);
+            });
+            trigger.countDown();
+            FBUtilities.waitOnFutures(Arrays.asList(f1, f2));
+        }
+        es.shutdown();
+        es.awaitTermination(1, TimeUnit.MINUTES);
+    }
+
+    @Test
     public void testSecondaryIndexTracking() throws Throwable
     {
         createTable("CREATE TABLE %s (pk int, ck int, a int, b int, PRIMARY KEY (pk, ck))");
@@ -59,7 +110,7 @@ public class ActiveCompactionsTest extends CQLTester
         getCurrentColumnFamilyStore().disableAutoCompaction();
         for (int i = 0; i < 5; i++)
         {
-            execute("INSERT INTO %s (pk, ck, a, b) VALUES ("+i+", 2, 3, 4)");
+            execute("INSERT INTO %s (pk, ck, a, b) VALUES (" + i + ", 2, 3, 4)");
             getCurrentColumnFamilyStore().forceBlockingFlush();
         }
 
@@ -82,7 +133,7 @@ public class ActiveCompactionsTest extends CQLTester
         getCurrentColumnFamilyStore().disableAutoCompaction();
         for (int i = 0; i < 5; i++)
         {
-            execute("INSERT INTO %s (pk, ck, a, b) VALUES ("+i+", 2, 3, 4)");
+            execute("INSERT INTO %s (pk, ck, a, b) VALUES (" + i + ", 2, 3, 4)");
             getCurrentColumnFamilyStore().forceBlockingFlush();
         }
         Set<SSTableReader> sstables = getCurrentColumnFamilyStore().getLiveSSTables();
@@ -107,7 +158,7 @@ public class ActiveCompactionsTest extends CQLTester
         getCurrentColumnFamilyStore().disableAutoCompaction();
         for (int i = 0; i < 5; i++)
         {
-            execute("INSERT INTO %s (k1, c1, val) VALUES ("+i+", 2, 3)");
+            execute("INSERT INTO %s (k1, c1, val) VALUES (" + i + ", 2, 3)");
             getCurrentColumnFamilyStore().forceBlockingFlush();
         }
         execute(String.format("CREATE MATERIALIZED VIEW %s.view1 AS SELECT k1, c1, val FROM %s.%s WHERE k1 IS NOT NULL AND c1 IS NOT NULL AND val IS NOT NULL PRIMARY KEY (val, k1, c1)", keyspace(), keyspace(), currentTable()));

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org