You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by bd...@apache.org on 2020/08/25 22:29:15 UTC
[cassandra] branch trunk updated: Add addition incremental repair
visibility to nodetool repair_admin
This is an automated email from the ASF dual-hosted git repository.
bdeggleston pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push:
new c343175 Add addition incremental repair visibility to nodetool repair_admin
c343175 is described below
commit c34317526fc6dbe559beb36cf44e24278656bdf2
Author: Blake Eggleston <bd...@gmail.com>
AuthorDate: Tue Aug 4 16:38:33 2020 -0700
Add addition incremental repair visibility to nodetool repair_admin
Patch by Blake Eggleston; Reviewed by Marcus Eriksson for CASSANDRA-14939
---
CHANGES.txt | 1 +
.../org/apache/cassandra/db/ColumnFamilyStore.java | 48 +++
.../db/compaction/CompactionStrategyManager.java | 25 ++
.../db/compaction/PendingRepairManager.java | 70 ++++-
src/java/org/apache/cassandra/dht/Range.java | 17 ++
.../repair/consistent/ConsistentSession.java | 5 +
.../cassandra/repair/consistent/LocalSession.java | 6 +-
.../cassandra/repair/consistent/LocalSessions.java | 145 ++++++++-
.../cassandra/repair/consistent/RepairedState.java | 339 +++++++++++++++++++++
.../repair/consistent/admin/CleanupSummary.java | 143 +++++++++
.../repair/consistent/admin/PendingStat.java | 143 +++++++++
.../repair/consistent/admin/PendingStats.java | 104 +++++++
.../repair/consistent/admin/RepairStats.java | 187 ++++++++++++
.../repair/consistent/admin/SchemaArgsParser.java | 117 +++++++
.../cassandra/repair/messages/RepairOption.java | 48 +--
.../cassandra/service/ActiveRepairService.java | 81 ++++-
.../service/ActiveRepairServiceMBean.java | 7 +-
src/java/org/apache/cassandra/tools/NodeTool.java | 10 +-
.../cassandra/tools/nodetool/RepairAdmin.java | 322 ++++++++++++++-----
test/unit/org/apache/cassandra/dht/RangeTest.java | 17 ++
.../cassandra/repair/AbstractRepairTest.java | 5 +
.../repair/consistent/LocalSessionTest.java | 22 +-
.../repair/consistent/PendingRepairStatTest.java | 184 +++++++++++
.../repair/consistent/RepairStateTest.java | 168 ++++++++++
.../consistent/admin/SchemaArgsParserTest.java | 91 ++++++
25 files changed, 2198 insertions(+), 107 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index b8ace5b..123efdf 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
4.0-beta2
+ * Add addition incremental repair visibility to nodetool repair_admin (CASSANDRA-14939)
* Always access system properties and environment variables via the new CassandraRelevantProperties and CassandraRelevantEnv classes (CASSANDRA-15876)
* Remove deprecated HintedHandOffManager (CASSANDRA-15939)
* Prevent repair from overrunning compaction (CASSANDRA-15817)
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 77a8cdd..824005b 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -75,6 +75,8 @@ import org.apache.cassandra.metrics.Sampler.Sample;
import org.apache.cassandra.metrics.Sampler.SamplerType;
import org.apache.cassandra.metrics.TableMetrics;
import org.apache.cassandra.repair.TableRepairManager;
+import org.apache.cassandra.repair.consistent.admin.CleanupSummary;
+import org.apache.cassandra.repair.consistent.admin.PendingStat;
import org.apache.cassandra.schema.*;
import org.apache.cassandra.schema.CompactionParams.TombstoneOption;
import org.apache.cassandra.service.CacheService;
@@ -1555,6 +1557,52 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
return data.getUncompacting();
}
+ public Map<UUID, PendingStat> getPendingRepairStats()
+ {
+ Map<UUID, PendingStat.Builder> builders = new HashMap<>();
+ for (SSTableReader sstable : getLiveSSTables())
+ {
+ UUID session = sstable.getPendingRepair();
+ if (session == null)
+ continue;
+
+ if (!builders.containsKey(session))
+ builders.put(session, new PendingStat.Builder());
+
+ builders.get(session).addSSTable(sstable);
+ }
+
+ Map<UUID, PendingStat> stats = new HashMap<>();
+ for (Map.Entry<UUID, PendingStat.Builder> entry : builders.entrySet())
+ {
+ stats.put(entry.getKey(), entry.getValue().build());
+ }
+ return stats;
+ }
+
+ /**
+ * promotes (or demotes) data attached to an incremental repair session that has either completed successfully,
+ * or failed
+ *
+ * @return session ids whose data could not be released
+ */
+ public CleanupSummary releaseRepairData(Collection<UUID> sessions, boolean force)
+ {
+ if (force)
+ {
+ Predicate<SSTableReader> predicate = sst -> {
+ UUID session = sst.getPendingRepair();
+ return session != null && sessions.contains(session);
+ };
+ return runWithCompactionsDisabled(() -> compactionStrategyManager.releaseRepairData(sessions),
+ predicate, false, true, true);
+ }
+ else
+ {
+ return compactionStrategyManager.releaseRepairData(sessions);
+ }
+ }
+
public boolean isFilterFullyCoveredBy(ClusteringIndexFilter filter,
DataLimits limits,
CachedPartition cached,
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
index 708faff..3a05e50 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
@@ -39,6 +39,8 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.primitives.Longs;
+
+import org.apache.cassandra.db.compaction.PendingRepairManager.CleanupTask;
import org.apache.cassandra.db.lifecycle.LifecycleNewTracker;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -69,6 +71,7 @@ import org.apache.cassandra.notifications.SSTableDeletingNotification;
import org.apache.cassandra.notifications.SSTableListChangedNotification;
import org.apache.cassandra.notifications.SSTableMetadataChanged;
import org.apache.cassandra.notifications.SSTableRepairStatusChanged;
+import org.apache.cassandra.repair.consistent.admin.CleanupSummary;
import org.apache.cassandra.schema.CompactionParams;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.service.ActiveRepairService;
@@ -1223,4 +1226,26 @@ public class CompactionStrategyManager implements INotificationConsumer
if (isTransient != sstable.isTransient())
throw new IllegalStateException(String.format("Failed setting isTransient to %b on %s (isTransient is %b)", isTransient, sstable, sstable.isTransient()));
}
+
+ public CleanupSummary releaseRepairData(Collection<UUID> sessions)
+ {
+ List<CleanupTask> cleanupTasks = new ArrayList<>();
+ readLock.lock();
+ try
+ {
+ for (PendingRepairManager prm : Iterables.concat(pendingRepairs.getManagers(), transientRepairs.getManagers()))
+ cleanupTasks.add(prm.releaseSessionData(sessions));
+ }
+ finally
+ {
+ readLock.unlock();
+ }
+
+ CleanupSummary summary = new CleanupSummary(cfs, Collections.emptySet(), Collections.emptySet());
+
+ for (CleanupTask task : cleanupTasks)
+ summary = CleanupSummary.add(summary, task.cleanup());
+
+ return summary;
+ }
}
diff --git a/src/java/org/apache/cassandra/db/compaction/PendingRepairManager.java b/src/java/org/apache/cassandra/db/compaction/PendingRepairManager.java
index 764a4dc..aefa40b 100644
--- a/src/java/org/apache/cassandra/db/compaction/PendingRepairManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/PendingRepairManager.java
@@ -46,6 +46,7 @@ import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.io.sstable.ISSTableScanner;
import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.repair.consistent.admin.CleanupSummary;
import org.apache.cassandra.schema.CompactionParams;
import org.apache.cassandra.service.ActiveRepairService;
import org.apache.cassandra.utils.Pair;
@@ -263,12 +264,79 @@ class PendingRepairManager
@SuppressWarnings("resource")
private RepairFinishedCompactionTask getRepairFinishedCompactionTask(UUID sessionID)
{
- Set<SSTableReader> sstables = get(sessionID).getSSTables();
+ Preconditions.checkState(canCleanup(sessionID));
+ AbstractCompactionStrategy compactionStrategy = get(sessionID);
+ if (compactionStrategy == null)
+ return null;
+ Set<SSTableReader> sstables = compactionStrategy.getSSTables();
long repairedAt = ActiveRepairService.instance.consistent.local.getFinalSessionRepairedAt(sessionID);
LifecycleTransaction txn = cfs.getTracker().tryModify(sstables, OperationType.COMPACTION);
return txn == null ? null : new RepairFinishedCompactionTask(cfs, txn, sessionID, repairedAt);
}
+ public static class CleanupTask
+ {
+ private final ColumnFamilyStore cfs;
+ private final List<Pair<UUID, RepairFinishedCompactionTask>> tasks;
+
+ public CleanupTask(ColumnFamilyStore cfs, List<Pair<UUID, RepairFinishedCompactionTask>> tasks)
+ {
+ this.cfs = cfs;
+ this.tasks = tasks;
+ }
+
+ public CleanupSummary cleanup()
+ {
+ Set<UUID> successful = new HashSet<>();
+ Set<UUID> unsuccessful = new HashSet<>();
+ for (Pair<UUID, RepairFinishedCompactionTask> pair : tasks)
+ {
+ UUID session = pair.left;
+ RepairFinishedCompactionTask task = pair.right;
+
+ if (task != null)
+ {
+ try
+ {
+ task.run();
+ successful.add(session);
+ }
+ catch (Throwable t)
+ {
+ t = task.transaction.abort(t);
+ logger.error("Failed cleaning up " + session, t);
+ unsuccessful.add(session);
+ }
+ }
+ else
+ {
+ unsuccessful.add(session);
+ }
+ }
+ return new CleanupSummary(cfs, successful, unsuccessful);
+ }
+
+ public Throwable abort(Throwable accumulate)
+ {
+ for (Pair<UUID, RepairFinishedCompactionTask> pair : tasks)
+ accumulate = pair.right.transaction.abort(accumulate);
+ return accumulate;
+ }
+ }
+
+ public CleanupTask releaseSessionData(Collection<UUID> sessionIDs)
+ {
+ List<Pair<UUID, RepairFinishedCompactionTask>> tasks = new ArrayList<>(sessionIDs.size());
+ for (UUID session : sessionIDs)
+ {
+ if (hasDataForSession(session))
+ {
+ tasks.add(Pair.create(session, getRepairFinishedCompactionTask(session)));
+ }
+ }
+ return new CleanupTask(cfs, tasks);
+ }
+
synchronized int getNumPendingRepairFinishedTasks()
{
int count = 0;
diff --git a/src/java/org/apache/cassandra/dht/Range.java b/src/java/org/apache/cassandra/dht/Range.java
index 80c9ef1..5b2f3d9 100644
--- a/src/java/org/apache/cassandra/dht/Range.java
+++ b/src/java/org/apache/cassandra/dht/Range.java
@@ -21,6 +21,7 @@ import java.io.Serializable;
import java.util.*;
import java.util.function.Predicate;
+import com.google.common.collect.Iterables;
import org.apache.commons.lang3.ObjectUtils;
import org.apache.cassandra.db.PartitionPosition;
@@ -139,6 +140,11 @@ public class Range<T extends RingPosition<T>> extends AbstractBounds<T> implemen
return contains(that.left) || (!that.left.equals(that.right) && intersects(new Range<T>(that.left, that.right)));
}
+ public static boolean intersects(Iterable<Range<Token>> l, Iterable<Range<Token>> r)
+ {
+ return Iterables.any(l, rng -> rng.intersects(r));
+ }
+
@SafeVarargs
public static <T extends RingPosition<T>> Set<Range<T>> rangeSet(Range<T> ... ranges)
{
@@ -335,6 +341,17 @@ public class Range<T extends RingPosition<T>> extends AbstractBounds<T> implemen
return result;
}
+
+ public static <T extends RingPosition<T>> Set<Range<T>> subtract(Collection<Range<T>> ranges, Collection<Range<T>> subtract)
+ {
+ Set<Range<T>> result = new HashSet<>();
+ for (Range<T> range : ranges)
+ {
+ result.addAll(range.subtractAll(subtract));
+ }
+ return result;
+ }
+
/**
* Calculate set of the difference ranges of given two ranges
* (as current (A, B] and rhs is (C, D])
diff --git a/src/java/org/apache/cassandra/repair/consistent/ConsistentSession.java b/src/java/org/apache/cassandra/repair/consistent/ConsistentSession.java
index d9ac927..e4d8ff0 100644
--- a/src/java/org/apache/cassandra/repair/consistent/ConsistentSession.java
+++ b/src/java/org/apache/cassandra/repair/consistent/ConsistentSession.java
@@ -216,6 +216,11 @@ public abstract class ConsistentSession
this.state = state;
}
+ public boolean intersects(Iterable<Range<Token>> otherRanges)
+ {
+ return Iterables.any(ranges, r -> r.intersects(otherRanges));
+ }
+
public boolean equals(Object o)
{
if (this == o) return true;
diff --git a/src/java/org/apache/cassandra/repair/consistent/LocalSession.java b/src/java/org/apache/cassandra/repair/consistent/LocalSession.java
index e116a43..a6f81d7 100644
--- a/src/java/org/apache/cassandra/repair/consistent/LocalSession.java
+++ b/src/java/org/apache/cassandra/repair/consistent/LocalSession.java
@@ -98,14 +98,16 @@ public class LocalSession extends ConsistentSession
private int startedAt;
private int lastUpdate;
- public void withStartedAt(int startedAt)
+ public Builder withStartedAt(int startedAt)
{
this.startedAt = startedAt;
+ return this;
}
- public void withLastUpdate(int lastUpdate)
+ public Builder withLastUpdate(int lastUpdate)
{
this.lastUpdate = lastUpdate;
+ return this;
}
void validate()
diff --git a/src/java/org/apache/cassandra/repair/consistent/LocalSessions.java b/src/java/org/apache/cassandra/repair/consistent/LocalSessions.java
index a35c50a..55fe2f0 100644
--- a/src/java/org/apache/cassandra/repair/consistent/LocalSessions.java
+++ b/src/java/org/apache/cassandra/repair/consistent/LocalSessions.java
@@ -41,10 +41,12 @@ import javax.annotation.Nullable;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
+import com.google.common.base.Verify;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
import com.google.common.primitives.Ints;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
@@ -59,6 +61,9 @@ import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.locator.Replica;
import org.apache.cassandra.repair.KeyspaceRepairManager;
+import org.apache.cassandra.repair.consistent.admin.CleanupSummary;
+import org.apache.cassandra.repair.consistent.admin.PendingStat;
+import org.apache.cassandra.repair.consistent.admin.PendingStats;
import org.apache.cassandra.schema.Schema;
import org.apache.cassandra.db.marshal.UTF8Type;
import org.apache.cassandra.locator.InetAddressAndPort;
@@ -148,6 +153,7 @@ public class LocalSessions
private final String table = SystemKeyspace.REPAIRS;
private boolean started = false;
private volatile ImmutableMap<UUID, LocalSession> sessions = ImmutableMap.of();
+ private volatile ImmutableMap<TableId, RepairedState> repairedStates = ImmutableMap.of();
@VisibleForTesting
int getNumSessions()
@@ -173,16 +179,137 @@ public class LocalSessions
return StorageService.instance.isInitialized();
}
- public List<Map<String, String>> sessionInfo(boolean all)
+ public List<Map<String, String>> sessionInfo(boolean all, Set<Range<Token>> ranges)
{
Iterable<LocalSession> currentSessions = sessions.values();
+
if (!all)
- {
currentSessions = Iterables.filter(currentSessions, s -> !s.isCompleted());
- }
+
+ if (!ranges.isEmpty())
+ currentSessions = Iterables.filter(currentSessions, s -> s.intersects(ranges));
+
return Lists.newArrayList(Iterables.transform(currentSessions, LocalSessionInfo::sessionToMap));
}
+ private RepairedState getRepairedState(TableId tid)
+ {
+ if (!repairedStates.containsKey(tid))
+ {
+ synchronized (this)
+ {
+ if (!repairedStates.containsKey(tid))
+ {
+ repairedStates = ImmutableMap.<TableId, RepairedState>builder()
+ .putAll(repairedStates)
+ .put(tid, new RepairedState())
+ .build();
+ }
+ }
+ }
+ return Verify.verifyNotNull(repairedStates.get(tid));
+ }
+
+ private void maybeUpdateRepairedState(LocalSession session)
+ {
+ if (session.getState() != FINALIZED)
+ return;
+
+ // if the session is finalized but has repairedAt set to 0, it was
+ // a forced repair, and we shouldn't update the repaired state
+ if (session.repairedAt == ActiveRepairService.UNREPAIRED_SSTABLE)
+ return;
+
+ for (TableId tid : session.tableIds)
+ {
+ RepairedState state = getRepairedState(tid);
+ state.add(session.ranges, session.repairedAt);
+ }
+ }
+
+ /**
+ * Determine if all ranges and tables covered by this session
+ * have since been re-repaired by a more recent session
+ */
+ private boolean isSuperseded(LocalSession session)
+ {
+ for (TableId tid : session.tableIds)
+ {
+ RepairedState state = repairedStates.get(tid);
+
+ if (state == null)
+ return false;
+
+ long minRepaired = state.minRepairedAt(session.ranges);
+ if (minRepaired <= session.repairedAt)
+ return false;
+ }
+
+ return true;
+ }
+
+ public RepairedState.Stats getRepairedStats(TableId tid, Collection<Range<Token>> ranges)
+ {
+ RepairedState state = repairedStates.get(tid);
+
+ if (state == null)
+ return RepairedState.Stats.EMPTY;
+
+ return state.getRepairedStats(ranges);
+ }
+
+ public PendingStats getPendingStats(TableId tid, Collection<Range<Token>> ranges)
+ {
+ ColumnFamilyStore cfs = Schema.instance.getColumnFamilyStoreInstance(tid);
+ Preconditions.checkArgument(cfs != null);
+
+ PendingStat.Builder pending = new PendingStat.Builder();
+ PendingStat.Builder finalized = new PendingStat.Builder();
+ PendingStat.Builder failed = new PendingStat.Builder();
+
+ Map<UUID, PendingStat> stats = cfs.getPendingRepairStats();
+ for (Map.Entry<UUID, PendingStat> entry : stats.entrySet())
+ {
+ UUID sessionID = entry.getKey();
+ PendingStat stat = entry.getValue();
+ Verify.verify(sessionID.equals(Iterables.getOnlyElement(stat.sessions)));
+
+ LocalSession session = sessions.get(sessionID);
+ Verify.verifyNotNull(session);
+
+ if (!Iterables.any(ranges, r -> r.intersects(session.ranges)))
+ continue;
+
+ switch (session.getState())
+ {
+ case FINALIZED:
+ finalized.addStat(stat);
+ break;
+ case FAILED:
+ failed.addStat(stat);
+ break;
+ default:
+ pending.addStat(stat);
+ }
+ }
+
+ return new PendingStats(cfs.keyspace.getName(), cfs.name, pending.build(), finalized.build(), failed.build());
+ }
+
+ public CleanupSummary cleanup(TableId tid, Collection<Range<Token>> ranges, boolean force)
+ {
+ Iterable<LocalSession> candidates = Iterables.filter(sessions.values(),
+ ls -> ls.isCompleted()
+ && ls.tableIds.contains(tid)
+ && Range.intersects(ls.ranges, ranges));
+
+ ColumnFamilyStore cfs = Schema.instance.getColumnFamilyStoreInstance(tid);
+ Set<UUID> sessionIds = Sets.newHashSet(Iterables.transform(candidates, s -> s.sessionID));
+
+
+ return cfs.releaseRepairData(sessionIds, force);
+ }
+
/**
* hook for operators to cancel sessions, cancelling from a non-coordinator is an error, unless
* force is set to true. Messages are sent out to other participants, but we don't wait for a response
@@ -219,6 +346,7 @@ public class LocalSessions
try
{
LocalSession session = load(row);
+ maybeUpdateRepairedState(session);
loadedSessions.put(session.sessionID, session);
}
catch (IllegalArgumentException | NullPointerException e)
@@ -275,7 +403,14 @@ public class LocalSessions
}
else if (shouldDelete(session, now))
{
- if (!sessionHasData(session))
+ if (session.getState() == FINALIZED && !isSuperseded(session))
+ {
+ // if we delete a non-superseded session, some ranges will be mis-reported as
+ // not having been repaired in repair_admin after a restart
+ logger.info("Skipping delete of FINALIZED LocalSession {} because it has " +
+ "not been superseded by a more recent session", session.sessionID);
+ }
+ else if (!sessionHasData(session))
{
logger.info("Auto deleting repair session {}", session);
deleteSession(session.sessionID);
@@ -370,6 +505,8 @@ public class LocalSessions
session.participants.stream().map(participant -> participant.toString()).collect(Collectors.toSet()),
serializeRanges(session.ranges),
tableIdToUuid(session.tableIds));
+
+ maybeUpdateRepairedState(session);
}
private static int dateToSeconds(Date d)
diff --git a/src/java/org/apache/cassandra/repair/consistent/RepairedState.java b/src/java/org/apache/cassandra/repair/consistent/RepairedState.java
new file mode 100644
index 0000000..ac0e7cb
--- /dev/null
+++ b/src/java/org/apache/cassandra/repair/consistent/RepairedState.java
@@ -0,0 +1,339 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.repair.consistent;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.PeekingIterator;
+import com.google.common.collect.Sets;
+
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.service.ActiveRepairService;
+import org.apache.cassandra.utils.UUIDGen;
+
+import static org.apache.cassandra.service.ActiveRepairService.UNREPAIRED_SSTABLE;
+
+/**
+ * Tracks the repaired state of token ranges per table, and is effectively an
+ * in memory representation of the on-disk local incremental repair state.
+ *
+ * The main purpose of this class is to provide metrics via nodetool repair_admin. To make sure those metrics
+ * are accurate, it also determines when a completed IR session can be deleted, which is explained in a bit
+ * more detail in LocalSessions#cleanup, by the call to isSuperseded.
+ */
+public class RepairedState
+{
+ static class Level
+ {
+ final List<Range<Token>> ranges;
+ final long repairedAt;
+
+ private static final Comparator<Level> timeComparator = Comparator.comparingLong(l -> -l.repairedAt);
+
+ Level(Collection<Range<Token>> ranges, long repairedAt)
+ {
+ this.ranges = Range.normalize(ranges);
+ this.repairedAt = repairedAt;
+ }
+
+ Level subtract(Collection<Range<Token>> ranges)
+ {
+ if (ranges.isEmpty())
+ return this;
+
+ Set<Range<Token>> difference = Range.subtract(this.ranges, ranges);
+ if (difference.isEmpty())
+ return null;
+
+ return new Level(difference, repairedAt);
+ }
+
+ public boolean equals(Object o)
+ {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ Level level = (Level) o;
+ return repairedAt == level.repairedAt &&
+ Objects.equals(ranges, level.ranges);
+ }
+
+ public int hashCode()
+ {
+ return Objects.hash(ranges, repairedAt);
+ }
+
+ @Override
+ public String toString()
+ {
+ return "Level{" +
+ "ranges=" + ranges +
+ ", repairedAt=" + repairedAt +
+ '}';
+ }
+ }
+
+ public static class Section
+ {
+ public final Range<Token> range;
+ public final long repairedAt;
+ private static final Comparator<Section> tokenComparator = (l, r) -> l.range.left.compareTo(r.range.left);
+
+ Section(Range<Token> range, long repairedAt)
+ {
+ this.range = range;
+ this.repairedAt = repairedAt;
+ }
+
+ Section makeSubsection(Range<Token> subrange)
+ {
+ Preconditions.checkArgument(range.contains(subrange));
+ return new Section(subrange, repairedAt);
+ }
+
+ public boolean equals(Object o)
+ {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ Section section = (Section) o;
+ return repairedAt == section.repairedAt &&
+ Objects.equals(range, section.range);
+ }
+
+ public int hashCode()
+ {
+
+ return Objects.hash(range, repairedAt);
+ }
+
+ @Override
+ public String toString()
+ {
+ return "Section{" +
+ "range=" + range +
+ ", repairedAt=" + repairedAt +
+ '}';
+ }
+ }
+
+ public static class Stats
+ {
+ public static final Stats EMPTY = new Stats(UNREPAIRED_SSTABLE, UNREPAIRED_SSTABLE, Collections.emptyList());
+
+ public final long minRepaired;
+ public final long maxRepaired;
+ public final List<Section> sections;
+
+ public Stats(long minRepaired, long maxRepaired, List<Section> sections)
+ {
+ this.minRepaired = minRepaired;
+ this.maxRepaired = maxRepaired;
+ this.sections = sections;
+ }
+
+
+ }
+
+ static class State
+ {
+ final ImmutableList<Level> levels;
+ final ImmutableList<Range<Token>> covered;
+ final ImmutableList<Section> sections;
+
+ State(List<Level> levels, List<Range<Token>> covered, List<Section> sections)
+ {
+ this.levels = ImmutableList.copyOf(levels);
+ this.covered = ImmutableList.copyOf(covered);
+ this.sections = ImmutableList.copyOf(sections);
+ }
+
+ public boolean equals(Object o)
+ {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ State state = (State) o;
+ return Objects.equals(levels, state.levels) &&
+ Objects.equals(covered, state.covered);
+ }
+
+ public int hashCode()
+ {
+ return Objects.hash(levels, covered);
+ }
+
+ @Override
+ public String toString()
+ {
+ return "State{" +
+ "levels=" + levels +
+ ", covered=" + covered +
+ '}';
+ }
+ }
+
+ private volatile State state = new State(Collections.emptyList(), Collections.emptyList(), Collections.emptyList());
+
+ State state()
+ {
+ return state;
+ }
+
+ private static List<Section> levelsToSections(List<Level> levels)
+ {
+ List<Section> sections = new ArrayList<>();
+ for (Level level : levels)
+ {
+ for (Range<Token> range : level.ranges)
+ {
+ sections.add(new Section(range, level.repairedAt));
+ }
+ }
+ sections.sort(Section.tokenComparator);
+ return sections;
+ }
+
+ public synchronized void add(Collection<Range<Token>> ranges, long repairedAt)
+ {
+ Level newLevel = new Level(ranges, repairedAt);
+
+ State lastState = state;
+
+ List<Level> tmp = new ArrayList<>(lastState.levels.size() + 1);
+ tmp.addAll(lastState.levels);
+ tmp.add(newLevel);
+ tmp.sort(Level.timeComparator);
+
+ List<Level> levels = new ArrayList<>(lastState.levels.size() + 1);
+ List<Range<Token>> covered = new ArrayList<>();
+
+ for (Level level : tmp)
+ {
+ Level subtracted = level.subtract(covered);
+ if (subtracted == null)
+ continue;
+
+ levels.add(subtracted);
+
+ covered.addAll(subtracted.ranges);
+ covered = Range.normalize(covered);
+ }
+
+ List<Section> sections = new ArrayList<>();
+ for (Level level : levels)
+ {
+ for (Range<Token> range : level.ranges)
+ {
+ sections.add(new Section(range, level.repairedAt));
+ }
+ }
+ sections.sort(Section.tokenComparator);
+
+ state = new State(levels, covered, sections);
+ }
+
+ public long minRepairedAt(Collection<Range<Token>> ranges)
+ {
+ State current = state;
+
+ Set<Range<Token>> remainingRanges = new HashSet<>(ranges);
+ long minTime = Long.MAX_VALUE;
+ for (Section section : current.sections)
+ {
+ if (section.range.intersects(remainingRanges))
+ {
+ minTime = Math.min(minTime, section.repairedAt);
+ remainingRanges = Range.subtract(remainingRanges, Collections.singleton(section.range));
+ }
+
+ if (remainingRanges.isEmpty())
+ break;
+ }
+ // if there are still ranges we don't have data for, part of the requested ranges is unrepaired
+ return remainingRanges.isEmpty() ? minTime : UNREPAIRED_SSTABLE;
+ }
+
+ static List<Section> getRepairedStats(List<Section> sections, Collection<Range<Token>> ranges)
+ {
+ if (ranges.isEmpty())
+ return Collections.emptyList();
+
+ Set<Range<Token>> remaining = Sets.newHashSet(Range.normalize(ranges));
+ List<Section> results = new ArrayList<>();
+
+ for (Section section : sections)
+ {
+ if (remaining.isEmpty())
+ break;
+
+ Set<Range<Token>> sectionRanges = Range.rangeSet(section.range);
+ for (Range<Token> range : remaining)
+ {
+ if (sectionRanges.isEmpty())
+ break;
+
+ Set<Range<Token>> intersection = new HashSet<>();
+ sectionRanges.forEach(r -> intersection.addAll(r.intersectionWith(range)));
+
+ if (intersection.isEmpty())
+ continue;
+
+ intersection.forEach(r -> results.add(section.makeSubsection(r)));
+ sectionRanges = Range.subtract(sectionRanges, intersection);
+ }
+
+ remaining = Range.subtract(remaining, Collections.singleton(section.range));
+ }
+
+ remaining.forEach(r -> results.add(new Section(r, UNREPAIRED_SSTABLE)));
+ results.sort(Section.tokenComparator);
+
+ return results;
+ }
+
+ public Stats getRepairedStats(Collection<Range<Token>> ranges)
+ {
+ List<Section> sections = getRepairedStats(state.sections, ranges);
+
+ if (sections.isEmpty())
+ return new Stats(UNREPAIRED_SSTABLE, UNREPAIRED_SSTABLE, Collections.emptyList());
+
+ long minTime = Long.MAX_VALUE;
+ long maxTime = Long.MIN_VALUE;
+
+ for (Section section : sections)
+ {
+ minTime = Math.min(minTime, section.repairedAt);
+ maxTime = Math.max(maxTime, section.repairedAt);
+ }
+
+ return new Stats(minTime, maxTime, sections);
+ }
+}
diff --git a/src/java/org/apache/cassandra/repair/consistent/admin/CleanupSummary.java b/src/java/org/apache/cassandra/repair/consistent/admin/CleanupSummary.java
new file mode 100644
index 0000000..f984411
--- /dev/null
+++ b/src/java/org/apache/cassandra/repair/consistent/admin/CleanupSummary.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.repair.consistent.admin;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+
+import javax.management.openmbean.ArrayType;
+import javax.management.openmbean.CompositeData;
+import javax.management.openmbean.CompositeDataSupport;
+import javax.management.openmbean.CompositeType;
+import javax.management.openmbean.OpenDataException;
+import javax.management.openmbean.OpenType;
+import javax.management.openmbean.SimpleType;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
+import com.google.common.collect.Sets;
+
+import org.apache.cassandra.db.ColumnFamilyStore;
+
+public class CleanupSummary
+{
+ private static final String[] COMPOSITE_NAMES = new String[] { "keyspace", "table", "successful", "unsuccessful" };
+ private static final OpenType<?>[] COMPOSITE_TYPES;
+ private static final CompositeType COMPOSITE_TYPE;
+
+ static
+ {
+ try
+ {
+ COMPOSITE_TYPES = new OpenType[] { SimpleType.STRING,
+ SimpleType.STRING,
+ ArrayType.getArrayType(SimpleType.STRING),
+ ArrayType.getArrayType(SimpleType.STRING) };
+ COMPOSITE_TYPE = new CompositeType(RepairStats.Section.class.getName(), "PendingStats",
+ COMPOSITE_NAMES, COMPOSITE_NAMES, COMPOSITE_TYPES);
+ }
+ catch (OpenDataException e)
+ {
+ throw Throwables.propagate(e);
+ }
+ }
+
+ public final String keyspace;
+ public final String table;
+
+ public final Set<UUID> successful;
+ public final Set<UUID> unsuccessful;
+
+ public CleanupSummary(String keyspace, String table, Set<UUID> successful, Set<UUID> unsuccessful)
+ {
+ this.keyspace = keyspace;
+ this.table = table;
+ this.successful = successful;
+ this.unsuccessful = unsuccessful;
+ }
+
+ public CleanupSummary(ColumnFamilyStore cfs, Set<UUID> successful, Set<UUID> unsuccessful)
+ {
+ this(cfs.keyspace.getName(), cfs.name, successful, unsuccessful);
+ }
+
+ public static CleanupSummary add(CleanupSummary l, CleanupSummary r)
+ {
+ Preconditions.checkArgument(l.keyspace.equals(r.keyspace));
+ Preconditions.checkArgument(l.table.equals(r.table));
+
+ Set<UUID> unsuccessful = new HashSet<>(l.unsuccessful);
+ unsuccessful.addAll(r.unsuccessful);
+
+ Set<UUID> successful = new HashSet<>(l.successful);
+ successful.addAll(r.successful);
+ successful.removeAll(unsuccessful);
+
+ return new CleanupSummary(l.keyspace, l.table, successful, unsuccessful);
+ }
+
+ private static String[] uuids2Strings(Set<UUID> uuids)
+ {
+ String[] strings = new String[uuids.size()];
+ int idx = 0;
+ for (UUID uuid : uuids)
+ strings[idx++] = uuid.toString();
+ return strings;
+ }
+
+ private static Set<UUID> strings2Uuids(String[] strings)
+ {
+ Set<UUID> uuids = Sets.newHashSetWithExpectedSize(strings.length);
+ for (String string : strings)
+ uuids.add(UUID.fromString(string));
+
+ return uuids;
+ }
+
+ public CompositeData toComposite()
+ {
+ Map<String, Object> values = new HashMap<>();
+ values.put(COMPOSITE_NAMES[0], keyspace);
+ values.put(COMPOSITE_NAMES[1], table);
+ values.put(COMPOSITE_NAMES[2], uuids2Strings(successful));
+ values.put(COMPOSITE_NAMES[3], uuids2Strings(unsuccessful));
+ try
+ {
+ return new CompositeDataSupport(COMPOSITE_TYPE, values);
+ }
+ catch (OpenDataException e)
+ {
+ throw Throwables.propagate(e);
+ }
+ }
+
+ public static CleanupSummary fromComposite(CompositeData cd)
+ {
+ Preconditions.checkArgument(cd.getCompositeType().equals(COMPOSITE_TYPE));
+ Object[] values = cd.getAll(COMPOSITE_NAMES);
+ return new CleanupSummary((String) values[0],
+ (String) values[1],
+ strings2Uuids((String[]) values[2]),
+ strings2Uuids((String[]) values[3]));
+
+ }
+}
diff --git a/src/java/org/apache/cassandra/repair/consistent/admin/PendingStat.java b/src/java/org/apache/cassandra/repair/consistent/admin/PendingStat.java
new file mode 100644
index 0000000..0c4424e
--- /dev/null
+++ b/src/java/org/apache/cassandra/repair/consistent/admin/PendingStat.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.repair.consistent.admin;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import javax.management.openmbean.ArrayType;
+import javax.management.openmbean.CompositeData;
+import javax.management.openmbean.CompositeDataSupport;
+import javax.management.openmbean.CompositeType;
+import javax.management.openmbean.OpenDataException;
+import javax.management.openmbean.OpenType;
+import javax.management.openmbean.SimpleType;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
+import com.google.common.collect.Iterables;
+
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.util.FileUtils;
+
+public class PendingStat
+{
+ private static final String[] COMPOSITE_NAMES = new String[] {"dataSize", "numSSTables", "sessions"};
+ private static final OpenType<?>[] COMPOSITE_TYPES;
+ public static final CompositeType COMPOSITE_TYPE;
+
+ static
+ {
+ try
+ {
+ COMPOSITE_TYPES = new OpenType[] { SimpleType.LONG, SimpleType.INTEGER, ArrayType.getArrayType(SimpleType.STRING) };
+ COMPOSITE_TYPE = new CompositeType(PendingStat.class.getName(),
+ PendingStat.class.getSimpleName(),
+ COMPOSITE_NAMES, COMPOSITE_NAMES, COMPOSITE_TYPES);
+ }
+ catch (OpenDataException e)
+ {
+ throw Throwables.propagate(e);
+ }
+ }
+
+
+
+ public final long dataSize;
+ public final int numSSTables;
+ public final Set<UUID> sessions;
+
+ public PendingStat(long dataSize, int numSSTables, Set<UUID> sessions)
+ {
+ this.dataSize = dataSize;
+ this.numSSTables = numSSTables;
+ this.sessions = Collections.unmodifiableSet(sessions);
+ }
+
+ public String sizeString()
+ {
+ return String.format("%s (%s sstables / %s sessions)", FileUtils.stringifyFileSize(dataSize), numSSTables, sessions.size());
+ }
+
+ public CompositeData toComposite()
+ {
+ Map<String, Object> values = new HashMap<>();
+ values.put(COMPOSITE_NAMES[0], dataSize);
+ values.put(COMPOSITE_NAMES[1], numSSTables);
+ String[] sessionIds = new String[sessions.size()];
+ int idx = 0;
+ for (UUID session : sessions)
+ sessionIds[idx++] = session.toString();
+ values.put(COMPOSITE_NAMES[2], sessionIds);
+
+ try
+ {
+ return new CompositeDataSupport(COMPOSITE_TYPE, values);
+ }
+ catch (OpenDataException e)
+ {
+ throw Throwables.propagate(e);
+ }
+ }
+
+ public static PendingStat fromComposite(CompositeData cd)
+ {
+ Preconditions.checkArgument(cd.getCompositeType().equals(COMPOSITE_TYPE));
+ Object[] values = cd.getAll(COMPOSITE_NAMES);
+ Set<UUID> sessions = new HashSet<>();
+ for (String session : (String[]) values[2])
+ sessions.add(UUID.fromString(session));
+ return new PendingStat((long) values[0], (int) values[1], sessions);
+ }
+
+ public static class Builder
+ {
+ public long dataSize = 0;
+ public int numSSTables = 0;
+ public Set<UUID> sessions = new HashSet<>();
+
+ public Builder addSSTable(SSTableReader sstable)
+ {
+ UUID sessionID = sstable.getPendingRepair();
+ if (sessionID == null)
+ return this;
+ dataSize += sstable.onDiskLength();
+ sessions.add(sessionID);
+ numSSTables++;
+ return this;
+ }
+
+ public Builder addStat(PendingStat stat)
+ {
+ dataSize += stat.dataSize;
+ numSSTables += stat.numSSTables;
+ sessions.addAll(stat.sessions);
+ return this;
+ }
+
+ public PendingStat build()
+ {
+ return new PendingStat(dataSize, numSSTables, sessions);
+ }
+ }
+}
diff --git a/src/java/org/apache/cassandra/repair/consistent/admin/PendingStats.java b/src/java/org/apache/cassandra/repair/consistent/admin/PendingStats.java
new file mode 100644
index 0000000..9dbe0df
--- /dev/null
+++ b/src/java/org/apache/cassandra/repair/consistent/admin/PendingStats.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.repair.consistent.admin;
+
+import java.util.HashMap;
+import java.util.Map;
+import javax.management.openmbean.CompositeData;
+import javax.management.openmbean.CompositeDataSupport;
+import javax.management.openmbean.CompositeType;
+import javax.management.openmbean.OpenDataException;
+import javax.management.openmbean.OpenType;
+import javax.management.openmbean.SimpleType;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
+
+public class PendingStats
+{
+
+ private static final String[] COMPOSITE_NAMES = new String[] { "keyspace", "table", "total", "pending", "finalized", "failed" };
+ private static final OpenType<?>[] COMPOSITE_TYPES;
+ private static final CompositeType COMPOSITE_TYPE;
+
+ static
+ {
+ try
+ {
+ COMPOSITE_TYPES = new OpenType[] { SimpleType.STRING,
+ SimpleType.STRING,
+ PendingStat.COMPOSITE_TYPE,
+ PendingStat.COMPOSITE_TYPE,
+ PendingStat.COMPOSITE_TYPE};
+ COMPOSITE_TYPE = new CompositeType(RepairStats.Section.class.getName(), "PendingStats", COMPOSITE_NAMES, COMPOSITE_NAMES, COMPOSITE_TYPES);
+ }
+ catch (OpenDataException e)
+ {
+ throw Throwables.propagate(e);
+ }
+ }
+
+ public final String keyspace;
+ public final String table;
+ public final PendingStat total;
+ public final PendingStat pending;
+ public final PendingStat finalized;
+ public final PendingStat failed;
+
+ public PendingStats(String keyspace, String table, PendingStat pending, PendingStat finalized, PendingStat failed)
+ {
+ this.keyspace = keyspace;
+ this.table = table;
+
+ this.total = new PendingStat.Builder().addStat(pending).addStat(finalized).addStat(failed).build();
+ this.pending = pending;
+ this.finalized = finalized;
+ this.failed = failed;
+ }
+
+ public CompositeData toComposite()
+ {
+ Map<String, Object> values = new HashMap<>();
+ values.put(COMPOSITE_NAMES[0], keyspace);
+ values.put(COMPOSITE_NAMES[1], table);
+ values.put(COMPOSITE_NAMES[2], pending.toComposite());
+ values.put(COMPOSITE_NAMES[3], finalized.toComposite());
+ values.put(COMPOSITE_NAMES[4], failed.toComposite());
+ try
+ {
+ return new CompositeDataSupport(COMPOSITE_TYPE, values);
+ }
+ catch (OpenDataException e)
+ {
+ throw Throwables.propagate(e);
+ }
+ }
+
+ public static PendingStats fromComposite(CompositeData cd)
+ {
+ Preconditions.checkArgument(cd.getCompositeType().equals(COMPOSITE_TYPE));
+ Object[] values = cd.getAll(COMPOSITE_NAMES);
+ return new PendingStats((String) values[0],
+ (String) values[1],
+ PendingStat.fromComposite((CompositeData) values[2]),
+ PendingStat.fromComposite((CompositeData) values[3]),
+ PendingStat.fromComposite((CompositeData) values[3]));
+
+ }
+}
diff --git a/src/java/org/apache/cassandra/repair/consistent/admin/RepairStats.java b/src/java/org/apache/cassandra/repair/consistent/admin/RepairStats.java
new file mode 100644
index 0000000..bbb4778
--- /dev/null
+++ b/src/java/org/apache/cassandra/repair/consistent/admin/RepairStats.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.repair.consistent.admin;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import javax.management.openmbean.*;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
+
+import org.apache.cassandra.repair.consistent.RepairedState;
+
+public class RepairStats
+{
+ public static class Section
+ {
+
+ private static final String[] COMPOSITE_NAMES = new String[] { "start", "end", "repairedAt" };
+ private static final OpenType<?>[] COMPOSITE_TYPES;
+ private static final CompositeType COMPOSITE_TYPE;
+
+ static
+ {
+ try
+ {
+ COMPOSITE_TYPES = new OpenType[] { SimpleType.STRING, SimpleType.STRING, SimpleType.LONG };
+ COMPOSITE_TYPE = new CompositeType(Section.class.getName(), "Section", COMPOSITE_NAMES, COMPOSITE_NAMES, COMPOSITE_TYPES);
+ }
+ catch (OpenDataException e)
+ {
+ throw Throwables.propagate(e);
+ }
+ }
+
+ public final String start;
+ public final String end;
+ public final long time;
+
+ public Section(String start, String end, long time)
+ {
+ this.start = start;
+ this.end = end;
+ this.time = time;
+ }
+
+ private CompositeData toComposite()
+ {
+ Map<String, Object> values = new HashMap<>();
+ values.put(COMPOSITE_NAMES[0], start);
+ values.put(COMPOSITE_NAMES[1], end);
+ values.put(COMPOSITE_NAMES[2], time);
+
+ try
+ {
+ return new CompositeDataSupport(COMPOSITE_TYPE, values);
+ }
+ catch (OpenDataException e)
+ {
+ throw Throwables.propagate(e);
+ }
+ }
+
+ private static Section fromComposite(CompositeData cd)
+ {
+ Preconditions.checkArgument(cd.getCompositeType().equals(COMPOSITE_TYPE));
+ Object[] values = cd.getAll(COMPOSITE_NAMES);
+ String start = (String) values[0];
+ String end = (String) values[1];
+ long time = (long) values[2];
+ return new Section(start, end, time);
+ }
+
+ @Override
+ public String toString()
+ {
+ return String.format("(%s,%s]=%s", start, end, time);
+ }
+ }
+
+ private static final String[] COMPOSITE_NAMES = new String[] { "keyspace", "table", "minRepaired", "maxRepaired", "sections" };
+ private static final OpenType<?>[] COMPOSITE_TYPES;
+ private static final CompositeType COMPOSITE_TYPE;
+
+ static
+ {
+ try
+ {
+ COMPOSITE_TYPES = new OpenType[] { SimpleType.STRING, SimpleType.STRING,
+ SimpleType.LONG, SimpleType.LONG,
+ ArrayType.getArrayType(Section.COMPOSITE_TYPE)};
+ COMPOSITE_TYPE = new CompositeType(RepairStats.class.getName(), RepairStats.class.getSimpleName(),
+ COMPOSITE_NAMES, COMPOSITE_NAMES, COMPOSITE_TYPES);
+ }
+ catch (OpenDataException e)
+ {
+ throw Throwables.propagate(e);
+ }
+ }
+
+ public final String keyspace;
+ public final String table;
+ public final long minRepaired;
+ public final long maxRepaired;
+ public final List<Section> sections;
+
+ private RepairStats(String keyspace, String table, long minRepaired, long maxRepaired, List<Section> sections)
+ {
+ this.keyspace = keyspace;
+ this.table = table;
+ this.minRepaired = minRepaired;
+ this.maxRepaired = maxRepaired;
+ this.sections = sections;
+ }
+
+ public static List<Section> convertSections(List<RepairedState.Section> from)
+ {
+ List<Section> to = new ArrayList<>(from.size());
+ for (RepairedState.Section section : from)
+ {
+ to.add(new Section(section.range.left.toString(), section.range.right.toString(), section.repairedAt));
+ }
+ return to;
+ }
+
+ public static RepairStats fromRepairState(String keyspace, String table, RepairedState.Stats stats)
+ {
+ return new RepairStats(keyspace, table, stats.minRepaired, stats.maxRepaired, convertSections(stats.sections));
+ }
+
+ public CompositeData toComposite()
+ {
+ Map<String, Object> values = new HashMap<>();
+ values.put(COMPOSITE_NAMES[0], keyspace);
+ values.put(COMPOSITE_NAMES[1], table);
+ values.put(COMPOSITE_NAMES[2], minRepaired);
+ values.put(COMPOSITE_NAMES[3], maxRepaired);
+
+ CompositeData[] compositeSections = new CompositeData[sections.size()];
+ for (int i=0; i<sections.size(); i++)
+ compositeSections[i] = sections.get(i).toComposite();
+
+ values.put(COMPOSITE_NAMES[4], compositeSections);
+ try
+ {
+ return new CompositeDataSupport(COMPOSITE_TYPE, values);
+ }
+ catch (OpenDataException e)
+ {
+ throw Throwables.propagate(e);
+ }
+ }
+
+ public static RepairStats fromComposite(CompositeData cd)
+ {
+ Preconditions.checkArgument(cd.getCompositeType().equals(COMPOSITE_TYPE));
+ Object[] values = cd.getAll(COMPOSITE_NAMES);
+
+ String keyspace = (String) values[0];
+ String table = (String) values[1];
+ long minRepaired = (long) values[2];
+ long maxRepaired = (long) values[3];
+ CompositeData[] sectionData = (CompositeData[]) values[4];
+ List<Section> sections = new ArrayList<>(sectionData.length);
+ for (CompositeData scd : sectionData)
+ sections.add(Section.fromComposite(scd));
+ return new RepairStats(keyspace, table, minRepaired, maxRepaired, sections);
+ }
+}
diff --git a/src/java/org/apache/cassandra/repair/consistent/admin/SchemaArgsParser.java b/src/java/org/apache/cassandra/repair/consistent/admin/SchemaArgsParser.java
new file mode 100644
index 0000000..67c0244
--- /dev/null
+++ b/src/java/org/apache/cassandra/repair/consistent/admin/SchemaArgsParser.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.repair.consistent.admin;
+
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.schema.Schema;
+import org.apache.cassandra.utils.AbstractIterator;
+
+public class SchemaArgsParser implements Iterable<ColumnFamilyStore>
+{
+
+ private final List<String> schemaArgs;
+
+ private SchemaArgsParser(List<String> schemaArgs)
+ {
+ this.schemaArgs = schemaArgs;
+ }
+
+ private static class TableIterator extends AbstractIterator<ColumnFamilyStore>
+ {
+ private final Iterator<ColumnFamilyStore> tables;
+
+ public TableIterator(String ksName, List<String> tableNames)
+ {
+ Preconditions.checkArgument(Schema.instance.getKeyspaceMetadata(ksName) != null);
+ Keyspace keyspace = Keyspace.open(ksName);
+
+ if (tableNames.isEmpty())
+ {
+ tables = keyspace.getColumnFamilyStores().iterator();
+ }
+ else
+ {
+ tables = Lists.newArrayList(Iterables.transform(tableNames, tn -> keyspace.getColumnFamilyStore(tn))).iterator();
+ }
+ }
+
+ @Override
+ protected ColumnFamilyStore computeNext()
+ {
+ return tables.hasNext() ? tables.next() : endOfData();
+ }
+ }
+
+ @Override
+ public Iterator<ColumnFamilyStore> iterator()
+ {
+ if (schemaArgs.isEmpty())
+ {
+ // iterate over everything
+ Iterator<String> ksNames = Schema.instance.getNonLocalStrategyKeyspaces().iterator();
+
+ return new AbstractIterator<ColumnFamilyStore>()
+ {
+ TableIterator current = null;
+ protected ColumnFamilyStore computeNext()
+ {
+ for (;;)
+ {
+ if (current != null && current.hasNext())
+ {
+ return current.next();
+ }
+
+ if (ksNames.hasNext())
+ {
+ current = new TableIterator(ksNames.next(), Collections.emptyList());
+ continue;
+ }
+
+ return endOfData();
+ }
+ }
+ };
+
+ }
+ else
+ {
+ return new TableIterator(schemaArgs.get(0), schemaArgs.subList(1, schemaArgs.size()));
+ }
+ }
+
+ public static Iterable<ColumnFamilyStore> parse(List<String> schemaArgs)
+ {
+ return new SchemaArgsParser(schemaArgs);
+ }
+
+ public static Iterable<ColumnFamilyStore> parse(String... schemaArgs)
+ {
+ return parse(Lists.newArrayList(schemaArgs));
+ }
+}
diff --git a/src/java/org/apache/cassandra/repair/messages/RepairOption.java b/src/java/org/apache/cassandra/repair/messages/RepairOption.java
index adcd776..f7ed052 100644
--- a/src/java/org/apache/cassandra/repair/messages/RepairOption.java
+++ b/src/java/org/apache/cassandra/repair/messages/RepairOption.java
@@ -57,6 +57,30 @@ public class RepairOption
private static final Logger logger = LoggerFactory.getLogger(RepairOption.class);
+ public static Set<Range<Token>> parseRanges(String rangesStr, IPartitioner partitioner)
+ {
+ if (rangesStr == null || rangesStr.isEmpty())
+ return Collections.emptySet();
+
+ Set<Range<Token>> ranges = new HashSet<>();
+ StringTokenizer tokenizer = new StringTokenizer(rangesStr, ",");
+ while (tokenizer.hasMoreTokens())
+ {
+ String[] rangeStr = tokenizer.nextToken().split(":", 2);
+ if (rangeStr.length < 2)
+ {
+ continue;
+ }
+ Token parsedBeginToken = partitioner.getTokenFactory().fromString(rangeStr[0].trim());
+ Token parsedEndToken = partitioner.getTokenFactory().fromString(rangeStr[1].trim());
+ if (parsedBeginToken.equals(parsedEndToken))
+ {
+ throw new IllegalArgumentException("Start and end tokens must be different.");
+ }
+ ranges.add(new Range<>(parsedBeginToken, parsedEndToken));
+ }
+ return ranges;
+ }
/**
* Construct RepairOptions object from given map of Strings.
* <p>
@@ -165,28 +189,10 @@ public class RepairOption
}
catch (NumberFormatException ignore) {}
}
+
// ranges
- String rangesStr = options.get(RANGES_KEY);
- Set<Range<Token>> ranges = new HashSet<>();
- if (rangesStr != null)
- {
- StringTokenizer tokenizer = new StringTokenizer(rangesStr, ",");
- while (tokenizer.hasMoreTokens())
- {
- String[] rangeStr = tokenizer.nextToken().split(":", 2);
- if (rangeStr.length < 2)
- {
- continue;
- }
- Token parsedBeginToken = partitioner.getTokenFactory().fromString(rangeStr[0].trim());
- Token parsedEndToken = partitioner.getTokenFactory().fromString(rangeStr[1].trim());
- if (parsedBeginToken.equals(parsedEndToken))
- {
- throw new IllegalArgumentException("Start and end tokens must be different.");
- }
- ranges.add(new Range<>(parsedBeginToken, parsedEndToken));
- }
- }
+ Set<Range<Token>> ranges = parseRanges(options.get(RANGES_KEY), partitioner);
+
boolean asymmetricSyncing = Boolean.parseBoolean(options.get(OPTIMISE_STREAMS_KEY));
RepairOption option = new RepairOption(parallelism, primaryRange, incremental, trace, jobThreads, ranges, !ranges.isEmpty(), pullRepair, force, previewKind, asymmetricSyncing);
diff --git a/src/java/org/apache/cassandra/service/ActiveRepairService.java b/src/java/org/apache/cassandra/service/ActiveRepairService.java
index e61bd38..3b13907 100644
--- a/src/java/org/apache/cassandra/service/ActiveRepairService.java
+++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java
@@ -22,6 +22,7 @@ import java.net.UnknownHostException;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
+import javax.management.openmbean.CompositeData;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
@@ -31,7 +32,6 @@ import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Multimap;
import com.google.common.util.concurrent.AbstractFuture;
-
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
@@ -55,9 +55,9 @@ import org.apache.cassandra.gms.ApplicationState;
import org.apache.cassandra.gms.EndpointState;
import org.apache.cassandra.gms.FailureDetector;
import org.apache.cassandra.gms.Gossiper;
-import org.apache.cassandra.gms.IFailureDetector;
import org.apache.cassandra.gms.IEndpointStateChangeSubscriber;
import org.apache.cassandra.gms.IFailureDetectionEventListener;
+import org.apache.cassandra.gms.IFailureDetector;
import org.apache.cassandra.gms.VersionedValue;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.locator.TokenMetadata;
@@ -66,14 +66,23 @@ import org.apache.cassandra.net.RequestCallback;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.repair.CommonRange;
-import org.apache.cassandra.streaming.PreviewKind;
import org.apache.cassandra.repair.RepairJobDesc;
import org.apache.cassandra.repair.RepairParallelism;
import org.apache.cassandra.repair.RepairSession;
import org.apache.cassandra.repair.consistent.CoordinatorSessions;
import org.apache.cassandra.repair.consistent.LocalSessions;
-import org.apache.cassandra.repair.messages.*;
+import org.apache.cassandra.repair.consistent.admin.CleanupSummary;
+import org.apache.cassandra.repair.consistent.admin.PendingStats;
+import org.apache.cassandra.repair.consistent.admin.RepairStats;
+import org.apache.cassandra.repair.consistent.RepairedState;
+import org.apache.cassandra.repair.consistent.admin.SchemaArgsParser;
+import org.apache.cassandra.repair.messages.PrepareMessage;
+import org.apache.cassandra.repair.messages.RepairMessage;
+import org.apache.cassandra.repair.messages.RepairOption;
+import org.apache.cassandra.repair.messages.SyncResponse;
+import org.apache.cassandra.repair.messages.ValidationResponse;
import org.apache.cassandra.schema.TableId;
+import org.apache.cassandra.streaming.PreviewKind;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.MBeanWrapper;
import org.apache.cassandra.utils.Pair;
@@ -210,9 +219,10 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai
}
@Override
- public List<Map<String, String>> getSessions(boolean all)
+ public List<Map<String, String>> getSessions(boolean all, String rangesStr)
{
- return consistent.local.sessionInfo(all);
+ Set<Range<Token>> ranges = RepairOption.parseRanges(rangesStr, DatabaseDescriptor.getPartitioner());
+ return consistent.local.sessionInfo(all, ranges);
}
@Override
@@ -234,6 +244,65 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai
return DatabaseDescriptor.getRepairSessionSpaceInMegabytes();
}
+ public List<CompositeData> getRepairStats(List<String> schemaArgs, String rangeString)
+ {
+ List<CompositeData> stats = new ArrayList<>();
+ Collection<Range<Token>> userRanges = rangeString != null
+ ? RepairOption.parseRanges(rangeString, DatabaseDescriptor.getPartitioner())
+ : null;
+
+ for (ColumnFamilyStore cfs : SchemaArgsParser.parse(schemaArgs))
+ {
+ String keyspace = cfs.keyspace.getName();
+ Collection<Range<Token>> ranges = userRanges != null
+ ? userRanges
+ : StorageService.instance.getLocalReplicas(keyspace).ranges();
+ RepairedState.Stats cfStats = consistent.local.getRepairedStats(cfs.metadata().id, ranges);
+ stats.add(RepairStats.fromRepairState(keyspace, cfs.name, cfStats).toComposite());
+ }
+
+ return stats;
+ }
+
+ @Override
+ public List<CompositeData> getPendingStats(List<String> schemaArgs, String rangeString)
+ {
+ List<CompositeData> stats = new ArrayList<>();
+ Collection<Range<Token>> userRanges = rangeString != null
+ ? RepairOption.parseRanges(rangeString, DatabaseDescriptor.getPartitioner())
+ : null;
+ for (ColumnFamilyStore cfs : SchemaArgsParser.parse(schemaArgs))
+ {
+ String keyspace = cfs.keyspace.getName();
+ Collection<Range<Token>> ranges = userRanges != null
+ ? userRanges
+ : StorageService.instance.getLocalReplicas(keyspace).ranges();
+ PendingStats cfStats = consistent.local.getPendingStats(cfs.metadata().id, ranges);
+ stats.add(cfStats.toComposite());
+ }
+
+ return stats;
+ }
+
+ @Override
+ public List<CompositeData> cleanupPending(List<String> schemaArgs, String rangeString, boolean force)
+ {
+ List<CompositeData> stats = new ArrayList<>();
+ Collection<Range<Token>> userRanges = rangeString != null
+ ? RepairOption.parseRanges(rangeString, DatabaseDescriptor.getPartitioner())
+ : null;
+ for (ColumnFamilyStore cfs : SchemaArgsParser.parse(schemaArgs))
+ {
+ String keyspace = cfs.keyspace.getName();
+ Collection<Range<Token>> ranges = userRanges != null
+ ? userRanges
+ : StorageService.instance.getLocalReplicas(keyspace).ranges();
+ CleanupSummary summary = consistent.local.cleanup(cfs.metadata().id, ranges, force);
+ stats.add(summary.toComposite());
+ }
+ return stats;
+ }
+
/**
* Requests repairs for the given keyspace and column families.
*
diff --git a/src/java/org/apache/cassandra/service/ActiveRepairServiceMBean.java b/src/java/org/apache/cassandra/service/ActiveRepairServiceMBean.java
index f4d6c48..8cffecc 100644
--- a/src/java/org/apache/cassandra/service/ActiveRepairServiceMBean.java
+++ b/src/java/org/apache/cassandra/service/ActiveRepairServiceMBean.java
@@ -20,12 +20,13 @@ package org.apache.cassandra.service;
import java.util.List;
import java.util.Map;
+import javax.management.openmbean.CompositeData;
public interface ActiveRepairServiceMBean
{
public static final String MBEAN_NAME = "org.apache.cassandra.db:type=RepairService";
- public List<Map<String, String>> getSessions(boolean all);
+ public List<Map<String, String>> getSessions(boolean all, String rangesStr);
public void failSession(String session, boolean force);
public void setRepairSessionSpaceInMegabytes(int sizeInMegabytes);
@@ -36,4 +37,8 @@ public interface ActiveRepairServiceMBean
public int getRepairPendingCompactionRejectThreshold();
public void setRepairPendingCompactionRejectThreshold(int value);
+
+ public List<CompositeData> getRepairStats(List<String> schemaArgs, String rangeString);
+ public List<CompositeData> getPendingStats(List<String> schemaArgs, String rangeString);
+ public List<CompositeData> cleanupPending(List<String> schemaArgs, String rangeString, boolean force);
}
diff --git a/src/java/org/apache/cassandra/tools/NodeTool.java b/src/java/org/apache/cassandra/tools/NodeTool.java
index bf5e5cc..69cd04c 100644
--- a/src/java/org/apache/cassandra/tools/NodeTool.java
+++ b/src/java/org/apache/cassandra/tools/NodeTool.java
@@ -153,7 +153,6 @@ public class NodeTool
ReloadSeeds.class,
ResetFullQueryLog.class,
Repair.class,
- RepairAdmin.class,
ReplayBatchlog.class,
SetCacheCapacity.class,
SetConcurrency.class,
@@ -224,6 +223,15 @@ public class NodeTool
.withDefaultCommand(CassHelp.class)
.withCommand(BootstrapResume.class);
+ builder.withGroup("repair_admin")
+ .withDescription("list and fail incremental repair sessions")
+ .withDefaultCommand(RepairAdmin.ListCmd.class)
+ .withCommand(RepairAdmin.ListCmd.class)
+ .withCommand(RepairAdmin.CancelCmd.class)
+ .withCommand(RepairAdmin.CleanupDataCmd.class)
+ .withCommand(RepairAdmin.SummarizePendingCmd.class)
+ .withCommand(RepairAdmin.SummarizeRepairedCmd.class);
+
Cli<Consumer<INodeProbeFactory>> parser = builder.build();
int status = 0;
diff --git a/src/java/org/apache/cassandra/tools/nodetool/RepairAdmin.java b/src/java/org/apache/cassandra/tools/nodetool/RepairAdmin.java
index ba3cf62..b66c32a 100644
--- a/src/java/org/apache/cassandra/tools/nodetool/RepairAdmin.java
+++ b/src/java/org/apache/cassandra/tools/nodetool/RepairAdmin.java
@@ -22,13 +22,23 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import javax.management.openmbean.CompositeData;
+
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
+import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
+import org.apache.commons.lang3.StringUtils;
+
+import io.airlift.airline.Arguments;
import io.airlift.airline.Command;
import io.airlift.airline.Option;
import org.apache.cassandra.repair.consistent.LocalSessionInfo;
+import org.apache.cassandra.repair.consistent.admin.CleanupSummary;
+import org.apache.cassandra.repair.consistent.admin.PendingStat;
+import org.apache.cassandra.repair.consistent.admin.PendingStats;
+import org.apache.cassandra.repair.consistent.admin.RepairStats;
import org.apache.cassandra.service.ActiveRepairServiceMBean;
import org.apache.cassandra.tools.NodeProbe;
import org.apache.cassandra.tools.NodeTool;
@@ -37,113 +47,287 @@ import org.apache.cassandra.utils.FBUtilities;
/**
* Supports listing and failing incremental repair sessions
*/
-@Command(name = "repair_admin", description = "list and fail incremental repair sessions")
-public class RepairAdmin extends NodeTool.NodeToolCmd
+public abstract class RepairAdmin extends NodeTool.NodeToolCmd
{
- @Option(title = "list", name = {"-l", "--list"}, description = "list repair sessions (default behavior)")
- private boolean list = false;
-
- @Option(title = "all", name = {"-a", "--all"}, description = "include completed and failed sessions")
- private boolean all = false;
+ @Command(name = "list", description = "list repair sessions")
+ public static class ListCmd extends RepairAdmin
+ {
+ @Option(title = "all", name = {"-a", "--all"}, description = "include completed and failed sessions")
+ private boolean all;
- @Option(title = "cancel", name = {"-x", "--cancel"}, description = "cancel an incremental repair session")
- private String cancel = null;
+ @Option(title = "start_token", name = {"-st", "--start-token"}, description = "Use -st to specify a token at which the repair range starts")
+ private String startToken = StringUtils.EMPTY;
- @Option(title = "force", name = {"-f", "--force"}, description = "cancel repair session from a node other than the repair coordinator." +
- " Attempting to cancel FINALIZED or FAILED sessions is an error.")
- private boolean force = false;
+ @Option(title = "end_token", name = {"-et", "--end-token"}, description = "Use -et to specify a token at which repair range ends")
+ private String endToken = StringUtils.EMPTY;
- private static final List<String> header = Lists.newArrayList("id",
- "state",
- "last activity",
- "coordinator",
- "participants",
- "participants_wp");
+ protected void execute(NodeProbe probe)
+ {
+ List<Map<String, String>> sessions = probe.getRepairServiceProxy().getSessions(all, getRangeString(startToken, endToken));
+ if (sessions.isEmpty())
+ {
+ System.out.println("no sessions");
+ }
+ else
+ {
+ List<List<String>> rows = new ArrayList<>();
+ rows.add(Lists.newArrayList("id",
+ "state",
+ "last activity",
+ "coordinator",
+ "participants",
+ "participants_wp"));
+ int now = FBUtilities.nowInSeconds();
+ for (Map<String, String> session : sessions)
+ {
+ int updated = Integer.parseInt(session.get(LocalSessionInfo.LAST_UPDATE));
+ List<String> values = Lists.newArrayList(session.get(LocalSessionInfo.SESSION_ID),
+ session.get(LocalSessionInfo.STATE),
+ (now - updated) + " (s)",
+ session.get(LocalSessionInfo.COORDINATOR),
+ session.get(LocalSessionInfo.PARTICIPANTS),
+ session.get(LocalSessionInfo.PARTICIPANTS_WP));
+ rows.add(values);
+ }
- private List<String> sessionValues(Map<String, String> session, int now)
- {
- int updated = Integer.parseInt(session.get(LocalSessionInfo.LAST_UPDATE));
- return Lists.newArrayList(session.get(LocalSessionInfo.SESSION_ID),
- session.get(LocalSessionInfo.STATE),
- Integer.toString(now - updated) + " (s)",
- session.get(LocalSessionInfo.COORDINATOR),
- session.get(LocalSessionInfo.PARTICIPANTS),
- session.get(LocalSessionInfo.PARTICIPANTS_WP));
+ printTable(rows);
+ }
+ }
}
-
- private void listSessions(ActiveRepairServiceMBean repairServiceProxy)
+ @Command(name = "summarize-pending", description = "report the amount of data marked pending repair for the given token " +
+ "range (or all replicated range if no tokens are provided")
+ public static class SummarizePendingCmd extends RepairAdmin
{
- Preconditions.checkArgument(cancel == null);
- Preconditions.checkArgument(!force, "-f/--force only valid for session cancel");
- List<Map<String, String>> sessions = repairServiceProxy.getSessions(all);
- if (sessions.isEmpty())
- {
- System.out.println("no sessions");
+ @Option(title = "verbose", name = {"-v", "--verbose"}, description = "print additional info ")
+ private boolean verbose;
- }
- else
+ @Option(title = "start_token", name = {"-st", "--start-token"}, description = "Use -st to specify a token at which the repair range starts")
+ private String startToken = StringUtils.EMPTY;
+
+ @Option(title = "end_token", name = {"-et", "--end-token"}, description = "Use -et to specify a token at which repair range ends")
+ private String endToken = StringUtils.EMPTY;
+
+ @Arguments(usage = "[<keyspace> <tables>...]", description = "The keyspace followed by one or many tables")
+ private List<String> schemaArgs = new ArrayList<>();
+
+ protected void execute(NodeProbe probe)
{
- List<List<String>> rows = new ArrayList<>();
- rows.add(header);
- int now = FBUtilities.nowInSeconds();
- for (Map<String, String> session : sessions)
+ List<CompositeData> cds = probe.getRepairServiceProxy().getPendingStats(schemaArgs, getRangeString(startToken, endToken));
+ List<PendingStats> stats = new ArrayList<>(cds.size());
+ cds.forEach(cd -> stats.add(PendingStats.fromComposite(cd)));
+
+ stats.sort((l, r) -> {
+ int cmp = l.keyspace.compareTo(r.keyspace);
+ if (cmp != 0)
+ return cmp;
+
+ return l.table.compareTo(r.table);
+ });
+
+ List<String> header = Lists.newArrayList("keyspace", "table", "total");
+ if (verbose)
{
- rows.add(sessionValues(session, now));
+ header.addAll(Lists.newArrayList("pending", "finalized", "failed"));
}
- // get max col widths
- int[] widths = new int[header.size()];
- for (List<String> row : rows)
+ List<List<String>> rows = new ArrayList<>(stats.size() + 1);
+ rows.add(header);
+
+ for (PendingStats stat : stats)
{
- assert row.size() == widths.length;
- for (int i = 0; i < widths.length; i++)
+ List<String> row = new ArrayList<>(header.size());
+
+ row.add(stat.keyspace);
+ row.add(stat.table);
+ row.add(stat.total.sizeString());
+ if (verbose)
{
- widths[i] = Math.max(widths[i], row.get(i).length());
+ row.add(stat.pending.sizeString());
+ row.add(stat.finalized.sizeString());
+ row.add(stat.failed.sizeString());
}
+ rows.add(row);
}
- List<String> fmts = new ArrayList<>(widths.length);
- for (int i = 0; i < widths.length; i++)
+ printTable(rows);
+ }
+ }
+
+ @Command(name = "summarize-repaired", description = "return the most recent repairedAt timestamp for the given token range " +
+ "(or all replicated ranges if no tokens are provided)")
+ public static class SummarizeRepairedCmd extends RepairAdmin
+ {
+ @Option(title = "verbose", name = {"-v", "--verbose"}, description = "print additional info ")
+ private boolean verbose = false;
+
+ @Option(title = "start_token", name = {"-st", "--start-token"}, description = "Use -st to specify a token at which the repair range starts")
+ private String startToken = StringUtils.EMPTY;
+
+ @Option(title = "end_token", name = {"-et", "--end-token"}, description = "Use -et to specify a token at which repair range ends")
+ private String endToken = StringUtils.EMPTY;
+
+ @Arguments(usage = "[<keyspace> <tables>...]", description = "The keyspace followed by one or many tables")
+ private List<String> schemaArgs = new ArrayList<>();
+
+ protected void execute(NodeProbe probe)
+ {
+ List<CompositeData> compositeData = probe.getRepairServiceProxy().getRepairStats(schemaArgs, getRangeString(startToken, endToken));
+
+ if (compositeData.isEmpty())
{
- fmts.add("%-" + Integer.toString(widths[i]) + "s");
+ System.out.println("no stats");
+ return;
}
+ List<RepairStats> stats = new ArrayList<>(compositeData.size());
+ compositeData.forEach(cd -> stats.add(RepairStats.fromComposite(cd)));
+
+ stats.sort((l, r) -> {
+ int cmp = l.keyspace.compareTo(r.keyspace);
+ if (cmp != 0)
+ return cmp;
+
+ return l.table.compareTo(r.table);
+ });
+
+ List<String> header = Lists.newArrayList("keyspace", "table", "min_repaired", "max_repaired");
+ if (verbose)
+ header.add("detail");
- // print
- for (List<String> row : rows)
+ List<List<String>> rows = new ArrayList<>(stats.size() + 1);
+ rows.add(header);
+
+ for (RepairStats stat : stats)
{
- List<String> formatted = new ArrayList<>(row.size());
- for (int i = 0; i < widths.length; i++)
+ List<String> row = Lists.newArrayList(stat.keyspace,
+ stat.table,
+ Long.toString(stat.minRepaired),
+ Long.toString(stat.maxRepaired));
+ if (verbose)
{
- formatted.add(String.format(fmts.get(i), row.get(i)));
+ row.add(Joiner.on(", ").join(Iterables.transform(stat.sections, RepairStats.Section::toString)));
}
- System.out.println(Joiner.on(" | ").join(formatted));
+ rows.add(row);
}
+
+ printTable(rows);
}
}
- private void cancelSession(ActiveRepairServiceMBean repairServiceProxy)
+ @Command(name = "cleanup", description = "cleans up pending data from completed sessions. " +
+ "This happens automatically, but the command is provided " +
+ "for situations where it needs to be expedited." +
+ " Use --force to cancel compactions that are preventing promotion")
+ public static class CleanupDataCmd extends RepairAdmin
{
- Preconditions.checkArgument(!list);
- Preconditions.checkArgument(!all, "-a/--all only valid for session list");
- repairServiceProxy.failSession(cancel, force);
+ @Option(title = "force", name = {"-f", "--force"}, description = "Force a cleanup.")
+ private boolean force = false;
+
+ @Option(title = "start_token", name = {"-st", "--start-token"}, description = "Use -st to specify a token at which the repair range starts")
+ private String startToken = StringUtils.EMPTY;
+
+ @Option(title = "end_token", name = {"-et", "--end-token"}, description = "Use -et to specify a token at which repair range ends")
+ private String endToken = StringUtils.EMPTY;
+
+ @Arguments(usage = "[<keyspace> <tables>...]", description = "The keyspace followed by one or many tables")
+ private List<String> schemaArgs = new ArrayList<>();
+
+ protected void execute(NodeProbe probe)
+ {
+ System.out.println("Cleaning up data from completed sessions...");
+ List<CompositeData> compositeData = probe.getRepairServiceProxy().cleanupPending(schemaArgs, getRangeString(startToken, endToken), force);
+
+ List<CleanupSummary> summaries = new ArrayList<>(compositeData.size());
+ compositeData.forEach(cd -> summaries.add(CleanupSummary.fromComposite(cd)));
+
+ summaries.sort((l, r) -> {
+ int cmp = l.keyspace.compareTo(r.keyspace);
+ if (cmp != 0)
+ return cmp;
+
+ return l.table.compareTo(r.table);
+ });
+
+ List<String> header = Lists.newArrayList("keyspace", "table", "successful sessions", "unsuccessful sessions");
+ List<List<String>> rows = new ArrayList<>(summaries.size() + 1);
+ rows.add(header);
+
+ boolean hasFailures = false;
+ for (CleanupSummary summary : summaries)
+ {
+ List<String> row = Lists.newArrayList(summary.keyspace,
+ summary.table,
+ Integer.toString(summary.successful.size()),
+ Integer.toString(summary.unsuccessful.size()));
+
+ hasFailures |= !summary.unsuccessful.isEmpty();
+ rows.add(row);
+ }
+
+ if (hasFailures)
+ System.out.println("Some tables couldn't be cleaned up completely");
+
+ printTable(rows);
+ }
}
- protected void execute(NodeProbe probe)
+ @Command(name = "cancel", description = "cancel an incremental repair session." +
+ " Use --force to cancel from a node other than the repair coordinator" +
+ " Attempting to cancel FINALIZED or FAILED sessions is an error.")
+ public static class CancelCmd extends RepairAdmin
{
- if (list && cancel != null)
+ @Option(title = "force", name = {"-f", "--force"}, description = "Force a cancellation.")
+ private boolean force = false;
+
+ @Option(title = "session", name = {"-s", "--session"}, description = "The session to cancel", required = true)
+ private String sessionToCancel;
+
+ protected void execute(NodeProbe probe)
{
- throw new RuntimeException("Can either list, or cancel sessions, not both");
+ probe.getRepairServiceProxy().failSession(sessionToCancel, force);
}
- else if (cancel != null)
+ }
+
+ private static void printTable(List<List<String>> rows)
+ {
+ if (rows.isEmpty())
+ return;
+
+ // get max col widths
+ int[] widths = new int[rows.get(0).size()];
+ for (List<String> row : rows)
{
- cancelSession(probe.getRepairServiceProxy());
+ assert row.size() == widths.length;
+ for (int i = 0; i < widths.length; i++)
+ {
+ widths[i] = Math.max(widths[i], row.get(i).length());
+ }
}
- else
+
+ List<String> fmts = new ArrayList<>(widths.length);
+ for (int i = 0; i < widths.length; i++)
{
- // default
- listSessions(probe.getRepairServiceProxy());
+ fmts.add("%-" + widths[i] + "s");
}
+
+ // print
+ for (List<String> row : rows)
+ {
+ List<String> formatted = new ArrayList<>(row.size());
+ for (int i = 0; i < widths.length; i++)
+ {
+ formatted.add(String.format(fmts.get(i), row.get(i)));
+ }
+ System.out.println(Joiner.on(" | ").join(formatted));
+ }
+ }
+
+ static String getRangeString(String startToken, String endToken)
+ {
+ String rangeStr = null;
+ if (!startToken.isEmpty() || !endToken.isEmpty())
+ rangeStr = startToken + ':' + endToken;
+ return rangeStr;
}
}
diff --git a/test/unit/org/apache/cassandra/dht/RangeTest.java b/test/unit/org/apache/cassandra/dht/RangeTest.java
index 29b120b..68113f0 100644
--- a/test/unit/org/apache/cassandra/dht/RangeTest.java
+++ b/test/unit/org/apache/cassandra/dht/RangeTest.java
@@ -27,7 +27,9 @@ import java.util.Random;
import java.util.Set;
import com.google.common.base.Joiner;
+import com.google.common.collect.Sets;
import org.apache.commons.lang3.StringUtils;
+import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -723,4 +725,19 @@ public class RangeTest
Range<Token> r1 = r(20, -5);
assertNotSame(r0.compareTo(r1), r1.compareTo(r0));
}
+
+ @Test
+ public void testGroupIntersect()
+ {
+ assertTrue(Range.intersects(asList(r(1, 5), r(10, 15)), asList(r(4, 6), r(20, 25))));
+ assertFalse(Range.intersects(asList(r(1, 5), r(10, 15)), asList(r(6, 7), r(20, 25))));
+ }
+
+ @Test
+ public void testGroupSubtract()
+ {
+ Collection<Range<Token>> ranges = Sets.newHashSet(r(1, 5), r(10, 15));
+ assertEquals(ranges, Range.subtract(ranges, asList(r(6, 7), r(20, 25))));
+ assertEquals(Sets.newHashSet(r(1, 4), r(11, 15)), Range.subtract(ranges, asList(r(4, 7), r(8, 11))));
+ }
}
diff --git a/test/unit/org/apache/cassandra/repair/AbstractRepairTest.java b/test/unit/org/apache/cassandra/repair/AbstractRepairTest.java
index 2c47137..d57607a 100644
--- a/test/unit/org/apache/cassandra/repair/AbstractRepairTest.java
+++ b/test/unit/org/apache/cassandra/repair/AbstractRepairTest.java
@@ -71,6 +71,11 @@ public abstract class AbstractRepairTest
return DatabaseDescriptor.getPartitioner().getToken(ByteBufferUtil.bytes(v));
}
+ protected static Range<Token> r(int l, int r)
+ {
+ return new Range<>(t(l), t(r));
+ }
+
protected static final Range<Token> RANGE1 = new Range<>(t(1), t(2));
protected static final Range<Token> RANGE2 = new Range<>(t(2), t(3));
protected static final Range<Token> RANGE3 = new Range<>(t(4), t(5));
diff --git a/test/unit/org/apache/cassandra/repair/consistent/LocalSessionTest.java b/test/unit/org/apache/cassandra/repair/consistent/LocalSessionTest.java
index 15fd1fc..cb420b7 100644
--- a/test/unit/org/apache/cassandra/repair/consistent/LocalSessionTest.java
+++ b/test/unit/org/apache/cassandra/repair/consistent/LocalSessionTest.java
@@ -72,6 +72,8 @@ import static org.apache.cassandra.repair.consistent.ConsistentSession.State.*;
public class LocalSessionTest extends AbstractRepairTest
{
+ private static final UUID TID1 = UUIDGen.getTimeUUID();
+ private static final UUID TID2 = UUIDGen.getTimeUUID();
static LocalSession.Builder createBuilder()
{
@@ -79,7 +81,7 @@ public class LocalSessionTest extends AbstractRepairTest
builder.withState(PREPARING);
builder.withSessionID(UUIDGen.getTimeUUID());
builder.withCoordinator(COORDINATOR);
- builder.withUUIDTableIds(Sets.newHashSet(UUIDGen.getTimeUUID(), UUIDGen.getTimeUUID()));
+ builder.withUUIDTableIds(Sets.newHashSet(TID1, TID2));
builder.withRepairedAt(System.currentTimeMillis());
builder.withRanges(Sets.newHashSet(RANGE1, RANGE2, RANGE3));
builder.withParticipants(Sets.newHashSet(PARTICIPANT1, PARTICIPANT2, PARTICIPANT3));
@@ -870,6 +872,7 @@ public class LocalSessionTest extends AbstractRepairTest
{
LocalSession.Builder builder = createBuilder();
builder.withStartedAt(started);
+ builder.withRepairedAt(started);
builder.withLastUpdate(updated);
return builder.build();
}
@@ -940,11 +943,26 @@ public class LocalSessionTest extends AbstractRepairTest
sessions.cleanup();
+ // failed session should be gone, but finalized should not, since it hasn't been superseded
Assert.assertNull(sessions.getSession(failed.sessionID));
- Assert.assertNull(sessions.getSession(finalized.sessionID));
+ Assert.assertNotNull(sessions.getSession(finalized.sessionID));
Assert.assertNull(sessions.loadUnsafe(failed.sessionID));
+ Assert.assertNotNull(sessions.loadUnsafe(finalized.sessionID));
+
+ // add a finalized superseding session
+ LocalSession superseding = sessionWithTime(time, time + 1);
+ superseding.setState(FINALIZED);
+ sessions.putSessionUnsafe(superseding);
+
+ sessions.cleanup();
+
+ // old finalized should be removed, superseding should still be there
+ Assert.assertNull(sessions.getSession(finalized.sessionID));
+ Assert.assertNotNull(sessions.getSession(superseding.sessionID));
+
Assert.assertNull(sessions.loadUnsafe(finalized.sessionID));
+ Assert.assertNotNull(sessions.loadUnsafe(superseding.sessionID));
}
/**
diff --git a/test/unit/org/apache/cassandra/repair/consistent/PendingRepairStatTest.java b/test/unit/org/apache/cassandra/repair/consistent/PendingRepairStatTest.java
new file mode 100644
index 0000000..6c42724
--- /dev/null
+++ b/test/unit/org/apache/cassandra/repair/consistent/PendingRepairStatTest.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.repair.consistent;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Set;
+import java.util.UUID;
+
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Sets;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.cql3.QueryProcessor;
+import org.apache.cassandra.cql3.statements.schema.CreateTableStatement;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.dht.ByteOrderedPartitioner;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.repair.AbstractRepairTest;
+import org.apache.cassandra.repair.consistent.LocalSessionTest.InstrumentedLocalSessions;
+import org.apache.cassandra.repair.consistent.admin.PendingStats;
+import org.apache.cassandra.schema.KeyspaceParams;
+import org.apache.cassandra.schema.Schema;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.UUIDGen;
+
+import static org.apache.cassandra.repair.consistent.ConsistentSession.State.FAILED;
+import static org.apache.cassandra.repair.consistent.ConsistentSession.State.FINALIZED;
+import static org.apache.cassandra.repair.consistent.ConsistentSession.State.PREPARING;
+import static org.apache.cassandra.service.ActiveRepairService.NO_PENDING_REPAIR;
+import static org.apache.cassandra.service.ActiveRepairService.UNREPAIRED_SSTABLE;
+
+public class PendingRepairStatTest extends AbstractRepairTest
+{
+ private static TableMetadata cfm;
+ private static ColumnFamilyStore cfs;
+
+ private static Range<Token> FULL_RANGE;
+ private static IPartitioner partitioner;
+
+ static
+ {
+ DatabaseDescriptor.daemonInitialization();
+ partitioner = DatabaseDescriptor.getPartitioner();
+ assert partitioner instanceof ByteOrderedPartitioner;
+ FULL_RANGE = new Range<>(DatabaseDescriptor.getPartitioner().getMinimumToken(),
+ DatabaseDescriptor.getPartitioner().getMinimumToken());
+ }
+
+ @BeforeClass
+ public static void setupClass()
+ {
+ SchemaLoader.prepareServer();
+ cfm = CreateTableStatement.parse("CREATE TABLE tbl (k INT PRIMARY KEY, v INT)", "coordinatorsessiontest").build();
+ SchemaLoader.createKeyspace("coordinatorsessiontest", KeyspaceParams.simple(1), cfm);
+ cfs = Schema.instance.getColumnFamilyStoreInstance(cfm.id);
+ }
+
+ @Before
+ public void setUp() throws Exception
+ {
+ cfs.enableAutoCompaction();
+ }
+
+ static LocalSession createSession()
+ {
+ LocalSession.Builder builder = LocalSession.builder();
+ builder.withState(PREPARING);
+ builder.withSessionID(UUIDGen.getTimeUUID());
+ builder.withCoordinator(COORDINATOR);
+ builder.withUUIDTableIds(Sets.newHashSet(cfm.id.asUUID()));
+ builder.withRepairedAt(System.currentTimeMillis());
+ builder.withRanges(Collections.singleton(FULL_RANGE));
+ builder.withParticipants(Sets.newHashSet(PARTICIPANT1, PARTICIPANT2, PARTICIPANT3));
+
+ int now = FBUtilities.nowInSeconds();
+ builder.withStartedAt(now);
+ builder.withLastUpdate(now);
+
+ return builder.build();
+ }
+
+ private static SSTableReader createSSTable(int startKey, int keys)
+ {
+ Set<SSTableReader> existing = cfs.getLiveSSTables();
+ assert keys > 0;
+ for (int i=0; i<keys; i++)
+ {
+ int key = startKey + i;
+ QueryProcessor.executeInternal(String.format("INSERT INTO %s.%s (k, v) VALUES (?, ?)", cfm.keyspace, cfm.name), key, key);
+ }
+ cfs.forceBlockingFlush();
+ return Iterables.getOnlyElement(Sets.difference(cfs.getLiveSSTables(), existing));
+ }
+
+ private static void mutateRepaired(SSTableReader sstable, long repairedAt, UUID pendingRepair)
+ {
+ try
+ {
+ cfs.getCompactionStrategyManager().mutateRepaired(Collections.singleton(sstable), repairedAt, pendingRepair, false);
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Test
+ public void pendingRepairStats()
+ {
+ InstrumentedLocalSessions sessions = new InstrumentedLocalSessions();
+ sessions.start();
+ cfs.disableAutoCompaction();
+ SSTableReader sstable1 = createSSTable(0, 10);
+ SSTableReader sstable2 = createSSTable(10, 10);
+ SSTableReader sstable3 = createSSTable(10, 20);
+
+ LocalSession session1 = createSession();
+ sessions.putSessionUnsafe(session1);
+ LocalSession session2 = createSession();
+ sessions.putSessionUnsafe(session2);
+
+ PendingStats stats;
+ stats = sessions.getPendingStats(cfm.id, Collections.singleton(FULL_RANGE));
+ Assert.assertEquals(0, stats.total.numSSTables);
+
+ // set all sstables to pending
+ mutateRepaired(sstable1, UNREPAIRED_SSTABLE, session1.sessionID);
+ mutateRepaired(sstable2, UNREPAIRED_SSTABLE, session2.sessionID);
+ mutateRepaired(sstable3, UNREPAIRED_SSTABLE, session2.sessionID);
+
+ stats = sessions.getPendingStats(cfm.id, Collections.singleton(FULL_RANGE));
+ Assert.assertEquals(Sets.newHashSet(session1.sessionID, session2.sessionID), stats.total.sessions);
+ Assert.assertEquals(3, stats.total.numSSTables);
+ Assert.assertEquals(3, stats.pending.numSSTables);
+ Assert.assertEquals(0, stats.failed.numSSTables);
+ Assert.assertEquals(0, stats.finalized.numSSTables);
+
+ // set the 2 sessions to failed and finalized
+ session1.setState(FAILED);
+ sessions.save(session1);
+ session2.setState(FINALIZED);
+ sessions.save(session2);
+
+ stats = sessions.getPendingStats(cfm.id, Collections.singleton(FULL_RANGE));
+ Assert.assertEquals(3, stats.total.numSSTables);
+ Assert.assertEquals(0, stats.pending.numSSTables);
+ Assert.assertEquals(1, stats.failed.numSSTables);
+ Assert.assertEquals(2, stats.finalized.numSSTables);
+
+ // remove sstables from pending sets
+ mutateRepaired(sstable1, UNREPAIRED_SSTABLE, NO_PENDING_REPAIR);
+ mutateRepaired(sstable2, session2.repairedAt, NO_PENDING_REPAIR);
+ mutateRepaired(sstable3, session2.repairedAt, NO_PENDING_REPAIR);
+
+ stats = sessions.getPendingStats(cfm.id, Collections.singleton(FULL_RANGE));
+ Assert.assertTrue(stats.total.sessions.isEmpty());
+ }
+}
diff --git a/test/unit/org/apache/cassandra/repair/consistent/RepairStateTest.java b/test/unit/org/apache/cassandra/repair/consistent/RepairStateTest.java
new file mode 100644
index 0000000..db425de
--- /dev/null
+++ b/test/unit/org/apache/cassandra/repair/consistent/RepairStateTest.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.repair.consistent;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import com.google.common.collect.Lists;
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.cassandra.dht.Murmur3Partitioner;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+
+import static org.apache.cassandra.repair.consistent.RepairedState.getRepairedStats;
+
+public class RepairStateTest
+{
+ private static Token tk(long t)
+ {
+ return new Murmur3Partitioner.LongToken(t);
+ }
+
+ private static Range<Token> range(long left, long right)
+ {
+ return new Range<>(tk(left), tk(right));
+ }
+
+ private static List<Range<Token>> ranges(long... tokens)
+ {
+ assert tokens.length %2 == 0;
+ List<Range<Token>> ranges = new ArrayList<>();
+ for (int i=0; i<tokens.length; i+=2)
+ {
+ ranges.add(range(tokens[i], tokens[i+1]));
+
+ }
+ return ranges;
+ }
+
+ private static RepairedState.Level level(Collection<Range<Token>> ranges, long repairedAt)
+ {
+ return new RepairedState.Level(ranges, repairedAt);
+ }
+
+ private static RepairedState.Section sect(Range<Token> range, long repairedAt)
+ {
+ return new RepairedState.Section(range, repairedAt);
+ }
+
+ private static RepairedState.Section sect(int l, int r, long time)
+ {
+ return sect(range(l, r), time);
+ }
+
+ private static <T> List<T> l(T... contents)
+ {
+ return Lists.newArrayList(contents);
+ }
+
+ @Test
+ public void mergeOverlapping()
+ {
+ RepairedState repairs = new RepairedState();
+
+ repairs.add(ranges(100, 300), 5);
+ repairs.add(ranges(200, 400), 6);
+
+ RepairedState.State state = repairs.state();
+ Assert.assertEquals(l(level(ranges(200, 400), 6), level(ranges(100, 200), 5)), state.levels);
+ Assert.assertEquals(l(sect(range(100, 200), 5), sect(range(200, 400), 6)), state.sections);
+ Assert.assertEquals(ranges(100, 400), state.covered);
+ }
+
+ @Test
+ public void mergeSameRange()
+ {
+ RepairedState repairs = new RepairedState();
+
+ repairs.add(ranges(100, 400), 5);
+ repairs.add(ranges(100, 400), 6);
+
+ RepairedState.State state = repairs.state();
+ Assert.assertEquals(l(level(ranges(100, 400), 6)), state.levels);
+ Assert.assertEquals(l(sect(range(100, 400), 6)), state.sections);
+ Assert.assertEquals(ranges(100, 400), state.covered);
+ }
+
+ @Test
+ public void mergeLargeRange()
+ {
+ RepairedState repairs = new RepairedState();
+
+ repairs.add(ranges(200, 300), 5);
+ repairs.add(ranges(100, 400), 6);
+
+ RepairedState.State state = repairs.state();
+ Assert.assertEquals(l(level(ranges(100, 400), 6)), state.levels);
+ Assert.assertEquals(l(sect(range(100, 400), 6)), state.sections);
+ Assert.assertEquals(ranges(100, 400), state.covered);
+ }
+
+ @Test
+ public void mergeSmallRange()
+ {
+ RepairedState repairs = new RepairedState();
+
+ repairs.add(ranges(100, 400), 5);
+ repairs.add(ranges(200, 300), 6);
+
+ RepairedState.State state = repairs.state();
+ Assert.assertEquals(l(level(ranges(200, 300), 6), level(ranges(100, 200, 300, 400), 5)), state.levels);
+ Assert.assertEquals(l(sect(range(100, 200), 5), sect(range(200, 300), 6), sect(range(300, 400), 5)), state.sections);
+ Assert.assertEquals(ranges(100, 400), state.covered);
+ }
+
+
+ @Test
+ public void repairedAt()
+ {
+ RepairedState rs;
+
+ // overlapping
+ rs = new RepairedState();
+ rs.add(ranges(100, 300), 5);
+ rs.add(ranges(200, 400), 6);
+
+ Assert.assertEquals(5, rs.minRepairedAt(ranges(150, 250)));
+ Assert.assertEquals(5, rs.minRepairedAt(ranges(150, 160)));
+ Assert.assertEquals(5, rs.minRepairedAt(ranges(100, 200)));
+ Assert.assertEquals(6, rs.minRepairedAt(ranges(200, 400)));
+ Assert.assertEquals(0, rs.minRepairedAt(ranges(200, 401)));
+ Assert.assertEquals(0, rs.minRepairedAt(ranges(99, 200)));
+ Assert.assertEquals(0, rs.minRepairedAt(ranges(50, 450)));
+ Assert.assertEquals(0, rs.minRepairedAt(ranges(50, 60)));
+ Assert.assertEquals(0, rs.minRepairedAt(ranges(450, 460)));
+ }
+
+ @Test
+ public void stats()
+ {
+ Assert.assertEquals(l(sect(100, 200, 5), sect(200, 300, 0), sect(300, 400, 5)),
+ getRepairedStats(l(sect(100, 200, 5), sect(300, 400, 5)), ranges(100, 400)));
+
+ Assert.assertEquals(l(sect(100, 200, 0), sect(200, 300, 5), sect(300, 400, 0)),
+ getRepairedStats(l(sect(200, 300, 5)), ranges(100, 400)));
+
+ Assert.assertEquals(l(sect(200, 300, 5)), getRepairedStats(l(sect(200, 300, 5)), ranges(200, 300)));
+ }
+}
diff --git a/test/unit/org/apache/cassandra/repair/consistent/admin/SchemaArgsParserTest.java b/test/unit/org/apache/cassandra/repair/consistent/admin/SchemaArgsParserTest.java
new file mode 100644
index 0000000..9d98c9d
--- /dev/null
+++ b/test/unit/org/apache/cassandra/repair/consistent/admin/SchemaArgsParserTest.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.repair.consistent.admin;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import com.google.common.collect.Sets;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.cql3.statements.schema.CreateTableStatement;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.schema.KeyspaceParams;
+import org.apache.cassandra.schema.Schema;
+import org.apache.cassandra.schema.TableMetadata;
+import org.assertj.core.util.Lists;
+
+public class SchemaArgsParserTest
+{
+ private static final String KEYSPACE = "schemaargsparsertest";
+ private static final int NUM_TBL = 3;
+ private static TableMetadata[] cfm = new TableMetadata[NUM_TBL];
+ private static ColumnFamilyStore[] cfs = new ColumnFamilyStore[NUM_TBL];
+
+ @BeforeClass
+ public static void setupClass()
+ {
+ SchemaLoader.prepareServer();
+ for (int i=0; i<NUM_TBL; i++)
+ cfm[i] = CreateTableStatement.parse("CREATE TABLE tbl" + i + " (k INT PRIMARY KEY, v INT)", KEYSPACE).build();
+ SchemaLoader.createKeyspace(KEYSPACE, KeyspaceParams.simple(1), cfm);
+ for (int i=0; i<NUM_TBL; i++)
+ cfs[i] = Schema.instance.getColumnFamilyStoreInstance(cfm[i].id);
+ }
+
+ /**
+ * Specifying only the keyspace should return all tables in that keyspaces
+ */
+ @Test
+ public void keyspaceOnly()
+ {
+ Set<ColumnFamilyStore> tables = Sets.newHashSet(SchemaArgsParser.parse(KEYSPACE));
+ Assert.assertEquals(Sets.newHashSet(cfs), tables);
+ }
+
+ @Test
+ public void someTables()
+ {
+ Set<ColumnFamilyStore> tables = Sets.newHashSet(SchemaArgsParser.parse(KEYSPACE, "tbl1", "tbl2"));
+ Assert.assertEquals(Sets.newHashSet(cfs[1], cfs[2]), tables);
+ }
+
+ @Test
+ public void noKeyspace()
+ {
+ Set<ColumnFamilyStore> allTables = Sets.newHashSet(SchemaArgsParser.parse().iterator());
+ Assert.assertTrue(allTables.containsAll(Sets.newHashSet(cfs)));
+ }
+
+ @Test( expected = IllegalArgumentException.class )
+ public void invalidKeyspace()
+ {
+ Sets.newHashSet(SchemaArgsParser.parse("SOME_KEYSPACE"));
+ }
+
+ @Test( expected = IllegalArgumentException.class )
+ public void invalidTables()
+ {
+ Set<ColumnFamilyStore> tables = Sets.newHashSet(SchemaArgsParser.parse(KEYSPACE, "sometable"));
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org