You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2017/02/07 03:11:44 UTC
[3/7] cassandra git commit: Fix consistency of incrementally repaired
data
http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/src/java/org/apache/cassandra/service/ActiveRepairService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ActiveRepairService.java b/src/java/org/apache/cassandra/service/ActiveRepairService.java
index e7c6640..89e1954 100644
--- a/src/java/org/apache/cassandra/service/ActiveRepairService.java
+++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java
@@ -18,29 +18,31 @@
package org.apache.cassandra.service;
import java.io.IOException;
+import java.lang.management.ManagementFactory;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+
import com.google.common.base.Predicate;
-import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
import com.google.common.collect.Multimap;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.AbstractFuture;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
+
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.cassandra.concurrent.ScheduledExecutors;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.db.compaction.CompactionManager;
-import org.apache.cassandra.db.lifecycle.SSTableSet;
-import org.apache.cassandra.db.lifecycle.View;
import org.apache.cassandra.dht.Bounds;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
@@ -59,18 +61,17 @@ import org.apache.cassandra.net.IAsyncCallbackWithFailure;
import org.apache.cassandra.net.MessageIn;
import org.apache.cassandra.net.MessageOut;
import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.repair.AnticompactionTask;
import org.apache.cassandra.repair.RepairJobDesc;
import org.apache.cassandra.repair.RepairParallelism;
import org.apache.cassandra.repair.RepairSession;
+import org.apache.cassandra.repair.consistent.CoordinatorSessions;
+import org.apache.cassandra.repair.consistent.LocalSessions;
import org.apache.cassandra.repair.messages.*;
import org.apache.cassandra.schema.TableId;
import org.apache.cassandra.utils.CassandraVersion;
import org.apache.cassandra.utils.Clock;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.UUIDGen;
-import org.apache.cassandra.utils.concurrent.Ref;
-import org.apache.cassandra.utils.concurrent.Refs;
/**
* ActiveRepairService is the starting point for manual "active" repairs.
@@ -86,7 +87,7 @@ import org.apache.cassandra.utils.concurrent.Refs;
* The creation of a repair session is done through the submitRepairSession that
* returns a future on the completion of that session.
*/
-public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFailureDetectionEventListener
+public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFailureDetectionEventListener, ActiveRepairServiceMBean
{
/**
* @deprecated this statuses are from the previous JMX notification service,
@@ -98,6 +99,15 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai
{
STARTED, SESSION_SUCCESS, SESSION_FAILED, FINISHED
}
+
+ public class ConsistentSessions
+ {
+ public final LocalSessions local = new LocalSessions();
+ public final CoordinatorSessions coordinated = new CoordinatorSessions();
+ }
+
+ public final ConsistentSessions consistent = new ConsistentSessions();
+
private boolean registeredForEndpointChanges = false;
public static CassandraVersion SUPPORTS_GLOBAL_PREPARE_FLAG_VERSION = new CassandraVersion("2.2.1");
@@ -107,6 +117,7 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai
public static final ActiveRepairService instance = new ActiveRepairService(FailureDetector.instance, Gossiper.instance);
public static final long UNREPAIRED_SSTABLE = 0;
+ public static final UUID NO_PENDING_REPAIR = null;
/**
* A map of active coordinator session.
@@ -122,6 +133,37 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai
{
this.failureDetector = failureDetector;
this.gossiper = gossiper;
+
+ MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+ try
+ {
+ mbs.registerMBean(this, new ObjectName(MBEAN_NAME));
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public void start()
+ {
+ consistent.local.start();
+ ScheduledExecutors.optionalTasks.scheduleAtFixedRate(consistent.local::cleanup, 0,
+ LocalSessions.CLEANUP_INTERVAL,
+ TimeUnit.SECONDS);
+ }
+
+ @Override
+ public List<Map<String, String>> getSessions(boolean all)
+ {
+ return consistent.local.sessionInfo(all);
+ }
+
+ @Override
+ public void failSession(String session, boolean force)
+ {
+ UUID sessionID = UUID.fromString(session);
+ consistent.local.cancelSession(sessionID, force);
}
/**
@@ -135,6 +177,7 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai
RepairParallelism parallelismDegree,
Set<InetAddress> endpoints,
long repairedAt,
+ boolean isConsistent,
boolean pullRepair,
ListeningExecutorService executor,
String... cfnames)
@@ -145,7 +188,7 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai
if (cfnames.length == 0)
return null;
- final RepairSession session = new RepairSession(parentRepairSession, UUIDGen.getTimeUUID(), range, keyspace, parallelismDegree, endpoints, repairedAt, pullRepair, cfnames);
+ final RepairSession session = new RepairSession(parentRepairSession, UUIDGen.getTimeUUID(), range, keyspace, parallelismDegree, endpoints, repairedAt, isConsistent, pullRepair, cfnames);
sessions.put(session.getId(), session);
// register listeners
@@ -283,8 +326,9 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai
public synchronized UUID prepareForRepair(UUID parentRepairSession, InetAddress coordinator, Set<InetAddress> endpoints, RepairOption options, List<ColumnFamilyStore> columnFamilyStores)
{
- long timestamp = Clock.instance.currentTimeMillis();
- registerParentRepairSession(parentRepairSession, coordinator, columnFamilyStores, options.getRanges(), options.isIncremental(), timestamp, options.isGlobal());
+ // we only want repairedAt for incremental repairs, for non incremental repairs, UNREPAIRED_SSTABLE will preserve repairedAt on streamed sstables
+ long repairedAt = options.isIncremental() ? Clock.instance.currentTimeMillis() : ActiveRepairService.UNREPAIRED_SSTABLE;
+ registerParentRepairSession(parentRepairSession, coordinator, columnFamilyStores, options.getRanges(), options.isIncremental(), repairedAt, options.isGlobal());
final CountDownLatch prepareLatch = new CountDownLatch(endpoints.size());
final AtomicBoolean status = new AtomicBoolean(true);
final Set<String> failedNodes = Collections.synchronizedSet(new HashSet<String>());
@@ -316,7 +360,7 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai
{
if (FailureDetector.instance.isAlive(neighbour))
{
- PrepareMessage message = new PrepareMessage(parentRepairSession, tableIds, options.getRanges(), options.isIncremental(), timestamp, options.isGlobal());
+ PrepareMessage message = new PrepareMessage(parentRepairSession, tableIds, options.getRanges(), options.isIncremental(), repairedAt, options.isGlobal());
MessageOut<RepairMessage> msg = message.createMessage();
MessagingService.instance().sendRR(msg, neighbour, callback, TimeUnit.HOURS.toMillis(1), true);
}
@@ -346,8 +390,9 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai
return parentRepairSession;
}
- public void registerParentRepairSession(UUID parentRepairSession, InetAddress coordinator, List<ColumnFamilyStore> columnFamilyStores, Collection<Range<Token>> ranges, boolean isIncremental, long timestamp, boolean isGlobal)
+ public void registerParentRepairSession(UUID parentRepairSession, InetAddress coordinator, List<ColumnFamilyStore> columnFamilyStores, Collection<Range<Token>> ranges, boolean isIncremental, long repairedAt, boolean isGlobal)
{
+ assert isIncremental || repairedAt == ActiveRepairService.UNREPAIRED_SSTABLE;
if (!registeredForEndpointChanges)
{
Gossiper.instance.register(this);
@@ -355,41 +400,7 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai
registeredForEndpointChanges = true;
}
- parentRepairSessions.put(parentRepairSession, new ParentRepairSession(coordinator, columnFamilyStores, ranges, isIncremental, timestamp, isGlobal));
- }
-
- public Set<SSTableReader> currentlyRepairing(TableId tableId, UUID parentRepairSession)
- {
- Set<SSTableReader> repairing = new HashSet<>();
- for (Map.Entry<UUID, ParentRepairSession> entry : parentRepairSessions.entrySet())
- {
- Collection<SSTableReader> sstables = entry.getValue().getActiveSSTables(tableId);
- if (sstables != null && !entry.getKey().equals(parentRepairSession))
- repairing.addAll(sstables);
- }
- return repairing;
- }
-
- /**
- * Run final process of repair.
- * This removes all resources held by parent repair session, after performing anti compaction if necessary.
- *
- * @param parentSession Parent session ID
- * @param neighbors Repair participants (not including self)
- * @param successfulRanges Ranges that repaired successfully
- */
- public synchronized ListenableFuture finishParentSession(UUID parentSession, Set<InetAddress> neighbors, Collection<Range<Token>> successfulRanges)
- {
- List<ListenableFuture<?>> tasks = new ArrayList<>(neighbors.size() + 1);
- for (InetAddress neighbor : neighbors)
- {
- AnticompactionTask task = new AnticompactionTask(parentSession, neighbor, successfulRanges);
- registerOnFdAndGossip(task);
- tasks.add(task);
- task.run(); // 'run' is just sending message
- }
- tasks.add(doAntiCompaction(parentSession, successfulRanges));
- return Futures.successfulAsList(tasks);
+ parentRepairSessions.put(parentRepairSession, new ParentRepairSession(coordinator, columnFamilyStores, ranges, isIncremental, repairedAt, isGlobal));
}
public ParentRepairSession getParentRepairSession(UUID parentSessionId)
@@ -422,53 +433,6 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai
return parentRepairSessions.remove(parentSessionId);
}
- /**
- * Submit anti-compaction jobs to CompactionManager.
- * When all jobs are done, parent repair session is removed whether those are suceeded or not.
- *
- * @param parentRepairSession parent repair session ID
- * @return Future result of all anti-compaction jobs.
- */
- @SuppressWarnings("resource")
- public ListenableFuture<List<Object>> doAntiCompaction(final UUID parentRepairSession, Collection<Range<Token>> successfulRanges)
- {
- assert parentRepairSession != null;
- ParentRepairSession prs = getParentRepairSession(parentRepairSession);
- //A repair will be marked as not global if it is a subrange repair to avoid many small anti-compactions
- //in addition to other scenarios such as repairs not involving all DCs or hosts
- if (!prs.isGlobal)
- {
- logger.info("[repair #{}] Not a global repair, will not do anticompaction", parentRepairSession);
- removeParentRepairSession(parentRepairSession);
- return Futures.immediateFuture(Collections.emptyList());
- }
- assert prs.ranges.containsAll(successfulRanges) : "Trying to perform anticompaction on unknown ranges";
-
- List<ListenableFuture<?>> futures = new ArrayList<>();
- // if we don't have successful repair ranges, then just skip anticompaction
- if (!successfulRanges.isEmpty())
- {
- for (Map.Entry<TableId, ColumnFamilyStore> columnFamilyStoreEntry : prs.columnFamilyStores.entrySet())
- {
- Refs<SSTableReader> sstables = prs.getActiveRepairedSSTableRefsForAntiCompaction(columnFamilyStoreEntry.getKey(), parentRepairSession);
- ColumnFamilyStore cfs = columnFamilyStoreEntry.getValue();
- futures.add(CompactionManager.instance.submitAntiCompaction(cfs, successfulRanges, sstables, prs.repairedAt, parentRepairSession));
- }
- }
-
- ListenableFuture<List<Object>> allAntiCompactionResults = Futures.successfulAsList(futures);
- allAntiCompactionResults.addListener(new Runnable()
- {
- @Override
- public void run()
- {
- removeParentRepairSession(parentRepairSession);
- }
- }, MoreExecutors.directExecutor());
-
- return allAntiCompactionResults;
- }
-
public void handleMessage(InetAddress endpoint, RepairMessage message)
{
RepairJobDesc desc = message.desc;
@@ -495,27 +459,15 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai
* We keep a ParentRepairSession around for the duration of the entire repair, for example, on a 256 token vnode rf=3 cluster
* we would have 768 RepairSession but only one ParentRepairSession. We use the PRS to avoid anticompacting the sstables
* 768 times, instead we take all repaired ranges at the end of the repair and anticompact once.
- *
- * We do an optimistic marking of sstables - when we start an incremental repair we mark all unrepaired sstables as
- * repairing (@see markSSTablesRepairing), then while the repair is ongoing compactions might remove those sstables,
- * and when it is time for anticompaction we will only anticompact the sstables that are still on disk.
- *
- * Note that validation and streaming do not care about which sstables we have marked as repairing - they operate on
- * all unrepaired sstables (if it is incremental), otherwise we would not get a correct repair.
*/
public static class ParentRepairSession
{
private final Map<TableId, ColumnFamilyStore> columnFamilyStores = new HashMap<>();
private final Collection<Range<Token>> ranges;
- public final Map<TableId, Set<String>> sstableMap = new HashMap<>();
public final boolean isIncremental;
public final boolean isGlobal;
public final long repairedAt;
public final InetAddress coordinator;
- /**
- * Indicates whether we have marked sstables as repairing. Can only be done once per table per ParentRepairSession
- */
- private final Set<TableId> marked = new HashSet<>();
public ParentRepairSession(InetAddress coordinator, List<ColumnFamilyStore> columnFamilyStores, Collection<Range<Token>> ranges, boolean isIncremental, long repairedAt, boolean isGlobal)
{
@@ -523,7 +475,6 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai
for (ColumnFamilyStore cfs : columnFamilyStores)
{
this.columnFamilyStores.put(cfs.metadata.id, cfs);
- sstableMap.put(cfs.metadata.id, new HashSet<>());
}
this.ranges = ranges;
this.repairedAt = repairedAt;
@@ -531,97 +482,6 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai
this.isGlobal = isGlobal;
}
- /**
- * Mark sstables repairing - either all sstables or only the unrepaired ones depending on
- *
- * whether this is an incremental or full repair
- *
- * @param tableId the table
- * @param parentSessionId the parent repair session id, used to make sure we don't start multiple repairs over the same sstables
- */
- public synchronized void markSSTablesRepairing(TableId tableId, UUID parentSessionId)
- {
- if (!marked.contains(tableId))
- {
- List<SSTableReader> sstables = columnFamilyStores.get(tableId).select(View.select(SSTableSet.CANONICAL, (s) -> !isIncremental || !s.isRepaired())).sstables;
- Set<SSTableReader> currentlyRepairing = ActiveRepairService.instance.currentlyRepairing(tableId, parentSessionId);
- if (!Sets.intersection(currentlyRepairing, Sets.newHashSet(sstables)).isEmpty())
- {
- logger.error("Cannot start multiple repair sessions over the same sstables");
- throw new RuntimeException("Cannot start multiple repair sessions over the same sstables");
- }
- addSSTables(tableId, sstables);
- marked.add(tableId);
- }
- }
-
- /**
- * Get the still active sstables we should run anticompaction on
- *
- * note that validation and streaming do not call this method - they have to work on the actual active sstables on the node, we only call this
- * to know which sstables are still there that were there when we started the repair
- *
- * @param tableId
- * @param parentSessionId for checking if there exists a snapshot for this repair
- * @return
- */
- @SuppressWarnings("resource")
- public synchronized Refs<SSTableReader> getActiveRepairedSSTableRefsForAntiCompaction(TableId tableId, UUID parentSessionId)
- {
- assert marked.contains(tableId);
- if (!columnFamilyStores.containsKey(tableId))
- throw new RuntimeException("Not possible to get sstables for anticompaction for " + tableId);
- boolean isSnapshotRepair = columnFamilyStores.get(tableId).snapshotExists(parentSessionId.toString());
- ImmutableMap.Builder<SSTableReader, Ref<SSTableReader>> references = ImmutableMap.builder();
- Iterable<SSTableReader> sstables = isSnapshotRepair ? getSSTablesForSnapshotRepair(tableId, parentSessionId) : getActiveSSTables(tableId);
- // we check this above - if columnFamilyStores contains the tableId sstables will not be null
- assert sstables != null;
- for (SSTableReader sstable : sstables)
- {
- Ref<SSTableReader> ref = sstable.tryRef();
- if (ref == null)
- sstableMap.get(tableId).remove(sstable.getFilename());
- else
- references.put(sstable, ref);
- }
- return new Refs<>(references.build());
- }
-
- /**
- * If we are running a snapshot repair we need to find the 'real' sstables when we start anticompaction
- *
- * We use the generation of the sstables as identifiers instead of the file name to avoid having to parse out the
- * actual filename.
- *
- * @param tableId
- * @param parentSessionId
- * @return
- */
- private Set<SSTableReader> getSSTablesForSnapshotRepair(TableId tableId, UUID parentSessionId)
- {
- Set<SSTableReader> activeSSTables = new HashSet<>();
- ColumnFamilyStore cfs = columnFamilyStores.get(tableId);
- if (cfs == null)
- return null;
-
- Set<Integer> snapshotGenerations = new HashSet<>();
- try (Refs<SSTableReader> snapshottedSSTables = cfs.getSnapshotSSTableReader(parentSessionId.toString()))
- {
- for (SSTableReader sstable : snapshottedSSTables)
- {
- snapshotGenerations.add(sstable.descriptor.generation);
- }
- }
- catch (IOException e)
- {
- throw new RuntimeException(e);
- }
- for (SSTableReader sstable : cfs.getSSTables(SSTableSet.CANONICAL))
- if (snapshotGenerations.contains(sstable.descriptor.generation))
- activeSSTables.add(sstable);
- return activeSSTables;
- }
-
public synchronized void maybeSnapshot(TableId tableId, UUID parentSessionId)
{
String snapshotName = parentSessionId.toString();
@@ -637,75 +497,29 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai
new Bounds<>(sstable.first.getToken(), sstable.last.getToken()).intersects(ranges);
}
}, true, false);
-
- if (isAlreadyRepairing(tableId, parentSessionId, snapshottedSSTables))
- {
- columnFamilyStores.get(tableId).clearSnapshot(snapshotName);
- logger.error("Cannot start multiple repair sessions over the same sstables");
- throw new RuntimeException("Cannot start multiple repair sessions over the same sstables");
- }
- addSSTables(tableId, snapshottedSSTables);
- marked.add(tableId);
}
}
-
- /**
- * Compares other repairing sstables *generation* to the ones we just snapshotted
- *
- * we compare generations since the sstables have different paths due to snapshot names
- *
- * @param tableId id of table store
- * @param parentSessionId parent repair session
- * @param sstables the newly snapshotted sstables
- * @return
- */
- private boolean isAlreadyRepairing(TableId tableId, UUID parentSessionId, Collection<SSTableReader> sstables)
+ public long getRepairedAt()
{
- Set<SSTableReader> currentlyRepairing = ActiveRepairService.instance.currentlyRepairing(tableId, parentSessionId);
- Set<Integer> currentlyRepairingGenerations = new HashSet<>();
- Set<Integer> newRepairingGenerations = new HashSet<>();
- for (SSTableReader sstable : currentlyRepairing)
- currentlyRepairingGenerations.add(sstable.descriptor.generation);
- for (SSTableReader sstable : sstables)
- newRepairingGenerations.add(sstable.descriptor.generation);
-
- return !Sets.intersection(currentlyRepairingGenerations, newRepairingGenerations).isEmpty();
+ if (isGlobal)
+ return repairedAt;
+ return ActiveRepairService.UNREPAIRED_SSTABLE;
}
- private Set<SSTableReader> getActiveSSTables(TableId tableId)
+ public Collection<ColumnFamilyStore> getColumnFamilyStores()
{
- if (!columnFamilyStores.containsKey(tableId))
- return null;
-
- Set<String> repairedSSTables = sstableMap.get(tableId);
- Set<SSTableReader> activeSSTables = new HashSet<>();
- Set<String> activeSSTableNames = new HashSet<>();
- ColumnFamilyStore cfs = columnFamilyStores.get(tableId);
- for (SSTableReader sstable : cfs.getSSTables(SSTableSet.CANONICAL))
- {
- if (repairedSSTables.contains(sstable.getFilename()))
- {
- activeSSTables.add(sstable);
- activeSSTableNames.add(sstable.getFilename());
- }
- }
- sstableMap.put(tableId, activeSSTableNames);
- return activeSSTables;
+ return ImmutableSet.<ColumnFamilyStore>builder().addAll(columnFamilyStores.values()).build();
}
- private void addSSTables(TableId tableId, Collection<SSTableReader> sstables)
+ public Set<TableId> getTableIds()
{
- for (SSTableReader sstable : sstables)
- sstableMap.get(tableId).add(sstable.getFilename());
+ return ImmutableSet.copyOf(Iterables.transform(getColumnFamilyStores(), cfs -> cfs.metadata.id));
}
-
- public long getRepairedAt()
+ public Collection<Range<Token>> getRanges()
{
- if (isGlobal)
- return repairedAt;
- return ActiveRepairService.UNREPAIRED_SSTABLE;
+ return ImmutableSet.copyOf(ranges);
}
@Override
@@ -714,7 +528,6 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai
return "ParentRepairSession{" +
"columnFamilyStores=" + columnFamilyStores +
", ranges=" + ranges +
- ", sstableMap=" + sstableMap +
", repairedAt=" + repairedAt +
'}';
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/src/java/org/apache/cassandra/service/ActiveRepairServiceMBean.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ActiveRepairServiceMBean.java b/src/java/org/apache/cassandra/service/ActiveRepairServiceMBean.java
new file mode 100644
index 0000000..53b0acb
--- /dev/null
+++ b/src/java/org/apache/cassandra/service/ActiveRepairServiceMBean.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service;
+
+import java.util.List;
+import java.util.Map;
+
+public interface ActiveRepairServiceMBean
+{
+ public static final String MBEAN_NAME = "org.apache.cassandra.db:type=RepairService";
+
+ public List<Map<String, String>> getSessions(boolean all);
+ public void failSession(String session, boolean force);
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/src/java/org/apache/cassandra/service/CassandraDaemon.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/CassandraDaemon.java b/src/java/org/apache/cassandra/service/CassandraDaemon.java
index fe84082..03156ae 100644
--- a/src/java/org/apache/cassandra/service/CassandraDaemon.java
+++ b/src/java/org/apache/cassandra/service/CassandraDaemon.java
@@ -322,6 +322,7 @@ public class CassandraDaemon
}
SystemKeyspace.finishStartup();
+ ActiveRepairService.instance.start();
// Prepared statements
QueryProcessor.preloadPreparedStatement();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/ConnectionHandler.java b/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
index 556748d..10c5827 100644
--- a/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
+++ b/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
@@ -198,7 +198,8 @@ public class ConnectionHandler
session.description(),
!isOutgoingHandler,
session.keepSSTableLevel(),
- session.isIncremental());
+ session.isIncremental(),
+ session.getPendingRepair());
ByteBuffer messageBuf = message.createMessage(false, protocolVersion);
DataOutputStreamPlus out = getWriteChannel(socket);
out.write(messageBuf);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/src/java/org/apache/cassandra/streaming/StreamCoordinator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamCoordinator.java b/src/java/org/apache/cassandra/streaming/StreamCoordinator.java
index 2cb75f7..81d0498 100644
--- a/src/java/org/apache/cassandra/streaming/StreamCoordinator.java
+++ b/src/java/org/apache/cassandra/streaming/StreamCoordinator.java
@@ -49,15 +49,17 @@ public class StreamCoordinator
private final boolean keepSSTableLevel;
private final boolean isIncremental;
private Iterator<StreamSession> sessionsToConnect = null;
+ private final UUID pendingRepair;
public StreamCoordinator(int connectionsPerHost, boolean keepSSTableLevel, boolean isIncremental,
- StreamConnectionFactory factory, boolean connectSequentially)
+ StreamConnectionFactory factory, boolean connectSequentially, UUID pendingRepair)
{
this.connectionsPerHost = connectionsPerHost;
this.factory = factory;
this.keepSSTableLevel = keepSSTableLevel;
this.isIncremental = isIncremental;
this.connectSequentially = connectSequentially;
+ this.pendingRepair = pendingRepair;
}
public void setConnectionFactory(StreamConnectionFactory factory)
@@ -288,7 +290,7 @@ public class StreamCoordinator
// create
if (streamSessions.size() < connectionsPerHost)
{
- StreamSession session = new StreamSession(peer, connecting, factory, streamSessions.size(), keepSSTableLevel, isIncremental);
+ StreamSession session = new StreamSession(peer, connecting, factory, streamSessions.size(), keepSSTableLevel, isIncremental, pendingRepair);
streamSessions.put(++lastReturned, session);
return session;
}
@@ -320,7 +322,7 @@ public class StreamCoordinator
StreamSession session = streamSessions.get(id);
if (session == null)
{
- session = new StreamSession(peer, connecting, factory, id, keepSSTableLevel, isIncremental);
+ session = new StreamSession(peer, connecting, factory, id, keepSSTableLevel, isIncremental, pendingRepair);
streamSessions.put(id, session);
}
return session;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/src/java/org/apache/cassandra/streaming/StreamPlan.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamPlan.java b/src/java/org/apache/cassandra/streaming/StreamPlan.java
index e9d43cb..5526da8 100644
--- a/src/java/org/apache/cassandra/streaming/StreamPlan.java
+++ b/src/java/org/apache/cassandra/streaming/StreamPlan.java
@@ -48,21 +48,21 @@ public class StreamPlan
*/
public StreamPlan(String description)
{
- this(description, ActiveRepairService.UNREPAIRED_SSTABLE, 1, false, false, false);
+ this(description, ActiveRepairService.UNREPAIRED_SSTABLE, 1, false, false, false, null);
}
public StreamPlan(String description, boolean keepSSTableLevels, boolean connectSequentially)
{
- this(description, ActiveRepairService.UNREPAIRED_SSTABLE, 1, keepSSTableLevels, false, connectSequentially);
+ this(description, ActiveRepairService.UNREPAIRED_SSTABLE, 1, keepSSTableLevels, false, connectSequentially, null);
}
public StreamPlan(String description, long repairedAt, int connectionsPerHost, boolean keepSSTableLevels,
- boolean isIncremental, boolean connectSequentially)
+ boolean isIncremental, boolean connectSequentially, UUID pendingRepair)
{
this.description = description;
this.repairedAt = repairedAt;
this.coordinator = new StreamCoordinator(connectionsPerHost, keepSSTableLevels, isIncremental, new DefaultConnectionFactory(),
- connectSequentially);
+ connectSequentially, pendingRepair);
}
/**
http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/src/java/org/apache/cassandra/streaming/StreamReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamReader.java b/src/java/org/apache/cassandra/streaming/StreamReader.java
index baf5ec9..fdc2ae2 100644
--- a/src/java/org/apache/cassandra/streaming/StreamReader.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReader.java
@@ -21,6 +21,7 @@ import java.io.*;
import java.nio.channels.Channels;
import java.nio.channels.ReadableByteChannel;
import java.util.Collection;
+import java.util.UUID;
import com.google.common.base.Throwables;
import com.google.common.collect.UnmodifiableIterator;
@@ -95,16 +96,16 @@ public class StreamReader
throw new IOException("CF " + tableId + " was dropped during streaming");
}
- logger.debug("[Stream #{}] Start receiving file #{} from {}, repairedAt = {}, size = {}, ks = '{}', table = '{}'.",
+ logger.debug("[Stream #{}] Start receiving file #{} from {}, repairedAt = {}, size = {}, ks = '{}', table = '{}', pendingRepair = '{}'.",
session.planId(), fileSeqNum, session.peer, repairedAt, totalSize, cfs.keyspace.getName(),
- cfs.getTableName());
+ cfs.getTableName(), session.getPendingRepair());
TrackedInputStream in = new TrackedInputStream(new LZFInputStream(Channels.newInputStream(channel)));
StreamDeserializer deserializer = new StreamDeserializer(cfs.metadata(), in, inputVersion, getHeader(cfs.metadata()));
SSTableMultiWriter writer = null;
try
{
- writer = createWriter(cfs, totalSize, repairedAt, format);
+ writer = createWriter(cfs, totalSize, repairedAt, session.getPendingRepair(), format);
while (in.getBytesRead() < totalSize)
{
writePartition(deserializer, writer);
@@ -138,13 +139,13 @@ public class StreamReader
return header != null? header.toHeader(metadata) : null; //pre-3.0 sstable have no SerializationHeader
}
- protected SSTableMultiWriter createWriter(ColumnFamilyStore cfs, long totalSize, long repairedAt, SSTableFormat.Type format) throws IOException
+ protected SSTableMultiWriter createWriter(ColumnFamilyStore cfs, long totalSize, long repairedAt, UUID pendingRepair, SSTableFormat.Type format) throws IOException
{
Directories.DataDirectory localDir = cfs.getDirectories().getWriteableLocation(totalSize);
if (localDir == null)
throw new IOException(String.format("Insufficient disk space to store %s", FBUtilities.prettyPrintMemory(totalSize)));
- RangeAwareSSTableWriter writer = new RangeAwareSSTableWriter(cfs, estimatedKeys, repairedAt, format, sstableLevel, totalSize, session.getTransaction(tableId), getHeader(cfs.metadata()));
+ RangeAwareSSTableWriter writer = new RangeAwareSSTableWriter(cfs, estimatedKeys, repairedAt, pendingRepair, format, sstableLevel, totalSize, session.getTransaction(tableId), getHeader(cfs.metadata()));
StreamHook.instance.reportIncomingFile(cfs, writer, session, fileSeqNum);
return writer;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/src/java/org/apache/cassandra/streaming/StreamResultFuture.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamResultFuture.java b/src/java/org/apache/cassandra/streaming/StreamResultFuture.java
index 61a1c8c..6d0c03b 100644
--- a/src/java/org/apache/cassandra/streaming/StreamResultFuture.java
+++ b/src/java/org/apache/cassandra/streaming/StreamResultFuture.java
@@ -71,10 +71,9 @@ public final class StreamResultFuture extends AbstractFuture<StreamState>
set(getCurrentState());
}
- private StreamResultFuture(UUID planId, String description, boolean keepSSTableLevels, boolean isIncremental)
+ private StreamResultFuture(UUID planId, String description, boolean keepSSTableLevels, boolean isIncremental, UUID pendingRepair)
{
- this(planId, description, new StreamCoordinator(0, keepSSTableLevels, isIncremental,
- new DefaultConnectionFactory(), false));
+ this(planId, description, new StreamCoordinator(0, keepSSTableLevels, isIncremental, new DefaultConnectionFactory(), false, pendingRepair));
}
static StreamResultFuture init(UUID planId, String description, Collection<StreamEventHandler> listeners,
@@ -108,7 +107,8 @@ public final class StreamResultFuture extends AbstractFuture<StreamState>
boolean isForOutgoing,
int version,
boolean keepSSTableLevel,
- boolean isIncremental) throws IOException
+ boolean isIncremental,
+ UUID pendingRepair) throws IOException
{
StreamResultFuture future = StreamManager.instance.getReceivingStream(planId);
if (future == null)
@@ -116,7 +116,7 @@ public final class StreamResultFuture extends AbstractFuture<StreamState>
logger.info("[Stream #{} ID#{}] Creating new streaming plan for {}", planId, sessionIndex, description);
// The main reason we create a StreamResultFuture on the receiving side is for JMX exposure.
- future = new StreamResultFuture(planId, description, keepSSTableLevel, isIncremental);
+ future = new StreamResultFuture(planId, description, keepSSTableLevel, isIncremental, pendingRepair);
StreamManager.instance.registerReceiving(future);
}
future.attachConnection(from, sessionIndex, connection, isForOutgoing, version);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/src/java/org/apache/cassandra/streaming/StreamSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamSession.java b/src/java/org/apache/cassandra/streaming/StreamSession.java
index faa05d1..b7db2b2 100644
--- a/src/java/org/apache/cassandra/streaming/StreamSession.java
+++ b/src/java/org/apache/cassandra/streaming/StreamSession.java
@@ -163,6 +163,7 @@ public class StreamSession implements IEndpointStateChangeSubscriber
private final boolean keepSSTableLevel;
private final boolean isIncremental;
private ScheduledFuture<?> keepAliveFuture = null;
+ private final UUID pendingRepair;
public static enum State
{
@@ -184,7 +185,7 @@ public class StreamSession implements IEndpointStateChangeSubscriber
* @param connecting Actual connecting address
* @param factory is used for establishing connection
*/
- public StreamSession(InetAddress peer, InetAddress connecting, StreamConnectionFactory factory, int index, boolean keepSSTableLevel, boolean isIncremental)
+ public StreamSession(InetAddress peer, InetAddress connecting, StreamConnectionFactory factory, int index, boolean keepSSTableLevel, boolean isIncremental, UUID pendingRepair)
{
this.peer = peer;
this.connecting = connecting;
@@ -196,6 +197,7 @@ public class StreamSession implements IEndpointStateChangeSubscriber
this.metrics = StreamingMetrics.get(connecting);
this.keepSSTableLevel = keepSSTableLevel;
this.isIncremental = isIncremental;
+ this.pendingRepair = pendingRepair;
}
public UUID planId()
@@ -223,6 +225,10 @@ public class StreamSession implements IEndpointStateChangeSubscriber
return isIncremental;
}
+ public UUID getPendingRepair()
+ {
+ return pendingRepair;
+ }
public LifecycleTransaction getTransaction(TableId tableId)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
index a15d2ff..d8e329c 100644
--- a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
+++ b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
@@ -73,7 +73,7 @@ public class CompressedStreamReader extends StreamReader
}
logger.debug("[Stream #{}] Start receiving file #{} from {}, repairedAt = {}, size = {}, ks = '{}', table = '{}'.",
- session.planId(), fileSeqNum, session.peer, repairedAt, totalSize, cfs.keyspace.getName(),
+ session.planId(), fileSeqNum, session.peer, repairedAt, totalSize, cfs.keyspace.getName(), session.getPendingRepair(),
cfs.getTableName());
CompressedInputStream cis = new CompressedInputStream(Channels.newInputStream(channel), compressionInfo,
@@ -84,7 +84,7 @@ public class CompressedStreamReader extends StreamReader
SSTableMultiWriter writer = null;
try
{
- writer = createWriter(cfs, totalSize, repairedAt, format);
+ writer = createWriter(cfs, totalSize, repairedAt, session.getPendingRepair(), format);
String filename = writer.getFilename();
int sectionIdx = 0;
for (Pair<Long, Long> section : sections)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java b/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java
index 6d807e9..3b4b512 100644
--- a/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java
+++ b/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java
@@ -49,8 +49,9 @@ public class StreamInitMessage
public final boolean isForOutgoing;
public final boolean keepSSTableLevel;
public final boolean isIncremental;
+ public final UUID pendingRepair;
- public StreamInitMessage(InetAddress from, int sessionIndex, UUID planId, String description, boolean isForOutgoing, boolean keepSSTableLevel, boolean isIncremental)
+ public StreamInitMessage(InetAddress from, int sessionIndex, UUID planId, String description, boolean isForOutgoing, boolean keepSSTableLevel, boolean isIncremental, UUID pendingRepair)
{
this.from = from;
this.sessionIndex = sessionIndex;
@@ -59,6 +60,7 @@ public class StreamInitMessage
this.isForOutgoing = isForOutgoing;
this.keepSSTableLevel = keepSSTableLevel;
this.isIncremental = isIncremental;
+ this.pendingRepair = pendingRepair;
}
/**
@@ -114,6 +116,12 @@ public class StreamInitMessage
out.writeBoolean(message.isForOutgoing);
out.writeBoolean(message.keepSSTableLevel);
out.writeBoolean(message.isIncremental);
+
+ out.writeBoolean(message.pendingRepair != null);
+ if (message.pendingRepair != null)
+ {
+ UUIDSerializer.serializer.serialize(message.pendingRepair, out, MessagingService.current_version);
+ }
}
public StreamInitMessage deserialize(DataInputPlus in, int version) throws IOException
@@ -124,8 +132,10 @@ public class StreamInitMessage
String description = in.readUTF();
boolean sentByInitiator = in.readBoolean();
boolean keepSSTableLevel = in.readBoolean();
+
boolean isIncremental = in.readBoolean();
- return new StreamInitMessage(from, sessionIndex, planId, description, sentByInitiator, keepSSTableLevel, isIncremental);
+ UUID pendingRepair = in.readBoolean() ? UUIDSerializer.serializer.deserialize(in, version) : null;
+ return new StreamInitMessage(from, sessionIndex, planId, description, sentByInitiator, keepSSTableLevel, isIncremental, pendingRepair);
}
public long serializedSize(StreamInitMessage message, int version)
@@ -137,6 +147,11 @@ public class StreamInitMessage
size += TypeSizes.sizeof(message.isForOutgoing);
size += TypeSizes.sizeof(message.keepSSTableLevel);
size += TypeSizes.sizeof(message.isIncremental);
+ size += TypeSizes.sizeof(message.pendingRepair != null);
+ if (message.pendingRepair != null)
+ {
+ size += UUIDSerializer.serializer.serializedSize(message.pendingRepair, MessagingService.current_version);
+ }
return size;
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/src/java/org/apache/cassandra/tools/NodeProbe.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/NodeProbe.java b/src/java/org/apache/cassandra/tools/NodeProbe.java
index 5463255..865665c 100644
--- a/src/java/org/apache/cassandra/tools/NodeProbe.java
+++ b/src/java/org/apache/cassandra/tools/NodeProbe.java
@@ -74,6 +74,7 @@ import org.apache.cassandra.metrics.TableMetrics;
import org.apache.cassandra.metrics.ThreadPoolMetrics;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.net.MessagingServiceMBean;
+import org.apache.cassandra.service.ActiveRepairServiceMBean;
import org.apache.cassandra.service.CacheService;
import org.apache.cassandra.service.CacheServiceMBean;
import org.apache.cassandra.service.GCInspector;
@@ -122,6 +123,7 @@ public class NodeProbe implements AutoCloseable
private StorageProxyMBean spProxy;
private HintedHandOffManagerMBean hhProxy;
private BatchlogManagerMBean bmProxy;
+ private ActiveRepairServiceMBean arsProxy;
private boolean failed;
/**
@@ -214,6 +216,8 @@ public class NodeProbe implements AutoCloseable
gossProxy = JMX.newMBeanProxy(mbeanServerConn, name, GossiperMBean.class);
name = new ObjectName(BatchlogManager.MBEAN_NAME);
bmProxy = JMX.newMBeanProxy(mbeanServerConn, name, BatchlogManagerMBean.class);
+ name = new ObjectName(ActiveRepairServiceMBean.MBEAN_NAME);
+ arsProxy = JMX.newMBeanProxy(mbeanServerConn, name, ActiveRepairServiceMBean.class);
}
catch (MalformedObjectNameException e)
{
@@ -1511,6 +1515,11 @@ public class NodeProbe implements AutoCloseable
throw new RuntimeException(e);
}
}
+
+ public ActiveRepairServiceMBean getRepairServiceProxy()
+ {
+ return arsProxy;
+ }
}
class ColumnFamilyStoreMBeanIterator implements Iterator<Map.Entry<String, ColumnFamilyStoreMBean>>
http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/src/java/org/apache/cassandra/tools/NodeTool.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/NodeTool.java b/src/java/org/apache/cassandra/tools/NodeTool.java
index 0c55f76..6812a27 100644
--- a/src/java/org/apache/cassandra/tools/NodeTool.java
+++ b/src/java/org/apache/cassandra/tools/NodeTool.java
@@ -99,6 +99,7 @@ public class NodeTool
RemoveNode.class,
Assassinate.class,
Repair.class,
+ RepairAdmin.class,
ReplayBatchlog.class,
SetCacheCapacity.class,
SetHintedHandoffThrottleInKB.class,
http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/src/java/org/apache/cassandra/tools/SSTableMetadataViewer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/SSTableMetadataViewer.java b/src/java/org/apache/cassandra/tools/SSTableMetadataViewer.java
index 019e053..63c7f96 100644
--- a/src/java/org/apache/cassandra/tools/SSTableMetadataViewer.java
+++ b/src/java/org/apache/cassandra/tools/SSTableMetadataViewer.java
@@ -135,6 +135,7 @@ public class SSTableMetadataViewer
out.printf("Estimated droppable tombstones: %s%n", stats.getEstimatedDroppableTombstoneRatio((int) (System.currentTimeMillis() / 1000) - gcgs));
out.printf("SSTable Level: %d%n", stats.sstableLevel);
out.printf("Repaired at: %d%n", stats.repairedAt);
+ out.printf("Pending repair: %s%n", stats.pendingRepair);
out.printf("Replay positions covered: %s%n", stats.commitLogIntervals);
out.printf("totalColumnsSet: %s%n", stats.totalColumnsSet);
out.printf("totalRows: %s%n", stats.totalRows);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/src/java/org/apache/cassandra/tools/SSTableRepairedAtSetter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/SSTableRepairedAtSetter.java b/src/java/org/apache/cassandra/tools/SSTableRepairedAtSetter.java
index a130177..8056ff8 100644
--- a/src/java/org/apache/cassandra/tools/SSTableRepairedAtSetter.java
+++ b/src/java/org/apache/cassandra/tools/SSTableRepairedAtSetter.java
@@ -27,7 +27,6 @@ import java.util.List;
import org.apache.cassandra.io.sstable.Component;
import org.apache.cassandra.io.sstable.Descriptor;
-import org.apache.cassandra.service.ActiveRepairService;
/**
* Set repairedAt status on a given set of sstables.
@@ -89,11 +88,11 @@ public class SSTableRepairedAtSetter
if (setIsRepaired)
{
FileTime f = Files.getLastModifiedTime(new File(descriptor.filenameFor(Component.DATA)).toPath());
- descriptor.getMetadataSerializer().mutateRepairedAt(descriptor, f.toMillis());
+ descriptor.getMetadataSerializer().mutateRepaired(descriptor, f.toMillis(), null);
}
else
{
- descriptor.getMetadataSerializer().mutateRepairedAt(descriptor, ActiveRepairService.UNREPAIRED_SSTABLE);
+ descriptor.getMetadataSerializer().mutateRepaired(descriptor, 0, null);
}
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/src/java/org/apache/cassandra/tools/nodetool/RepairAdmin.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/nodetool/RepairAdmin.java b/src/java/org/apache/cassandra/tools/nodetool/RepairAdmin.java
new file mode 100644
index 0000000..bb201a2
--- /dev/null
+++ b/src/java/org/apache/cassandra/tools/nodetool/RepairAdmin.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.tools.nodetool;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
+import io.airlift.command.Command;
+import io.airlift.command.Option;
+import org.apache.cassandra.repair.consistent.LocalSessionInfo;
+import org.apache.cassandra.service.ActiveRepairServiceMBean;
+import org.apache.cassandra.tools.NodeProbe;
+import org.apache.cassandra.tools.NodeTool;
+import org.apache.cassandra.utils.FBUtilities;
+
+/**
+ * Supports listing and failing incremental repair sessions
+ */
+@Command(name = "repair_admin", description = "list and fail incremental repair sessions")
+public class RepairAdmin extends NodeTool.NodeToolCmd
+{
+ @Option(title = "list", name = {"-l", "--list"}, description = "list repair sessions (default behavior)")
+ private boolean list = false;
+
+ @Option(title = "all", name = {"-a", "--all"}, description = "include completed and failed sessions")
+ private boolean all = false;
+
+ @Option(title = "cancel", name = {"-x", "--cancel"}, description = "cancel an incremental repair session")
+ private String cancel = null;
+
+ @Option(title = "force", name = {"-f", "--force"}, description = "cancel repair session from a node other than the repair coordinator." +
+ " Attempting to cancel FINALIZED or FAILED sessions is an error.")
+ private boolean force = false;
+
+ private static final List<String> header = Lists.newArrayList("id",
+ "state",
+ "last activity",
+ "coordinator",
+ "participants");
+
+
+ private List<String> sessionValues(Map<String, String> session, int now)
+ {
+ int updated = Integer.parseInt(session.get(LocalSessionInfo.LAST_UPDATE));
+ return Lists.newArrayList(session.get(LocalSessionInfo.SESSION_ID),
+ session.get(LocalSessionInfo.STATE),
+ Integer.toString(now - updated) + " (s)",
+ session.get(LocalSessionInfo.COORDINATOR),
+ session.get(LocalSessionInfo.PARTICIPANTS));
+ }
+
+ private void listSessions(ActiveRepairServiceMBean repairServiceProxy)
+ {
+ Preconditions.checkArgument(cancel == null);
+ Preconditions.checkArgument(!force, "-f/--force only valid for session cancel");
+ List<Map<String, String>> sessions = repairServiceProxy.getSessions(all);
+ if (sessions.isEmpty())
+ {
+ System.out.println("no sessions");
+
+ }
+ else
+ {
+ List<List<String>> rows = new ArrayList<>();
+ rows.add(header);
+ int now = FBUtilities.nowInSeconds();
+ for (Map<String, String> session : sessions)
+ {
+ rows.add(sessionValues(session, now));
+ }
+
+ // get max col widths
+ int[] widths = new int[header.size()];
+ for (List<String> row : rows)
+ {
+ assert row.size() == widths.length;
+ for (int i = 0; i < widths.length; i++)
+ {
+ widths[i] = Math.max(widths[i], row.get(i).length());
+ }
+ }
+
+ List<String> fmts = new ArrayList<>(widths.length);
+ for (int i = 0; i < widths.length; i++)
+ {
+ fmts.add("%-" + Integer.toString(widths[i]) + "s");
+ }
+
+
+ // print
+ for (List<String> row : rows)
+ {
+ List<String> formatted = new ArrayList<>(row.size());
+ for (int i = 0; i < widths.length; i++)
+ {
+ formatted.add(String.format(fmts.get(i), row.get(i)));
+ }
+ System.out.println(Joiner.on(" | ").join(formatted));
+ }
+ }
+ }
+
+ private void cancelSession(ActiveRepairServiceMBean repairServiceProxy)
+ {
+ Preconditions.checkArgument(!list);
+ Preconditions.checkArgument(!all, "-a/--all only valid for session list");
+ repairServiceProxy.failSession(cancel, force);
+ }
+
+ protected void execute(NodeProbe probe)
+ {
+ if (list && cancel != null)
+ {
+ throw new RuntimeException("Can either list, or cancel sessions, not both");
+ }
+ else if (cancel != null)
+ {
+ cancelSession(probe.getRepairServiceProxy());
+ }
+ else
+ {
+ // default
+ listSessions(probe.getRepairServiceProxy());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/test/unit/org/apache/cassandra/db/RepairedDataTombstonesTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/RepairedDataTombstonesTest.java b/test/unit/org/apache/cassandra/db/RepairedDataTombstonesTest.java
index b814ea6..e01088d 100644
--- a/test/unit/org/apache/cassandra/db/RepairedDataTombstonesTest.java
+++ b/test/unit/org/apache/cassandra/db/RepairedDataTombstonesTest.java
@@ -308,7 +308,7 @@ public class RepairedDataTombstonesTest extends CQLTester
public static void repair(ColumnFamilyStore cfs, SSTableReader sstable) throws IOException
{
- sstable.descriptor.getMetadataSerializer().mutateRepairedAt(sstable.descriptor, 1);
+ sstable.descriptor.getMetadataSerializer().mutateRepaired(sstable.descriptor, 1, null);
sstable.reloadSSTableMetadata();
cfs.getTracker().notifySSTableRepairedStatusChanged(Collections.singleton(sstable));
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/test/unit/org/apache/cassandra/db/ScrubTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/ScrubTest.java b/test/unit/org/apache/cassandra/db/ScrubTest.java
index d9f6433..f0873b9 100644
--- a/test/unit/org/apache/cassandra/db/ScrubTest.java
+++ b/test/unit/org/apache/cassandra/db/ScrubTest.java
@@ -642,7 +642,7 @@ public class ScrubTest
{
SerializationHeader header = new SerializationHeader(true, metadata.get(), metadata.get().regularAndStaticColumns(), EncodingStats.NO_STATS);
MetadataCollector collector = new MetadataCollector(metadata.get().comparator).sstableLevel(0);
- return new TestMultiWriter(new TestWriter(descriptor, keyCount, 0, metadata, collector, header, txn), txn);
+ return new TestMultiWriter(new TestWriter(descriptor, keyCount, 0, null, metadata, collector, header, txn), txn);
}
private static class TestMultiWriter extends SimpleSSTableMultiWriter
@@ -658,10 +658,10 @@ public class ScrubTest
*/
private static class TestWriter extends BigTableWriter
{
- TestWriter(Descriptor descriptor, long keyCount, long repairedAt, TableMetadataRef metadata,
+ TestWriter(Descriptor descriptor, long keyCount, long repairedAt, UUID pendingRepair, TableMetadataRef metadata,
MetadataCollector collector, SerializationHeader header, LifecycleTransaction txn)
{
- super(descriptor, keyCount, repairedAt, metadata, collector, header, Collections.emptySet(), txn);
+ super(descriptor, keyCount, repairedAt, pendingRepair, metadata, collector, header, Collections.emptySet(), txn);
}
@Override
http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/test/unit/org/apache/cassandra/db/compaction/AbstractPendingRepairTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/AbstractPendingRepairTest.java b/test/unit/org/apache/cassandra/db/compaction/AbstractPendingRepairTest.java
new file mode 100644
index 0000000..08be550
--- /dev/null
+++ b/test/unit/org/apache/cassandra/db/compaction/AbstractPendingRepairTest.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.compaction;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.UUID;
+
+import com.google.common.collect.Iterables;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.cql3.statements.CreateTableStatement;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.schema.Schema;
+import org.apache.cassandra.cql3.QueryProcessor;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.net.IMessageSink;
+import org.apache.cassandra.net.MessageIn;
+import org.apache.cassandra.net.MessageOut;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.repair.consistent.AbstractConsistentSessionTest;
+import org.apache.cassandra.repair.consistent.LocalSessionAccessor;
+import org.apache.cassandra.schema.KeyspaceParams;
+import org.apache.cassandra.service.ActiveRepairService;
+
+@Ignore
+public class AbstractPendingRepairTest extends AbstractConsistentSessionTest
+{
+ protected String ks;
+ protected final String tbl = "tbl";
+ protected TableMetadata cfm;
+ protected ColumnFamilyStore cfs;
+ protected CompactionStrategyManager csm;
+ protected static ActiveRepairService ARS;
+
+ private int nextSSTableKey = 0;
+
+ @BeforeClass
+ public static void setupClass()
+ {
+ SchemaLoader.prepareServer();
+ ARS = ActiveRepairService.instance;
+ LocalSessionAccessor.startup();
+
+ // cutoff messaging service
+ MessagingService.instance().addMessageSink(new IMessageSink()
+ {
+ public boolean allowOutgoingMessage(MessageOut message, int id, InetAddress to)
+ {
+ return false;
+ }
+
+ public boolean allowIncomingMessage(MessageIn message, int id)
+ {
+ return false;
+ }
+ });
+ }
+
+ @Before
+ public void setup()
+ {
+ ks = "ks_" + System.currentTimeMillis();
+ cfm = CreateTableStatement.parse(String.format("CREATE TABLE %s.%s (k INT PRIMARY KEY, v INT)", ks, tbl), ks).build();
+ SchemaLoader.createKeyspace(ks, KeyspaceParams.simple(1), cfm);
+ cfs = Schema.instance.getColumnFamilyStoreInstance(cfm.id);
+ csm = cfs.getCompactionStrategyManager();
+ nextSSTableKey = 0;
+ }
+
+ /**
+ * creates and returns an sstable
+ *
+ * @param orphan if true, the sstable will be removed from the unrepaired strategy
+ */
+ SSTableReader makeSSTable(boolean orphan)
+ {
+ int pk = nextSSTableKey++;
+ Set<SSTableReader> pre = cfs.getLiveSSTables();
+ QueryProcessor.executeInternal(String.format("INSERT INTO %s.%s (k, v) VALUES(?, ?)", ks, tbl), pk, pk);
+ cfs.forceBlockingFlush();
+ Set<SSTableReader> post = cfs.getLiveSSTables();
+ Set<SSTableReader> diff = new HashSet<>(post);
+ diff.removeAll(pre);
+ assert diff.size() == 1;
+ SSTableReader sstable = diff.iterator().next();
+ if (orphan)
+ {
+ Iterables.any(csm.getUnrepaired(), s -> s.getSSTables().contains(sstable));
+ csm.getUnrepaired().forEach(s -> s.removeSSTable(sstable));
+ }
+ return sstable;
+ }
+
+ protected static void mutateRepaired(SSTableReader sstable, long repairedAt, UUID pendingRepair)
+ {
+ try
+ {
+ sstable.descriptor.getMetadataSerializer().mutateRepaired(sstable.descriptor, repairedAt, pendingRepair);
+ sstable.reloadSSTableMetadata();
+ }
+ catch (IOException e)
+ {
+ throw new AssertionError(e);
+ }
+ }
+
+ protected static void mutateRepaired(SSTableReader sstable, long repairedAt)
+ {
+ mutateRepaired(sstable, repairedAt, ActiveRepairService.NO_PENDING_REPAIR);
+ }
+
+ protected static void mutateRepaired(SSTableReader sstable, UUID pendingRepair)
+ {
+ mutateRepaired(sstable, ActiveRepairService.UNREPAIRED_SSTABLE, pendingRepair);
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java b/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
index 5a7bfed..41c090e 100644
--- a/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
@@ -29,6 +29,7 @@ import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import org.junit.BeforeClass;
import org.junit.After;
+import org.junit.Ignore;
import org.junit.Test;
import org.apache.cassandra.schema.TableMetadata;
@@ -47,9 +48,11 @@ import org.apache.cassandra.schema.KeyspaceParams;
import org.apache.cassandra.service.ActiveRepairService;
import org.apache.cassandra.SchemaLoader;
import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.UUIDGen;
import org.apache.cassandra.utils.concurrent.Refs;
import org.apache.cassandra.UpdateBuilder;
+import static org.apache.cassandra.service.ActiveRepairService.UNREPAIRED_SSTABLE;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@@ -57,6 +60,8 @@ import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
+import static org.apache.cassandra.service.ActiveRepairService.NO_PENDING_REPAIR;
+
public class AntiCompactionTest
{
@@ -80,9 +85,10 @@ public class AntiCompactionTest
store.truncateBlocking();
}
- @Test
- public void antiCompactOne() throws Exception
+ private void antiCompactOne(long repairedAt, UUID pendingRepair) throws Exception
{
+ assert repairedAt != UNREPAIRED_SSTABLE || pendingRepair != null;
+
ColumnFamilyStore store = prepareColumnFamilyStore();
Collection<SSTableReader> sstables = getUnrepairedSSTables(store);
assertEquals(store.getLiveSSTables().size(), sstables.size());
@@ -90,15 +96,15 @@ public class AntiCompactionTest
List<Range<Token>> ranges = Arrays.asList(range);
int repairedKeys = 0;
+ int pendingKeys = 0;
int nonRepairedKeys = 0;
try (LifecycleTransaction txn = store.getTracker().tryModify(sstables, OperationType.ANTICOMPACTION);
Refs<SSTableReader> refs = Refs.ref(sstables))
{
if (txn == null)
throw new IllegalStateException();
- long repairedAt = 1000;
UUID parentRepairSession = UUID.randomUUID();
- CompactionManager.instance.performAnticompaction(store, ranges, refs, txn, repairedAt, parentRepairSession);
+ CompactionManager.instance.performAnticompaction(store, ranges, refs, txn, repairedAt, pendingRepair, parentRepairSession);
}
assertEquals(2, store.getLiveSSTables().size());
@@ -109,10 +115,11 @@ public class AntiCompactionTest
while (scanner.hasNext())
{
UnfilteredRowIterator row = scanner.next();
- if (sstable.isRepaired())
+ if (sstable.isRepaired() || sstable.isPendingRepair())
{
assertTrue(range.contains(row.partitionKey().getToken()));
- repairedKeys++;
+ repairedKeys += sstable.isRepaired() ? 1 : 0;
+ pendingKeys += sstable.isPendingRepair() ? 1 : 0;
}
else
{
@@ -128,11 +135,25 @@ public class AntiCompactionTest
assertEquals(1, sstable.selfRef().globalCount());
}
assertEquals(0, store.getTracker().getCompacting().size());
- assertEquals(repairedKeys, 4);
+ assertEquals(repairedKeys, repairedAt != UNREPAIRED_SSTABLE ? 4 : 0);
+ assertEquals(pendingKeys, pendingRepair != NO_PENDING_REPAIR ? 4 : 0);
assertEquals(nonRepairedKeys, 6);
}
@Test
+ public void antiCompactOneRepairedAt() throws Exception
+ {
+ antiCompactOne(1000, NO_PENDING_REPAIR);
+ }
+
+ @Test
+ public void antiCompactOnePendingRepair() throws Exception
+ {
+ antiCompactOne(UNREPAIRED_SSTABLE, UUIDGen.getTimeUUID());
+ }
+
+ @Ignore
+ @Test
public void antiCompactionSizeTest() throws InterruptedException, IOException
{
Keyspace keyspace = Keyspace.open(KEYSPACE1);
@@ -147,7 +168,7 @@ public class AntiCompactionTest
try (LifecycleTransaction txn = cfs.getTracker().tryModify(sstables, OperationType.ANTICOMPACTION);
Refs<SSTableReader> refs = Refs.ref(sstables))
{
- CompactionManager.instance.performAnticompaction(cfs, Arrays.asList(range), refs, txn, 12345, parentRepairSession);
+ CompactionManager.instance.performAnticompaction(cfs, Arrays.asList(range), refs, txn, 12345, NO_PENDING_REPAIR, parentRepairSession);
}
long sum = 0;
long rows = 0;
@@ -166,7 +187,7 @@ public class AntiCompactionTest
File dir = cfs.getDirectories().getDirectoryForNewSSTables();
Descriptor desc = cfs.newSSTableDescriptor(dir);
- try (SSTableTxnWriter writer = SSTableTxnWriter.create(cfs, desc, 0, 0, new SerializationHeader(true, cfs.metadata(), cfs.metadata().regularAndStaticColumns(), EncodingStats.NO_STATS)))
+ try (SSTableTxnWriter writer = SSTableTxnWriter.create(cfs, desc, 0, 0, NO_PENDING_REPAIR, new SerializationHeader(true, cfs.metadata(), cfs.metadata().regularAndStaticColumns(), EncodingStats.NO_STATS)))
{
for (int i = 0; i < count; i++)
{
@@ -230,7 +251,7 @@ public class AntiCompactionTest
try (LifecycleTransaction txn = store.getTracker().tryModify(sstables, OperationType.ANTICOMPACTION);
Refs<SSTableReader> refs = Refs.ref(sstables))
{
- CompactionManager.instance.performAnticompaction(store, ranges, refs, txn, repairedAt, parentRepairSession);
+ CompactionManager.instance.performAnticompaction(store, ranges, refs, txn, repairedAt, NO_PENDING_REPAIR, parentRepairSession);
}
/*
Anticompaction will be anti-compacting 10 SSTables but will be doing this two at a time
@@ -267,8 +288,7 @@ public class AntiCompactionTest
assertEquals(nonRepairedKeys, 60);
}
- @Test
- public void shouldMutateRepairedAt() throws InterruptedException, IOException
+ private void shouldMutate(long repairedAt, UUID pendingRepair) throws InterruptedException, IOException
{
ColumnFamilyStore store = prepareColumnFamilyStore();
Collection<SSTableReader> sstables = getUnrepairedSSTables(store);
@@ -280,15 +300,27 @@ public class AntiCompactionTest
try (LifecycleTransaction txn = store.getTracker().tryModify(sstables, OperationType.ANTICOMPACTION);
Refs<SSTableReader> refs = Refs.ref(sstables))
{
- CompactionManager.instance.performAnticompaction(store, ranges, refs, txn, 1, parentRepairSession);
+ CompactionManager.instance.performAnticompaction(store, ranges, refs, txn, repairedAt, pendingRepair, parentRepairSession);
}
assertThat(store.getLiveSSTables().size(), is(1));
- assertThat(Iterables.get(store.getLiveSSTables(), 0).isRepaired(), is(true));
+ assertThat(Iterables.get(store.getLiveSSTables(), 0).isRepaired(), is(repairedAt != UNREPAIRED_SSTABLE));
+ assertThat(Iterables.get(store.getLiveSSTables(), 0).isPendingRepair(), is(pendingRepair != NO_PENDING_REPAIR));
assertThat(Iterables.get(store.getLiveSSTables(), 0).selfRef().globalCount(), is(1));
assertThat(store.getTracker().getCompacting().size(), is(0));
}
+ @Test
+ public void shouldMutateRepairedAt() throws InterruptedException, IOException
+ {
+ shouldMutate(1, NO_PENDING_REPAIR);
+ }
+
+ @Test
+ public void shouldMutatePendingRepair() throws InterruptedException, IOException
+ {
+ shouldMutate(UNREPAIRED_SSTABLE, UUIDGen.getTimeUUID());
+ }
@Test
public void shouldSkipAntiCompactionForNonIntersectingRange() throws InterruptedException, IOException
@@ -311,7 +343,7 @@ public class AntiCompactionTest
try (LifecycleTransaction txn = store.getTracker().tryModify(sstables, OperationType.ANTICOMPACTION);
Refs<SSTableReader> refs = Refs.ref(sstables))
{
- CompactionManager.instance.performAnticompaction(store, ranges, refs, txn, 1, parentRepairSession);
+ CompactionManager.instance.performAnticompaction(store, ranges, refs, txn, 1, NO_PENDING_REPAIR, parentRepairSession);
}
assertThat(store.getLiveSSTables().size(), is(10));
@@ -348,5 +380,4 @@ public class AntiCompactionTest
return ImmutableSet.copyOf(cfs.getTracker().getView().sstables(SSTableSet.LIVE, (s) -> !s.isRepaired()));
}
-
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/test/unit/org/apache/cassandra/db/compaction/CompactionManagerGetSSTablesForValidationTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionManagerGetSSTablesForValidationTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionManagerGetSSTablesForValidationTest.java
new file mode 100644
index 0000000..0ee85c6
--- /dev/null
+++ b/test/unit/org/apache/cassandra/db/compaction/CompactionManagerGetSSTablesForValidationTest.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.compaction;
+
+import java.net.InetAddress;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.UUID;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.cql3.statements.CreateTableStatement;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.schema.Schema;
+import org.apache.cassandra.cql3.QueryProcessor;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.repair.RepairJobDesc;
+import org.apache.cassandra.repair.Validator;
+import org.apache.cassandra.schema.KeyspaceParams;
+import org.apache.cassandra.service.ActiveRepairService;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.UUIDGen;
+
+/**
+ * Tests correct sstables are returned from CompactionManager.getSSTablesForValidation
+ * for consistent, legacy incremental, and full repairs
+ */
+public class CompactionManagerGetSSTablesForValidationTest
+{
+ private String ks;
+ private static final String tbl = "tbl";
+ private ColumnFamilyStore cfs;
+ private static InetAddress coordinator;
+
+ private static Token MT;
+
+ private SSTableReader repaired;
+ private SSTableReader unrepaired;
+ private SSTableReader pendingRepair;
+
+ private UUID sessionID;
+ private RepairJobDesc desc;
+
+ @BeforeClass
+ public static void setupClass() throws Exception
+ {
+ SchemaLoader.prepareServer();
+ coordinator = InetAddress.getByName("10.0.0.1");
+ MT = DatabaseDescriptor.getPartitioner().getMinimumToken();
+ }
+
+ @Before
+ public void setup() throws Exception
+ {
+ ks = "ks_" + System.currentTimeMillis();
+ TableMetadata cfm = CreateTableStatement.parse(String.format("CREATE TABLE %s.%s (k INT PRIMARY KEY, v INT)", ks, tbl), ks).build();
+ SchemaLoader.createKeyspace(ks, KeyspaceParams.simple(1), cfm);
+ cfs = Schema.instance.getColumnFamilyStoreInstance(cfm.id);
+ }
+
+ private void makeSSTables()
+ {
+ for (int i=0; i<3; i++)
+ {
+ QueryProcessor.executeInternal(String.format("INSERT INTO %s.%s (k, v) VALUES(?, ?)", ks, tbl), i, i);
+ cfs.forceBlockingFlush();
+ }
+ Assert.assertEquals(3, cfs.getLiveSSTables().size());
+
+ }
+
+ private void registerRepair(boolean incremental) throws Exception
+ {
+ sessionID = UUIDGen.getTimeUUID();
+ Range<Token> range = new Range<>(MT, MT);
+ ActiveRepairService.instance.registerParentRepairSession(sessionID,
+ coordinator,
+ Lists.newArrayList(cfs),
+ Sets.newHashSet(range),
+ incremental,
+ incremental ? System.currentTimeMillis() : ActiveRepairService.UNREPAIRED_SSTABLE,
+ true);
+ desc = new RepairJobDesc(sessionID, UUIDGen.getTimeUUID(), ks, tbl, Collections.singleton(range));
+ }
+
+ private void modifySSTables() throws Exception
+ {
+ Iterator<SSTableReader> iter = cfs.getLiveSSTables().iterator();
+
+ repaired = iter.next();
+ repaired.descriptor.getMetadataSerializer().mutateRepaired(repaired.descriptor, System.currentTimeMillis(), null);
+ repaired.reloadSSTableMetadata();
+
+ pendingRepair = iter.next();
+ pendingRepair.descriptor.getMetadataSerializer().mutateRepaired(pendingRepair.descriptor, ActiveRepairService.UNREPAIRED_SSTABLE, sessionID);
+ pendingRepair.reloadSSTableMetadata();
+
+ unrepaired = iter.next();
+
+ Assert.assertFalse(iter.hasNext());
+ }
+
+ @Test
+ public void consistentRepair() throws Exception
+ {
+ makeSSTables();
+ registerRepair(true);
+ modifySSTables();
+
+ // get sstables for repair
+ Validator validator = new Validator(desc, coordinator, FBUtilities.nowInSeconds(), true);
+ Set<SSTableReader> sstables = Sets.newHashSet(CompactionManager.instance.getSSTablesToValidate(cfs, validator));
+ Assert.assertNotNull(sstables);
+ Assert.assertEquals(1, sstables.size());
+ Assert.assertTrue(sstables.contains(pendingRepair));
+ }
+
+ @Test
+ public void legacyIncrementalRepair() throws Exception
+ {
+ makeSSTables();
+ registerRepair(true);
+ modifySSTables();
+
+ // get sstables for repair
+ Validator validator = new Validator(desc, coordinator, FBUtilities.nowInSeconds(), false);
+ Set<SSTableReader> sstables = Sets.newHashSet(CompactionManager.instance.getSSTablesToValidate(cfs, validator));
+ Assert.assertNotNull(sstables);
+ Assert.assertEquals(2, sstables.size());
+ Assert.assertTrue(sstables.contains(pendingRepair));
+ Assert.assertTrue(sstables.contains(unrepaired));
+ }
+
+ @Test
+ public void fullRepair() throws Exception
+ {
+ makeSSTables();
+ registerRepair(false);
+ modifySSTables();
+
+ // get sstables for repair
+ Validator validator = new Validator(desc, coordinator, FBUtilities.nowInSeconds(), false);
+ Set<SSTableReader> sstables = Sets.newHashSet(CompactionManager.instance.getSSTablesToValidate(cfs, validator));
+ Assert.assertNotNull(sstables);
+ Assert.assertEquals(3, sstables.size());
+ Assert.assertTrue(sstables.contains(pendingRepair));
+ Assert.assertTrue(sstables.contains(unrepaired));
+ Assert.assertTrue(sstables.contains(repaired));
+ }
+}