You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2019/10/31 13:39:51 UTC

[cassandra] branch trunk updated (f6c1dea -> f9b46ae)

This is an automated email from the ASF dual-hosted git repository.

marcuse pushed a change to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git.


    from f6c1dea  Merge branch 'cassandra-3.11' into trunk
     new b40d79c  Make sure index summary redistributions don't start when compactions are paused
     new 4b547f1  Merge branch 'cassandra-3.0' into cassandra-3.11
     new f9b46ae  Merge branch 'cassandra-3.11' into trunk

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 CHANGES.txt                                        |  1 +
 .../apache/cassandra/cache/AutoSavingCache.java    |  5 ++
 .../org/apache/cassandra/db/ColumnFamilyStore.java | 43 +++++++++++--
 .../cassandra/db/compaction/CompactionInfo.java    |  8 ++-
 .../db/compaction/CompactionIterator.java          |  5 ++
 .../cassandra/db/compaction/CompactionManager.java | 26 ++++++++
 .../cassandra/db/compaction/CompactionTask.java    | 10 +--
 .../apache/cassandra/db/compaction/Scrubber.java   |  5 ++
 .../apache/cassandra/db/compaction/Verifier.java   |  5 ++
 .../apache/cassandra/db/view/ViewBuilderTask.java  |  5 ++
 .../cassandra/index/SecondaryIndexBuilder.java     |  5 ++
 .../cassandra/io/sstable/IndexSummaryManager.java  |  2 +
 .../io/sstable/IndexSummaryRedistribution.java     |  6 ++
 .../db/compaction/CancelCompactionsTest.java       | 72 +++++++++++++++++++++-
 .../db/repair/PendingAntiCompactionTest.java       | 15 +++++
 .../io/sstable/IndexSummaryManagerTest.java        | 46 +++++++++++++-
 16 files changed, 246 insertions(+), 13 deletions(-)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org


[cassandra] 01/01: Merge branch 'cassandra-3.11' into trunk

Posted by ma...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

marcuse pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit f9b46aeb3123e3d387837aa8525126b0a450851b
Merge: f6c1dea 4b547f1
Author: Marcus Eriksson <ma...@apache.org>
AuthorDate: Thu Oct 31 14:34:06 2019 +0100

    Merge branch 'cassandra-3.11' into trunk

 CHANGES.txt                                        |  1 +
 .../apache/cassandra/cache/AutoSavingCache.java    |  5 ++
 .../org/apache/cassandra/db/ColumnFamilyStore.java | 43 +++++++++++--
 .../cassandra/db/compaction/CompactionInfo.java    |  8 ++-
 .../db/compaction/CompactionIterator.java          |  5 ++
 .../cassandra/db/compaction/CompactionManager.java | 26 ++++++++
 .../cassandra/db/compaction/CompactionTask.java    | 10 +--
 .../apache/cassandra/db/compaction/Scrubber.java   |  5 ++
 .../apache/cassandra/db/compaction/Verifier.java   |  5 ++
 .../apache/cassandra/db/view/ViewBuilderTask.java  |  5 ++
 .../cassandra/index/SecondaryIndexBuilder.java     |  5 ++
 .../cassandra/io/sstable/IndexSummaryManager.java  |  2 +
 .../io/sstable/IndexSummaryRedistribution.java     |  6 ++
 .../db/compaction/CancelCompactionsTest.java       | 72 +++++++++++++++++++++-
 .../db/repair/PendingAntiCompactionTest.java       | 15 +++++
 .../io/sstable/IndexSummaryManagerTest.java        | 46 +++++++++++++-
 16 files changed, 246 insertions(+), 13 deletions(-)

diff --cc CHANGES.txt
index f7bb517,4240841..201ab60
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -2,400 -5,9 +2,401 @@@
  Merged from 2.2:
   * In-JVM DTest: Set correct internode message version for upgrade test (CASSANDRA-15371)
  
 +4.0-alpha2
 + * Fix SASI non-literal string comparisons (range operators) (CASSANDRA-15169)
 + * Upgrade Guava to 27, and to java-driver 3.6.0 (from 3.4.0-SNAPSHOT) (CASSANDRA-14655)
 + * Extract an AbstractCompactionController to allow for custom implementations (CASSANDRA-15286)
 + * Move chronicle-core version from snapshot to stable, and include carrotsearch in generated pom.xml (CASSANDRA-15321)
 + * Untangle RepairMessage sub-hierarchy of messages, use new messaging (more) correctly (CASSANDRA-15163)
 + * Add `allocate_tokens_for_local_replication_factor` option for token allocation (CASSANDRA-15260)
 + * Add Alibaba Cloud Platform snitch (CASSANDRA-15092)
 +Merged from 3.0:
++ * Make sure index summary redistribution does not start when compactions are paused (CASSANDRA-15265)
 + * Add ability to cap max negotiable protocol version (CASSANDRA-15193)
 + * Gossip tokens on startup if available (CASSANDRA-15335)
 + * Fix resource leak in CompressedSequentialWriter (CASSANDRA-15340)
 + * Fix bad merge that reverted CASSANDRA-14993 (CASSANDRA-15289)
 + * Add support for network topology and query tracing for inJVM dtest (CASSANDRA-15319)
 +
 +
 +4.0-alpha1
 + * Inaccurate exception message with nodetool snapshot (CASSANDRA-15287)
 + * Fix InternodeOutboundMetrics overloaded bytes/count mixup (CASSANDRA-15186)
 + * Enhance & reenable RepairTest with compression=off and compression=on (CASSANDRA-15272)
 + * Improve readability of Table metrics Virtual tables units (CASSANDRA-15194)
 + * Fix error with non-existent table for nodetool tablehistograms (CASSANDRA-14410)
 + * Avoid result truncation in decimal operations (CASSANDRA-15232)
 + * Catch non-IOException in FileUtils.close to make sure that all resources are closed (CASSANDRA-15225)
 + * Align load column in nodetool status output (CASSANDRA-14787)
 + * CassandraNetworkAuthorizer uses cached roles info (CASSANDRA-15089)
 + * Introduce optional timeouts for idle client sessions (CASSANDRA-11097)
 + * Fix AlterTableStatement dropped type validation order (CASSANDRA-15203)
 + * Update Netty dependencies to latest, clean up SocketFactory (CASSANDRA-15195)
 + * Native Transport - Apply noSpamLogger to ConnectionLimitHandler (CASSANDRA-15167)
 + * Reduce heap pressure during compactions (CASSANDRA-14654)
 + * Support building Cassandra with JDK 11 (CASSANDRA-15108)
 + * Use quilt to patch cassandra.in.sh in Debian packaging (CASSANDRA-14710)
 + * Take sstable references before calculating approximate key count (CASSANDRA-14647)
 + * Restore snapshotting of system keyspaces on version change (CASSANDRA-14412)
 + * Fix AbstractBTreePartition locking in java 11 (CASSANDRA-14607)
 + * SimpleClient should pass connection properties as options (CASSANDRA-15056)
 + * Set repaired data tracking flag on range reads if enabled (CASSANDRA-15019)
 + * Calculate pending ranges for BOOTSTRAP_REPLACE correctly (CASSANDRA-14802)
 + * Make TableCQLHelper reuse the single quote pattern (CASSANDRA-15033)
 + * Add Zstd compressor (CASSANDRA-14482)
 + * Fix IR prepare anti-compaction race (CASSANDRA-15027)
 + * Fix SimpleStrategy option validation (CASSANDRA-15007)
 + * Don't try to cancel 2i compactions when starting anticompaction (CASSANDRA-15024)
 + * Avoid NPE in RepairRunnable.recordFailure (CASSANDRA-15025)
 + * SSL Cert Hot Reloading should check for sanity of the new keystore/truststore before loading it (CASSANDRA-14991)
 + * Avoid leaking threads when failing anticompactions and rate limit anticompactions (CASSANDRA-15002)
 + * Validate token() arguments early instead of throwing NPE at execution (CASSANDRA-14989)
 + * Add a new tool to dump audit logs (CASSANDRA-14885)
 + * Fix generating javadoc with Java11 (CASSANDRA-14988)
 + * Only cancel conflicting compactions when starting anticompactions and sub range compactions (CASSANDRA-14935)
 + * Use a stub IndexRegistry for non-daemon use cases (CASSANDRA-14938)
 + * Don't enable client transports when bootstrap is pending (CASSANDRA-14525)
 + * Make antiCompactGroup throw exception on error and anticompaction non cancellable
 +   again (CASSANDRA-14936)
 + * Catch empty/invalid bounds in SelectStatement (CASSANDRA-14849)
 + * Auto-expand replication_factor for NetworkTopologyStrategy (CASSANDRA-14303)
 + * Transient Replication: support EACH_QUORUM (CASSANDRA-14727)
 + * BufferPool: allocating thread for new chunks should acquire directly (CASSANDRA-14832)
 + * Send correct messaging version in internode messaging handshake's third message (CASSANDRA-14896)
 + * Make Read and Write Latency columns consistent for proxyhistograms and tablehistograms (CASSANDRA-11939)
 + * Make protocol checksum type option case insensitive (CASSANDRA-14716)
 + * Forbid re-adding static columns as regular and vice versa (CASSANDRA-14913)
 + * Audit log allows system keyspaces to be audited via configuration options (CASSANDRA-14498)
 + * Lower default chunk_length_in_kb from 64kb to 16kb (CASSANDRA-13241)
 + * Startup checker should wait for count rather than percentage (CASSANDRA-14297)
 + * Fix incorrect sorting of replicas in SimpleStrategy.calculateNaturalReplicas (CASSANDRA-14862)
 + * Partitioned outbound internode TCP connections can occur when nodes restart (CASSANDRA-14358)
 + * Don't write to system_distributed.repair_history, system_traces.sessions, system_traces.events in mixed version 3.X/4.0 clusters (CASSANDRA-14841)
 + * Avoid running query to self through messaging service (CASSANDRA-14807)
 + * Allow using custom script for chronicle queue BinLog archival (CASSANDRA-14373)
 + * Transient->Full range movements mishandle consistency level upgrade (CASSANDRA-14759)
 + * ReplicaCollection follow-up (CASSANDRA-14726)
 + * Transient node receives full data requests (CASSANDRA-14762)
 + * Enable snapshot artifacts publish (CASSANDRA-12704)
 + * Introduce RangesAtEndpoint.unwrap to simplify StreamSession.addTransferRanges (CASSANDRA-14770)
 + * LOCAL_QUORUM may speculate to non-local nodes, resulting in Timeout instead of Unavailable (CASSANDRA-14735)
 + * Avoid creating empty compaction tasks after truncate (CASSANDRA-14780)
 + * Fail incremental repair prepare phase if it encounters sstables from un-finalized sessions (CASSANDRA-14763)
 + * Add a check for receiving digest response from transient node (CASSANDRA-14750)
 + * Fail query on transient replica if coordinator only expects full data (CASSANDRA-14704)
 + * Remove mentions of transient replication from repair path (CASSANDRA-14698)
 + * Fix handleRepairStatusChangedNotification to remove first then add (CASSANDRA-14720)
 + * Allow transient node to serve as a repair coordinator (CASSANDRA-14693)
 + * DecayingEstimatedHistogramReservoir.EstimatedHistogramReservoirSnapshot returns wrong value for size() and incorrectly calculates count (CASSANDRA-14696)
 + * AbstractReplicaCollection equals and hash code should throw due to conflict between order sensitive/insensitive uses (CASSANDRA-14700)
 + * Detect inconsistencies in repaired data on the read path (CASSANDRA-14145)
 + * Add checksumming to the native protocol (CASSANDRA-13304)
 + * Make AuthCache more easily extendable (CASSANDRA-14662)
 + * Extend RolesCache to include detailed role info (CASSANDRA-14497)
 + * Add fqltool compare (CASSANDRA-14619)
 + * Add fqltool replay (CASSANDRA-14618)
 + * Log keyspace in full query log (CASSANDRA-14656)
 + * Transient Replication and Cheap Quorums (CASSANDRA-14404)
 + * Log server-generated timestamp and nowInSeconds used by queries in FQL (CASSANDRA-14675)
 + * Add diagnostic events for read repairs (CASSANDRA-14668)
 + * Use consistent nowInSeconds and timestamps values within a request (CASSANDRA-14671)
 + * Add sampler for query time and expose with nodetool (CASSANDRA-14436)
 + * Clean up Message.Request implementations (CASSANDRA-14677)
 + * Disable old native protocol versions on demand (CASANDRA-14659)
 + * Allow specifying now-in-seconds in native protocol (CASSANDRA-14664)
 + * Improve BTree build performance by avoiding data copy (CASSANDRA-9989)
 + * Make monotonic read / read repair configurable (CASSANDRA-14635)
 + * Refactor CompactionStrategyManager (CASSANDRA-14621)
 + * Flush netty client messages immediately by default (CASSANDRA-13651)
 + * Improve read repair blocking behavior (CASSANDRA-10726)
 + * Add a virtual table to expose settings (CASSANDRA-14573)
 + * Fix up chunk cache handling of metrics (CASSANDRA-14628)
 + * Extend IAuthenticator to accept peer SSL certificates (CASSANDRA-14652)
 + * Incomplete handling of exceptions when decoding incoming messages (CASSANDRA-14574)
 + * Add diagnostic events for user audit logging (CASSANDRA-13668)
 + * Allow retrieving diagnostic events via JMX (CASSANDRA-14435)
 + * Add base classes for diagnostic events (CASSANDRA-13457)
 + * Clear view system metadata when dropping keyspace (CASSANDRA-14646)
 + * Allocate ReentrantLock on-demand in java11 AtomicBTreePartitionerBase (CASSANDRA-14637)
 + * Make all existing virtual tables use LocalPartitioner (CASSANDRA-14640)
 + * Revert 4.0 GC alg back to CMS (CASANDRA-14636)
 + * Remove hardcoded java11 jvm args in idea workspace files (CASSANDRA-14627)
 + * Update netty to 4.1.128 (CASSANDRA-14633)
 + * Add a virtual table to expose thread pools (CASSANDRA-14523)
 + * Add a virtual table to expose caches (CASSANDRA-14538, CASSANDRA-14626)
 + * Fix toDate function for timestamp arguments (CASSANDRA-14502)
 + * Revert running dtests by default in circleci (CASSANDRA-14614)
 + * Stream entire SSTables when possible (CASSANDRA-14556)
 + * Cell reconciliation should not depend on nowInSec (CASSANDRA-14592)
 + * Add experimental support for Java 11 (CASSANDRA-9608)
 + * Make PeriodicCommitLogService.blockWhenSyncLagsNanos configurable (CASSANDRA-14580)
 + * Improve logging in MessageInHandler's constructor (CASSANDRA-14576)
 + * Set broadcast address in internode messaging handshake (CASSANDRA-14579)
 + * Wait for schema agreement prior to building MVs (CASSANDRA-14571)
 + * Make all DDL statements idempotent and not dependent on global state (CASSANDRA-13426)
 + * Bump the hints messaging version to match the current one (CASSANDRA-14536)
 + * OffsetAwareConfigurationLoader doesn't set ssl storage port causing bind errors in CircleCI (CASSANDRA-14546)
 + * Report why native_transport_port fails to bind (CASSANDRA-14544)
 + * Optimize internode messaging protocol (CASSANDRA-14485)
 + * Internode messaging handshake sends wrong messaging version number (CASSANDRA-14540)
 + * Add a virtual table to expose active client connections (CASSANDRA-14458)
 + * Clean up and refactor client metrics (CASSANDRA-14524)
 + * Nodetool import row cache invalidation races with adding sstables to tracker (CASSANDRA-14529)
 + * Fix assertions in LWTs after TableMetadata was made immutable (CASSANDRA-14356)
 + * Abort compactions quicker (CASSANDRA-14397)
 + * Support light-weight transactions in cassandra-stress (CASSANDRA-13529)
 + * Make AsyncOneResponse use the correct timeout (CASSANDRA-14509)
 + * Add option to sanity check tombstones on reads/compactions (CASSANDRA-14467)
 + * Add a virtual table to expose all running sstable tasks (CASSANDRA-14457)
 + * Let nodetool import take a list of directories (CASSANDRA-14442)
 + * Avoid unneeded memory allocations / cpu for disabled log levels (CASSANDRA-14488)
 + * Implement virtual keyspace interface (CASSANDRA-7622)
 + * nodetool import cleanup and improvements (CASSANDRA-14417)
 + * Bump jackson version to >= 2.9.5 (CASSANDRA-14427)
 + * Allow nodetool toppartitions without specifying table (CASSANDRA-14360)
 + * Audit logging for database activity (CASSANDRA-12151)
 + * Clean up build artifacts in docs container (CASSANDRA-14432)
 + * Minor network authz improvements (Cassandra-14413)
 + * Automatic sstable upgrades (CASSANDRA-14197)
 + * Replace deprecated junit.framework.Assert usages with org.junit.Assert (CASSANDRA-14431)
 + * Cassandra-stress throws NPE if insert section isn't specified in user profile (CASSSANDRA-14426)
 + * List clients by protocol versions `nodetool clientstats --by-protocol` (CASSANDRA-14335)
 + * Improve LatencyMetrics performance by reducing write path processing (CASSANDRA-14281)
 + * Add network authz (CASSANDRA-13985)
 + * Use the correct IP/Port for Streaming when localAddress is left unbound (CASSANDRA-14389)
 + * nodetool listsnapshots is missing local system keyspace snapshots (CASSANDRA-14381)
 + * Remove StreamCoordinator.streamExecutor thread pool (CASSANDRA-14402)
 + * Rename nodetool --with-port to --print-port to disambiguate from --port (CASSANDRA-14392)
 + * Client TOPOLOGY_CHANGE messages have wrong port. (CASSANDRA-14398)
 + * Add ability to load new SSTables from a separate directory (CASSANDRA-6719)
 + * Eliminate background repair and probablistic read_repair_chance table options
 +   (CASSANDRA-13910)
 + * Bind to correct local address in 4.0 streaming (CASSANDRA-14362)
 + * Use standard Amazon naming for datacenter and rack in Ec2Snitch (CASSANDRA-7839)
 + * Fix junit failure for SSTableReaderTest (CASSANDRA-14387)
 + * Abstract write path for pluggable storage (CASSANDRA-14118)
 + * nodetool describecluster should be more informative (CASSANDRA-13853)
 + * Compaction performance improvements (CASSANDRA-14261) 
 + * Refactor Pair usage to avoid boxing ints/longs (CASSANDRA-14260)
 + * Add options to nodetool tablestats to sort and limit output (CASSANDRA-13889)
 + * Rename internals to reflect CQL vocabulary (CASSANDRA-14354)
 + * Add support for hybrid MIN(), MAX() speculative retry policies
 +   (CASSANDRA-14293, CASSANDRA-14338, CASSANDRA-14352)
 + * Fix some regressions caused by 14058 (CASSANDRA-14353)
 + * Abstract repair for pluggable storage (CASSANDRA-14116)
 + * Add meaningful toString() impls (CASSANDRA-13653)
 + * Add sstableloader option to accept target keyspace name (CASSANDRA-13884)
 + * Move processing of EchoMessage response to gossip stage (CASSANDRA-13713)
 + * Add coordinator write metric per CF (CASSANDRA-14232)
 + * Correct and clarify SSLFactory.getSslContext method and call sites (CASSANDRA-14314)
 + * Handle static and partition deletion properly on ThrottledUnfilteredIterator (CASSANDRA-14315)
 + * NodeTool clientstats should show SSL Cipher (CASSANDRA-14322)
 + * Add ability to specify driver name and version (CASSANDRA-14275)
 + * Abstract streaming for pluggable storage (CASSANDRA-14115)
 + * Forced incremental repairs should promote sstables if they can (CASSANDRA-14294)
 + * Use Murmur3 for validation compactions (CASSANDRA-14002)
 + * Comma at the end of the seed list is interpretated as localhost (CASSANDRA-14285)
 + * Refactor read executor and response resolver, abstract read repair (CASSANDRA-14058)
 + * Add optional startup delay to wait until peers are ready (CASSANDRA-13993)
 + * Add a few options to nodetool verify (CASSANDRA-14201)
 + * CVE-2017-5929 Security vulnerability and redefine default log rotation policy (CASSANDRA-14183)
 + * Use JVM default SSL validation algorithm instead of custom default (CASSANDRA-13259)
 + * Better document in code InetAddressAndPort usage post 7544, incorporate port into UUIDGen node (CASSANDRA-14226)
 + * Fix sstablemetadata date string for minLocalDeletionTime (CASSANDRA-14132)
 + * Make it possible to change neverPurgeTombstones during runtime (CASSANDRA-14214)
 + * Remove GossipDigestSynVerbHandler#doSort() (CASSANDRA-14174)
 + * Add nodetool clientlist (CASSANDRA-13665)
 + * Revert ProtocolVersion changes from CASSANDRA-7544 (CASSANDRA-14211)
 + * Non-disruptive seed node list reload (CASSANDRA-14190)
 + * Nodetool tablehistograms to print statics for all the tables (CASSANDRA-14185)
 + * Migrate dtests to use pytest and python3 (CASSANDRA-14134)
 + * Allow storage port to be configurable per node (CASSANDRA-7544)
 + * Make sub-range selection for non-frozen collections return null instead of empty (CASSANDRA-14182)
 + * BloomFilter serialization format should not change byte ordering (CASSANDRA-9067)
 + * Remove unused on-heap BloomFilter implementation (CASSANDRA-14152)
 + * Delete temp test files on exit (CASSANDRA-14153)
 + * Make PartitionUpdate and Mutation immutable (CASSANDRA-13867)
 + * Fix CommitLogReplayer exception for CDC data (CASSANDRA-14066)
 + * Fix cassandra-stress startup failure (CASSANDRA-14106)
 + * Remove initialDirectories from CFS (CASSANDRA-13928)
 + * Fix trivial log format error (CASSANDRA-14015)
 + * Allow sstabledump to do a json object per partition (CASSANDRA-13848)
 + * Add option to optimise merkle tree comparison across replicas (CASSANDRA-3200)
 + * Remove unused and deprecated methods from AbstractCompactionStrategy (CASSANDRA-14081)
 + * Fix Distribution.average in cassandra-stress (CASSANDRA-14090)
 + * Support a means of logging all queries as they were invoked (CASSANDRA-13983)
 + * Presize collections (CASSANDRA-13760)
 + * Add GroupCommitLogService (CASSANDRA-13530)
 + * Parallelize initial materialized view build (CASSANDRA-12245)
 + * Fix flaky SecondaryIndexManagerTest.assert[Not]MarkedAsBuilt (CASSANDRA-13965)
 + * Make LWTs send resultset metadata on every request (CASSANDRA-13992)
 + * Fix flaky indexWithFailedInitializationIsNotQueryableAfterPartialRebuild (CASSANDRA-13963)
 + * Introduce leaf-only iterator (CASSANDRA-9988)
 + * Upgrade Guava to 23.3 and Airline to 0.8 (CASSANDRA-13997)
 + * Allow only one concurrent call to StatusLogger (CASSANDRA-12182)
 + * Refactoring to specialised functional interfaces (CASSANDRA-13982)
 + * Speculative retry should allow more friendly params (CASSANDRA-13876)
 + * Throw exception if we send/receive repair messages to incompatible nodes (CASSANDRA-13944)
 + * Replace usages of MessageDigest with Guava's Hasher (CASSANDRA-13291)
 + * Add nodetool cmd to print hinted handoff window (CASSANDRA-13728)
 + * Fix some alerts raised by static analysis (CASSANDRA-13799)
 + * Checksum sstable metadata (CASSANDRA-13321, CASSANDRA-13593)
 + * Add result set metadata to prepared statement MD5 hash calculation (CASSANDRA-10786)
 + * Refactor GcCompactionTest to avoid boxing (CASSANDRA-13941)
 + * Expose recent histograms in JmxHistograms (CASSANDRA-13642)
 + * Fix buffer length comparison when decompressing in netty-based streaming (CASSANDRA-13899)
 + * Properly close StreamCompressionInputStream to release any ByteBuf (CASSANDRA-13906)
 + * Add SERIAL and LOCAL_SERIAL support for cassandra-stress (CASSANDRA-13925)
 + * LCS needlessly checks for L0 STCS candidates multiple times (CASSANDRA-12961)
 + * Correctly close netty channels when a stream session ends (CASSANDRA-13905)
 + * Update lz4 to 1.4.0 (CASSANDRA-13741)
 + * Optimize Paxos prepare and propose stage for local requests (CASSANDRA-13862)
 + * Throttle base partitions during MV repair streaming to prevent OOM (CASSANDRA-13299)
 + * Use compaction threshold for STCS in L0 (CASSANDRA-13861)
 + * Fix problem with min_compress_ratio: 1 and disallow ratio < 1 (CASSANDRA-13703)
 + * Add extra information to SASI timeout exception (CASSANDRA-13677)
 + * Add incremental repair support for --hosts, --force, and subrange repair (CASSANDRA-13818)
 + * Rework CompactionStrategyManager.getScanners synchronization (CASSANDRA-13786)
 + * Add additional unit tests for batch behavior, TTLs, Timestamps (CASSANDRA-13846)
 + * Add keyspace and table name in schema validation exception (CASSANDRA-13845)
 + * Emit metrics whenever we hit tombstone failures and warn thresholds (CASSANDRA-13771)
 + * Make netty EventLoopGroups daemon threads (CASSANDRA-13837)
 + * Race condition when closing stream sessions (CASSANDRA-13852)
 + * NettyFactoryTest is failing in trunk on macOS (CASSANDRA-13831)
 + * Allow changing log levels via nodetool for related classes (CASSANDRA-12696)
 + * Add stress profile yaml with LWT (CASSANDRA-7960)
 + * Reduce memory copies and object creations when acting on ByteBufs (CASSANDRA-13789)
 + * Simplify mx4j configuration (Cassandra-13578)
 + * Fix trigger example on 4.0 (CASSANDRA-13796)
 + * Force minumum timeout value (CASSANDRA-9375)
 + * Use netty for streaming (CASSANDRA-12229)
 + * Use netty for internode messaging (CASSANDRA-8457)
 + * Add bytes repaired/unrepaired to nodetool tablestats (CASSANDRA-13774)
 + * Don't delete incremental repair sessions if they still have sstables (CASSANDRA-13758)
 + * Fix pending repair manager index out of bounds check (CASSANDRA-13769)
 + * Don't use RangeFetchMapCalculator when RF=1 (CASSANDRA-13576)
 + * Don't optimise trivial ranges in RangeFetchMapCalculator (CASSANDRA-13664)
 + * Use an ExecutorService for repair commands instead of new Thread(..).start() (CASSANDRA-13594)
 + * Fix race / ref leak in anticompaction (CASSANDRA-13688)
 + * Expose tasks queue length via JMX (CASSANDRA-12758)
 + * Fix race / ref leak in PendingRepairManager (CASSANDRA-13751)
 + * Enable ppc64le runtime as unsupported architecture (CASSANDRA-13615)
 + * Improve sstablemetadata output (CASSANDRA-11483)
 + * Support for migrating legacy users to roles has been dropped (CASSANDRA-13371)
 + * Introduce error metrics for repair (CASSANDRA-13387)
 + * Refactoring to primitive functional interfaces in AuthCache (CASSANDRA-13732)
 + * Update metrics to 3.1.5 (CASSANDRA-13648)
 + * batch_size_warn_threshold_in_kb can now be set at runtime (CASSANDRA-13699)
 + * Avoid always rebuilding secondary indexes at startup (CASSANDRA-13725)
 + * Upgrade JMH from 1.13 to 1.19 (CASSANDRA-13727)
 + * Upgrade SLF4J from 1.7.7 to 1.7.25 (CASSANDRA-12996)
 + * Default for start_native_transport now true if not set in config (CASSANDRA-13656)
 + * Don't add localhost to the graph when calculating where to stream from (CASSANDRA-13583)
 + * Make CDC availability more deterministic via hard-linking (CASSANDRA-12148)
 + * Allow skipping equality-restricted clustering columns in ORDER BY clause (CASSANDRA-10271)
 + * Use common nowInSec for validation compactions (CASSANDRA-13671)
 + * Improve handling of IR prepare failures (CASSANDRA-13672)
 + * Send IR coordinator messages synchronously (CASSANDRA-13673)
 + * Flush system.repair table before IR finalize promise (CASSANDRA-13660)
 + * Fix column filter creation for wildcard queries (CASSANDRA-13650)
 + * Add 'nodetool getbatchlogreplaythrottle' and 'nodetool setbatchlogreplaythrottle' (CASSANDRA-13614)
 + * fix race condition in PendingRepairManager (CASSANDRA-13659)
 + * Allow noop incremental repair state transitions (CASSANDRA-13658)
 + * Run repair with down replicas (CASSANDRA-10446)
 + * Added started & completed repair metrics (CASSANDRA-13598)
 + * Added started & completed repair metrics (CASSANDRA-13598)
 + * Improve secondary index (re)build failure and concurrency handling (CASSANDRA-10130)
 + * Improve calculation of available disk space for compaction (CASSANDRA-13068)
 + * Change the accessibility of RowCacheSerializer for third party row cache plugins (CASSANDRA-13579)
 + * Allow sub-range repairs for a preview of repaired data (CASSANDRA-13570)
 + * NPE in IR cleanup when columnfamily has no sstables (CASSANDRA-13585)
 + * Fix Randomness of stress values (CASSANDRA-12744)
 + * Allow selecting Map values and Set elements (CASSANDRA-7396)
 + * Fast and garbage-free Streaming Histogram (CASSANDRA-13444)
 + * Update repairTime for keyspaces on completion (CASSANDRA-13539)
 + * Add configurable upper bound for validation executor threads (CASSANDRA-13521)
 + * Bring back maxHintTTL propery (CASSANDRA-12982)
 + * Add testing guidelines (CASSANDRA-13497)
 + * Add more repair metrics (CASSANDRA-13531)
 + * RangeStreamer should be smarter when picking endpoints for streaming (CASSANDRA-4650)
 + * Avoid rewrapping an exception thrown for cache load functions (CASSANDRA-13367)
 + * Log time elapsed for each incremental repair phase (CASSANDRA-13498)
 + * Add multiple table operation support to cassandra-stress (CASSANDRA-8780)
 + * Fix incorrect cqlsh results when selecting same columns multiple times (CASSANDRA-13262)
 + * Fix WriteResponseHandlerTest is sensitive to test execution order (CASSANDRA-13421)
 + * Improve incremental repair logging (CASSANDRA-13468)
 + * Start compaction when incremental repair finishes (CASSANDRA-13454)
 + * Add repair streaming preview (CASSANDRA-13257)
 + * Cleanup isIncremental/repairedAt usage (CASSANDRA-13430)
 + * Change protocol to allow sending key space independent of query string (CASSANDRA-10145)
 + * Make gc_log and gc_warn settable at runtime (CASSANDRA-12661)
 + * Take number of files in L0 in account when estimating remaining compaction tasks (CASSANDRA-13354)
 + * Skip building views during base table streams on range movements (CASSANDRA-13065)
 + * Improve error messages for +/- operations on maps and tuples (CASSANDRA-13197)
 + * Remove deprecated repair JMX APIs (CASSANDRA-11530)
 + * Fix version check to enable streaming keep-alive (CASSANDRA-12929)
 + * Make it possible to monitor an ideal consistency level separate from actual consistency level (CASSANDRA-13289)
 + * Outbound TCP connections ignore internode authenticator (CASSANDRA-13324)
 + * Upgrade junit from 4.6 to 4.12 (CASSANDRA-13360)
 + * Cleanup ParentRepairSession after repairs (CASSANDRA-13359)
 + * Upgrade snappy-java to 1.1.2.6 (CASSANDRA-13336)
 + * Incremental repair not streaming correct sstables (CASSANDRA-13328)
 + * Upgrade the jna version to 4.3.0 (CASSANDRA-13300)
 + * Add the currentTimestamp, currentDate, currentTime and currentTimeUUID functions (CASSANDRA-13132)
 + * Remove config option index_interval (CASSANDRA-10671)
 + * Reduce lock contention for collection types and serializers (CASSANDRA-13271)
 + * Make it possible to override MessagingService.Verb ids (CASSANDRA-13283)
 + * Avoid synchronized on prepareForRepair in ActiveRepairService (CASSANDRA-9292)
 + * Adds the ability to use uncompressed chunks in compressed files (CASSANDRA-10520)
 + * Don't flush sstables when streaming for incremental repair (CASSANDRA-13226)
 + * Remove unused method (CASSANDRA-13227)
 + * Fix minor bugs related to #9143 (CASSANDRA-13217)
 + * Output warning if user increases RF (CASSANDRA-13079)
 + * Remove pre-3.0 streaming compatibility code for 4.0 (CASSANDRA-13081)
 + * Add support for + and - operations on dates (CASSANDRA-11936)
 + * Fix consistency of incrementally repaired data (CASSANDRA-9143)
 + * Increase commitlog version (CASSANDRA-13161)
 + * Make TableMetadata immutable, optimize Schema (CASSANDRA-9425)
 + * Refactor ColumnCondition (CASSANDRA-12981)
 + * Parallelize streaming of different keyspaces (CASSANDRA-4663)
 + * Improved compactions metrics (CASSANDRA-13015)
 + * Speed-up start-up sequence by avoiding un-needed flushes (CASSANDRA-13031)
 + * Use Caffeine (W-TinyLFU) for on-heap caches (CASSANDRA-10855)
 + * Thrift removal (CASSANDRA-11115)
 + * Remove pre-3.0 compatibility code for 4.0 (CASSANDRA-12716)
 + * Add column definition kind to dropped columns in schema (CASSANDRA-12705)
 + * Add (automate) Nodetool Documentation (CASSANDRA-12672)
 + * Update bundled cqlsh python driver to 3.7.0 (CASSANDRA-12736)
 + * Reject invalid replication settings when creating or altering a keyspace (CASSANDRA-12681)
 + * Clean up the SSTableReader#getScanner API wrt removal of RateLimiter (CASSANDRA-12422)
 + * Use new token allocation for non bootstrap case as well (CASSANDRA-13080)
 + * Avoid byte-array copy when key cache is disabled (CASSANDRA-13084)
 + * Require forceful decommission if number of nodes is less than replication factor (CASSANDRA-12510)
 + * Allow IN restrictions on column families with collections (CASSANDRA-12654)
 + * Log message size in trace message in OutboundTcpConnection (CASSANDRA-13028)
 + * Add timeUnit Days for cassandra-stress (CASSANDRA-13029)
 + * Add mutation size and batch metrics (CASSANDRA-12649)
 + * Add method to get size of endpoints to TokenMetadata (CASSANDRA-12999)
 + * Expose time spent waiting in thread pool queue (CASSANDRA-8398)
 + * Conditionally update index built status to avoid unnecessary flushes (CASSANDRA-12969)
 + * cqlsh auto completion: refactor definition of compaction strategy options (CASSANDRA-12946)
 + * Add support for arithmetic operators (CASSANDRA-11935)
 + * Add histogram for delay to deliver hints (CASSANDRA-13234)
 + * Fix cqlsh automatic protocol downgrade regression (CASSANDRA-13307)
 + * Changing `max_hint_window_in_ms` at runtime (CASSANDRA-11720)
 + * Trivial format error in StorageProxy (CASSANDRA-13551)
 + * Nodetool repair can hang forever if we lose the notification for the repair completing/failing (CASSANDRA-13480)
 + * Anticompaction can cause noisy log messages (CASSANDRA-13684)
 + * Switch to client init for sstabledump (CASSANDRA-13683)
 + * CQLSH: Don't pause when capturing data (CASSANDRA-13743)
 + * nodetool clearsnapshot requires --all to clear all snapshots (CASSANDRA-13391)
 + * Correctly count range tombstones in traces and tombstone thresholds (CASSANDRA-8527)
 + * cqlshrc.sample uses incorrect option for time formatting (CASSANDRA-14243)
 + * Multi-version in-JVM dtests (CASSANDRA-14937)
 + * Allow instance class loaders to be garbage collected for inJVM dtest (CASSANDRA-15170)
 +
  
  3.11.5
 - * Fix SASI non-literal string comparisons (range operators) (CASSANDRA-15169)
   * Make sure user defined compaction transactions are always closed (CASSANDRA-15123)
   * Fix cassandra-env.sh to use $CASSANDRA_CONF to find cassandra-jaas.config (CASSANDRA-14305)
   * Fixed nodetool cfstats printing index name twice (CASSANDRA-14903)
diff --cc src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 326c005,fa00e5b..61129bb
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@@ -2194,29 -2259,25 +2195,28 @@@ public class ColumnFamilyStore implemen
          // and so we only run one major compaction at a time
          synchronized (this)
          {
 -            logger.trace("Cancelling in-progress compactions for {}", metadata.cfName);
 +            logger.trace("Cancelling in-progress compactions for {}", metadata.name);
 +            Iterable<ColumnFamilyStore> toInterruptFor = interruptIndexes
 +                                                         ? concatWithIndexes()
 +                                                         : Collections.singleton(this);
  
 -            Iterable<ColumnFamilyStore> selfWithAuxiliaryCfs = interruptViews
 -                                                               ? Iterables.concat(concatWithIndexes(), viewManager.allViewsCfs())
 -                                                               : concatWithIndexes();
 +            toInterruptFor = interruptViews
 +                             ? Iterables.concat(toInterruptFor, viewManager.allViewsCfs())
 +                             : toInterruptFor;
  
-             for (ColumnFamilyStore cfs : toInterruptFor)
-                 cfs.getCompactionStrategyManager().pause();
-             try
+             try (CompactionManager.CompactionPauser pause = CompactionManager.instance.pauseGlobalCompaction();
 -                 CompactionManager.CompactionPauser pausedStrategies = pauseCompactionStrategies(selfWithAuxiliaryCfs))
++                 CompactionManager.CompactionPauser pausedStrategies = pauseCompactionStrategies(toInterruptFor))
              {
                  // interrupt in-progress compactions
 -                CompactionManager.instance.interruptCompactionForCFs(selfWithAuxiliaryCfs, interruptValidation);
 -                CompactionManager.instance.waitForCessation(selfWithAuxiliaryCfs);
 +                CompactionManager.instance.interruptCompactionForCFs(toInterruptFor, sstablesPredicate, interruptValidation);
 +                CompactionManager.instance.waitForCessation(toInterruptFor, sstablesPredicate);
  
                  // doublecheck that we finished, instead of timing out
 -                for (ColumnFamilyStore cfs : selfWithAuxiliaryCfs)
 +                for (ColumnFamilyStore cfs : toInterruptFor)
                  {
 -                    if (!cfs.getTracker().getCompacting().isEmpty())
 +                    if (cfs.getTracker().getCompacting().stream().anyMatch(sstablesPredicate))
                      {
 -                        logger.warn("Unable to cancel in-progress compactions for {}.  Perhaps there is an unusually large row in progress somewhere, or the system is simply overloaded.", metadata.cfName);
 +                        logger.warn("Unable to cancel in-progress compactions for {}.  Perhaps there is an unusually large row in progress somewhere, or the system is simply overloaded.", metadata.name);
                          return null;
                      }
                  }
diff --cc src/java/org/apache/cassandra/db/compaction/CompactionInfo.java
index 09bed74,a6dbd9d..275ce70
--- a/src/java/org/apache/cassandra/db/compaction/CompactionInfo.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionInfo.java
@@@ -184,32 -165,15 +184,38 @@@ public final class CompactionInf
              stopRequested = true;
          }
  
+         /**
+          * if this compaction involves several/all tables we can safely check globalCompactionsPaused
+          * in isStopRequested() below
+          */
+         public abstract boolean isGlobal();
+ 
          public boolean isStopRequested()
          {
-             return stopRequested;
+             return stopRequested || (isGlobal() && CompactionManager.instance.isGlobalCompactionPaused());
          }
      }
 +
 +    public enum Unit
 +    {
 +        BYTES("bytes"), RANGES("token range parts"), KEYS("keys");
 +
 +        private final String name;
 +
 +        Unit(String name)
 +        {
 +            this.name = name;
 +        }
 +
 +        @Override
 +        public String toString()
 +        {
 +            return this.name;
 +        }
 +
 +        public static boolean isFileSize(String unit)
 +        {
 +            return BYTES.toString().equals(unit);
 +        }
 +    }
  }
diff --cc src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
index 789d1ee,9aba938..1128108
--- a/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
@@@ -120,10 -124,14 +120,15 @@@ public class CompactionIterator extend
                                    type,
                                    bytesRead,
                                    totalBytes,
 -                                  compactionId);
 +                                  compactionId,
 +                                  sstables);
      }
  
+     public boolean isGlobal()
+     {
+         return false;
+     }
+ 
      private void updateCounterFor(int rows)
      {
          assert rows > 0 && rows - 1 < mergeCounters.length;
diff --cc src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index bb1d585,a08d08b..cfca7d9
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@@ -131,15 -118,11 +131,18 @@@ public class CompactionManager implemen
      @VisibleForTesting
      final Multiset<ColumnFamilyStore> compactingCF = ConcurrentHashMultiset.create();
  
 +    public final ActiveCompactions active = new ActiveCompactions();
 +
+     // used to temporarily pause non-strategy managed compactions (like index summary redistribution)
+     private final AtomicInteger globalCompactionPauseCount = new AtomicInteger(0);
+ 
      private final RateLimiter compactionRateLimiter = RateLimiter.create(Double.MAX_VALUE);
  
 +    public CompactionMetrics getMetrics()
 +    {
 +        return metrics;
 +    }
 +
      /**
       * Gets compaction rate limiter.
       * Rate unit is bytes per sec.
@@@ -2161,14 -2136,25 +2164,37 @@@
          }
      }
  
++
 +    public List<CompactionInfo> getSSTableTasks()
 +    {
 +        return active.getCompactions()
 +                     .stream()
 +                     .map(CompactionInfo.Holder::getCompactionInfo)
 +                     .filter(task -> task.getTaskType() != OperationType.COUNTER_CACHE_SAVE
 +                                     && task.getTaskType() != OperationType.KEY_CACHE_SAVE
 +                                     && task.getTaskType() != OperationType.ROW_CACHE_SAVE)
 +                     .collect(Collectors.toList());
 +    }
++
+     /**
+      * Return whether "global" compactions should be paused, used by ColumnFamilyStore#runWithCompactionsDisabled
+      *
+      * a global compaction is one that includes several/all tables, currently only IndexSummaryBuilder
+      */
+     public boolean isGlobalCompactionPaused()
+     {
+         return globalCompactionPauseCount.get() > 0;
+     }
+ 
+     public CompactionPauser pauseGlobalCompaction()
+     {
+         CompactionPauser pauser = globalCompactionPauseCount::decrementAndGet;
+         globalCompactionPauseCount.incrementAndGet();
+         return pauser;
+     }
+ 
+     public interface CompactionPauser extends AutoCloseable
+     {
+         public void close();
+     }
  }
diff --cc src/java/org/apache/cassandra/db/compaction/CompactionTask.java
index 685da2e,2efcd11..725f04d
--- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
@@@ -184,13 -184,17 +184,15 @@@ public class CompactionTask extends Abs
  
                  long lastBytesScanned = 0;
  
-                 if (!controller.cfs.getCompactionStrategyManager().isActive())
-                     throw new CompactionInterruptedException(ci.getCompactionInfo());
 -                if (collector != null)
 -                    collector.beginCompaction(ci);
--
 +                activeCompactions.beginCompaction(ci);
- 
                  try (CompactionAwareWriter writer = getCompactionAwareWriter(cfs, getDirectories(), transaction, actuallyCompact))
                  {
+                     // Note that we need to re-check this flag after calling beginCompaction above to avoid a window
+                     // where the compaction does not exist in activeCompactions but the CSM gets paused.
+                     // We already have the sstables marked compacting here so CompactionManager#waitForCessation will
+                     // block until the below exception is thrown and the transaction is cancelled.
+                     if (!controller.cfs.getCompactionStrategyManager().isActive())
+                         throw new CompactionInterruptedException(ci.getCompactionInfo());
                      estimatedKeys = writer.estimatedKeys();
                      while (ci.hasNext())
                      {
diff --cc src/java/org/apache/cassandra/db/view/ViewBuilderTask.java
index d041b48,0000000..c84c697
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/db/view/ViewBuilderTask.java
+++ b/src/java/org/apache/cassandra/db/view/ViewBuilderTask.java
@@@ -1,269 -1,0 +1,274 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.cassandra.db.view;
 +
 +import java.util.Collection;
 +import java.util.Collections;
 +import java.util.Iterator;
 +import java.util.UUID;
 +import java.util.concurrent.Callable;
 +import java.util.concurrent.TimeUnit;
 +import java.util.concurrent.atomic.AtomicLong;
 +
 +import com.google.common.annotations.VisibleForTesting;
 +import com.google.common.base.Function;
 +import com.google.common.base.Objects;
 +import com.google.common.collect.ImmutableSet;
 +import com.google.common.collect.Iterators;
 +import com.google.common.collect.PeekingIterator;
 +import com.google.common.util.concurrent.Futures;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import org.apache.cassandra.db.ColumnFamilyStore;
 +import org.apache.cassandra.db.DecoratedKey;
 +import org.apache.cassandra.db.DeletionTime;
 +import org.apache.cassandra.db.Mutation;
 +import org.apache.cassandra.db.ReadExecutionController;
 +import org.apache.cassandra.db.ReadQuery;
 +import org.apache.cassandra.db.SinglePartitionReadCommand;
 +import org.apache.cassandra.db.SystemKeyspace;
 +import org.apache.cassandra.db.compaction.CompactionInfo;
 +import org.apache.cassandra.db.compaction.CompactionInfo.Unit;
 +import org.apache.cassandra.db.compaction.CompactionInterruptedException;
 +import org.apache.cassandra.db.compaction.OperationType;
 +import org.apache.cassandra.db.lifecycle.SSTableSet;
 +import org.apache.cassandra.db.partitions.UnfilteredPartitionIterators;
 +import org.apache.cassandra.db.rows.Rows;
 +import org.apache.cassandra.db.rows.UnfilteredRowIterator;
 +import org.apache.cassandra.db.rows.UnfilteredRowIterators;
 +import org.apache.cassandra.dht.Range;
 +import org.apache.cassandra.dht.Token;
 +import org.apache.cassandra.gms.Gossiper;
 +import org.apache.cassandra.io.sstable.ReducingKeyIterator;
 +import org.apache.cassandra.io.sstable.format.SSTableReader;
 +import org.apache.cassandra.service.StorageProxy;
 +import org.apache.cassandra.utils.FBUtilities;
 +import org.apache.cassandra.utils.UUIDGen;
 +import org.apache.cassandra.utils.concurrent.Refs;
 +
 +public class ViewBuilderTask extends CompactionInfo.Holder implements Callable<Long>
 +{
 +    private static final Logger logger = LoggerFactory.getLogger(ViewBuilderTask.class);
 +
 +    private static final int ROWS_BETWEEN_CHECKPOINTS = 1000;
 +
 +    private final ColumnFamilyStore baseCfs;
 +    private final View view;
 +    private final Range<Token> range;
 +    private final UUID compactionId;
 +    private volatile Token prevToken;
 +    private volatile long keysBuilt = 0;
 +    private volatile boolean isStopped = false;
 +    private volatile boolean isCompactionInterrupted = false;
 +
 +    @VisibleForTesting
 +    public ViewBuilderTask(ColumnFamilyStore baseCfs, View view, Range<Token> range, Token lastToken, long keysBuilt)
 +    {
 +        this.baseCfs = baseCfs;
 +        this.view = view;
 +        this.range = range;
 +        this.compactionId = UUIDGen.getTimeUUID();
 +        this.prevToken = lastToken;
 +        this.keysBuilt = keysBuilt;
 +    }
 +
 +    @SuppressWarnings("resource")
 +    private void buildKey(DecoratedKey key)
 +    {
 +        ReadQuery selectQuery = view.getReadQuery();
 +
 +        if (!selectQuery.selectsKey(key))
 +        {
 +            logger.trace("Skipping {}, view query filters", key);
 +            return;
 +        }
 +
 +        int nowInSec = FBUtilities.nowInSeconds();
 +        SinglePartitionReadCommand command = view.getSelectStatement().internalReadForView(key, nowInSec);
 +
 +        // We're rebuilding everything from what's on disk, so we read everything, consider that as new updates
 +        // and pretend that there is nothing pre-existing.
 +        UnfilteredRowIterator empty = UnfilteredRowIterators.noRowsIterator(baseCfs.metadata(), key, Rows.EMPTY_STATIC_ROW, DeletionTime.LIVE, false);
 +
 +        try (ReadExecutionController orderGroup = command.executionController();
 +             UnfilteredRowIterator data = UnfilteredPartitionIterators.getOnlyElement(command.executeLocally(orderGroup), command))
 +        {
 +            Iterator<Collection<Mutation>> mutations = baseCfs.keyspace.viewManager
 +                                                       .forTable(baseCfs.metadata.id)
 +                                                       .generateViewUpdates(Collections.singleton(view), data, empty, nowInSec, true);
 +
 +            AtomicLong noBase = new AtomicLong(Long.MAX_VALUE);
 +            mutations.forEachRemaining(m -> StorageProxy.mutateMV(key.getKey(), m, true, noBase, System.nanoTime()));
 +        }
 +    }
 +
 +    public Long call()
 +    {
 +        String ksName = baseCfs.metadata.keyspace;
 +
 +        if (prevToken == null)
 +            logger.debug("Starting new view build for range {}", range);
 +        else
 +            logger.debug("Resuming view build for range {} from token {} with {} covered keys", range, prevToken, keysBuilt);
 +
 +        /*
 +         * It's possible for view building to start before MV creation got propagated to other nodes. For this reason
 +         * we should wait for schema to converge before attempting to send any view mutations to other nodes, or else
 +         * face UnknownTableException upon Mutation deserialization on the nodes that haven't processed the schema change.
 +         */
 +        boolean schemaConverged = Gossiper.instance.waitForSchemaAgreement(10, TimeUnit.SECONDS, () -> this.isStopped);
 +        if (!schemaConverged)
 +            logger.warn("Failed to get schema to converge before building view {}.{}", baseCfs.keyspace.getName(), view.name);
 +
 +        Function<org.apache.cassandra.db.lifecycle.View, Iterable<SSTableReader>> function;
 +        function = org.apache.cassandra.db.lifecycle.View.select(SSTableSet.CANONICAL, s -> range.intersects(s.getBounds()));
 +
 +        try (ColumnFamilyStore.RefViewFragment viewFragment = baseCfs.selectAndReference(function);
 +             Refs<SSTableReader> sstables = viewFragment.refs;
 +             ReducingKeyIterator keyIter = new ReducingKeyIterator(sstables))
 +        {
 +            PeekingIterator<DecoratedKey> iter = Iterators.peekingIterator(keyIter);
 +            while (!isStopped && iter.hasNext())
 +            {
 +                DecoratedKey key = iter.next();
 +                Token token = key.getToken();
 +                //skip tokens already built or not present in range
 +                if (range.contains(token) && (prevToken == null || token.compareTo(prevToken) > 0))
 +                {
 +                    buildKey(key);
 +                    ++keysBuilt;
 +                    //build other keys sharing the same token
 +                    while (iter.hasNext() && iter.peek().getToken().equals(token))
 +                    {
 +                        key = iter.next();
 +                        buildKey(key);
 +                        ++keysBuilt;
 +                    }
 +                    if (keysBuilt % ROWS_BETWEEN_CHECKPOINTS == 1)
 +                        SystemKeyspace.updateViewBuildStatus(ksName, view.name, range, token, keysBuilt);
 +                    prevToken = token;
 +                }
 +            }
 +        }
 +
 +        finish();
 +
 +        return keysBuilt;
 +    }
 +
 +    private void finish()
 +    {
 +        String ksName = baseCfs.keyspace.getName();
 +        if (!isStopped)
 +        {
 +            // Save the completed status using the end of the range as last token. This way it will be possible for
 +            // future view build attempts to don't even create a task for this range
 +            SystemKeyspace.updateViewBuildStatus(ksName, view.name, range, range.right, keysBuilt);
 +
 +            logger.debug("Completed build of view({}.{}) for range {} after covering {} keys ", ksName, view.name, range, keysBuilt);
 +        }
 +        else
 +        {
 +            logger.debug("Stopped build for view({}.{}) for range {} after covering {} keys", ksName, view.name, range, keysBuilt);
 +
 +            // If it's stopped due to a compaction interruption we should throw that exception.
 +            // Otherwise we assume that the task has been stopped due to a schema update and we can finish successfully.
 +            if (isCompactionInterrupted)
 +                throw new StoppedException(ksName, view.name, getCompactionInfo());
 +        }
 +    }
 +
 +    @Override
 +    public CompactionInfo getCompactionInfo()
 +    {
 +        // we don't know the sstables at construction of ViewBuilderTask and we could change this to return once we know the
 +        // but since we basically only cancel view builds on truncation where we cancel all compactions anyway, this seems reasonable
 +
 +        // If there's splitter, calculate progress based on last token position
 +        if (range.left.getPartitioner().splitter().isPresent())
 +        {
 +            long progress = prevToken == null ? 0 : Math.round(prevToken.getPartitioner().splitter().get().positionInRange(prevToken, range) * 1000);
 +            return CompactionInfo.withoutSSTables(baseCfs.metadata(), OperationType.VIEW_BUILD, progress, 1000, Unit.RANGES, compactionId);
 +        }
 +
 +        // When there is no splitter, estimate based on number of total keys but
 +        // take the max with keysBuilt + 1 to avoid having more completed than total
 +        long keysTotal = Math.max(keysBuilt + 1, baseCfs.estimatedKeysForRange(range));
 +        return CompactionInfo.withoutSSTables(baseCfs.metadata(), OperationType.VIEW_BUILD, keysBuilt, keysTotal, Unit.KEYS, compactionId);
 +    }
 +
 +    @Override
 +    public void stop()
 +    {
 +        stop(true);
 +    }
 +
++    public boolean isGlobal()
++    {
++        return false;
++    }
++
 +    synchronized void stop(boolean isCompactionInterrupted)
 +    {
 +        isStopped = true;
 +        this.isCompactionInterrupted = isCompactionInterrupted;
 +    }
 +
 +    long keysBuilt()
 +    {
 +        return keysBuilt;
 +    }
 +
 +    /**
 +     * {@link CompactionInterruptedException} with {@link Object#equals(Object)} and {@link Object#hashCode()}
 +     * implementations that consider equals all the exceptions produced by the same view build, independently of their
 +     * token range.
 +     * <p>
 +     * This is used to avoid Guava's {@link Futures#allAsList(Iterable)} log spamming when multiple build tasks fail
 +     * due to compaction interruption.
 +     */
 +    static class StoppedException extends CompactionInterruptedException
 +    {
 +        private final String ksName, viewName;
 +
 +        private StoppedException(String ksName, String viewName, CompactionInfo info)
 +        {
 +            super(info);
 +            this.ksName = ksName;
 +            this.viewName = viewName;
 +        }
 +
 +        @Override
 +        public boolean equals(Object o)
 +        {
 +            if (!(o instanceof StoppedException))
 +                return false;
 +
 +            StoppedException that = (StoppedException) o;
 +            return Objects.equal(this.ksName, that.ksName) && Objects.equal(this.viewName, that.viewName);
 +        }
 +
 +        @Override
 +        public int hashCode()
 +        {
 +            return 31 * ksName.hashCode() + viewName.hashCode();
 +        }
 +    }
 +}
diff --cc src/java/org/apache/cassandra/index/SecondaryIndexBuilder.java
index 9ec8a4e,8276626..73dc334
--- a/src/java/org/apache/cassandra/index/SecondaryIndexBuilder.java
+++ b/src/java/org/apache/cassandra/index/SecondaryIndexBuilder.java
@@@ -25,4 -25,8 +25,9 @@@ import org.apache.cassandra.db.compacti
  public abstract class SecondaryIndexBuilder extends CompactionInfo.Holder
  {
      public abstract void build();
++
+     public boolean isGlobal()
+     {
+         return false;
+     }
  }
diff --cc src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java
index 1f4059a,dea1cd6..880b738
--- a/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java
+++ b/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java
@@@ -228,13 -224,13 +228,15 @@@ public class IndexSummaryManager implem
  
      public void redistributeSummaries() throws IOException
      {
+         if (CompactionManager.instance.isGlobalCompactionPaused())
+             return;
 -        Pair<List<SSTableReader>, Map<UUID, LifecycleTransaction>> compactingAndNonCompacting = getCompactingAndNonCompactingSSTables();
 +        Pair<Long, Map<TableId, LifecycleTransaction>> redistributionTransactionInfo = getRestributionTransactions();
 +        Map<TableId, LifecycleTransaction> transactions = redistributionTransactionInfo.right;
 +        long nonRedistributingOffHeapSize = redistributionTransactionInfo.left;
          try
          {
 -            redistributeSummaries(new IndexSummaryRedistribution(compactingAndNonCompacting.left,
 -                                                                 compactingAndNonCompacting.right,
 +            redistributeSummaries(new IndexSummaryRedistribution(transactions,
 +                                                                 nonRedistributingOffHeapSize,
                                                                   this.memoryPoolBytes));
          }
          catch (Exception e)
diff --cc src/java/org/apache/cassandra/io/sstable/IndexSummaryRedistribution.java
index a8fcad1,d80adc0..1300c99
--- a/src/java/org/apache/cassandra/io/sstable/IndexSummaryRedistribution.java
+++ b/src/java/org/apache/cassandra/io/sstable/IndexSummaryRedistribution.java
@@@ -305,9 -312,14 +306,14 @@@ public class IndexSummaryRedistributio
  
      public CompactionInfo getCompactionInfo()
      {
 -        return new CompactionInfo(OperationType.INDEX_SUMMARY, (memoryPoolBytes - remainingSpace), memoryPoolBytes, Unit.BYTES, compactionId);
 +        return CompactionInfo.withoutSSTables(null, OperationType.INDEX_SUMMARY, (memoryPoolBytes - remainingSpace), memoryPoolBytes, Unit.BYTES, compactionId);
      }
  
+     public boolean isGlobal()
+     {
+         return true;
+     }
+ 
      /** Utility class for sorting sstables by their read rates. */
      private static class ReadRateComparator implements Comparator<SSTableReader>
      {
diff --cc test/unit/org/apache/cassandra/db/compaction/CancelCompactionsTest.java
index 976180c,bcbe92d..60433bb
--- a/test/unit/org/apache/cassandra/db/compaction/CancelCompactionsTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/CancelCompactionsTest.java
@@@ -18,436 -18,88 +18,506 @@@
  
  package org.apache.cassandra.db.compaction;
  
 +import java.util.ArrayList;
  import java.util.Collections;
 +import java.util.HashSet;
  import java.util.List;
 +import java.util.Set;
 +import java.util.UUID;
  import java.util.concurrent.CountDownLatch;
 +import java.util.concurrent.ExecutionException;
 +import java.util.concurrent.Executors;
 +import java.util.concurrent.Future;
 +import java.util.concurrent.TimeUnit;
 +import java.util.stream.Collectors;
  
 +import com.google.common.collect.ImmutableSet;
- import org.junit.BeforeClass;
+ import com.google.common.util.concurrent.Uninterruptibles;
  import org.junit.Test;
  
 -import org.apache.cassandra.config.CFMetaData;
 +import org.apache.cassandra.config.DatabaseDescriptor;
  import org.apache.cassandra.cql3.CQLTester;
 -import org.apache.cassandra.metrics.CompactionMetrics;
 +import org.apache.cassandra.db.ColumnFamilyStore;
 +import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
 +import org.apache.cassandra.db.repair.PendingAntiCompaction;
 +import org.apache.cassandra.dht.IPartitioner;
 +import org.apache.cassandra.dht.Murmur3Partitioner;
 +import org.apache.cassandra.dht.Range;
 +import org.apache.cassandra.dht.Token;
 +import org.apache.cassandra.index.Index;
 +import org.apache.cassandra.index.StubIndex;
 +import org.apache.cassandra.index.internal.CollatedViewIndexBuilder;
 +import org.apache.cassandra.io.sstable.ISSTableScanner;
 +import org.apache.cassandra.io.sstable.ReducingKeyIterator;
 +import org.apache.cassandra.io.sstable.format.SSTableReader;
 +import org.apache.cassandra.locator.InetAddressAndPort;
 +import org.apache.cassandra.locator.RangesAtEndpoint;
 +import org.apache.cassandra.locator.Replica;
 +import org.apache.cassandra.schema.MockSchema;
++import org.apache.cassandra.schema.TableMetadata;
 +import org.apache.cassandra.service.ActiveRepairService;
 +import org.apache.cassandra.streaming.PreviewKind;
 +import org.apache.cassandra.utils.FBUtilities;
  
 +import static org.junit.Assert.assertEquals;
 +import static org.junit.Assert.assertFalse;
  import static org.junit.Assert.assertNotNull;
 +import static org.junit.Assert.assertTrue;
+ import static org.junit.Assert.fail;
  
  public class CancelCompactionsTest extends CQLTester
  {
 +    /**
 +     * makes sure we only cancel compactions if the precidate says we have overlapping sstables
 +     */
 +    @Test
 +    public void cancelTest() throws InterruptedException
 +    {
 +        ColumnFamilyStore cfs = MockSchema.newCFS();
 +        List<SSTableReader> sstables = createSSTables(cfs, 10, 0);
 +        Set<SSTableReader> toMarkCompacting = new HashSet<>(sstables.subList(0, 3));
 +
 +        TestCompactionTask tct = new TestCompactionTask(cfs, toMarkCompacting);
 +        try
 +        {
 +            tct.start();
 +
 +            List<CompactionInfo.Holder> activeCompactions = CompactionManager.instance.active.getCompactions();
 +            assertEquals(1, activeCompactions.size());
 +            assertEquals(activeCompactions.get(0).getCompactionInfo().getSSTables(), toMarkCompacting);
 +            // predicate requires the non-compacting sstables, should not cancel the one currently compacting:
 +            cfs.runWithCompactionsDisabled(() -> null, (sstable) -> !toMarkCompacting.contains(sstable), false, false, true);
 +            assertEquals(1, activeCompactions.size());
 +            assertFalse(activeCompactions.get(0).isStopRequested());
 +
 +            // predicate requires the compacting ones - make sure stop is requested and that when we abort that
 +            // compaction we actually run the callable (countdown the latch)
 +            CountDownLatch cdl = new CountDownLatch(1);
 +            Thread t = new Thread(() -> cfs.runWithCompactionsDisabled(() -> { cdl.countDown(); return null; }, toMarkCompacting::contains, false, false, true));
 +            t.start();
 +            while (!activeCompactions.get(0).isStopRequested())
 +                Thread.sleep(100);
 +
 +            // cdl.countDown will not get executed until we have aborted all compactions for the sstables in toMarkCompacting
 +            assertFalse(cdl.await(2, TimeUnit.SECONDS));
 +            tct.abort();
 +            // now the compactions are aborted and we can successfully wait for the latch
 +            t.join();
 +            assertTrue(cdl.await(2, TimeUnit.SECONDS));
 +        }
 +        finally
 +        {
 +            tct.abort();
 +        }
 +    }
 +
 +    /**
 +     * make sure we only cancel relevant compactions when there are multiple ongoing compactions
 +     */
 +    @Test
 +    public void multipleCompactionsCancelTest() throws InterruptedException
 +    {
 +        ColumnFamilyStore cfs = MockSchema.newCFS();
 +        List<SSTableReader> sstables = createSSTables(cfs, 10, 0);
 +
 +        List<TestCompactionTask> tcts = new ArrayList<>();
 +        tcts.add(new TestCompactionTask(cfs, new HashSet<>(sstables.subList(0, 3))));
 +        tcts.add(new TestCompactionTask(cfs, new HashSet<>(sstables.subList(6, 9))));
 +
 +        try
 +        {
 +            tcts.forEach(TestCompactionTask::start);
 +
 +            List<CompactionInfo.Holder> activeCompactions = CompactionManager.instance.active.getCompactions();
 +            assertEquals(2, activeCompactions.size());
 +
 +            Set<Set<SSTableReader>> compactingSSTables = new HashSet<>();
 +            compactingSSTables.add(activeCompactions.get(0).getCompactionInfo().getSSTables());
 +            compactingSSTables.add(activeCompactions.get(1).getCompactionInfo().getSSTables());
 +            Set<Set<SSTableReader>> expectedSSTables = new HashSet<>();
 +            expectedSSTables.add(new HashSet<>(sstables.subList(0, 3)));
 +            expectedSSTables.add(new HashSet<>(sstables.subList(6, 9)));
 +            assertEquals(compactingSSTables, expectedSSTables);
 +
 +            cfs.runWithCompactionsDisabled(() -> null, (sstable) -> false, false, false, true);
 +            assertEquals(2, activeCompactions.size());
 +            assertTrue(activeCompactions.stream().noneMatch(CompactionInfo.Holder::isStopRequested));
 +
 +            CountDownLatch cdl = new CountDownLatch(1);
 +            // start a compaction which only needs the sstables where first token is > 50 - these are the sstables compacted by tcts.get(1)
 +            Thread t = new Thread(() -> cfs.runWithCompactionsDisabled(() -> { cdl.countDown(); return null; }, (sstable) -> first(sstable) > 50, false, false, true));
 +            t.start();
 +            activeCompactions = CompactionManager.instance.active.getCompactions();
 +            assertEquals(2, activeCompactions.size());
 +            Thread.sleep(500);
 +            for (CompactionInfo.Holder holder : activeCompactions)
 +            {
 +                if (holder.getCompactionInfo().getSSTables().containsAll(sstables.subList(6, 9)))
 +                    assertTrue(holder.isStopRequested());
 +                else
 +                    assertFalse(holder.isStopRequested());
 +            }
 +            tcts.get(1).abort();
 +            assertEquals(1, CompactionManager.instance.active.getCompactions().size());
 +            cdl.await();
 +            t.join();
 +        }
 +        finally
 +        {
 +            tcts.forEach(TestCompactionTask::abort);
 +        }
 +    }
 +
 +    /**
 +     * Makes sure sub range compaction now only cancels the relevant compactions, not all of them
 +     */
 +    @Test
 +    public void testSubrangeCompaction() throws InterruptedException
 +    {
 +        ColumnFamilyStore cfs = MockSchema.newCFS();
 +        List<SSTableReader> sstables = createSSTables(cfs, 10, 0);
 +
 +        List<TestCompactionTask> tcts = new ArrayList<>();
 +        tcts.add(new TestCompactionTask(cfs, new HashSet<>(sstables.subList(0, 2))));
 +        tcts.add(new TestCompactionTask(cfs, new HashSet<>(sstables.subList(3, 4))));
 +        tcts.add(new TestCompactionTask(cfs, new HashSet<>(sstables.subList(5, 7))));
 +        tcts.add(new TestCompactionTask(cfs, new HashSet<>(sstables.subList(8, 9))));
 +        try
 +        {
 +            tcts.forEach(TestCompactionTask::start);
 +
 +            List<CompactionInfo.Holder> activeCompactions = CompactionManager.instance.active.getCompactions();
 +            assertEquals(4, activeCompactions.size());
 +            Range<Token> range = new Range<>(token(0), token(49));
 +            Thread t = new Thread(() -> {
 +                try
 +                {
 +                    cfs.forceCompactionForTokenRange(Collections.singleton(range));
 +                }
 +                catch (Throwable e)
 +                {
 +                    throw new RuntimeException(e);
 +                }
 +            });
 +
 +            t.start();
 +
 +            Thread.sleep(500);
 +            assertEquals(4, CompactionManager.instance.active.getCompactions().size());
 +            List<TestCompactionTask> toAbort = new ArrayList<>();
 +            for (CompactionInfo.Holder holder : CompactionManager.instance.active.getCompactions())
 +            {
 +                if (holder.getCompactionInfo().getSSTables().stream().anyMatch(sstable -> sstable.intersects(Collections.singleton(range))))
 +                {
 +                    assertTrue(holder.isStopRequested());
 +                    for (TestCompactionTask tct : tcts)
 +                        if (tct.sstables.equals(holder.getCompactionInfo().getSSTables()))
 +                            toAbort.add(tct);
 +                }
 +                else
 +                    assertFalse(holder.isStopRequested());
 +            }
 +            assertEquals(2, toAbort.size());
 +            toAbort.forEach(TestCompactionTask::abort);
 +            t.join();
 +
 +        }
 +        finally
 +        {
 +            tcts.forEach(TestCompactionTask::abort);
 +        }
 +    }
 +
 +    @Test
 +    public void testAnticompaction() throws InterruptedException, ExecutionException
 +    {
 +        ColumnFamilyStore cfs = MockSchema.newCFS();
 +        List<SSTableReader> sstables = createSSTables(cfs, 10, 0);
 +        List<SSTableReader> alreadyRepairedSSTables = createSSTables(cfs, 10, 10);
 +        for (SSTableReader sstable : alreadyRepairedSSTables)
 +            AbstractPendingRepairTest.mutateRepaired(sstable, System.currentTimeMillis());
 +        assertEquals(20, cfs.getLiveSSTables().size());
 +        List<TestCompactionTask> tcts = new ArrayList<>();
 +        tcts.add(new TestCompactionTask(cfs, new HashSet<>(sstables.subList(0, 2))));
 +        tcts.add(new TestCompactionTask(cfs, new HashSet<>(sstables.subList(3, 4))));
 +        tcts.add(new TestCompactionTask(cfs, new HashSet<>(sstables.subList(5, 7))));
 +        tcts.add(new TestCompactionTask(cfs, new HashSet<>(sstables.subList(8, 9))));
 +
 +        List<TestCompactionTask> nonAffectedTcts = new ArrayList<>();
 +        nonAffectedTcts.add(new TestCompactionTask(cfs, new HashSet<>(alreadyRepairedSSTables)));
 +
 +        try
 +        {
 +            tcts.forEach(TestCompactionTask::start);
 +            nonAffectedTcts.forEach(TestCompactionTask::start);
 +            List<CompactionInfo.Holder> activeCompactions = CompactionManager.instance.active.getCompactions();
 +            assertEquals(5, activeCompactions.size());
 +            // make sure that sstables are fully contained so that the metadata gets mutated
 +            Range<Token> range = new Range<>(token(-1), token(49));
 +
 +            UUID prsid = UUID.randomUUID();
 +            ActiveRepairService.instance.registerParentRepairSession(prsid, InetAddressAndPort.getLocalHost(), Collections.singletonList(cfs), Collections.singleton(range), true, 1, true, PreviewKind.NONE);
 +
 +            InetAddressAndPort local = FBUtilities.getBroadcastAddressAndPort();
 +            RangesAtEndpoint rae = RangesAtEndpoint.builder(local).add(new Replica(local, range, true)).build();
 +
 +            PendingAntiCompaction pac = new PendingAntiCompaction(prsid, Collections.singleton(cfs), rae, Executors.newSingleThreadExecutor(), () -> false);
 +            Future<?> fut = pac.run();
 +            Thread.sleep(600);
 +            List<TestCompactionTask> toAbort = new ArrayList<>();
 +            for (CompactionInfo.Holder holder : CompactionManager.instance.active.getCompactions())
 +            {
 +                if (holder.getCompactionInfo().getSSTables().stream().anyMatch(sstable -> sstable.intersects(Collections.singleton(range)) && !sstable.isRepaired() && !sstable.isPendingRepair()))
 +                {
 +                    assertTrue(holder.isStopRequested());
 +                    for (TestCompactionTask tct : tcts)
 +                        if (tct.sstables.equals(holder.getCompactionInfo().getSSTables()))
 +                            toAbort.add(tct);
 +                }
 +                else
 +                    assertFalse(holder.isStopRequested());
 +            }
 +            assertEquals(2, toAbort.size());
 +            toAbort.forEach(TestCompactionTask::abort);
 +            fut.get();
 +            for (SSTableReader sstable : sstables)
 +                assertTrue(!sstable.intersects(Collections.singleton(range)) || sstable.isPendingRepair());
 +        }
 +        finally
 +        {
 +            tcts.forEach(TestCompactionTask::abort);
 +            nonAffectedTcts.forEach(TestCompactionTask::abort);
 +        }
 +    }
 +
 +    /**
 +     * Make sure index rebuilds get cancelled
 +     */
 +    @Test
 +    public void testIndexRebuild() throws ExecutionException, InterruptedException
 +    {
 +        ColumnFamilyStore cfs = MockSchema.newCFS();
 +        List<SSTableReader> sstables = createSSTables(cfs, 5, 0);
 +        Index idx = new StubIndex(cfs, null);
 +        CountDownLatch indexBuildStarted = new CountDownLatch(1);
 +        CountDownLatch indexBuildRunning = new CountDownLatch(1);
 +        CountDownLatch compactionsStopped = new CountDownLatch(1);
 +        ReducingKeyIterator reducingKeyIterator = new ReducingKeyIterator(sstables)
 +        {
 +            @Override
 +            public boolean hasNext()
 +            {
 +                indexBuildStarted.countDown();
 +                try
 +                {
 +                    indexBuildRunning.await();
 +                }
 +                catch (InterruptedException e)
 +                {
 +                    throw new RuntimeException();
 +                }
 +                return false;
 +            }
 +        };
 +        Future<?> f = CompactionManager.instance.submitIndexBuild(new CollatedViewIndexBuilder(cfs, Collections.singleton(idx), reducingKeyIterator, ImmutableSet.copyOf(sstables)));
 +        // wait for hasNext to get called
 +        indexBuildStarted.await();
 +        assertEquals(1, CompactionManager.instance.active.getCompactions().size());
 +        boolean foundCompaction = false;
 +        for (CompactionInfo.Holder holder : CompactionManager.instance.active.getCompactions())
 +        {
 +            if (holder.getCompactionInfo().getSSTables().equals(new HashSet<>(sstables)))
 +            {
 +                assertFalse(holder.isStopRequested());
 +                foundCompaction = true;
 +            }
 +        }
 +        assertTrue(foundCompaction);
 +        cfs.runWithCompactionsDisabled(() -> {compactionsStopped.countDown(); return null;}, (sstable) -> true, false, false, true);
 +        // wait for the runWithCompactionsDisabled callable
 +        compactionsStopped.await();
 +        assertEquals(1, CompactionManager.instance.active.getCompactions().size());
 +        foundCompaction = false;
 +        for (CompactionInfo.Holder holder : CompactionManager.instance.active.getCompactions())
 +        {
 +            if (holder.getCompactionInfo().getSSTables().equals(new HashSet<>(sstables)))
 +            {
 +                assertTrue(holder.isStopRequested());
 +                foundCompaction = true;
 +            }
 +        }
 +        assertTrue(foundCompaction);
 +        // signal that the index build should be finished
 +        indexBuildRunning.countDown();
 +        f.get();
 +        assertTrue(CompactionManager.instance.active.getCompactions().isEmpty());
 +    }
 +
 +    long first(SSTableReader sstable)
 +    {
 +        return (long)sstable.first.getToken().getTokenValue();
 +    }
 +
 +    Token token(long t)
 +    {
 +        return new Murmur3Partitioner.LongToken(t);
 +    }
 +
 +    private List<SSTableReader> createSSTables(ColumnFamilyStore cfs, int count, int startGeneration)
 +    {
 +        List<SSTableReader> sstables = new ArrayList<>();
 +        for (int i = 0; i < count; i++)
 +        {
 +            long first = i * 10;
 +            long last  = (i + 1) * 10 - 1;
 +            sstables.add(MockSchema.sstable(startGeneration + i, 0, true, first, last, cfs));
 +        }
 +        cfs.disableAutoCompaction();
 +        cfs.addSSTables(sstables);
 +        return sstables;
 +    }
 +
 +    private static class TestCompactionTask
 +    {
 +        private ColumnFamilyStore cfs;
 +        private final Set<SSTableReader> sstables;
 +        private LifecycleTransaction txn;
 +        private CompactionController controller;
 +        private CompactionIterator ci;
 +        private List<ISSTableScanner> scanners;
 +
 +        public TestCompactionTask(ColumnFamilyStore cfs, Set<SSTableReader> sstables)
 +        {
 +            this.cfs = cfs;
 +            this.sstables = sstables;
 +        }
 +
 +        public void start()
 +        {
 +            scanners = sstables.stream().map(SSTableReader::getScanner).collect(Collectors.toList());
 +            txn = cfs.getTracker().tryModify(sstables, OperationType.COMPACTION);
 +            assertNotNull(txn);
 +            controller = new CompactionController(cfs, sstables, Integer.MIN_VALUE);
 +            ci = new CompactionIterator(txn.opType(), scanners, controller, FBUtilities.nowInSeconds(), UUID.randomUUID());
 +            CompactionManager.instance.active.beginCompaction(ci);
 +        }
 +
 +        public void abort()
 +        {
 +            if (controller != null)
 +                controller.close();
 +            if (ci != null)
 +                ci.close();
 +            if (txn != null)
 +                txn.abort();
 +            if (scanners != null)
 +                scanners.forEach(ISSTableScanner::close);
 +            CompactionManager.instance.active.finishCompaction(ci);
 +
 +        }
 +    }
 +
 +    @Test
 +    public void test2iCancellation() throws Throwable
 +    {
 +        createTable("create table %s (id int primary key, something int)");
 +        createIndex("create index on %s(something)");
 +        getCurrentColumnFamilyStore().disableAutoCompaction();
 +        for (int i = 0; i < 10; i++)
 +            execute("insert into %s (id, something) values (?, ?)", i, i);
 +        flush();
 +        ColumnFamilyStore idx = getCurrentColumnFamilyStore().indexManager.getAllIndexColumnFamilyStores().iterator().next();
 +        Set<SSTableReader> sstables = new HashSet<>();
 +        try (LifecycleTransaction txn = idx.getTracker().tryModify(idx.getLiveSSTables(), OperationType.COMPACTION))
 +        {
 +            getCurrentColumnFamilyStore().runWithCompactionsDisabled(() -> true, (sstable) -> { sstables.add(sstable); return true;}, false, false, false);
 +        }
 +        // the predicate only gets compacting sstables, and we are only compacting the 2i sstables - with interruptIndexes = false we should see no sstables here
 +        assertTrue(sstables.isEmpty());
 +    }
 +
 +    @Test
 +    public void testSubrangeCompactionWith2i() throws Throwable
 +    {
 +        createTable("create table %s (id int primary key, something int)");
 +        createIndex("create index on %s(something)");
 +        getCurrentColumnFamilyStore().disableAutoCompaction();
 +        for (int i = 0; i < 10; i++)
 +            execute("insert into %s (id, something) values (?, ?)", i, i);
 +        flush();
 +        ColumnFamilyStore idx = getCurrentColumnFamilyStore().indexManager.getAllIndexColumnFamilyStores().iterator().next();
 +        try (LifecycleTransaction txn = idx.getTracker().tryModify(idx.getLiveSSTables(), OperationType.COMPACTION))
 +        {
 +            IPartitioner partitioner = getCurrentColumnFamilyStore().getPartitioner();
 +            getCurrentColumnFamilyStore().forceCompactionForTokenRange(Collections.singleton(new Range<>(partitioner.getMinimumToken(), partitioner.getMaximumToken())));
 +        }
 +    }
++
+     @Test
+     public void testStandardCompactionTaskCancellation() throws Throwable
+     {
+         createTable("create table %s (id int primary key, something int)");
+         getCurrentColumnFamilyStore().disableAutoCompaction();
+ 
+         for (int i = 0; i < 10; i++)
+         {
+             execute("insert into %s (id, something) values (?,?)", i, i);
+             getCurrentColumnFamilyStore().forceBlockingFlush();
+         }
+         AbstractCompactionTask ct = null;
+ 
+         for (List<AbstractCompactionStrategy> css : getCurrentColumnFamilyStore().getCompactionStrategyManager().getStrategies())
+         {
+             for (AbstractCompactionStrategy cs : css)
+             {
+                 ct = cs.getNextBackgroundTask(0);
+                 if (ct != null)
+                     break;
+             }
 -            if (ct != null)
 -                break;
++            if (ct != null) break;
+         }
+         assertNotNull(ct);
+ 
+         CountDownLatch waitForBeginCompaction = new CountDownLatch(1);
+         CountDownLatch waitForStart = new CountDownLatch(1);
 -        Iterable<CFMetaData> metadatas = Collections.singleton(getCurrentColumnFamilyStore().metadata);
++        Iterable<TableMetadata> metadatas = Collections.singleton(getCurrentColumnFamilyStore().metadata());
+         /*
+         Here we ask strategies to pause & interrupt compactions right before calling beginCompaction in CompactionTask
+         The code running in the separate thread below mimics CFS#runWithCompactionsDisabled but we only allow
+         the real beginCompaction to be called after pausing & interrupting.
+          */
+         Thread t = new Thread(() -> {
+             Uninterruptibles.awaitUninterruptibly(waitForBeginCompaction);
+             getCurrentColumnFamilyStore().getCompactionStrategyManager().pause();
 -            CompactionManager.instance.interruptCompactionFor(metadatas, false);
++            CompactionManager.instance.interruptCompactionFor(metadatas, (s) -> true, false);
+             waitForStart.countDown();
 -            CompactionManager.instance.waitForCessation(Collections.singleton(getCurrentColumnFamilyStore()));
++            CompactionManager.instance.waitForCessation(Collections.singleton(getCurrentColumnFamilyStore()), (s) -> true);
+             getCurrentColumnFamilyStore().getCompactionStrategyManager().resume();
+         });
+         t.start();
+ 
+         try
+         {
 -            ct.execute(new CompactionMetrics()
++            ct.execute(new ActiveCompactions()
+             {
+                 @Override
+                 public void beginCompaction(CompactionInfo.Holder ci)
+                 {
+                     waitForBeginCompaction.countDown();
+                     Uninterruptibles.awaitUninterruptibly(waitForStart);
+                     super.beginCompaction(ci);
+                 }
+             });
+             fail("execute should throw CompactionInterruptedException");
+         }
+         catch (CompactionInterruptedException cie)
+         {
+             // expected
+         }
+         finally
+         {
+             ct.transaction.abort();
+             t.join();
+         }
+     }
  }
diff --cc test/unit/org/apache/cassandra/db/repair/PendingAntiCompactionTest.java
index 6bef001,0000000..1c5c245
mode 100644,000000..100644
--- a/test/unit/org/apache/cassandra/db/repair/PendingAntiCompactionTest.java
+++ b/test/unit/org/apache/cassandra/db/repair/PendingAntiCompactionTest.java
@@@ -1,747 -1,0 +1,762 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.cassandra.db.repair;
 +
 +import java.util.ArrayList;
 +import java.util.Collection;
 +import java.util.Collections;
 +import java.util.HashSet;
 +import java.util.List;
 +import java.util.Set;
 +import java.util.UUID;
 +import java.util.concurrent.CountDownLatch;
 +import java.util.concurrent.ExecutionException;
 +import java.util.concurrent.ExecutorService;
 +import java.util.concurrent.Executors;
 +import java.util.concurrent.Future;
 +import java.util.concurrent.TimeUnit;
 +import java.util.concurrent.TimeoutException;
 +import java.util.stream.Collectors;
 +
 +import javax.annotation.Nullable;
 +
 +import com.google.common.collect.Iterables;
 +import com.google.common.collect.Lists;
 +import com.google.common.util.concurrent.FutureCallback;
 +import com.google.common.util.concurrent.Futures;
 +import com.google.common.util.concurrent.ListenableFuture;
 +import com.google.common.util.concurrent.ListenableFutureTask;
 +import com.google.common.util.concurrent.ListeningExecutorService;
 +import com.google.common.util.concurrent.MoreExecutors;
 +import org.junit.Assert;
 +import org.junit.Test;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import org.apache.cassandra.config.DatabaseDescriptor;
 +import org.apache.cassandra.cql3.QueryProcessor;
 +import org.apache.cassandra.db.ColumnFamilyStore;
 +import org.apache.cassandra.db.compaction.AbstractPendingRepairTest;
 +import org.apache.cassandra.db.compaction.CompactionController;
 +import org.apache.cassandra.db.compaction.CompactionInfo;
 +import org.apache.cassandra.db.compaction.CompactionInterruptedException;
 +import org.apache.cassandra.db.compaction.CompactionIterator;
 +import org.apache.cassandra.db.compaction.CompactionManager;
 +import org.apache.cassandra.db.compaction.OperationType;
 +import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
 +import org.apache.cassandra.dht.ByteOrderedPartitioner;
 +import org.apache.cassandra.dht.Murmur3Partitioner;
 +import org.apache.cassandra.dht.Range;
 +import org.apache.cassandra.dht.Token;
 +import org.apache.cassandra.io.sstable.ISSTableScanner;
 +import org.apache.cassandra.io.sstable.format.SSTableReader;
 +import org.apache.cassandra.locator.InetAddressAndPort;
 +import org.apache.cassandra.locator.RangesAtEndpoint;
 +import org.apache.cassandra.locator.Replica;
 +import org.apache.cassandra.repair.consistent.LocalSessionAccessor;
 +import org.apache.cassandra.schema.MockSchema;
 +import org.apache.cassandra.schema.Schema;
 +import org.apache.cassandra.schema.TableId;
 +import org.apache.cassandra.service.ActiveRepairService;
 +import org.apache.cassandra.streaming.PreviewKind;
 +import org.apache.cassandra.utils.ByteBufferUtil;
 +import org.apache.cassandra.utils.UUIDGen;
 +import org.apache.cassandra.utils.WrappedRunnable;
 +import org.apache.cassandra.utils.concurrent.Transactional;
 +
 +import static org.junit.Assert.assertEquals;
 +import static org.junit.Assert.assertFalse;
 +import static org.junit.Assert.assertNotNull;
 +import static org.junit.Assert.assertNull;
 +import static org.junit.Assert.assertTrue;
 +import static org.junit.Assert.fail;
 +
 +public class PendingAntiCompactionTest extends AbstractPendingAntiCompactionTest
 +{
 +    static final Logger logger = LoggerFactory.getLogger(PendingAntiCompactionTest.class);
 +
 +    private static class InstrumentedAcquisitionCallback extends PendingAntiCompaction.AcquisitionCallback
 +    {
 +        public InstrumentedAcquisitionCallback(UUID parentRepairSession, RangesAtEndpoint ranges)
 +        {
 +            super(parentRepairSession, ranges, () -> false);
 +        }
 +
 +        Set<TableId> submittedCompactions = new HashSet<>();
 +
 +        ListenableFuture<?> submitPendingAntiCompaction(PendingAntiCompaction.AcquireResult result)
 +        {
 +            submittedCompactions.add(result.cfs.metadata.id);
 +            result.abort();  // prevent ref leak complaints
 +            return ListenableFutureTask.create(() -> {}, null);
 +        }
 +    }
 +
 +    /**
 +     * verify the pending anti compaction happy path
 +     */
 +    @Test
 +    public void successCase() throws Exception
 +    {
 +        Assert.assertSame(ByteOrderedPartitioner.class, DatabaseDescriptor.getPartitioner().getClass());
 +        cfs.disableAutoCompaction();
 +
 +        // create 2 sstables, one that will be split, and another that will be moved
 +        for (int i = 0; i < 8; i++)
 +        {
 +            QueryProcessor.executeInternal(String.format("INSERT INTO %s.%s (k, v) VALUES (?, ?)", ks, tbl), i, i);
 +        }
 +        cfs.forceBlockingFlush();
 +        for (int i = 8; i < 12; i++)
 +        {
 +            QueryProcessor.executeInternal(String.format("INSERT INTO %s.%s (k, v) VALUES (?, ?)", ks, tbl), i, i);
 +        }
 +        cfs.forceBlockingFlush();
 +        assertEquals(2, cfs.getLiveSSTables().size());
 +
 +        Token left = ByteOrderedPartitioner.instance.getToken(ByteBufferUtil.bytes((int) 6));
 +        Token right = ByteOrderedPartitioner.instance.getToken(ByteBufferUtil.bytes((int) 16));
 +        List<ColumnFamilyStore> tables = Lists.newArrayList(cfs);
 +        Collection<Range<Token>> ranges = Collections.singleton(new Range<>(left, right));
 +
 +        // create a session so the anti compaction can fine it
 +        UUID sessionID = UUIDGen.getTimeUUID();
 +        ActiveRepairService.instance.registerParentRepairSession(sessionID, InetAddressAndPort.getLocalHost(), tables, ranges, true, 1, true, PreviewKind.NONE);
 +
 +        PendingAntiCompaction pac;
 +        ExecutorService executor = Executors.newSingleThreadExecutor();
 +        try
 +        {
 +            pac = new PendingAntiCompaction(sessionID, tables, atEndpoint(ranges, NO_RANGES), executor, () -> false);
 +            pac.run().get();
 +        }
 +        finally
 +        {
 +            executor.shutdown();
 +        }
 +
 +        assertEquals(3, cfs.getLiveSSTables().size());
 +        int pendingRepair = 0;
 +        for (SSTableReader sstable : cfs.getLiveSSTables())
 +        {
 +            if (sstable.isPendingRepair())
 +                pendingRepair++;
 +        }
 +        assertEquals(2, pendingRepair);
 +    }
 +
 +    @Test
 +    public void acquisitionSuccess() throws Exception
 +    {
 +        cfs.disableAutoCompaction();
 +        makeSSTables(6);
 +        List<SSTableReader> sstables = new ArrayList<>(cfs.getLiveSSTables());
 +        List<SSTableReader> expected = sstables.subList(0, 3);
 +        Collection<Range<Token>> ranges = new HashSet<>();
 +        for (SSTableReader sstable : expected)
 +        {
 +            ranges.add(new Range<>(sstable.first.getToken(), sstable.last.getToken()));
 +        }
 +
 +        PendingAntiCompaction.AcquisitionCallable acquisitionCallable = new PendingAntiCompaction.AcquisitionCallable(cfs, ranges, UUIDGen.getTimeUUID(), 0, 0);
 +
 +        logger.info("SSTables: {}", sstables);
 +        logger.info("Expected: {}", expected);
 +        PendingAntiCompaction.AcquireResult result = acquisitionCallable.call();
 +        assertNotNull(result);
 +        logger.info("Originals: {}", result.txn.originals());
 +        assertEquals(3, result.txn.originals().size());
 +        for (SSTableReader sstable : expected)
 +        {
 +            logger.info("Checking {}", sstable);
 +            assertTrue(result.txn.originals().contains(sstable));
 +        }
 +
 +        assertEquals(Transactional.AbstractTransactional.State.IN_PROGRESS, result.txn.state());
 +        result.abort();
 +    }
 +
 +    @Test
 +    public void repairedSSTablesAreNotAcquired() throws Exception
 +    {
 +        cfs.disableAutoCompaction();
 +        makeSSTables(2);
 +
 +        List<SSTableReader> sstables = new ArrayList<>(cfs.getLiveSSTables());
 +        assertEquals(2, sstables.size());
 +        SSTableReader repaired = sstables.get(0);
 +        SSTableReader unrepaired = sstables.get(1);
 +        assertTrue(repaired.intersects(FULL_RANGE));
 +        assertTrue(unrepaired.intersects(FULL_RANGE));
 +
 +        repaired.descriptor.getMetadataSerializer().mutateRepairMetadata(repaired.descriptor, 1, null, false);
 +        repaired.reloadSSTableMetadata();
 +
 +        PendingAntiCompaction.AcquisitionCallable acquisitionCallable = new PendingAntiCompaction.AcquisitionCallable(cfs, FULL_RANGE, UUIDGen.getTimeUUID(), 0, 0);
 +        PendingAntiCompaction.AcquireResult result = acquisitionCallable.call();
 +        assertNotNull(result);
 +
 +        logger.info("Originals: {}", result.txn.originals());
 +        assertEquals(1, result.txn.originals().size());
 +        assertTrue(result.txn.originals().contains(unrepaired));
 +        result.abort(); // release sstable refs
 +    }
 +
 +    @Test
 +    public void finalizedPendingRepairSSTablesAreNotAcquired() throws Exception
 +    {
 +        cfs.disableAutoCompaction();
 +        makeSSTables(2);
 +
 +        List<SSTableReader> sstables = new ArrayList<>(cfs.getLiveSSTables());
 +        assertEquals(2, sstables.size());
 +        SSTableReader repaired = sstables.get(0);
 +        SSTableReader unrepaired = sstables.get(1);
 +        assertTrue(repaired.intersects(FULL_RANGE));
 +        assertTrue(unrepaired.intersects(FULL_RANGE));
 +
 +        UUID sessionId = prepareSession();
 +        LocalSessionAccessor.finalizeUnsafe(sessionId);
 +        repaired.descriptor.getMetadataSerializer().mutateRepairMetadata(repaired.descriptor, 0, sessionId, false);
 +        repaired.reloadSSTableMetadata();
 +        assertTrue(repaired.isPendingRepair());
 +
 +        PendingAntiCompaction.AcquisitionCallable acquisitionCallable = new PendingAntiCompaction.AcquisitionCallable(cfs, FULL_RANGE, UUIDGen.getTimeUUID(), 0, 0);
 +        PendingAntiCompaction.AcquireResult result = acquisitionCallable.call();
 +        assertNotNull(result);
 +
 +        logger.info("Originals: {}", result.txn.originals());
 +        assertEquals(1, result.txn.originals().size());
 +        assertTrue(result.txn.originals().contains(unrepaired));
 +        result.abort();  // releases sstable refs
 +    }
 +
 +    @Test
 +    public void conflictingSessionAcquisitionFailure() throws Exception
 +    {
 +        cfs.disableAutoCompaction();
 +        makeSSTables(2);
 +
 +        List<SSTableReader> sstables = new ArrayList<>(cfs.getLiveSSTables());
 +        assertEquals(2, sstables.size());
 +        SSTableReader repaired = sstables.get(0);
 +        SSTableReader unrepaired = sstables.get(1);
 +        assertTrue(repaired.intersects(FULL_RANGE));
 +        assertTrue(unrepaired.intersects(FULL_RANGE));
 +
 +        UUID sessionId = prepareSession();
 +        repaired.descriptor.getMetadataSerializer().mutateRepairMetadata(repaired.descriptor, 0, sessionId, false);
 +        repaired.reloadSSTableMetadata();
 +        assertTrue(repaired.isPendingRepair());
 +
 +        PendingAntiCompaction.AcquisitionCallable acquisitionCallable = new PendingAntiCompaction.AcquisitionCallable(cfs, FULL_RANGE, UUIDGen.getTimeUUID(), 0, 0);
 +        PendingAntiCompaction.AcquireResult result = acquisitionCallable.call();
 +        assertNull(result);
 +    }
 +
 +    @Test
 +    public void pendingRepairNoSSTablesExist() throws Exception
 +    {
 +        cfs.disableAutoCompaction();
 +
 +        assertEquals(0, cfs.getLiveSSTables().size());
 +
 +        PendingAntiCompaction.AcquisitionCallable acquisitionCallable = new PendingAntiCompaction.AcquisitionCallable(cfs, FULL_RANGE, UUIDGen.getTimeUUID(), 0, 0);
 +        PendingAntiCompaction.AcquireResult result = acquisitionCallable.call();
 +        assertNotNull(result);
 +
 +        result.abort();  // There's nothing to release, but we should exit cleanly
 +    }
 +
 +    /**
 +     * anti compaction task should be submitted if everything is ok
 +     */
 +    @Test
 +    public void callbackSuccess() throws Exception
 +    {
 +        cfs.disableAutoCompaction();
 +        makeSSTables(2);
 +
 +        PendingAntiCompaction.AcquisitionCallable acquisitionCallable = new PendingAntiCompaction.AcquisitionCallable(cfs, FULL_RANGE, UUIDGen.getTimeUUID(), 0, 0);
 +        PendingAntiCompaction.AcquireResult result = acquisitionCallable.call();
 +        assertNotNull(result);
 +
 +        InstrumentedAcquisitionCallback cb = new InstrumentedAcquisitionCallback(UUIDGen.getTimeUUID(), atEndpoint(FULL_RANGE, NO_RANGES));
 +        assertTrue(cb.submittedCompactions.isEmpty());
 +        cb.apply(Lists.newArrayList(result));
 +
 +        assertEquals(1, cb.submittedCompactions.size());
 +        assertTrue(cb.submittedCompactions.contains(cfm.id));
 +    }
 +
 +    /**
 +     * If one of the supplied AcquireResults is null, either an Exception was thrown, or
 +     * we couldn't get a transaction for the sstables. In either case we need to cancel the repair, and release
 +     * any sstables acquired for other tables
 +     */
 +    @Test
 +    public void callbackNullResult() throws Exception
 +    {
 +        cfs.disableAutoCompaction();
 +        makeSSTables(2);
 +
 +        PendingAntiCompaction.AcquisitionCallable acquisitionCallable = new PendingAntiCompaction.AcquisitionCallable(cfs, FULL_RANGE, UUIDGen.getTimeUUID(), 0, 0);
 +        PendingAntiCompaction.AcquireResult result = acquisitionCallable.call();
 +        assertNotNull(result);
 +        assertEquals(Transactional.AbstractTransactional.State.IN_PROGRESS, result.txn.state());
 +
 +        InstrumentedAcquisitionCallback cb = new InstrumentedAcquisitionCallback(UUIDGen.getTimeUUID(), atEndpoint(FULL_RANGE, Collections.emptyList()));
 +        assertTrue(cb.submittedCompactions.isEmpty());
 +        cb.apply(Lists.newArrayList(result, null));
 +
 +        assertTrue(cb.submittedCompactions.isEmpty());
 +        assertEquals(Transactional.AbstractTransactional.State.ABORTED, result.txn.state());
 +    }
 +
 +    /**
 +     * If an AcquireResult has a null txn, there were no sstables to acquire references
 +     * for, so no anti compaction should have been submitted.
 +     */
 +    @Test
 +    public void callbackNullTxn() throws Exception
 +    {
 +        cfs.disableAutoCompaction();
 +        makeSSTables(2);
 +
 +        PendingAntiCompaction.AcquisitionCallable acquisitionCallable = new PendingAntiCompaction.AcquisitionCallable(cfs, FULL_RANGE, UUIDGen.getTimeUUID(), 0, 0);
 +        PendingAntiCompaction.AcquireResult result = acquisitionCallable.call();
 +        assertNotNull(result);
 +
 +        ColumnFamilyStore cfs2 = Schema.instance.getColumnFamilyStoreInstance(Schema.instance.getTableMetadata("system", "peers").id);
 +        PendingAntiCompaction.AcquireResult fakeResult = new PendingAntiCompaction.AcquireResult(cfs2, null, null);
 +
 +        InstrumentedAcquisitionCallback cb = new InstrumentedAcquisitionCallback(UUIDGen.getTimeUUID(), atEndpoint(FULL_RANGE, NO_RANGES));
 +        assertTrue(cb.submittedCompactions.isEmpty());
 +        cb.apply(Lists.newArrayList(result, fakeResult));
 +
 +        assertEquals(1, cb.submittedCompactions.size());
 +        assertTrue(cb.submittedCompactions.contains(cfm.id));
 +        assertFalse(cb.submittedCompactions.contains(cfs2.metadata.id));
 +    }
 +
 +
 +    @Test
 +    public void singleAnticompaction() throws Exception
 +    {
 +        cfs.disableAutoCompaction();
 +        makeSSTables(2);
 +
 +        PendingAntiCompaction.AcquisitionCallable acquisitionCallable = new PendingAntiCompaction.AcquisitionCallable(cfs, FULL_RANGE, UUIDGen.getTimeUUID(), 0, 0);
 +        PendingAntiCompaction.AcquireResult result = acquisitionCallable.call();
 +        UUID sessionID = UUIDGen.getTimeUUID();
 +        ActiveRepairService.instance.registerParentRepairSession(sessionID,
 +                                                                 InetAddressAndPort.getByName("127.0.0.1"),
 +                                                                 Lists.newArrayList(cfs),
 +                                                                 FULL_RANGE,
 +                                                                 true,0,
 +                                                                 true,
 +                                                                 PreviewKind.NONE);
 +        CompactionManager.instance.performAnticompaction(result.cfs, atEndpoint(FULL_RANGE, NO_RANGES), result.refs, result.txn, sessionID, () -> false);
 +
 +    }
 +
 +    @Test (expected = CompactionInterruptedException.class)
 +    public void cancelledAntiCompaction() throws Exception
 +    {
 +        cfs.disableAutoCompaction();
 +        makeSSTables(1);
 +
 +        PendingAntiCompaction.AcquisitionCallable acquisitionCallable = new PendingAntiCompaction.AcquisitionCallable(cfs, FULL_RANGE, UUIDGen.getTimeUUID(), 0, 0);
 +        PendingAntiCompaction.AcquireResult result = acquisitionCallable.call();
 +        UUID sessionID = UUIDGen.getTimeUUID();
 +        ActiveRepairService.instance.registerParentRepairSession(sessionID,
 +                                                                 InetAddressAndPort.getByName("127.0.0.1"),
 +                                                                 Lists.newArrayList(cfs),
 +                                                                 FULL_RANGE,
 +                                                                 true,0,
 +                                                                 true,
 +                                                                 PreviewKind.NONE);
 +
 +        // attempt to anti-compact the sstable in half
 +        SSTableReader sstable = Iterables.getOnlyElement(cfs.getLiveSSTables());
 +        Token left = cfs.getPartitioner().midpoint(sstable.first.getToken(), sstable.last.getToken());
 +        Token right = sstable.last.getToken();
 +        CompactionManager.instance.performAnticompaction(result.cfs,
 +                                                         atEndpoint(Collections.singleton(new Range<>(left, right)), NO_RANGES),
 +                                                         result.refs, result.txn, sessionID, () -> true);
 +    }
 +
 +    /**
 +     * Makes sure that PendingAntiCompaction fails when anticompaction throws exception
 +     */
 +    @Test
 +    public void antiCompactionException()
 +    {
 +        cfs.disableAutoCompaction();
 +        makeSSTables(2);
 +        UUID prsid = UUID.randomUUID();
 +        ListeningExecutorService es = MoreExecutors.listeningDecorator(MoreExecutors.newDirectExecutorService());
 +        PendingAntiCompaction pac = new PendingAntiCompaction(prsid, Collections.singleton(cfs), atEndpoint(FULL_RANGE, NO_RANGES), es, () -> false) {
 +            @Override
 +            protected AcquisitionCallback getAcquisitionCallback(UUID prsId, RangesAtEndpoint tokenRanges)
 +            {
 +                return new AcquisitionCallback(prsid, tokenRanges, () -> false)
 +                {
 +                    @Override
 +                    ListenableFuture<?> submitPendingAntiCompaction(AcquireResult result)
 +                    {
 +                        Runnable r = new WrappedRunnable()
 +                        {
 +                            protected void runMayThrow()
 +                            {
 +                                throw new CompactionInterruptedException(null);
 +                            }
 +                        };
 +                        return es.submit(r);
 +                    }
 +                };
 +            }
 +        };
 +        ListenableFuture<?> fut = pac.run();
 +        try
 +        {
 +            fut.get();
 +            fail("Should throw exception");
 +        }
 +        catch(Throwable t)
 +        {
 +        }
 +    }
 +
 +    @Test
 +    public void testBlockedAcquisition() throws ExecutionException, InterruptedException, TimeoutException
 +    {
 +        cfs.disableAutoCompaction();
 +        ExecutorService es = Executors.newFixedThreadPool(1);
 +
 +        makeSSTables(2);
 +        UUID prsid = UUID.randomUUID();
 +        Set<SSTableReader> sstables = cfs.getLiveSSTables();
 +        List<ISSTableScanner> scanners = sstables.stream().map(SSTableReader::getScanner).collect(Collectors.toList());
 +        try
 +        {
 +            try (LifecycleTransaction txn = cfs.getTracker().tryModify(sstables, OperationType.ANTICOMPACTION);
 +                 CompactionController controller = new CompactionController(cfs, sstables, 0);
 +                 CompactionIterator ci = CompactionManager.getAntiCompactionIterator(scanners, controller, 0, UUID.randomUUID(), CompactionManager.instance.active, () -> false))
 +            {
 +                // `ci` is our imaginary ongoing anticompaction which makes no progress until after 30s
 +                // now we try to start a new AC, which will try to cancel all ongoing compactions
 +
 +                CompactionManager.instance.active.beginCompaction(ci);
 +                PendingAntiCompaction pac = new PendingAntiCompaction(prsid, Collections.singleton(cfs), atEndpoint(FULL_RANGE, NO_RANGES), 0, 0, es, () -> false);
 +                ListenableFuture fut = pac.run();
 +                try
 +                {
 +                    fut.get(30, TimeUnit.SECONDS);
 +                    fail("the future should throw exception since we try to start a new anticompaction when one is already running");
 +                }
 +                catch (ExecutionException e)
 +                {
 +                    assertTrue(e.getCause() instanceof PendingAntiCompaction.SSTableAcquisitionException);
 +                }
 +
 +                assertEquals(1, getCompactionsFor(cfs).size());
 +                for (CompactionInfo.Holder holder : getCompactionsFor(cfs))
 +                    assertFalse(holder.isStopRequested());
 +            }
 +        }
 +        finally
 +        {
 +            es.shutdown();
 +            ISSTableScanner.closeAllAndPropagate(scanners, null);
 +        }
 +    }
 +
 +    private List<CompactionInfo.Holder> getCompactionsFor(ColumnFamilyStore cfs)
 +    {
 +        List<CompactionInfo.Holder> compactions = new ArrayList<>();
 +        for (CompactionInfo.Holder holder : CompactionManager.instance.active.getCompactions())
 +        {
 +            if (holder.getCompactionInfo().getTableMetadata().equals(cfs.metadata()))
 +                compactions.add(holder);
 +        }
 +        return compactions;
 +    }
 +
 +    @Test
 +    public void testUnblockedAcquisition() throws ExecutionException, InterruptedException
 +    {
 +        cfs.disableAutoCompaction();
 +        ExecutorService es = Executors.newFixedThreadPool(1);
 +        makeSSTables(2);
 +        UUID prsid = prepareSession();
 +        Set<SSTableReader> sstables = cfs.getLiveSSTables();
 +        List<ISSTableScanner> scanners = sstables.stream().map(SSTableReader::getScanner).collect(Collectors.toList());
 +        try
 +        {
 +            try (LifecycleTransaction txn = cfs.getTracker().tryModify(sstables, OperationType.ANTICOMPACTION);
 +                 CompactionController controller = new CompactionController(cfs, sstables, 0);
 +                 CompactionIterator ci = new CompactionIterator(OperationType.COMPACTION, scanners, controller, 0, UUID.randomUUID()))
 +            {
 +                // `ci` is our imaginary ongoing anticompaction which makes no progress until after 5s
 +                // now we try to start a new AC, which will try to cancel all ongoing compactions
 +
 +                CompactionManager.instance.active.beginCompaction(ci);
 +                PendingAntiCompaction pac = new PendingAntiCompaction(prsid, Collections.singleton(cfs), atEndpoint(FULL_RANGE, NO_RANGES), es, () -> false);
 +                ListenableFuture fut = pac.run();
 +                try
 +                {
 +                    fut.get(5, TimeUnit.SECONDS);
 +                }
 +                catch (TimeoutException e)
 +                {
 +                    // expected, we wait 1 minute for compactions to get cancelled in runWithCompactionsDisabled, but we are not iterating
 +                    // CompactionIterator so the compaction is not actually cancelled
 +                }
 +                try
 +                {
 +                    assertTrue(ci.hasNext());
 +                    ci.next();
 +                    fail("CompactionIterator should be abortable");
 +                }
 +                catch (CompactionInterruptedException e)
 +                {
 +                    CompactionManager.instance.active.finishCompaction(ci);
 +                    txn.abort();
 +                    // expected
 +                }
 +                CountDownLatch cdl = new CountDownLatch(1);
 +                Futures.addCallback(fut, new FutureCallback<Object>()
 +                {
 +                    public void onSuccess(@Nullable Object o)
 +                    {
 +                        cdl.countDown();
 +                    }
 +
 +                    public void onFailure(Throwable throwable)
 +                    {
 +                    }
 +                }, MoreExecutors.directExecutor());
 +                assertTrue(cdl.await(1, TimeUnit.MINUTES));
 +            }
 +        }
 +        finally
 +        {
 +            es.shutdown();
 +        }
 +    }
 +
 +    @Test
 +    public void testSSTablePredicateOngoingAntiCompaction()
 +    {
 +        ColumnFamilyStore cfs = MockSchema.newCFS();
 +        cfs.disableAutoCompaction();
 +        List<SSTableReader> sstables = new ArrayList<>();
 +        List<SSTableReader> repairedSSTables = new ArrayList<>();
 +        List<SSTableReader> pendingSSTables = new ArrayList<>();
 +        for (int i = 1; i <= 10; i++)
 +        {
 +            SSTableReader sstable = MockSchema.sstable(i, i * 10, i * 10 + 9, cfs);
 +            sstables.add(sstable);
 +        }
 +        for (int i = 1; i <= 10; i++)
 +        {
 +            SSTableReader sstable = MockSchema.sstable(i + 10, i * 10, i * 10 + 9, cfs);
 +            AbstractPendingRepairTest.mutateRepaired(sstable, System.currentTimeMillis());
 +            repairedSSTables.add(sstable);
 +        }
 +        for (int i = 1; i <= 10; i++)
 +        {
 +            SSTableReader sstable = MockSchema.sstable(i + 20, i * 10, i * 10 + 9, cfs);
 +            AbstractPendingRepairTest.mutateRepaired(sstable, UUID.randomUUID(), false);
 +            pendingSSTables.add(sstable);
 +        }
 +
 +        cfs.addSSTables(sstables);
 +        cfs.addSSTables(repairedSSTables);
 +
 +        // if we are compacting the non-repaired non-pending sstables, we should get an error
 +        tryPredicate(cfs, sstables, null, true);
 +        // make sure we don't try to grab pending or repaired sstables;
 +        tryPredicate(cfs, repairedSSTables, sstables, false);
 +        tryPredicate(cfs, pendingSSTables, sstables, false);
 +    }
 +
 +    private void tryPredicate(ColumnFamilyStore cfs, List<SSTableReader> compacting, List<SSTableReader> expectedLive, boolean shouldFail)
 +    {
 +        CompactionInfo.Holder holder = new CompactionInfo.Holder()
 +        {
 +            public CompactionInfo getCompactionInfo()
 +            {
 +                return new CompactionInfo(cfs.metadata(), OperationType.ANTICOMPACTION, 0, 1000, UUID.randomUUID(), compacting);
 +            }
++
++            public boolean isGlobal()
++            {
++                return false;
++            }
 +        };
 +        CompactionManager.instance.active.beginCompaction(holder);
 +        try
 +        {
 +            PendingAntiCompaction.AntiCompactionPredicate predicate =
 +            new PendingAntiCompaction.AntiCompactionPredicate(Collections.singleton(new Range<>(new Murmur3Partitioner.LongToken(0), new Murmur3Partitioner.LongToken(100))),
 +                                                              UUID.randomUUID());
 +            Set<SSTableReader> live = cfs.getLiveSSTables().stream().filter(predicate).collect(Collectors.toSet());
 +            if (shouldFail)
 +                fail("should fail - we try to grab already anticompacting sstables for anticompaction");
 +            assertEquals(live, new HashSet<>(expectedLive));
 +        }
 +        catch (PendingAntiCompaction.SSTableAcquisitionException e)
 +        {
 +            if (!shouldFail)
 +                fail("We should not fail filtering sstables");
 +        }
 +        finally
 +        {
 +            CompactionManager.instance.active.finishCompaction(holder);
 +        }
 +    }
 +
 +    @Test
 +    public void testRetries() throws InterruptedException, ExecutionException
 +    {
 +        ColumnFamilyStore cfs = MockSchema.newCFS();
 +        cfs.addSSTable(MockSchema.sstable(1, true, cfs));
 +        CountDownLatch cdl = new CountDownLatch(5);
 +        ExecutorService es = Executors.newFixedThreadPool(1);
 +        CompactionInfo.Holder holder = new CompactionInfo.Holder()
 +        {
 +            public CompactionInfo getCompactionInfo()
 +            {
 +                return new CompactionInfo(cfs.metadata(), OperationType.ANTICOMPACTION, 0, 0, UUID.randomUUID(), cfs.getLiveSSTables());
 +            }
++
++            public boolean isGlobal()
++            {
++                return false;
++            }
 +        };
 +        try
 +        {
 +            PendingAntiCompaction.AntiCompactionPredicate acp = new PendingAntiCompaction.AntiCompactionPredicate(FULL_RANGE, UUID.randomUUID())
 +            {
 +                @Override
 +                public boolean apply(SSTableReader sstable)
 +                {
 +                    cdl.countDown();
 +                    if (cdl.getCount() > 0)
 +                        throw new PendingAntiCompaction.SSTableAcquisitionException("blah");
 +                    return true;
 +                }
 +            };
 +            CompactionManager.instance.active.beginCompaction(holder);
 +            PendingAntiCompaction.AcquisitionCallable acquisitionCallable = new PendingAntiCompaction.AcquisitionCallable(cfs, UUID.randomUUID(), 10, 1, acp);
 +            Future f = es.submit(acquisitionCallable);
 +            cdl.await();
 +            assertNotNull(f.get());
 +        }
 +        finally
 +        {
 +            es.shutdown();
 +            CompactionManager.instance.active.finishCompaction(holder);
 +        }
 +    }
 +
 +    @Test
 +    public void testRetriesTimeout() throws InterruptedException, ExecutionException
 +    {
 +        ColumnFamilyStore cfs = MockSchema.newCFS();
 +        cfs.addSSTable(MockSchema.sstable(1, true, cfs));
 +        ExecutorService es = Executors.newFixedThreadPool(1);
 +        CompactionInfo.Holder holder = new CompactionInfo.Holder()
 +        {
 +            public CompactionInfo getCompactionInfo()
 +            {
 +                return new CompactionInfo(cfs.metadata(), OperationType.ANTICOMPACTION, 0, 0, UUID.randomUUID(), cfs.getLiveSSTables());
 +            }
++
++            public boolean isGlobal()
++            {
++                return false;
++            }
 +        };
 +        try
 +        {
 +            PendingAntiCompaction.AntiCompactionPredicate acp = new PendingAntiCompaction.AntiCompactionPredicate(FULL_RANGE, UUID.randomUUID())
 +            {
 +                @Override
 +                public boolean apply(SSTableReader sstable)
 +                {
 +                    throw new PendingAntiCompaction.SSTableAcquisitionException("blah");
 +                }
 +            };
 +            CompactionManager.instance.active.beginCompaction(holder);
 +            PendingAntiCompaction.AcquisitionCallable acquisitionCallable = new PendingAntiCompaction.AcquisitionCallable(cfs, UUID.randomUUID(), 2, 1000, acp);
 +            Future fut = es.submit(acquisitionCallable);
 +            assertNull(fut.get());
 +        }
 +        finally
 +        {
 +            es.shutdown();
 +            CompactionManager.instance.active.finishCompaction(holder);
 +        }
 +    }
 +
 +    @Test
 +    public void testWith2i() throws ExecutionException, InterruptedException
 +    {
 +        cfs2.disableAutoCompaction();
 +        makeSSTables(2, cfs2, 100);
 +        ColumnFamilyStore idx = cfs2.indexManager.getAllIndexColumnFamilyStores().iterator().next();
 +        ExecutorService es = Executors.newFixedThreadPool(1);
 +        try
 +        {
 +            UUID prsid = prepareSession();
 +            for (SSTableReader sstable : cfs2.getLiveSSTables())
 +                assertFalse(sstable.isPendingRepair());
 +
 +            // mark the sstables pending, with a 2i compaction going, which should be untouched;
 +            try (LifecycleTransaction txn = idx.getTracker().tryModify(idx.getLiveSSTables(), OperationType.COMPACTION))
 +            {
 +                PendingAntiCompaction pac = new PendingAntiCompaction(prsid, Collections.singleton(cfs2), atEndpoint(FULL_RANGE, NO_RANGES), es, () -> false);
 +                pac.run().get();
 +            }
 +            // and make sure it succeeded;
 +            for (SSTableReader sstable : cfs2.getLiveSSTables())
 +                assertTrue(sstable.isPendingRepair());
 +        }
 +        finally
 +        {
 +            es.shutdown();
 +        }
 +    }
 +
 +    private static RangesAtEndpoint atEndpoint(Collection<Range<Token>> full, Collection<Range<Token>> trans)
 +    {
 +        RangesAtEndpoint.Builder builder = RangesAtEndpoint.builder(local);
 +        for (Range<Token> range : full)
 +            builder.add(new Replica(local, range, true));
 +
 +        for (Range<Token> range : trans)
 +            builder.add(new Replica(local, range, false));
 +
 +        return builder.build();
 +    }
 +}
diff --cc test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java
index 68ee3e1,b33ead2..d383e88
--- a/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java
@@@ -61,6 -57,7 +61,7 @@@ import org.apache.cassandra.utils.ByteB
  
  import static com.google.common.collect.ImmutableMap.of;
  import static java.util.Arrays.asList;
 -import static org.apache.cassandra.db.compaction.AntiCompactionTest.assertOnDiskState;
++import static org.apache.cassandra.Util.assertOnDiskState;
  import static org.apache.cassandra.io.sstable.Downsampling.BASE_SAMPLING_LEVEL;
  import static org.apache.cassandra.io.sstable.IndexSummaryRedistribution.DOWNSAMPLE_THESHOLD;
  import static org.apache.cassandra.io.sstable.IndexSummaryRedistribution.UPSAMPLE_THRESHOLD;
@@@ -68,7 -65,9 +69,8 @@@ import static org.junit.Assert.assertEq
  import static org.junit.Assert.assertNotNull;
  import static org.junit.Assert.assertNull;
  import static org.junit.Assert.assertTrue;
+ import static org.junit.Assert.fail;
  
 -
  @RunWith(OrderedJUnit4ClassRunner.class)
  public class IndexSummaryManagerTest
  {
@@@ -640,69 -633,91 +642,111 @@@
          final AtomicReference<CompactionInterruptedException> exception = new AtomicReference<>();
          // barrier to control when redistribution runs
          final CountDownLatch barrier = new CountDownLatch(1);
 +        CompactionInfo.Holder ongoingCompaction = new CompactionInfo.Holder()
 +        {
 +            public CompactionInfo getCompactionInfo()
 +            {
 +                return new CompactionInfo(cfs.metadata(), OperationType.UNKNOWN, 0, 0, UUID.randomUUID(), compacting);
 +            }
+ 
 -        Thread t = NamedThreadFactory.createThread(new Runnable()
++            public boolean isGlobal()
++            {
++                return false;
++            }
 +        };
 +        try (LifecycleTransaction ignored = cfs.getTracker().tryModify(compacting, OperationType.UNKNOWN))
          {
 -            public void run()
 +            CompactionManager.instance.active.beginCompaction(ongoingCompaction);
 +
 +            Thread t = NamedThreadFactory.createThread(new Runnable()
              {
 -                try
 +                public void run()
                  {
 -                    // Don't leave enough space for even the minimal index summaries
 -                    try (LifecycleTransaction txn = cfs.getTracker().tryModify(sstables, OperationType.UNKNOWN))
 +                    try
 +                    {
 +                        // Don't leave enough space for even the minimal index summaries
 +                        try (LifecycleTransaction txn = cfs.getTracker().tryModify(sstables, OperationType.UNKNOWN))
 +                        {
 +                            IndexSummaryManager.redistributeSummaries(new ObservableRedistribution(of(cfs.metadata.id, txn),
 +                                                                                                   0,
 +                                                                                                   singleSummaryOffHeapSpace,
 +                                                                                                   barrier));
 +                        }
 +                    }
 +                    catch (CompactionInterruptedException ex)
 +                    {
 +                        exception.set(ex);
 +                    }
 +                    catch (IOException ignored)
                      {
 -                        IndexSummaryManager.redistributeSummaries(new ObservableRedistribution(Collections.EMPTY_LIST,
 -                                                                                               of(cfs.metadata.cfId, txn),
 -                                                                                               singleSummaryOffHeapSpace,
 -                                                                                               barrier));
                      }
                  }
 -                catch (CompactionInterruptedException ex)
 -                {
 -                    exception.set(ex);
 -                }
 -                catch (IOException ignored)
 -                {
 -                }
 -            }
 -        });
 -        t.start();
 -        while (CompactionManager.instance.getActiveCompactions() == 0 && t.isAlive())
 -            Thread.sleep(1);
 -        // to ensure that the stop condition check in IndexSummaryRedistribution::redistributeSummaries
 -        // is made *after* the halt request is made to the CompactionManager, don't allow the redistribution
 -        // to proceed until stopCompaction has been called.
 -        cancelFunction.accept(cfs);
 -        // allows the redistribution to proceed
 -        barrier.countDown();
 -        t.join();
 +            });
 +
 +            t.start();
 +            while (CompactionManager.instance.getActiveCompactions() < 2 && t.isAlive())
 +                Thread.sleep(1);
 +            // to ensure that the stop condition check in IndexSummaryRedistribution::redistributeSummaries
 +            // is made *after* the halt request is made to the CompactionManager, don't allow the redistribution
 +            // to proceed until stopCompaction has been called.
 +            cancelFunction.accept(cfs);
 +            // allows the redistribution to proceed
 +            barrier.countDown();
 +            t.join();
 +        }
 +        finally
 +        {
 +            CompactionManager.instance.active.finishCompaction(ongoingCompaction);
 +        }
  
          assertNotNull("Expected compaction interrupted exception", exception.get());
 -        assertTrue("Expected no active compactions", CompactionMetrics.getCompactions().isEmpty());
 +        assertTrue("Expected no active compactions", CompactionManager.instance.active.getCompactions().isEmpty());
  
 -        Set<SSTableReader> beforeRedistributionSSTables = new HashSet<>(sstables);
 +        Set<SSTableReader> beforeRedistributionSSTables = new HashSet<>(allSSTables);
          Set<SSTableReader> afterCancelSSTables = new HashSet<>(cfs.getLiveSSTables());
          Set<SSTableReader> disjoint = Sets.symmetricDifference(beforeRedistributionSSTables, afterCancelSSTables);
          assertTrue(String.format("Mismatched files before and after cancelling redistribution: %s",
                                   Joiner.on(",").join(disjoint)),
                     disjoint.isEmpty());
-         Util.assertOnDiskState(cfs, 8);
 -
 -        assertOnDiskState(cfs, numSSTables);
++        assertOnDiskState(cfs, 8);
+         validateData(cfs, numRows);
+     }
+ 
+     @Test
+     public void testPauseIndexSummaryManager() throws Exception
+     {
+         String ksname = KEYSPACE1;
+         String cfname = CF_STANDARDLOWiINTERVAL; // index interval of 8, no key caching
+         Keyspace keyspace = Keyspace.open(ksname);
+         ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(cfname);
+         int numSSTables = 4;
+         int numRows = 256;
+         createSSTables(ksname, cfname, numSSTables, numRows);
+ 
+         List<SSTableReader> sstables = new ArrayList<>(cfs.getLiveSSTables());
+         for (SSTableReader sstable : sstables)
+             sstable.overrideReadMeter(new RestorableMeter(100.0, 100.0));
+ 
+         long singleSummaryOffHeapSpace = sstables.get(0).getIndexSummaryOffHeapSize();
+ 
+         // everything should get cut in half
+         assert sstables.size() == numSSTables;
+         try (LifecycleTransaction txn = cfs.getTracker().tryModify(sstables, OperationType.UNKNOWN))
+         {
+             try (AutoCloseable toresume = CompactionManager.instance.pauseGlobalCompaction())
+             {
 -                sstables = redistributeSummaries(Collections.emptyList(), of(cfs.metadata.cfId, txn), (singleSummaryOffHeapSpace * (numSSTables / 2)));
++                sstables = redistributeSummaries(Collections.emptyList(), of(cfs.metadata().id, txn), (singleSummaryOffHeapSpace * (numSSTables / 2)));
+                 fail("The redistribution should fail - we got paused before adding to active compactions, but after marking compacting");
+             }
+         }
+         catch (CompactionInterruptedException e)
+         {
+             // expected
+         }
+         for (SSTableReader sstable : sstables)
+             assertEquals(BASE_SAMPLING_LEVEL, sstable.getIndexSummarySamplingLevel());
          validateData(cfs, numRows);
+         assertOnDiskState(cfs, numSSTables);
      }
  
      private static List<SSTableReader> redistributeSummaries(List<SSTableReader> compacting,


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org