You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2022/01/17 08:20:43 UTC

[cassandra] branch trunk updated (76f8333 -> df1a2d4)

This is an automated email from the ASF dual-hosted git repository.

marcuse pushed a change to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git.


    from 76f8333  Use abolute path when checking file stores
     new 98e798f  Don't block gossip when clearing repair snapshots
     new df1a2d4  Merge branch 'cassandra-4.0' into trunk

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 CHANGES.txt                                        |   1 +
 .../cassandra/repair/RepairMessageVerbHandler.java |   1 +
 .../schema/SystemDistributedKeyspace.java          |   2 +-
 .../cassandra/service/ActiveRepairService.java     |  37 ++++-
 .../distributed/test/ClearSnapshotTest.java        | 170 +++++++++++++++++++++
 5 files changed, 206 insertions(+), 5 deletions(-)
 create mode 100644 test/distributed/org/apache/cassandra/distributed/test/ClearSnapshotTest.java

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org


[cassandra] 01/01: Merge branch 'cassandra-4.0' into trunk

Posted by ma...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

marcuse pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit df1a2d4c3db4ff016d03d5403ac68778a71d5759
Merge: 76f8333 98e798f
Author: Marcus Eriksson <ma...@apache.org>
AuthorDate: Mon Jan 17 09:13:32 2022 +0100

    Merge branch 'cassandra-4.0' into trunk

 CHANGES.txt                                        |   1 +
 .../cassandra/repair/RepairMessageVerbHandler.java |   1 +
 .../schema/SystemDistributedKeyspace.java          |   2 +-
 .../cassandra/service/ActiveRepairService.java     |  37 ++++-
 .../distributed/test/ClearSnapshotTest.java        | 170 +++++++++++++++++++++
 5 files changed, 206 insertions(+), 5 deletions(-)

diff --cc CHANGES.txt
index c3d1e81,0896d56..e5a1a8b
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,82 -1,5 +1,83 @@@
 -4.0.2
 +4.1
 + * Prewarm role and credential caches to avoid timeouts at startup (CASSANDRA-16958)
 + * Make capacity/validity/updateinterval/activeupdate for Auth Caches configurable via nodetool (CASSANDRA-17063)
 + * Added startup check for read_ahead_kb setting (CASSANDRA-16436)
 + * Avoid unecessary array allocations and initializations when performing query checks (CASSANDRA-17209)
 + * Add guardrail for list operations that require read before write (CASSANDRA-17154)
 + * Migrate thresholds for number of keyspaces and tables to guardrails (CASSANDRA-17195)
 + * Remove self-reference in SSTableTidier (CASSANDRA-17205)
 + * Add guardrail for query page size (CASSANDRA-17189)
 + * Allow column_index_size_in_kb to be configurable through nodetool (CASSANDRA-17121)
 + * Emit a metric for number of local read and write calls
 + * Add non-blocking mode for CDC writes (CASSANDRA-17001)
 + * Add guardrails framework (CASSANDRA-17147)
 + * Harden resource management on SSTable components to prevent future leaks (CASSANDRA-17174)
 + * Make nodes more resilient to local unrelated files during startup (CASSANDRA-17082)
 + * repair prepare message would produce a wrong error message if network timeout happened rather than reply wait timeout (CASSANDRA-16992)
 + * Log queries that fail on timeout or unavailable errors up to once per minute by default (CASSANDRA-17159)
 + * Refactor normal/preview/IR repair to standardize repair cleanup and error handling of failed RepairJobs (CASSANDRA-17069)
 + * Log missing peers in StartupClusterConnectivityChecker (CASSANDRA-17130)
 + * Introduce separate rate limiting settings for entire SSTable streaming (CASSANDRA-17065)
 + * Implement Virtual Tables for Auth Caches (CASSANDRA-16914)
 + * Actively update auth cache in the background (CASSANDRA-16957)
 + * Add unix time conversion functions (CASSANDRA-17029)
 + * JVMStabilityInspector.forceHeapSpaceOomMaybe should handle all non-heap OOMs rather than only supporting direct only (CASSANDRA-17128)
 + * Forbid other Future implementations with checkstyle (CASSANDRA-17055)
 + * commit log was switched from non-daemon to daemon threads, which causes the JVM to exit in some case as no non-daemon threads are active (CASSANDRA-17085)
 + * Add a Denylist to block reads and writes on specific partition keys (CASSANDRA-12106)
 + * v4+ protocol did not clean up client warnings, which caused leaking the state (CASSANDRA-17054)
 + * Remove duplicate toCQLString in ReadCommand (CASSANDRA-17023)
 + * Ensure hint window is persistent across restarts of a node (CASSANDRA-14309)
 + * Allow to GRANT or REVOKE multiple permissions in a single statement (CASSANDRA-17030)
 + * Allow to grant permission for all tables in a keyspace (CASSANDRA-17027)
 + * Log time spent writing keys during compaction (CASSANDRA-17037)
 + * Make nodetool compactionstats and sstable_tasks consistent (CASSANDRA-16976)
 + * Add metrics and logging around index summary redistribution (CASSANDRA-17036)
 + * Add configuration options for minimum allowable replication factor and default replication factor (CASSANDRA-14557)
 + * Expose information about stored hints via a nodetool command and a virtual table (CASSANDRA-14795)
 + * Add broadcast_rpc_address to system.local (CASSANDRA-11181)
 + * Add support for type casting in WHERE clause components and in the values of INSERT/UPDATE statements (CASSANDRA-14337)
 + * add credentials file support to CQLSH (CASSANDRA-16983)
 + * Skip remaining bytes in the Envelope buffer when a ProtocolException is thrown to avoid double decoding (CASSANDRA-17026)
 + * Allow reverse iteration of resources during permissions checking (CASSANDRA-17016)
 + * Add feature to verify correct ownership of attached locations on disk at startup (CASSANDRA-16879)
 + * Make SSLContext creation pluggable/extensible (CASSANDRA-16666)
 + * Add soft/hard limits to local reads to protect against reading too much data in a single query (CASSANDRA-16896)
 + * Avoid token cache invalidation for removing a non-member node (CASSANDRA-15290)
 + * Allow configuration of consistency levels on auth operations (CASSANDRA-12988)
 + * Add number of sstables in a compaction to compactionstats output (CASSANDRA-16844)
 + * Upgrade Caffeine to 2.9.2 (CASSANDRA-15153)
 + * Allow DELETE and TRUNCATE to work on Virtual Tables if the implementation allows it (CASSANDRA-16806)
 + * Include SASI components to snapshots (CASSANDRA-15134)
 + * Fix missed wait latencies in the output of `nodetool tpstats -F` (CASSANDRA-16938)
 + * Reduce native transport max frame size to 16MB (CASSANDRA-16886)
 + * Add support for filtering using IN restrictions (CASSANDRA-14344)
 + * Provide a nodetool command to invalidate auth caches (CASSANDRA-16404)
 + * Catch read repair timeout exceptions and add metric (CASSANDRA-16880)
 + * Exclude Jackson 1.x transitive dependency of hadoop* provided dependencies (CASSANDRA-16854)
 + * Add client warnings and abort to tombstone and coordinator reads which go past a low/high watermark (CASSANDRA-16850)
 + * Add TTL support to nodetool snapshots (CASSANDRA-16789)
 + * Allow CommitLogSegmentReader to optionally skip sync marker CRC checks (CASSANDRA-16842)
 + * allow blocking IPs from updating metrics about traffic (CASSANDRA-16859)
 + * Request-Based Native Transport Rate-Limiting (CASSANDRA-16663)
 + * Implement nodetool getauditlog command (CASSANDRA-16725)
 + * Clean up repair code (CASSANDRA-13720)
 + * Background schedule to clean up orphaned hints files (CASSANDRA-16815)
 + * Modify SecondaryIndexManager#indexPartition() to retrieve only columns for which indexes are actually being built (CASSANDRA-16776)
 + * Batch the token metadata update to improve the speed (CASSANDRA-15291)
 + * Reduce the log level on "expected" repair exceptions (CASSANDRA-16775)
 + * Make JMXTimer expose attributes using consistent time unit (CASSANDRA-16760)
 + * Remove check on gossip status from DynamicEndpointSnitch::updateScores (CASSANDRA-11671)
 + * Fix AbstractReadQuery::toCQLString not returning valid CQL (CASSANDRA-16510)
 + * Log when compacting many tombstones (CASSANDRA-16780)
 + * Display bytes per level in tablestats for LCS tables (CASSANDRA-16799)
 + * Add isolated flush timer to CommitLogMetrics and ensure writes correspond to single WaitingOnCommit data points (CASSANDRA-16701)
 + * Add a system property to set hostId if not yet initialized (CASSANDRA-14582)
 + * GossiperTest.testHasVersion3Nodes didn't take into account trunk version changes, fixed to rely on latest version (CASSANDRA-16651)
 + * Update JNA library to 5.9.0 and snappy-java to version 1.1.8.4 (CASSANDRA-17040)
 +
 +Merged from 4.0:
+  * Don't block gossip when clearing repair snapshots (CASSANDRA-17168)
   * Deduplicate warnings for deprecated parameters (changed names) (CASSANDRA-17160)
   * Update ant-junit to version 1.10.12 (CASSANDRA-17218)
   * Add droppable tombstone metrics to nodetool tablestats (CASSANDRA-16308)
diff --cc src/java/org/apache/cassandra/schema/SystemDistributedKeyspace.java
index 9f17578,0000000..6206fca
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/schema/SystemDistributedKeyspace.java
+++ b/src/java/org/apache/cassandra/schema/SystemDistributedKeyspace.java
@@@ -1,405 -1,0 +1,405 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.schema;
 +
 +import java.io.PrintWriter;
 +import java.io.StringWriter;
 +import java.nio.ByteBuffer;
 +import java.util.ArrayList;
 +import java.util.Collection;
 +import java.util.Collections;
 +import java.util.HashMap;
 +import java.util.List;
 +import java.util.Map;
 +import java.util.Set;
 +import java.util.UUID;
 +import java.util.concurrent.TimeUnit;
 +
 +import com.google.common.base.Joiner;
 +import com.google.common.collect.Lists;
 +import com.google.common.collect.ImmutableMap;
 +import com.google.common.collect.Sets;
 +
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import org.apache.cassandra.config.CassandraRelevantProperties;
 +import org.apache.cassandra.config.DatabaseDescriptor;
 +import org.apache.cassandra.cql3.QueryProcessor;
 +import org.apache.cassandra.cql3.UntypedResultSet;
 +import org.apache.cassandra.cql3.statements.schema.CreateTableStatement;
 +import org.apache.cassandra.db.ConsistencyLevel;
 +import org.apache.cassandra.db.Keyspace;
 +import org.apache.cassandra.dht.Range;
 +import org.apache.cassandra.dht.Token;
 +import org.apache.cassandra.gms.Gossiper;
 +import org.apache.cassandra.locator.InetAddressAndPort;
 +import org.apache.cassandra.repair.CommonRange;
 +import org.apache.cassandra.repair.messages.RepairOption;
 +import org.apache.cassandra.utils.FBUtilities;
 +
 +import static java.lang.String.format;
 +
 +import static org.apache.cassandra.utils.ByteBufferUtil.bytes;
 +
 +public final class SystemDistributedKeyspace
 +{
 +    private SystemDistributedKeyspace()
 +    {
 +    }
 +
 +    public static final String NAME = "system_distributed";
 +
 +    private static final int DEFAULT_RF = CassandraRelevantProperties.SYSTEM_DISTRIBUTED_DEFAULT_RF.getInt();
 +    private static final Logger logger = LoggerFactory.getLogger(SystemDistributedKeyspace.class);
 +
 +    /**
 +     * Generation is used as a timestamp for automatic table creation on startup.
 +     * If you make any changes to the tables below, make sure to increment the
 +     * generation and document your change here.
 +     *
 +     * gen 0: original definition in 2.2
 +     * gen 1: (pre-)add options column to parent_repair_history in 3.0, 3.11
 +     * gen 2: (pre-)add coordinator_port and participants_v2 columns to repair_history in 3.0, 3.11, 4.0
 +     * gen 3: gc_grace_seconds raised from 0 to 10 days in CASSANDRA-12954 in 3.11.0
 +     * gen 4: compression chunk length reduced to 16KiB, memtable_flush_period_in_ms now unset on all tables in 4.0
 +     * gen 5: add ttl and TWCS to repair_history tables
 +     * gen 6: add denylist table
 +     */
 +    public static final long GENERATION = 6;
 +
 +    public static final String REPAIR_HISTORY = "repair_history";
 +
 +    public static final String PARENT_REPAIR_HISTORY = "parent_repair_history";
 +
 +    public static final String VIEW_BUILD_STATUS = "view_build_status";
 +
 +    public static final String PARTITION_DENYLIST_TABLE = "partition_denylist";
 +
 +    private static final TableMetadata RepairHistory =
 +        parse(REPAIR_HISTORY,
 +                "Repair history",
 +                "CREATE TABLE %s ("
 +                     + "keyspace_name text,"
 +                     + "columnfamily_name text,"
 +                     + "id timeuuid,"
 +                     + "parent_id timeuuid,"
 +                     + "range_begin text,"
 +                     + "range_end text,"
 +                     + "coordinator inet,"
 +                     + "coordinator_port int,"
 +                     + "participants set<inet>,"
 +                     + "participants_v2 set<text>,"
 +                     + "exception_message text,"
 +                     + "exception_stacktrace text,"
 +                     + "status text,"
 +                     + "started_at timestamp,"
 +                     + "finished_at timestamp,"
 +                     + "PRIMARY KEY ((keyspace_name, columnfamily_name), id))")
 +        .defaultTimeToLive((int) TimeUnit.DAYS.toSeconds(30))
 +        .compaction(CompactionParams.twcs(ImmutableMap.of("compaction_window_unit","DAYS",
 +                                                          "compaction_window_size","1")))
 +        .build();
 +
 +    private static final TableMetadata ParentRepairHistory =
 +        parse(PARENT_REPAIR_HISTORY,
 +                "Repair history",
 +                "CREATE TABLE %s ("
 +                     + "parent_id timeuuid,"
 +                     + "keyspace_name text,"
 +                     + "columnfamily_names set<text>,"
 +                     + "started_at timestamp,"
 +                     + "finished_at timestamp,"
 +                     + "exception_message text,"
 +                     + "exception_stacktrace text,"
 +                     + "requested_ranges set<text>,"
 +                     + "successful_ranges set<text>,"
 +                     + "options map<text, text>,"
 +                     + "PRIMARY KEY (parent_id))")
 +        .defaultTimeToLive((int) TimeUnit.DAYS.toSeconds(30))
 +        .compaction(CompactionParams.twcs(ImmutableMap.of("compaction_window_unit","DAYS",
 +                                                          "compaction_window_size","1")))
 +        .build();
 +
 +    private static final TableMetadata ViewBuildStatus =
 +        parse(VIEW_BUILD_STATUS,
 +            "Materialized View build status",
 +            "CREATE TABLE %s ("
 +                     + "keyspace_name text,"
 +                     + "view_name text,"
 +                     + "host_id uuid,"
 +                     + "status text,"
 +                     + "PRIMARY KEY ((keyspace_name, view_name), host_id))").build();
 +
 +    public static final TableMetadata PartitionDenylistTable =
 +    parse(PARTITION_DENYLIST_TABLE,
 +          "Partition keys which have been denied access",
 +          "CREATE TABLE %s ("
 +          + "ks_name text,"
 +          + "table_name text,"
 +          + "key blob,"
 +          + "PRIMARY KEY ((ks_name, table_name), key))")
 +    .build();
 +
 +    private static TableMetadata.Builder parse(String table, String description, String cql)
 +    {
 +        return CreateTableStatement.parse(format(cql, table), SchemaConstants.DISTRIBUTED_KEYSPACE_NAME)
 +                                   .id(TableId.forSystemTable(SchemaConstants.DISTRIBUTED_KEYSPACE_NAME, table))
 +                                   .comment(description);
 +    }
 +
 +    public static KeyspaceMetadata metadata()
 +    {
 +        return KeyspaceMetadata.create(SchemaConstants.DISTRIBUTED_KEYSPACE_NAME, KeyspaceParams.simple(Math.max(DEFAULT_RF, DatabaseDescriptor.getDefaultKeyspaceRF())), Tables.of(RepairHistory, ParentRepairHistory, ViewBuildStatus, PartitionDenylistTable));
 +    }
 +
 +    public static void startParentRepair(UUID parent_id, String keyspaceName, String[] cfnames, RepairOption options)
 +    {
 +        Collection<Range<Token>> ranges = options.getRanges();
 +        String query = "INSERT INTO %s.%s (parent_id, keyspace_name, columnfamily_names, requested_ranges, started_at,          options)"+
 +                                 " VALUES (%s,        '%s',          { '%s' },           { '%s' },          toTimestamp(now()), { %s })";
 +        String fmtQry = format(query,
 +                                      SchemaConstants.DISTRIBUTED_KEYSPACE_NAME,
 +                                      PARENT_REPAIR_HISTORY,
 +                                      parent_id.toString(),
 +                                      keyspaceName,
 +                                      Joiner.on("','").join(cfnames),
 +                                      Joiner.on("','").join(ranges),
 +                                      toCQLMap(options.asMap(), RepairOption.RANGES_KEY, RepairOption.COLUMNFAMILIES_KEY));
 +        processSilent(fmtQry);
 +    }
 +
 +    private static String toCQLMap(Map<String, String> options, String ... ignore)
 +    {
 +        Set<String> toIgnore = Sets.newHashSet(ignore);
 +        StringBuilder map = new StringBuilder();
 +        boolean first = true;
 +        for (Map.Entry<String, String> entry : options.entrySet())
 +        {
 +            if (!toIgnore.contains(entry.getKey()))
 +            {
 +                if (!first)
 +                    map.append(',');
 +                first = false;
 +                map.append(format("'%s': '%s'", entry.getKey(), entry.getValue()));
 +            }
 +        }
 +        return map.toString();
 +    }
 +
 +    public static void failParentRepair(UUID parent_id, Throwable t)
 +    {
 +        String query = "UPDATE %s.%s SET finished_at = toTimestamp(now()), exception_message=?, exception_stacktrace=? WHERE parent_id=%s";
 +
 +        StringWriter sw = new StringWriter();
 +        PrintWriter pw = new PrintWriter(sw);
 +        t.printStackTrace(pw);
 +        String fmtQuery = format(query, SchemaConstants.DISTRIBUTED_KEYSPACE_NAME, PARENT_REPAIR_HISTORY, parent_id.toString());
 +        String message = t.getMessage();
 +        processSilent(fmtQuery, message != null ? message : "", sw.toString());
 +    }
 +
 +    public static void successfulParentRepair(UUID parent_id, Collection<Range<Token>> successfulRanges)
 +    {
 +        String query = "UPDATE %s.%s SET finished_at = toTimestamp(now()), successful_ranges = {'%s'} WHERE parent_id=%s";
 +        String fmtQuery = format(query, SchemaConstants.DISTRIBUTED_KEYSPACE_NAME, PARENT_REPAIR_HISTORY, Joiner.on("','").join(successfulRanges), parent_id.toString());
 +        processSilent(fmtQuery);
 +    }
 +
 +    public static void startRepairs(UUID id, UUID parent_id, String keyspaceName, String[] cfnames, CommonRange commonRange)
 +    {
 +        // Don't record repair history if an upgrade is in progress as version 3 nodes generates errors
 +        // due to schema differences
 +        boolean includeNewColumns = !Gossiper.instance.hasMajorVersion3Nodes();
 +
 +        InetAddressAndPort coordinator = FBUtilities.getBroadcastAddressAndPort();
 +        Set<String> participants = Sets.newHashSet();
 +        Set<String> participants_v2 = Sets.newHashSet();
 +
 +        for (InetAddressAndPort endpoint : commonRange.endpoints)
 +        {
 +            participants.add(endpoint.getHostAddress(false));
 +            participants_v2.add(endpoint.getHostAddressAndPort());
 +        }
 +
 +        String query =
 +                "INSERT INTO %s.%s (keyspace_name, columnfamily_name, id, parent_id, range_begin, range_end, coordinator, coordinator_port, participants, participants_v2, status, started_at) " +
 +                        "VALUES (   '%s',          '%s',              %s, %s,        '%s',        '%s',      '%s',        %d,               { '%s' },     { '%s' },        '%s',   toTimestamp(now()))";
 +        String queryWithoutNewColumns =
 +                "INSERT INTO %s.%s (keyspace_name, columnfamily_name, id, parent_id, range_begin, range_end, coordinator, participants, status, started_at) " +
 +                        "VALUES (   '%s',          '%s',              %s, %s,        '%s',        '%s',      '%s',               { '%s' },        '%s',   toTimestamp(now()))";
 +
 +        for (String cfname : cfnames)
 +        {
 +            for (Range<Token> range : commonRange.ranges)
 +            {
 +                String fmtQry;
 +                if (includeNewColumns)
 +                {
 +                    fmtQry = format(query, SchemaConstants.DISTRIBUTED_KEYSPACE_NAME, REPAIR_HISTORY,
 +                                    keyspaceName,
 +                                    cfname,
 +                                    id.toString(),
 +                                    parent_id.toString(),
 +                                    range.left.toString(),
 +                                    range.right.toString(),
 +                                    coordinator.getHostAddress(false),
 +                                    coordinator.getPort(),
 +                                    Joiner.on("', '").join(participants),
 +                                    Joiner.on("', '").join(participants_v2),
 +                                    RepairState.STARTED.toString());
 +                }
 +                else
 +                {
 +                    fmtQry = format(queryWithoutNewColumns, SchemaConstants.DISTRIBUTED_KEYSPACE_NAME, REPAIR_HISTORY,
 +                                    keyspaceName,
 +                                    cfname,
 +                                    id.toString(),
 +                                    parent_id.toString(),
 +                                    range.left.toString(),
 +                                    range.right.toString(),
 +                                    coordinator.getHostAddress(false),
 +                                    Joiner.on("', '").join(participants),
 +                                    RepairState.STARTED.toString());
 +                }
 +                processSilent(fmtQry);
 +            }
 +        }
 +    }
 +
 +    public static void failRepairs(UUID id, String keyspaceName, String[] cfnames, Throwable t)
 +    {
 +        for (String cfname : cfnames)
 +            failedRepairJob(id, keyspaceName, cfname, t);
 +    }
 +
 +    public static void successfulRepairJob(UUID id, String keyspaceName, String cfname)
 +    {
 +        String query = "UPDATE %s.%s SET status = '%s', finished_at = toTimestamp(now()) WHERE keyspace_name = '%s' AND columnfamily_name = '%s' AND id = %s";
 +        String fmtQuery = format(query, SchemaConstants.DISTRIBUTED_KEYSPACE_NAME, REPAIR_HISTORY,
 +                                        RepairState.SUCCESS.toString(),
 +                                        keyspaceName,
 +                                        cfname,
 +                                        id.toString());
 +        processSilent(fmtQuery);
 +    }
 +
 +    public static void failedRepairJob(UUID id, String keyspaceName, String cfname, Throwable t)
 +    {
 +        String query = "UPDATE %s.%s SET status = '%s', finished_at = toTimestamp(now()), exception_message=?, exception_stacktrace=? WHERE keyspace_name = '%s' AND columnfamily_name = '%s' AND id = %s";
 +        StringWriter sw = new StringWriter();
 +        PrintWriter pw = new PrintWriter(sw);
 +        t.printStackTrace(pw);
 +        String fmtQry = format(query, SchemaConstants.DISTRIBUTED_KEYSPACE_NAME, REPAIR_HISTORY,
 +                                      RepairState.FAILED.toString(),
 +                                      keyspaceName,
 +                                      cfname,
 +                                      id.toString());
 +        String message = t.getMessage();
 +        if (message == null)
 +            message = t.getClass().getName();
 +        processSilent(fmtQry, message, sw.toString());
 +    }
 +
 +    public static void startViewBuild(String keyspace, String view, UUID hostId)
 +    {
 +        String query = "INSERT INTO %s.%s (keyspace_name, view_name, host_id, status) VALUES (?, ?, ?, ?)";
 +        QueryProcessor.process(format(query, SchemaConstants.DISTRIBUTED_KEYSPACE_NAME, VIEW_BUILD_STATUS),
 +                               ConsistencyLevel.ONE,
 +                               Lists.newArrayList(bytes(keyspace),
 +                                                  bytes(view),
 +                                                  bytes(hostId),
 +                                                  bytes(BuildStatus.STARTED.toString())));
 +    }
 +
 +    public static void successfulViewBuild(String keyspace, String view, UUID hostId)
 +    {
 +        String query = "UPDATE %s.%s SET status = ? WHERE keyspace_name = ? AND view_name = ? AND host_id = ?";
 +        QueryProcessor.process(format(query, SchemaConstants.DISTRIBUTED_KEYSPACE_NAME, VIEW_BUILD_STATUS),
 +                               ConsistencyLevel.ONE,
 +                               Lists.newArrayList(bytes(BuildStatus.SUCCESS.toString()),
 +                                                  bytes(keyspace),
 +                                                  bytes(view),
 +                                                  bytes(hostId)));
 +    }
 +
 +    public static Map<UUID, String> viewStatus(String keyspace, String view)
 +    {
 +        String query = "SELECT host_id, status FROM %s.%s WHERE keyspace_name = ? AND view_name = ?";
 +        UntypedResultSet results;
 +        try
 +        {
 +            results = QueryProcessor.execute(format(query, SchemaConstants.DISTRIBUTED_KEYSPACE_NAME, VIEW_BUILD_STATUS),
 +                                             ConsistencyLevel.ONE,
 +                                             keyspace,
 +                                             view);
 +        }
 +        catch (Exception e)
 +        {
 +            return Collections.emptyMap();
 +        }
 +
 +
 +        Map<UUID, String> status = new HashMap<>();
 +        for (UntypedResultSet.Row row : results)
 +        {
 +            status.put(row.getUUID("host_id"), row.getString("status"));
 +        }
 +        return status;
 +    }
 +
 +    public static void setViewRemoved(String keyspaceName, String viewName)
 +    {
 +        String buildReq = "DELETE FROM %s.%s WHERE keyspace_name = ? AND view_name = ?";
 +        QueryProcessor.executeInternal(format(buildReq, SchemaConstants.DISTRIBUTED_KEYSPACE_NAME, VIEW_BUILD_STATUS), keyspaceName, viewName);
 +        forceBlockingFlush(VIEW_BUILD_STATUS);
 +    }
 +
 +    private static void processSilent(String fmtQry, String... values)
 +    {
 +        try
 +        {
 +            List<ByteBuffer> valueList = new ArrayList<>(values.length);
 +            for (String v : values)
 +            {
 +                valueList.add(bytes(v));
 +            }
-             QueryProcessor.process(fmtQry, ConsistencyLevel.ONE, valueList);
++            QueryProcessor.process(fmtQry, ConsistencyLevel.ANY, valueList);
 +        }
 +        catch (Throwable t)
 +        {
 +            logger.error("Error executing query "+fmtQry, t);
 +        }
 +    }
 +
 +    public static void forceBlockingFlush(String table)
 +    {
 +        if (!DatabaseDescriptor.isUnsafeSystem())
 +            FBUtilities.waitOnFuture(Keyspace.open(SchemaConstants.DISTRIBUTED_KEYSPACE_NAME).getColumnFamilyStore(table).forceFlush());
 +    }
 +
 +    private enum RepairState
 +    {
 +        STARTED, SUCCESS, FAILED
 +    }
 +
 +    private enum BuildStatus
 +    {
 +        UNKNOWN, STARTED, SUCCESS
 +    }
 +}
diff --cc src/java/org/apache/cassandra/service/ActiveRepairService.java
index 7d0a290,f2e8b6e..cc72430
--- a/src/java/org/apache/cassandra/service/ActiveRepairService.java
+++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java
@@@ -20,14 -20,11 +20,15 @@@ package org.apache.cassandra.service
  import java.io.IOException;
  import java.net.UnknownHostException;
  import java.util.*;
 -import java.util.concurrent.*;
 +import java.util.concurrent.ConcurrentHashMap;
 +import java.util.concurrent.ConcurrentMap;
 +import java.util.concurrent.ThreadPoolExecutor;
 +import java.util.concurrent.TimeUnit;
  import java.util.concurrent.atomic.AtomicBoolean;
  import javax.management.openmbean.CompositeData;
 +import java.util.concurrent.atomic.AtomicInteger;
  import java.util.function.Predicate;
+ import java.util.stream.Collectors;
  
  import com.google.common.annotations.VisibleForTesting;
  import com.google.common.base.Preconditions;
@@@ -36,10 -33,11 +37,9 @@@ import com.google.common.cache.CacheBui
  import com.google.common.collect.ImmutableSet;
  import com.google.common.collect.Iterables;
  import com.google.common.collect.Multimap;
 -import com.google.common.util.concurrent.AbstractFuture;
 -import com.google.common.util.concurrent.ListeningExecutorService;
--import com.google.common.util.concurrent.MoreExecutors;
  
 -import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
 +import org.apache.cassandra.concurrent.ExecutorPlus;
 +import org.apache.cassandra.config.Config;
  import org.apache.cassandra.db.compaction.CompactionManager;
  import org.apache.cassandra.locator.EndpointsByRange;
  import org.apache.cassandra.locator.EndpointsForRange;
@@@ -99,16 -95,7 +99,17 @@@ import org.apache.cassandra.utils.concu
  
  import static com.google.common.collect.Iterables.concat;
  import static com.google.common.collect.Iterables.transform;
 +import static java.util.Collections.synchronizedSet;
 +import static java.util.concurrent.TimeUnit.MILLISECONDS;
 +import static org.apache.cassandra.concurrent.ExecutorFactory.Global.executorFactory;
 +import static org.apache.cassandra.config.Config.RepairCommandPoolFullStrategy.reject;
 +import static org.apache.cassandra.config.DatabaseDescriptor.*;
 +import static org.apache.cassandra.net.Message.out;
  import static org.apache.cassandra.net.Verb.PREPARE_MSG;
 +import static org.apache.cassandra.utils.Clock.Global.currentTimeMillis;
 +import static org.apache.cassandra.utils.Simulate.With.MONITORS;
++import static org.apache.cassandra.utils.Clock.Global.nanoTime;
 +import static org.apache.cassandra.utils.concurrent.CountDownLatch.newCountDownLatch;
  
  /**
   * ActiveRepairService is the starting point for manual "active" repairs.
@@@ -189,6 -198,11 +190,10 @@@ public class ActiveRepairService implem
      private final Gossiper gossiper;
      private final Cache<Integer, Pair<ParentRepairStatus, List<String>>> repairStatusByCmd;
  
 -    private final DebuggableThreadPoolExecutor clearSnapshotExecutor = DebuggableThreadPoolExecutor.createWithMaximumPoolSize("RepairClearSnapshot",
 -                                                                                                                              1,
 -                                                                                                                              1,
 -                                                                                                                              TimeUnit.HOURS);
++    private final ExecutorPlus clearSnapshotExecutor = executorFactory().configurePooled("RepairClearSnapshot", 1)
++                                                                        .withKeepAlive(1, TimeUnit.HOURS)
++                                                                        .build();
+ 
      public ActiveRepairService(IFailureDetector failureDetector, Gossiper gossiper)
      {
          this.failureDetector = failureDetector;
@@@ -698,10 -706,22 +703,22 @@@
          ParentRepairSession session = parentRepairSessions.remove(parentSessionId);
          if (session == null)
              return null;
-         for (ColumnFamilyStore cfs : session.columnFamilyStores.values())
+ 
+         if (session.hasSnapshots)
          {
-             if (cfs.snapshotExists(snapshotName))
-                 cfs.clearSnapshot(snapshotName);
+             clearSnapshotExecutor.submit(() -> {
+                 logger.info("[repair #{}] Clearing snapshots for {}", parentSessionId,
+                             session.columnFamilyStores.values()
+                                                       .stream()
+                                                       .map(cfs -> cfs.metadata().toString()).collect(Collectors.joining(", ")));
 -                long startNanos = System.nanoTime();
++                long startNanos = nanoTime();
+                 for (ColumnFamilyStore cfs : session.columnFamilyStores.values())
+                 {
+                     if (cfs.snapshotExists(snapshotName))
+                         cfs.clearSnapshot(snapshotName);
+                 }
 -                logger.info("[repair #{}] Cleared snapshots in {}ms", parentSessionId, TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos));
++                logger.info("[repair #{}] Cleared snapshots in {}ms", parentSessionId, TimeUnit.NANOSECONDS.toMillis(nanoTime() - startNanos));
+             });
          }
          return session;
      }

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org