You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2016/06/23 09:29:17 UTC

[7/9] cassandra git commit: Merge branch 'cassandra-2.2' into cassandra-3.0

Merge branch 'cassandra-2.2' into cassandra-3.0


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/5ee6e7bc
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/5ee6e7bc
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/5ee6e7bc

Branch: refs/heads/cassandra-3.0
Commit: 5ee6e7bc12a22744db462106abe1372a72b07a41
Parents: 458b36b 225232a
Author: Marcus Eriksson <ma...@apache.org>
Authored: Thu Jun 23 11:26:43 2016 +0200
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Thu Jun 23 11:26:43 2016 +0200

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../db/compaction/CompactionManager.java        |  32 ++--
 .../repair/RepairMessageVerbHandler.java        |  31 ++--
 .../cassandra/service/ActiveRepairService.java  | 175 +++++++++++++++++--
 .../service/ActiveRepairServiceTest.java        | 125 +++++++++++--
 5 files changed, 305 insertions(+), 59 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/5ee6e7bc/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index ee2f6d3,b366d21..f7e854d
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -6,35 -2,9 +6,36 @@@ Merged from 2.2
   * Allow nodetool info to run with readonly JMX access (CASSANDRA-11755)
   * Validate bloom_filter_fp_chance against lowest supported
     value when the table is created (CASSANDRA-11920)
 - * RandomAccessReader: call isEOF() only when rebuffering, not for every read operation (CASSANDRA-12013)
   * Don't send erroneous NEW_NODE notifications on restart (CASSANDRA-11038)
   * StorageService shutdown hook should use a volatile variable (CASSANDRA-11984)
 +Merged from 2.1:
++ * Avoid marking too many sstables as repaired (CASSANDRA-11696)
 + * Prevent select statements with clustering key > 64k (CASSANDRA-11882)
 + * Fix clock skew corrupting other nodes with paxos (CASSANDRA-11991)
 + * Remove distinction between non-existing static columns and existing but null in LWTs (CASSANDRA-9842)
 + * Cache local ranges when calculating repair neighbors (CASSANDRA-11934)
 + * Allow LWT operation on static column with only partition keys (CASSANDRA-10532)
 + * Create interval tree over canonical sstables to avoid missing sstables during streaming (CASSANDRA-11886)
 + * cqlsh COPY FROM: shutdown parent cluster after forking, to avoid corrupting SSL connections (CASSANDRA-11749)
 +
 +
 +3.0.7
 + * Fix legacy serialization of Thrift-generated non-compound range tombstones
 +   when communicating with 2.x nodes (CASSANDRA-11930)
 + * Fix Directories instantiations where CFS.initialDirectories should be used (CASSANDRA-11849)
 + * Avoid referencing DatabaseDescriptor in AbstractType (CASSANDRA-11912)
 + * Fix sstables not being protected from removal during index build (CASSANDRA-11905)
 + * cqlsh: Suppress stack trace from Read/WriteFailures (CASSANDRA-11032)
 + * Remove unneeded code to repair index summaries that have
 +   been improperly down-sampled (CASSANDRA-11127)
 + * Avoid WriteTimeoutExceptions during commit log replay due to materialized
 +   view lock contention (CASSANDRA-11891)
 + * Prevent OOM failures on SSTable corruption, improve tests for corruption detection (CASSANDRA-9530)
 + * Use CFS.initialDirectories when clearing snapshots (CASSANDRA-11705)
 + * Allow compaction strategies to disable early open (CASSANDRA-11754)
 + * Refactor Materialized View code (CASSANDRA-11475)
 + * Update Java Driver (CASSANDRA-11615)
 +Merged from 2.2:
   * Persist local metadata earlier in startup sequence (CASSANDRA-11742)
   * Run CommitLog tests with different compression settings (CASSANDRA-9039)
   * cqlsh: fix tab completion for case-sensitive identifiers (CASSANDRA-11664)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5ee6e7bc/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index bf7bd81,cf82498..99e0fd5
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@@ -1059,10 -1068,16 +1059,17 @@@ public class CompactionManager implemen
          try
          {
  
-             String snapshotName = validator.desc.sessionId.toString();
              int gcBefore;
 +            int nowInSec = FBUtilities.nowInSeconds();
+             UUID parentRepairSessionId = validator.desc.parentSessionId;
+             String snapshotName;
+             boolean isGlobalSnapshotValidation = cfs.snapshotExists(parentRepairSessionId.toString());
+             if (isGlobalSnapshotValidation)
+                 snapshotName = parentRepairSessionId.toString();
+             else
+                 snapshotName = validator.desc.sessionId.toString();
              boolean isSnapshotValidation = cfs.snapshotExists(snapshotName);
+ 
              if (isSnapshotValidation)
              {
                  // If there is a snapshot created for the session then read from there.
@@@ -1083,40 -1098,53 +1090,42 @@@
                  StorageService.instance.forceKeyspaceFlush(cfs.keyspace.getName(), cfs.name);
                  sstables = getSSTablesToValidate(cfs, validator);
                  if (sstables == null)
--                    return; // this means the parent repair session was removed - the repair session failed on another node and we removed it
++                    return; // this means the parent repair session was removed - the repair session failed on another node and we removed i
                  if (validator.gcBefore > 0)
                      gcBefore = validator.gcBefore;
                  else
 -                    gcBefore = getDefaultGcBefore(cfs);
 +                    gcBefore = getDefaultGcBefore(cfs, nowInSec);
              }
  
 -            // Create Merkle tree suitable to hold estimated partitions for given range.
 -            // We blindly assume that partition is evenly distributed on all sstables for now.
 -            long numPartitions = 0;
 -            for (SSTableReader sstable : sstables)
 -            {
 -                numPartitions += sstable.estimatedKeysForRanges(singleton(validator.desc.range));
 -            }
 +            // Create Merkle trees suitable to hold estimated partitions for the given ranges.
 +            // We blindly assume that a partition is evenly distributed on all sstables for now.
              // determine tree depth from number of partitions, but cap at 20 to prevent large tree.
 -            int depth = numPartitions > 0 ? (int) Math.min(Math.floor(Math.log(numPartitions)), 20) : 0;
 -            MerkleTree tree = new MerkleTree(cfs.partitioner, validator.desc.range, MerkleTree.RECOMMENDED_DEPTH, (int) Math.pow(2, depth));
 -
 +            MerkleTrees tree = createMerkleTrees(sstables, validator.desc.ranges, cfs);
              long start = System.nanoTime();
 -            try (AbstractCompactionStrategy.ScannerList scanners = cfs.getCompactionStrategy().getScanners(sstables, validator.desc.range))
 +            try (AbstractCompactionStrategy.ScannerList scanners = cfs.getCompactionStrategyManager().getScanners(sstables, validator.desc.ranges);
 +                 ValidationCompactionController controller = new ValidationCompactionController(cfs, gcBefore);
 +                 CompactionIterator ci = new ValidationCompactionIterator(scanners.scanners, controller, nowInSec, metrics))
              {
 -                CompactionIterable ci = new ValidationCompactionIterable(cfs, scanners.scanners, gcBefore);
 -                Iterator<AbstractCompactedRow> iter = ci.iterator();
 -                metrics.beginCompaction(ci);
 -                try
 +                // validate the CF as we iterate over it
 +                validator.prepare(cfs, tree);
 +                while (ci.hasNext())
                  {
 -                    // validate the CF as we iterate over it
 -                    validator.prepare(cfs, tree);
 -                    while (iter.hasNext())
 +                    if (ci.isStopRequested())
 +                        throw new CompactionInterruptedException(ci.getCompactionInfo());
 +                    try (UnfilteredRowIterator partition = ci.next())
                      {
 -                        if (ci.isStopRequested())
 -                            throw new CompactionInterruptedException(ci.getCompactionInfo());
 -                        AbstractCompactedRow row = iter.next();
 -                        validator.add(row);
 +                        validator.add(partition);
                      }
 -                    validator.complete();
                  }
 -                finally
 +                validator.complete();
 +            }
 +            finally
 +            {
-                 if (isSnapshotValidation)
++                if (isSnapshotValidation && !isGlobalSnapshotValidation)
                  {
+                     // we can only clear the snapshot if we are not doing a global snapshot validation (we then clear it once anticompaction
+                     // is done).
 -                    if (isSnapshotValidation && !isGlobalSnapshotValidation)
 -                    {
 -                        cfs.clearSnapshot(snapshotName);
 -                    }
 -
 -                    metrics.finishCompaction(ci);
 +                    cfs.clearSnapshot(snapshotName);
                  }
              }
  
@@@ -1174,7 -1175,13 +1183,11 @@@
          if (prs == null)
              return null;
          Set<SSTableReader> sstablesToValidate = new HashSet<>();
 -
+         if (prs.isGlobal)
+             prs.markSSTablesRepairing(cfs.metadata.cfId, validator.desc.parentSessionId);
 -
+         // note that we always grab all existing sstables for this - if we were to just grab the ones that
+         // were marked as repairing, we would miss any ranges that were compacted away and this would cause us to overstream
 -        try (ColumnFamilyStore.RefViewFragment sstableCandidates = cfs.selectAndReference(prs.isIncremental ? ColumnFamilyStore.UNREPAIRED_SSTABLES : ColumnFamilyStore.CANONICAL_SSTABLES))
 +        try (ColumnFamilyStore.RefViewFragment sstableCandidates = cfs.selectAndReference(View.select(SSTableSet.CANONICAL, (s) -> !prs.isIncremental || !s.isRepaired())))
          {
              for (SSTableReader sstable : sstableCandidates.sstables)
              {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5ee6e7bc/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
index c536b13,1701e9a..edcb4f9
--- a/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
+++ b/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
@@@ -90,27 -99,23 +90,22 @@@ public class RepairMessageVerbHandler i
                                                                       desc.keyspace, desc.columnFamily), message.from, id);
                          return;
                      }
-                     final Collection<Range<Token>> repairingRange = desc.ranges;
-                     Set<SSTableReader> snapshottedSSSTables = cfs.snapshot(desc.sessionId.toString(), new Predicate<SSTableReader>()
+                     ActiveRepairService.ParentRepairSession prs = ActiveRepairService.instance.getParentRepairSession(desc.parentSessionId);
+                     if (prs.isGlobal)
                      {
-                         public boolean apply(SSTableReader sstable)
-                         {
-                             return sstable != null &&
-                                    !sstable.metadata.isIndex() && // exclude SSTables from 2i
-                                    new Bounds<>(sstable.first.getToken(), sstable.last.getToken()).intersects(repairingRange);
-                         }
-                     }, true); //ephemeral snapshot, if repair fails, it will be cleaned next startup
-                     if (ActiveRepairService.instance.getParentRepairSession(desc.parentSessionId).isGlobal)
+                         prs.maybeSnapshot(cfs.metadata.cfId, desc.parentSessionId);
+                     }
+                     else
                      {
-                         Set<SSTableReader> currentlyRepairing = ActiveRepairService.instance.currentlyRepairing(cfs.metadata.cfId, desc.parentSessionId);
-                         if (!Sets.intersection(currentlyRepairing, snapshottedSSSTables).isEmpty())
 -                        final Range<Token> repairingRange = desc.range;
+                         cfs.snapshot(desc.sessionId.toString(), new Predicate<SSTableReader>()
                          {
-                             // clear snapshot that we just created
-                             cfs.clearSnapshot(desc.sessionId.toString());
-                             logErrorAndSendFailureResponse("Cannot start multiple repair sessions over the same sstables", message.from, id);
-                             return;
-                         }
-                         ActiveRepairService.instance.getParentRepairSession(desc.parentSessionId).addSSTables(cfs.metadata.cfId, snapshottedSSSTables);
+                             public boolean apply(SSTableReader sstable)
+                             {
+                                 return sstable != null &&
 -                                       !(sstable.partitioner instanceof LocalPartitioner) && // exclude SSTables from 2i
 -                                       new Bounds<>(sstable.first.getToken(), sstable.last.getToken()).intersects(Collections.singleton(repairingRange));
++                                       !sstable.metadata.isIndex() && // exclude SSTables from 2i
++                                       new Bounds<>(sstable.first.getToken(), sstable.last.getToken()).intersects(desc.ranges);
+                             }
+                         }, true); //ephemeral snapshot, if repair fails, it will be cleaned next startup
                      }
                      logger.debug("Enqueuing response to snapshot request {} to {}", desc.sessionId, message.from);
                      MessagingService.instance().sendReply(new MessageOut(MessagingService.Verb.INTERNAL_RESPONSE), id, message.from);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5ee6e7bc/src/java/org/apache/cassandra/service/ActiveRepairService.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/ActiveRepairService.java
index 6b1fd83,e111155..27c2424
--- a/src/java/org/apache/cassandra/service/ActiveRepairService.java
+++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java
@@@ -36,9 -37,12 +37,11 @@@ import org.slf4j.Logger
  import org.slf4j.LoggerFactory;
  
  import org.apache.cassandra.config.DatabaseDescriptor;
 -import org.apache.cassandra.db.ColumnFamily;
  import org.apache.cassandra.db.ColumnFamilyStore;
--import org.apache.cassandra.db.SystemKeyspace;
  import org.apache.cassandra.db.compaction.CompactionManager;
++import org.apache.cassandra.db.lifecycle.SSTableSet;
++import org.apache.cassandra.db.lifecycle.View;
+ import org.apache.cassandra.dht.Bounds;
 -import org.apache.cassandra.dht.LocalPartitioner;
  import org.apache.cassandra.dht.Range;
  import org.apache.cassandra.dht.Token;
  import org.apache.cassandra.gms.ApplicationState;
@@@ -49,7 -53,7 +52,6 @@@ import org.apache.cassandra.gms.IFailur
  import org.apache.cassandra.gms.IEndpointStateChangeSubscriber;
  import org.apache.cassandra.gms.IFailureDetectionEventListener;
  import org.apache.cassandra.gms.VersionedValue;
--import org.apache.cassandra.io.sstable.Component;
  import org.apache.cassandra.io.sstable.format.SSTableReader;
  import org.apache.cassandra.locator.TokenMetadata;
  import org.apache.cassandra.net.IAsyncCallbackWithFailure;
@@@ -460,8 -488,12 +487,12 @@@ public class ActiveRepairService implem
          public final boolean isGlobal;
          public final long repairedAt;
          public final InetAddress coordinator;
+         /**
+          * Indicates whether we have marked sstables as repairing. Can only be done once per table per ParentRepairSession
+          */
+         private final Set<UUID> marked = new HashSet<>();
  
 -        public ParentRepairSession(InetAddress coordinator, List<ColumnFamilyStore> columnFamilyStores, Collection<Range<Token>> ranges, boolean isIncremental, boolean isGlobal, long repairedAt)
 +        public ParentRepairSession(InetAddress coordinator, List<ColumnFamilyStore> columnFamilyStores, Collection<Range<Token>> ranges, boolean isIncremental, long repairedAt, boolean isGlobal)
          {
              this.coordinator = coordinator;
              for (ColumnFamilyStore cfs : columnFamilyStores)
@@@ -471,15 -503,51 +502,51 @@@
              }
              this.ranges = ranges;
              this.repairedAt = repairedAt;
 -            this.isGlobal = isGlobal;
              this.isIncremental = isIncremental;
 +            this.isGlobal = isGlobal;
          }
  
+         /**
+          * Mark sstables repairing - either all sstables or only the unrepaired ones depending on
+          *
+          * whether this is an incremental or full repair
+          *
+          * @param cfId the column family
+          * @param parentSessionId the parent repair session id, used to make sure we don't start multiple repairs over the same sstables
+          */
+         public synchronized void markSSTablesRepairing(UUID cfId, UUID parentSessionId)
+         {
+             if (!marked.contains(cfId))
+             {
 -                List<SSTableReader> sstables = columnFamilyStores.get(cfId).select(isIncremental ? ColumnFamilyStore.UNREPAIRED_SSTABLES : ColumnFamilyStore.CANONICAL_SSTABLES).sstables;
++                List<SSTableReader> sstables = columnFamilyStores.get(cfId).select(View.select(SSTableSet.CANONICAL, (s) -> !isIncremental || !s.isRepaired())).sstables;
+                 Set<SSTableReader> currentlyRepairing = ActiveRepairService.instance.currentlyRepairing(cfId, parentSessionId);
+                 if (!Sets.intersection(currentlyRepairing, Sets.newHashSet(sstables)).isEmpty())
+                 {
+                     logger.error("Cannot start multiple repair sessions over the same sstables");
+                     throw new RuntimeException("Cannot start multiple repair sessions over the same sstables");
+                 }
+                 addSSTables(cfId, sstables);
+                 marked.add(cfId);
+             }
+         }
+ 
+         /**
+          * Get the still active sstables we should run anticompaction on
+          *
+          * note that validation and streaming do not call this method - they have to work on the actual active sstables on the node, we only call this
+          * to know which sstables are still there that were there when we started the repair
+          *
+          * @param cfId
+          * @param parentSessionId for checking if there exists a snapshot for this repair
+          * @return
+          */
          @SuppressWarnings("resource")
-         public synchronized Refs<SSTableReader> getActiveRepairedSSTableRefs(UUID cfId)
+         public synchronized Refs<SSTableReader> getActiveRepairedSSTableRefsForAntiCompaction(UUID cfId, UUID parentSessionId)
          {
+             assert marked.contains(cfId);
+             boolean isSnapshotRepair = columnFamilyStores.get(cfId).snapshotExists(parentSessionId.toString());
              ImmutableMap.Builder<SSTableReader, Ref<SSTableReader>> references = ImmutableMap.builder();
-             for (SSTableReader sstable : getActiveSSTables(cfId))
+             for (SSTableReader sstable : isSnapshotRepair ? getSSTablesForSnapshotRepair(cfId, parentSessionId) : getActiveSSTables(cfId))
              {
                  Ref<SSTableReader> ref = sstable.tryRef();
                  if (ref == null)
@@@ -490,12 -558,97 +557,97 @@@
              return new Refs<>(references.build());
          }
  
+         /**
+          * If we are running a snapshot repair we need to find the 'real' sstables when we start anticompaction
+          *
+          * We use the generation of the sstables as identifiers instead of the file name to avoid having to parse out the
+          * actual filename.
+          *
+          * @param cfId
+          * @param parentSessionId
+          * @return
+          */
+         private Set<SSTableReader> getSSTablesForSnapshotRepair(UUID cfId, UUID parentSessionId)
+         {
+             Set<SSTableReader> activeSSTables = new HashSet<>();
+             ColumnFamilyStore cfs = columnFamilyStores.get(cfId);
+ 
+             Set<Integer> snapshotGenerations = new HashSet<>();
+             try (Refs<SSTableReader> snapshottedSSTables = cfs.getSnapshotSSTableReader(parentSessionId.toString()))
+             {
+                 for (SSTableReader sstable : snapshottedSSTables)
+                 {
+                     snapshotGenerations.add(sstable.descriptor.generation);
+                 }
+             }
+             catch (IOException e)
+             {
+                 throw new RuntimeException(e);
+             }
 -            for (SSTableReader sstable : cfs.select(ColumnFamilyStore.CANONICAL_SSTABLES).sstables)
++            for (SSTableReader sstable : cfs.getSSTables(SSTableSet.CANONICAL))
+                 if (snapshotGenerations.contains(sstable.descriptor.generation))
+                     activeSSTables.add(sstable);
+             return activeSSTables;
+         }
+ 
+         public synchronized void maybeSnapshot(UUID cfId, UUID parentSessionId)
+         {
+             String snapshotName = parentSessionId.toString();
+             if (!columnFamilyStores.get(cfId).snapshotExists(snapshotName))
+             {
+                 Set<SSTableReader> snapshottedSSTables = columnFamilyStores.get(cfId).snapshot(snapshotName, new Predicate<SSTableReader>()
+                 {
+                     public boolean apply(SSTableReader sstable)
+                     {
+                         return sstable != null &&
+                                (!isIncremental || !sstable.isRepaired()) &&
 -                               !(sstable.partitioner instanceof LocalPartitioner) && // exclude SSTables from 2i
++                               !(sstable.metadata.isIndex()) && // exclude SSTables from 2i
+                                new Bounds<>(sstable.first.getToken(), sstable.last.getToken()).intersects(ranges);
+                     }
+                 }, true);
+ 
+                 if (isAlreadyRepairing(cfId, parentSessionId, snapshottedSSTables))
+                 {
+                     columnFamilyStores.get(cfId).clearSnapshot(parentSessionId.toString());
+                     logger.error("Cannot start multiple repair sessions over the same sstables");
+                     throw new RuntimeException("Cannot start multiple repair sessions over the same sstables");
+                 }
+                 addSSTables(cfId, snapshottedSSTables);
+                 marked.add(cfId);
+             }
+         }
+ 
+ 
+         /**
+          * Compares other repairing sstables *generation* to the ones we just snapshotted
+          *
+          * we compare generations since the sstables have different paths due to snapshot names
+          *
+          * @param cfId id of the column family store
+          * @param parentSessionId parent repair session
+          * @param sstables the newly snapshotted sstables
+          * @return
+          */
+         private boolean isAlreadyRepairing(UUID cfId, UUID parentSessionId, Collection<SSTableReader> sstables)
+         {
+             Set<SSTableReader> currentlyRepairing = ActiveRepairService.instance.currentlyRepairing(cfId, parentSessionId);
+             Set<Integer> currentlyRepairingGenerations = new HashSet<>();
+             Set<Integer> newRepairingGenerations = new HashSet<>();
+             for (SSTableReader sstable : currentlyRepairing)
+                 currentlyRepairingGenerations.add(sstable.descriptor.generation);
+             for (SSTableReader sstable : sstables)
+                 newRepairingGenerations.add(sstable.descriptor.generation);
+ 
+             return !Sets.intersection(currentlyRepairingGenerations, newRepairingGenerations).isEmpty();
+         }
+ 
          private Set<SSTableReader> getActiveSSTables(UUID cfId)
          {
              Set<String> repairedSSTables = sstableMap.get(cfId);
              Set<SSTableReader> activeSSTables = new HashSet<>();
              Set<String> activeSSTableNames = new HashSet<>();
-             for (SSTableReader sstable : columnFamilyStores.get(cfId).getSSTables())
+             ColumnFamilyStore cfs = columnFamilyStores.get(cfId);
 -            for (SSTableReader sstable : cfs.select(ColumnFamilyStore.CANONICAL_SSTABLES).sstables)
++            for (SSTableReader sstable : cfs.getSSTables(SSTableSet.CANONICAL))
              {
                  if (repairedSSTables.contains(sstable.getFilename()))
                  {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5ee6e7bc/test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java
index da067fd,03a25c6..adcd684
--- a/test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java
+++ b/test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java
@@@ -20,18 -20,23 +20,21 @@@ package org.apache.cassandra.service
  
  import java.net.InetAddress;
  import java.util.*;
+ import java.util.concurrent.ExecutionException;
  
 -import com.google.common.base.Predicate;
  import com.google.common.collect.Sets;
  import org.junit.Before;
  import org.junit.BeforeClass;
  import org.junit.Test;
  
  import org.apache.cassandra.SchemaLoader;
 -import org.apache.cassandra.Util;
  import org.apache.cassandra.config.DatabaseDescriptor;
 -import org.apache.cassandra.config.KSMetaData;
  import org.apache.cassandra.db.ColumnFamilyStore;
 -import org.apache.cassandra.db.DecoratedKey;
  import org.apache.cassandra.db.Keyspace;
 -import org.apache.cassandra.db.Mutation;
 +import org.apache.cassandra.db.RowUpdateBuilder;
  import org.apache.cassandra.db.compaction.OperationType;
++import org.apache.cassandra.db.lifecycle.SSTableSet;
++import org.apache.cassandra.db.lifecycle.View;
  import org.apache.cassandra.dht.Range;
  import org.apache.cassandra.dht.Token;
  import org.apache.cassandra.exceptions.ConfigurationException;
@@@ -225,17 -233,15 +229,15 @@@ public class ActiveRepairServiceTes
      public void testGetActiveRepairedSSTableRefs()
      {
          ColumnFamilyStore store = prepareColumnFamilyStore();
 -        Set<SSTableReader> original = store.getUnrepairedSSTables();
 +        Set<SSTableReader> original = store.getLiveSSTables();
  
          UUID prsId = UUID.randomUUID();
 -        ActiveRepairService.instance.registerParentRepairSession(prsId, FBUtilities.getBroadcastAddress(), Collections.singletonList(store), null, true, false);
 +        ActiveRepairService.instance.registerParentRepairSession(prsId, FBUtilities.getBroadcastAddress(), Collections.singletonList(store), null, true, 0, false);
          ActiveRepairService.ParentRepairSession prs = ActiveRepairService.instance.getParentRepairSession(prsId);
- 
-         //add all sstables to parent repair session
-         prs.addSSTables(store.metadata.cfId, original);
+         prs.markSSTablesRepairing(store.metadata.cfId, prsId);
  
          //retrieve all sstable references from parent repair sessions
-         Refs<SSTableReader> refs = prs.getActiveRepairedSSTableRefs(store.metadata.cfId);
+         Refs<SSTableReader> refs = prs.getActiveRepairedSSTableRefsForAntiCompaction(store.metadata.cfId, prsId);
          Set<SSTableReader> retrieved = Sets.newHashSet(refs.iterator());
          assertEquals(original, retrieved);
          refs.release();
@@@ -261,20 -267,117 +263,117 @@@
          refs.release();
      }
  
+     @Test
+     public void testAddingMoreSSTables()
+     {
+         ColumnFamilyStore store = prepareColumnFamilyStore();
 -        Set<SSTableReader> original = store.getUnrepairedSSTables();
++        Set<SSTableReader> original = Sets.newHashSet(store.select(View.select(SSTableSet.CANONICAL, (s) -> !s.isRepaired())).sstables);
+         UUID prsId = UUID.randomUUID();
 -        ActiveRepairService.instance.registerParentRepairSession(prsId, FBUtilities.getBroadcastAddress(), Collections.singletonList(store), null, true, true);
++        ActiveRepairService.instance.registerParentRepairSession(prsId, FBUtilities.getBroadcastAddress(), Collections.singletonList(store), null, true, System.currentTimeMillis(), true);
+         ActiveRepairService.ParentRepairSession prs = ActiveRepairService.instance.getParentRepairSession(prsId);
+         prs.markSSTablesRepairing(store.metadata.cfId, prsId);
+         try (Refs<SSTableReader> refs = prs.getActiveRepairedSSTableRefsForAntiCompaction(store.metadata.cfId, prsId))
+         {
+             Set<SSTableReader> retrieved = Sets.newHashSet(refs.iterator());
+             assertEquals(original, retrieved);
+         }
+         createSSTables(store, 2);
+         boolean exception = false;
+         try
+         {
+             UUID newPrsId = UUID.randomUUID();
 -            ActiveRepairService.instance.registerParentRepairSession(newPrsId, FBUtilities.getBroadcastAddress(), Collections.singletonList(store), null, true, true);
++            ActiveRepairService.instance.registerParentRepairSession(newPrsId, FBUtilities.getBroadcastAddress(), Collections.singletonList(store), null, true, System.currentTimeMillis(), true);
+             ActiveRepairService.instance.getParentRepairSession(newPrsId).markSSTablesRepairing(store.metadata.cfId, newPrsId);
+         }
+         catch (Throwable t)
+         {
+             exception = true;
+         }
+         assertTrue(exception);
+ 
+         try (Refs<SSTableReader> refs = prs.getActiveRepairedSSTableRefsForAntiCompaction(store.metadata.cfId, prsId))
+         {
+             Set<SSTableReader> retrieved = Sets.newHashSet(refs.iterator());
+             assertEquals(original, retrieved);
+         }
+     }
+ 
+     @Test
+     public void testSnapshotAddSSTables() throws ExecutionException, InterruptedException
+     {
+         ColumnFamilyStore store = prepareColumnFamilyStore();
+         UUID prsId = UUID.randomUUID();
 -        Set<SSTableReader> original = store.getUnrepairedSSTables();
 -        ActiveRepairService.instance.registerParentRepairSession(prsId, FBUtilities.getBroadcastAddress(), Collections.singletonList(store), Collections.singleton(new Range<>(store.partitioner.getMinimumToken(), store.partitioner.getMinimumToken())), true, true);
++        Set<SSTableReader> original = Sets.newHashSet(store.select(View.select(SSTableSet.CANONICAL, (s) -> !s.isRepaired())).sstables);
++        ActiveRepairService.instance.registerParentRepairSession(prsId, FBUtilities.getBroadcastAddress(), Collections.singletonList(store), Collections.singleton(new Range<>(store.getPartitioner().getMinimumToken(), store.getPartitioner().getMinimumToken())), true, System.currentTimeMillis(), true);
+         ActiveRepairService.instance.getParentRepairSession(prsId).maybeSnapshot(store.metadata.cfId, prsId);
+ 
+         UUID prsId2 = UUID.randomUUID();
 -        ActiveRepairService.instance.registerParentRepairSession(prsId2, FBUtilities.getBroadcastAddress(), Collections.singletonList(store), Collections.singleton(new Range<>(store.partitioner.getMinimumToken(), store.partitioner.getMinimumToken())), true, true);
++        ActiveRepairService.instance.registerParentRepairSession(prsId2, FBUtilities.getBroadcastAddress(), Collections.singletonList(store), Collections.singleton(new Range<>(store.getPartitioner().getMinimumToken(), store.getPartitioner().getMinimumToken())), true, System.currentTimeMillis(), true);
+         createSSTables(store, 2);
+         ActiveRepairService.instance.getParentRepairSession(prsId).maybeSnapshot(store.metadata.cfId, prsId);
+         try (Refs<SSTableReader> refs = ActiveRepairService.instance.getParentRepairSession(prsId).getActiveRepairedSSTableRefsForAntiCompaction(store.metadata.cfId, prsId))
+         {
+             assertEquals(original, Sets.newHashSet(refs.iterator()));
+         }
+         store.forceMajorCompaction();
+         // after a major compaction the original sstables will be gone and we will have no sstables to anticompact:
+         try (Refs<SSTableReader> refs = ActiveRepairService.instance.getParentRepairSession(prsId).getActiveRepairedSSTableRefsForAntiCompaction(store.metadata.cfId, prsId))
+         {
+             assertEquals(0, refs.size());
+         }
+     }
+ 
+     @Test
+     public void testSnapshotMultipleRepairs()
+     {
+         ColumnFamilyStore store = prepareColumnFamilyStore();
 -        Set<SSTableReader> original = store.getUnrepairedSSTables();
++        Set<SSTableReader> original = Sets.newHashSet(store.select(View.select(SSTableSet.CANONICAL, (s) -> !s.isRepaired())).sstables);
+         UUID prsId = UUID.randomUUID();
 -        ActiveRepairService.instance.registerParentRepairSession(prsId, FBUtilities.getBroadcastAddress(), Collections.singletonList(store), Collections.singleton(new Range<>(store.partitioner.getMinimumToken(), store.partitioner.getMinimumToken())), true, true);
++        ActiveRepairService.instance.registerParentRepairSession(prsId, FBUtilities.getBroadcastAddress(), Collections.singletonList(store), Collections.singleton(new Range<>(store.getPartitioner().getMinimumToken(), store.getPartitioner().getMinimumToken())), true, System.currentTimeMillis(), true);
+         ActiveRepairService.instance.getParentRepairSession(prsId).maybeSnapshot(store.metadata.cfId, prsId);
+ 
+         UUID prsId2 = UUID.randomUUID();
 -        ActiveRepairService.instance.registerParentRepairSession(prsId2, FBUtilities.getBroadcastAddress(), Collections.singletonList(store), Collections.singleton(new Range<>(store.partitioner.getMinimumToken(), store.partitioner.getMinimumToken())), true, true);
++        ActiveRepairService.instance.registerParentRepairSession(prsId2, FBUtilities.getBroadcastAddress(), Collections.singletonList(store), Collections.singleton(new Range<>(store.getPartitioner().getMinimumToken(), store.getPartitioner().getMinimumToken())), true, System.currentTimeMillis(), true);
+         boolean exception = false;
+         try
+         {
+             ActiveRepairService.instance.getParentRepairSession(prsId2).maybeSnapshot(store.metadata.cfId, prsId2);
+         }
+         catch (Throwable t)
+         {
+             exception = true;
+         }
+         assertTrue(exception);
+         try (Refs<SSTableReader> refs = ActiveRepairService.instance.getParentRepairSession(prsId).getActiveRepairedSSTableRefsForAntiCompaction(store.metadata.cfId, prsId))
+         {
+             assertEquals(original, Sets.newHashSet(refs.iterator()));
+         }
+     }
+ 
      private ColumnFamilyStore prepareColumnFamilyStore()
      {
          Keyspace keyspace = Keyspace.open(KEYSPACE5);
          ColumnFamilyStore store = keyspace.getColumnFamilyStore(CF_STANDARD1);
+         store.truncateBlocking();
          store.disableAutoCompaction();
-         for (int i = 0; i < 10; i++)
+         createSSTables(store, 10);
+         return store;
+     }
+ 
+     private void createSSTables(ColumnFamilyStore cfs, int count)
+     {
+         long timestamp = System.currentTimeMillis();
+         for (int i = 0; i < count; i++)
          {
-             new RowUpdateBuilder(store.metadata, System.currentTimeMillis(), Integer.toString(i))
-             .clustering("c")
-             .add("val", "val")
-             .build()
-             .applyUnsafe();
 -            DecoratedKey key = Util.dk(Integer.toString(i));
 -            Mutation rm = new Mutation(KEYSPACE5, key.getKey());
+             for (int j = 0; j < 10; j++)
 -                rm.add("Standard1", Util.cellname(Integer.toString(j)),
 -                       ByteBufferUtil.EMPTY_BYTE_BUFFER,
 -                       timestamp,
 -                       0);
 -            rm.apply();
++            {
++                new RowUpdateBuilder(cfs.metadata, timestamp, Integer.toString(j))
++                .clustering("c")
++                .add("val", "val")
++                .build()
++                .applyUnsafe();
++            }
+             cfs.forceBlockingFlush();
          }
-         store.forceBlockingFlush();
-         return store;
      }
  }