You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2014/02/08 08:48:58 UTC
[2/3] Avoid repairing already repaired data.
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7b72140/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java b/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
index acc8aab..bb66b69 100644
--- a/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
+++ b/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
@@ -17,15 +17,30 @@
*/
package org.apache.cassandra.repair;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.Future;
+
+import org.apache.cassandra.config.Schema;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.db.compaction.CompactionManager;
import org.apache.cassandra.net.IVerbHandler;
import org.apache.cassandra.net.MessageIn;
+import org.apache.cassandra.net.MessageOut;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.repair.messages.AnticompactionRequest;
+import org.apache.cassandra.repair.messages.PrepareMessage;
import org.apache.cassandra.repair.messages.RepairMessage;
import org.apache.cassandra.repair.messages.SyncRequest;
import org.apache.cassandra.repair.messages.ValidationRequest;
import org.apache.cassandra.service.ActiveRepairService;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.Pair;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Handles all repair related message.
@@ -34,16 +49,33 @@ import org.apache.cassandra.service.ActiveRepairService;
*/
public class RepairMessageVerbHandler implements IVerbHandler<RepairMessage>
{
+ private static final Logger logger = LoggerFactory.getLogger(RepairMessageVerbHandler.class);
public void doVerb(MessageIn<RepairMessage> message, int id)
{
// TODO add cancel/interrupt message
RepairJobDesc desc = message.payload.desc;
switch (message.payload.messageType)
{
+ case PREPARE_MESSAGE:
+ PrepareMessage prepareMessage = (PrepareMessage) message.payload;
+ List<ColumnFamilyStore> columnFamilyStores = new ArrayList<>(prepareMessage.cfIds.size());
+ for (UUID cfId : prepareMessage.cfIds)
+ {
+ Pair<String, String> kscf = Schema.instance.getCF(cfId);
+ ColumnFamilyStore columnFamilyStore = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right);
+ columnFamilyStores.add(columnFamilyStore);
+ }
+ ActiveRepairService.instance.registerParentRepairSession(prepareMessage.parentRepairSession,
+ columnFamilyStores,
+ prepareMessage.ranges);
+ MessagingService.instance().sendReply(new MessageOut(MessagingService.Verb.INTERNAL_RESPONSE), id, message.from);
+ break;
+
case VALIDATION_REQUEST:
ValidationRequest validationRequest = (ValidationRequest) message.payload;
// trigger read-only compaction
ColumnFamilyStore store = Keyspace.open(desc.keyspace).getColumnFamilyStore(desc.columnFamily);
+
Validator validator = new Validator(desc, message.from, validationRequest.gcBefore);
CompactionManager.instance.submitValidation(store, validator);
break;
@@ -55,6 +87,21 @@ public class RepairMessageVerbHandler implements IVerbHandler<RepairMessage>
task.run();
break;
+ case ANTICOMPACTION_REQUEST:
+ logger.debug("Got anticompaction request");
+ AnticompactionRequest anticompactionRequest = (AnticompactionRequest) message.payload;
+ try
+ {
+ List<Future<?>> futures = ActiveRepairService.instance.doAntiCompaction(anticompactionRequest.parentRepairSession);
+ FBUtilities.waitOnFutures(futures);
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException(e);
+ }
+
+ break;
+
default:
ActiveRepairService.instance.handleMessage(message.from, message.payload);
break;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7b72140/src/java/org/apache/cassandra/repair/RepairSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RepairSession.java b/src/java/org/apache/cassandra/repair/RepairSession.java
index 3933a88..75d5209 100644
--- a/src/java/org/apache/cassandra/repair/RepairSession.java
+++ b/src/java/org/apache/cassandra/repair/RepairSession.java
@@ -93,6 +93,7 @@ public class RepairSession extends WrappedRunnable implements IEndpointStateChan
private final SimpleCondition completed = new SimpleCondition();
public final Condition differencingDone = new SimpleCondition();
+ public final UUID parentRepairSession;
private volatile boolean terminated = false;
@@ -102,23 +103,24 @@ public class RepairSession extends WrappedRunnable implements IEndpointStateChan
* @param range range to repair
* @param keyspace name of keyspace
* @param isSequential true if performing repair on snapshots sequentially
- * @param dataCenters the data centers that should be part of the repair; null for all DCs
+ * @param endpoints the data centers that should be part of the repair; null for all DCs
* @param cfnames names of columnfamilies
*/
- public RepairSession(Range<Token> range, String keyspace, boolean isSequential, Collection<String> dataCenters, String... cfnames)
+ public RepairSession(UUID parentRepairSession, Range<Token> range, String keyspace, boolean isSequential, Set<InetAddress> endpoints, String... cfnames)
{
- this(UUIDGen.getTimeUUID(), range, keyspace, isSequential, dataCenters, cfnames);
+ this(parentRepairSession, UUIDGen.getTimeUUID(), range, keyspace, isSequential, endpoints, cfnames);
}
- public RepairSession(UUID id, Range<Token> range, String keyspace, boolean isSequential, Collection<String> dataCenters, String[] cfnames)
+ public RepairSession(UUID parentRepairSession, UUID id, Range<Token> range, String keyspace, boolean isSequential, Set<InetAddress> endpoints, String[] cfnames)
{
+ this.parentRepairSession = parentRepairSession;
this.id = id;
this.isSequential = isSequential;
this.keyspace = keyspace;
this.cfnames = cfnames;
assert cfnames.length > 0 : "Repairing no column families seems pointless, doesn't it";
this.range = range;
- this.endpoints = ActiveRepairService.getNeighbors(keyspace, range, dataCenters);
+ this.endpoints = endpoints;
}
public UUID getId()
@@ -260,15 +262,16 @@ public class RepairSession extends WrappedRunnable implements IEndpointStateChan
// Create and queue a RepairJob for each column family
for (String cfname : cfnames)
{
- RepairJob job = new RepairJob(id, keyspace, cfname, range, isSequential);
+ RepairJob job = new RepairJob(parentRepairSession, id, keyspace, cfname, range, isSequential);
jobs.offer(job);
}
-
+ logger.debug("Sending tree requests to endpoints {}", endpoints);
jobs.peek().sendTreeRequests(endpoints);
// block whatever thread started this session until all requests have been returned:
// if this thread dies, the session will still complete in the background
completed.await();
+
if (exception == null)
{
logger.info(String.format("[repair #%s] session completed successfully", getId()));
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7b72140/src/java/org/apache/cassandra/repair/StreamingRepairTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/StreamingRepairTask.java b/src/java/org/apache/cassandra/repair/StreamingRepairTask.java
index 1fd2b4f..636568c 100644
--- a/src/java/org/apache/cassandra/repair/StreamingRepairTask.java
+++ b/src/java/org/apache/cassandra/repair/StreamingRepairTask.java
@@ -23,6 +23,7 @@ import org.slf4j.LoggerFactory;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.repair.messages.SyncComplete;
import org.apache.cassandra.repair.messages.SyncRequest;
+import org.apache.cassandra.service.ActiveRepairService;
import org.apache.cassandra.streaming.*;
import org.apache.cassandra.utils.FBUtilities;
@@ -65,8 +66,12 @@ public class StreamingRepairTask implements Runnable, StreamEventHandler
private void initiateStreaming()
{
+ long repairedAt = ActiveRepairService.UNREPAIRED_SSTABLE;
+ if (desc.parentSessionId != null && ActiveRepairService.instance.getParentRepairSession(desc.parentSessionId) != null)
+ repairedAt = ActiveRepairService.instance.getParentRepairSession(desc.parentSessionId).repairedAt;
+
logger.info(String.format("[streaming task #%s] Performing streaming repair of %d ranges with %s", desc.sessionId, request.ranges.size(), request.dst));
- StreamResultFuture op = new StreamPlan("Repair")
+ StreamResultFuture op = new StreamPlan("Repair", repairedAt)
.flushBeforeTransfer(true)
// request ranges from the remote node
.requestRanges(request.dst, desc.keyspace, request.ranges, desc.columnFamily)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7b72140/src/java/org/apache/cassandra/repair/messages/RepairMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/messages/RepairMessage.java b/src/java/org/apache/cassandra/repair/messages/RepairMessage.java
index f546410..b195852 100644
--- a/src/java/org/apache/cassandra/repair/messages/RepairMessage.java
+++ b/src/java/org/apache/cassandra/repair/messages/RepairMessage.java
@@ -42,7 +42,9 @@ public abstract class RepairMessage
VALIDATION_REQUEST(0, ValidationRequest.serializer),
VALIDATION_COMPLETE(1, ValidationComplete.serializer),
SYNC_REQUEST(2, SyncRequest.serializer),
- SYNC_COMPLETE(3, SyncComplete.serializer);
+ SYNC_COMPLETE(3, SyncComplete.serializer),
+ ANTICOMPACTION_REQUEST(4, AnticompactionRequest.serializer),
+ PREPARE_MESSAGE(5, PrepareMessage.serializer);
private final byte type;
private final MessageSerializer<RepairMessage> serializer;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7b72140/src/java/org/apache/cassandra/service/ActiveRepairService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ActiveRepairService.java b/src/java/org/apache/cassandra/service/ActiveRepairService.java
index b77f216..dc4c66a 100644
--- a/src/java/org/apache/cassandra/service/ActiveRepairService.java
+++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java
@@ -17,6 +17,8 @@
*/
package org.apache.cassandra.service;
+import java.io.File;
+import java.io.IOException;
import java.net.InetAddress;
import java.util.*;
import java.util.concurrent.*;
@@ -24,19 +26,34 @@ import java.util.concurrent.*;
import com.google.common.collect.Multimap;
import com.google.common.collect.Sets;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import org.apache.cassandra.concurrent.JMXConfigurableThreadPoolExecutor;
import org.apache.cassandra.concurrent.NamedThreadFactory;
import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.compaction.CompactionManager;
+import org.apache.cassandra.dht.Bounds;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.gms.FailureDetector;
import org.apache.cassandra.gms.Gossiper;
+import org.apache.cassandra.io.sstable.Component;
+import org.apache.cassandra.io.sstable.SSTableReader;
import org.apache.cassandra.locator.TokenMetadata;
+import org.apache.cassandra.net.IAsyncCallback;
+import org.apache.cassandra.net.MessageIn;
+import org.apache.cassandra.net.MessageOut;
+import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.repair.*;
+import org.apache.cassandra.repair.messages.AnticompactionRequest;
+import org.apache.cassandra.repair.messages.PrepareMessage;
import org.apache.cassandra.repair.messages.RepairMessage;
import org.apache.cassandra.repair.messages.SyncComplete;
import org.apache.cassandra.repair.messages.ValidationComplete;
import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.UUIDGen;
/**
* ActiveRepairService is the starting point for manual "active" repairs.
@@ -54,9 +71,12 @@ import org.apache.cassandra.utils.FBUtilities;
*/
public class ActiveRepairService
{
+ private static final Logger logger = LoggerFactory.getLogger(ActiveRepairService.class);
// singleton enforcement
public static final ActiveRepairService instance = new ActiveRepairService();
+ public static final long UNREPAIRED_SSTABLE = 0;
+
private static final ThreadPoolExecutor executor;
static
{
@@ -74,16 +94,20 @@ public class ActiveRepairService
}
/**
- * A map of active session.
+ * A map of active coordinator session.
*/
private final ConcurrentMap<UUID, RepairSession> sessions;
+ private final ConcurrentMap<UUID, ParentRepairSession> parentRepairSessions;
+
+ private CountDownLatch prepareLatch = null;
/**
* Protected constructor. Use ActiveRepairService.instance.
*/
protected ActiveRepairService()
{
sessions = new ConcurrentHashMap<>();
+ parentRepairSessions = new ConcurrentHashMap<>();
}
/**
@@ -91,9 +115,9 @@ public class ActiveRepairService
*
* @return Future for asynchronous call or null if there is no need to repair
*/
- public RepairFuture submitRepairSession(Range<Token> range, String keyspace, boolean isSequential, Collection<String> dataCenters, String... cfnames)
+ public RepairFuture submitRepairSession(UUID parentRepairSession, Range<Token> range, String keyspace, boolean isSequential, Set<InetAddress> endpoints, String... cfnames)
{
- RepairSession session = new RepairSession(range, keyspace, isSequential, dataCenters, cfnames);
+ RepairSession session = new RepairSession(parentRepairSession, range, keyspace, isSequential, endpoints, cfnames);
if (session.endpoints.isEmpty())
return null;
RepairFuture futureTask = new RepairFuture(session);
@@ -121,13 +145,16 @@ public class ActiveRepairService
{
session.forceShutdown();
}
+ parentRepairSessions.clear();
}
// for testing only. Create a session corresponding to a fake request and
// add it to the sessions (avoid NPE in tests)
RepairFuture submitArtificialRepairSession(RepairJobDesc desc)
{
- RepairSession session = new RepairSession(desc.sessionId, desc.range, desc.keyspace, false, null, new String[]{desc.columnFamily});
+ Set<InetAddress> neighbours = new HashSet<>();
+ neighbours.addAll(ActiveRepairService.getNeighbors(desc.keyspace, desc.range, null));
+ RepairSession session = new RepairSession(desc.parentSessionId, desc.sessionId, desc.range, desc.keyspace, false, neighbours, new String[]{desc.columnFamily});
sessions.put(session.getId(), session);
RepairFuture futureTask = new RepairFuture(session);
executor.execute(futureTask);
@@ -186,6 +213,122 @@ public class ActiveRepairService
return neighbors;
}
+ public UUID prepareForRepair(Set<InetAddress> endpoints, Collection<Range<Token>> ranges, List<ColumnFamilyStore> columnFamilyStores)
+ {
+ UUID parentRepairSession = UUIDGen.getTimeUUID();
+ registerParentRepairSession(parentRepairSession, columnFamilyStores, ranges);
+ prepareLatch = new CountDownLatch(endpoints.size());
+ IAsyncCallback callback = new IAsyncCallback()
+ {
+ @Override
+ public void response(MessageIn msg)
+ {
+ ActiveRepairService.this.prepareLatch.countDown();
+ }
+
+ @Override
+ public boolean isLatencyForSnitch()
+ {
+ return false;
+ }
+ };
+
+ List<UUID> cfIds = new ArrayList<>(columnFamilyStores.size());
+ for (ColumnFamilyStore cfs : columnFamilyStores)
+ cfIds.add(cfs.metadata.cfId);
+
+ for(InetAddress neighbour : endpoints)
+ {
+ PrepareMessage message = new PrepareMessage(parentRepairSession, cfIds, ranges);
+ MessageOut<RepairMessage> msg = message.createMessage();
+ MessagingService.instance().sendRR(msg, neighbour, callback);
+ }
+ try
+ {
+ prepareLatch.await(1, TimeUnit.HOURS);
+ }
+ catch (InterruptedException e)
+ {
+ parentRepairSessions.remove(parentRepairSession);
+ throw new RuntimeException("Did not get replies from all endpoints.", e);
+ }
+ return parentRepairSession;
+ }
+
+ public void registerParentRepairSession(UUID parentRepairSession, List<ColumnFamilyStore> columnFamilyStores, Collection<Range<Token>> ranges)
+ {
+ Map<UUID, Set<SSTableReader>> sstablesToRepair = new HashMap<>();
+ for (ColumnFamilyStore cfs : columnFamilyStores)
+ {
+ Set<SSTableReader> sstables = new HashSet<>();
+ for (SSTableReader sstable : cfs.getSSTables())
+ {
+ if (new Bounds<>(sstable.first.token, sstable.last.token).intersects(ranges))
+ {
+ if (!sstable.isRepaired())
+ {
+ sstables.add(sstable);
+ }
+ }
+ }
+ sstablesToRepair.put(cfs.metadata.cfId, sstables);
+ }
+ parentRepairSessions.put(parentRepairSession, new ParentRepairSession(columnFamilyStores, ranges, sstablesToRepair, System.currentTimeMillis()));
+ }
+
+ public void finishParentSession(UUID parentSession, Set<InetAddress> neighbors) throws InterruptedException, ExecutionException, IOException
+ {
+
+ for (InetAddress neighbor : neighbors)
+ {
+ AnticompactionRequest acr = new AnticompactionRequest(parentSession);
+ MessageOut<RepairMessage> req = acr.createMessage();
+ MessagingService.instance().sendOneWay(req, neighbor);
+ }
+ try
+ {
+ List<Future<?>> futures = doAntiCompaction(parentSession);
+ FBUtilities.waitOnFutures(futures);
+ }
+ finally
+ {
+ parentRepairSessions.remove(parentSession);
+ }
+ }
+
+ public ParentRepairSession getParentRepairSession(UUID parentSessionId)
+ {
+ return parentRepairSessions.get(parentSessionId);
+ }
+
+ public List<Future<?>> doAntiCompaction(UUID parentRepairSession) throws InterruptedException, ExecutionException, IOException
+ {
+ assert parentRepairSession != null;
+ ParentRepairSession prs = getParentRepairSession(parentRepairSession);
+
+ List<Future<?>> futures = new ArrayList<>();
+ for (Map.Entry<UUID, ColumnFamilyStore> columnFamilyStoreEntry : prs.columnFamilyStores.entrySet())
+ {
+
+ Collection<SSTableReader> sstables = new HashSet<>(prs.getAndReferenceSSTables(columnFamilyStoreEntry.getKey()));
+ ColumnFamilyStore cfs = columnFamilyStoreEntry.getValue();
+ boolean success = false;
+ while (!success)
+ {
+ for (SSTableReader compactingSSTable : cfs.getDataTracker().getCompacting())
+ {
+ if (sstables.remove(compactingSSTable))
+ SSTableReader.releaseReferences(Arrays.asList(compactingSSTable));
+ }
+ success = sstables.isEmpty() || cfs.getDataTracker().markCompacting(sstables);
+ }
+
+ futures.add(CompactionManager.instance.submitAntiCompaction(cfs, prs.ranges, sstables, prs.repairedAt));
+ }
+
+ return futures;
+ }
+
public void handleMessage(InetAddress endpoint, RepairMessage message)
{
RepairJobDesc desc = message.desc;
@@ -207,4 +350,41 @@ public class ActiveRepairService
break;
}
}
+
+ public static class ParentRepairSession
+ {
+ public final Map<UUID, ColumnFamilyStore> columnFamilyStores = new HashMap<>();
+ public final Collection<Range<Token>> ranges;
+ public final Map<UUID, Set<SSTableReader>> sstableMap;
+ public final long repairedAt;
+
+ public ParentRepairSession(List<ColumnFamilyStore> columnFamilyStores, Collection<Range<Token>> ranges, Map<UUID, Set<SSTableReader>> sstables, long repairedAt)
+ {
+ for (ColumnFamilyStore cfs : columnFamilyStores)
+ this.columnFamilyStores.put(cfs.metadata.cfId, cfs);
+ this.ranges = ranges;
+ this.sstableMap = sstables;
+ this.repairedAt = repairedAt;
+ }
+
+ public Collection<SSTableReader> getAndReferenceSSTables(UUID cfId)
+ {
+ Set<SSTableReader> sstables = sstableMap.get(cfId);
+ Iterator<SSTableReader> sstableIterator = sstables.iterator();
+ while (sstableIterator.hasNext())
+ {
+ SSTableReader sstable = sstableIterator.next();
+ if (!new File(sstable.descriptor.filenameFor(Component.DATA)).exists())
+ {
+ sstableIterator.remove();
+ }
+ else
+ {
+ if (!sstable.acquireReference())
+ sstableIterator.remove();
+ }
+ }
+ return sstables;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7b72140/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index 4d6e13f..99090b9 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -1393,7 +1393,6 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
* Handle node bootstrap
*
* @param endpoint bootstrapping node
- * @param pieces STATE_BOOTSTRAPPING,bootstrap token as string
*/
private void handleStateBootstrap(InetAddress endpoint)
{
@@ -2418,13 +2417,14 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
sendNotification(jmxNotification);
}
- public int forceRepairAsync(final String keyspace, final boolean isSequential, final Collection<String> dataCenters, final boolean primaryRange, final String... columnFamilies)
+ public int forceRepairAsync(final String keyspace, final boolean isSequential, final Collection<String> dataCenters, final boolean primaryRange, final boolean fullRepair, final String... columnFamilies) throws IOException
{
final Collection<Range<Token>> ranges = primaryRange ? getLocalPrimaryRanges(keyspace) : getLocalRanges(keyspace);
- return forceRepairAsync(keyspace, isSequential, dataCenters, ranges, columnFamilies);
+
+ return forceRepairAsync(keyspace, isSequential, dataCenters, ranges, fullRepair, columnFamilies);
}
- public int forceRepairAsync(final String keyspace, final boolean isSequential, final Collection<String> dataCenters, final Collection<Range<Token>> ranges, final String... columnFamilies)
+ public int forceRepairAsync(final String keyspace, final boolean isSequential, final Collection<String> dataCenters, final Collection<Range<Token>> ranges, final boolean fullRepair, final String... columnFamilies) throws IOException
{
if (Keyspace.SYSTEM_KS.equals(keyspace) || ranges.isEmpty())
return 0;
@@ -2432,18 +2432,18 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
final int cmd = nextRepairCommand.incrementAndGet();
if (ranges.size() > 0)
{
- new Thread(createRepairTask(cmd, keyspace, ranges, isSequential, dataCenters, columnFamilies)).start();
+ new Thread(createRepairTask(cmd, keyspace, ranges, isSequential, dataCenters, fullRepair, columnFamilies)).start();
}
return cmd;
}
- public int forceRepairAsync(final String keyspace, final boolean isSequential, final boolean isLocal, final boolean primaryRange, final String... columnFamilies)
+ public int forceRepairAsync(final String keyspace, final boolean isSequential, final boolean isLocal, final boolean primaryRange, final boolean fullRepair, final String... columnFamilies)
{
final Collection<Range<Token>> ranges = primaryRange ? getLocalPrimaryRanges(keyspace) : getLocalRanges(keyspace);
- return forceRepairAsync(keyspace, isSequential, isLocal, ranges, columnFamilies);
+ return forceRepairAsync(keyspace, isSequential, isLocal, ranges, fullRepair, columnFamilies);
}
- public int forceRepairAsync(final String keyspace, final boolean isSequential, final boolean isLocal, final Collection<Range<Token>> ranges, final String... columnFamilies)
+ public int forceRepairAsync(final String keyspace, final boolean isSequential, final boolean isLocal, final Collection<Range<Token>> ranges, final boolean fullRepair, final String... columnFamilies)
{
if (Keyspace.SYSTEM_KS.equals(keyspace) || ranges.isEmpty())
return 0;
@@ -2451,29 +2451,29 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
final int cmd = nextRepairCommand.incrementAndGet();
if (ranges.size() > 0)
{
- new Thread(createRepairTask(cmd, keyspace, ranges, isSequential, isLocal, columnFamilies)).start();
+ new Thread(createRepairTask(cmd, keyspace, ranges, isSequential, isLocal, fullRepair, columnFamilies)).start();
}
return cmd;
}
- public int forceRepairRangeAsync(String beginToken, String endToken, final String keyspaceName, boolean isSequential, Collection<String> dataCenters, final String... columnFamilies)
+ public int forceRepairRangeAsync(String beginToken, String endToken, final String keyspaceName, boolean isSequential, Collection<String> dataCenters, boolean fullRepair, final String... columnFamilies) throws IOException
{
Token parsedBeginToken = getPartitioner().getTokenFactory().fromString(beginToken);
Token parsedEndToken = getPartitioner().getTokenFactory().fromString(endToken);
logger.info("starting user-requested repair of range ({}, {}] for keyspace {} and column families {}",
parsedBeginToken, parsedEndToken, keyspaceName, columnFamilies);
- return forceRepairAsync(keyspaceName, isSequential, dataCenters, Collections.singleton(new Range<Token>(parsedBeginToken, parsedEndToken)), columnFamilies);
+ return forceRepairAsync(keyspaceName, isSequential, dataCenters, Collections.singleton(new Range<Token>(parsedBeginToken, parsedEndToken)), fullRepair, columnFamilies);
}
- public int forceRepairRangeAsync(String beginToken, String endToken, final String keyspaceName, boolean isSequential, boolean isLocal, final String... columnFamilies)
+ public int forceRepairRangeAsync(String beginToken, String endToken, final String keyspaceName, boolean isSequential, boolean isLocal, boolean fullRepair, final String... columnFamilies)
{
Token parsedBeginToken = getPartitioner().getTokenFactory().fromString(beginToken);
Token parsedEndToken = getPartitioner().getTokenFactory().fromString(endToken);
logger.info("starting user-requested repair of range ({}, {}] for keyspace {} and column families {}",
parsedBeginToken, parsedEndToken, keyspaceName, columnFamilies);
- return forceRepairAsync(keyspaceName, isSequential, isLocal, Collections.singleton(new Range<Token>(parsedBeginToken, parsedEndToken)), columnFamilies);
+ return forceRepairAsync(keyspaceName, isSequential, isLocal, Collections.singleton(new Range<Token>(parsedBeginToken, parsedEndToken)), fullRepair, columnFamilies);
}
@@ -2483,53 +2483,72 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
* @param columnFamilies
* @throws IOException
*/
- public void forceKeyspaceRepair(final String keyspaceName, boolean isSequential, boolean isLocal, final String... columnFamilies) throws IOException
+ public void forceKeyspaceRepair(final String keyspaceName, boolean isSequential, boolean isLocal, boolean fullRepair, final String... columnFamilies) throws IOException
{
- forceKeyspaceRepairRange(keyspaceName, getLocalRanges(keyspaceName), isSequential, isLocal, columnFamilies);
+ forceKeyspaceRepairRange(keyspaceName, getLocalRanges(keyspaceName), isSequential, isLocal, fullRepair, columnFamilies);
}
- public void forceKeyspaceRepairPrimaryRange(final String keyspaceName, boolean isSequential, boolean isLocal, final String... columnFamilies) throws IOException
+ public void forceKeyspaceRepairPrimaryRange(final String keyspaceName, boolean isSequential, boolean isLocal, boolean fullRepair, final String... columnFamilies) throws IOException
{
- forceKeyspaceRepairRange(keyspaceName, getLocalPrimaryRanges(keyspaceName), isSequential, isLocal, columnFamilies);
+ forceKeyspaceRepairRange(keyspaceName, getLocalPrimaryRanges(keyspaceName), isSequential, isLocal, fullRepair, columnFamilies);
}
- public void forceKeyspaceRepairRange(String beginToken, String endToken, final String keyspaceName, boolean isSequential, boolean isLocal, final String... columnFamilies) throws IOException
+ public void forceKeyspaceRepairRange(String beginToken, String endToken, final String keyspaceName, boolean isSequential, boolean isLocal, boolean fullRepair, final String... columnFamilies) throws IOException
{
Token parsedBeginToken = getPartitioner().getTokenFactory().fromString(beginToken);
Token parsedEndToken = getPartitioner().getTokenFactory().fromString(endToken);
logger.info("starting user-requested repair of range ({}, {}] for keyspace {} and column families {}",
parsedBeginToken, parsedEndToken, keyspaceName, columnFamilies);
- forceKeyspaceRepairRange(keyspaceName, Collections.singleton(new Range<Token>(parsedBeginToken, parsedEndToken)), isSequential, isLocal, columnFamilies);
+ forceKeyspaceRepairRange(keyspaceName, Collections.singleton(new Range<Token>(parsedBeginToken, parsedEndToken)), isSequential, isLocal, fullRepair, columnFamilies);
}
- public void forceKeyspaceRepairRange(final String keyspaceName, final Collection<Range<Token>> ranges, boolean isSequential, boolean isLocal, final String... columnFamilies) throws IOException
+ public void forceKeyspaceRepairRange(final String keyspaceName, final Collection<Range<Token>> ranges, boolean isSequential, boolean isLocal, boolean fullRepair, final String... columnFamilies) throws IOException
{
if (Keyspace.SYSTEM_KS.equalsIgnoreCase(keyspaceName))
return;
- createRepairTask(nextRepairCommand.incrementAndGet(), keyspaceName, ranges, isSequential, isLocal, columnFamilies).run();
+ createRepairTask(nextRepairCommand.incrementAndGet(), keyspaceName, ranges, isSequential, isLocal, fullRepair, columnFamilies).run();
}
- private FutureTask<Object> createRepairTask(final int cmd, final String keyspace, final Collection<Range<Token>> ranges, final boolean isSequential, final boolean isLocal, final String... columnFamilies)
+ private FutureTask<Object> createRepairTask(final int cmd,
+ final String keyspace,
+ final Collection<Range<Token>> ranges,
+ final boolean isSequential,
+ final boolean isLocal,
+ final boolean fullRepair,
+ final String... columnFamilies)
{
Set<String> dataCenters = null;
if (isLocal)
{
dataCenters = Sets.newHashSet(DatabaseDescriptor.getLocalDataCenter());
}
- return createRepairTask(cmd, keyspace, ranges, isSequential, dataCenters, columnFamilies);
+ return createRepairTask(cmd, keyspace, ranges, isSequential, dataCenters, fullRepair, columnFamilies);
}
- private FutureTask<Object> createRepairTask(final int cmd, final String keyspace, final Collection<Range<Token>> ranges, final boolean isSequential, final Collection<String> dataCenters, final String... columnFamilies)
+ private FutureTask<Object> createRepairTask(final int cmd,
+ final String keyspace,
+ final Collection<Range<Token>> ranges,
+ final boolean isSequential,
+ final Collection<String> dataCenters,
+ final boolean fullRepair,
+ final String... columnFamilies)
{
- return new FutureTask<Object>(new WrappedRunnable()
+ return new FutureTask<>(new WrappedRunnable()
{
protected void runMayThrow() throws Exception
{
- String message = String.format("Starting repair command #%d, repairing %d ranges for keyspace %s", cmd, ranges.size(), keyspace);
+ String message = String.format("Starting repair command #%d, repairing %d ranges for keyspace %s (seq=%b, full=%b)", cmd, ranges.size(), keyspace, isSequential, fullRepair);
logger.info(message);
sendNotification("repair", message, new int[]{cmd, ActiveRepairService.Status.STARTED.ordinal()});
+ if (isSequential && !fullRepair)
+ {
+ message = "It is not possible to mix sequential repair and incremental repairs.";
+ logger.error(message);
+ sendNotification("repair", message, new int[]{cmd, ActiveRepairService.Status.FINISHED.ordinal()});
+ return;
+ }
if (dataCenters != null && !dataCenters.contains(DatabaseDescriptor.getLocalDataCenter()))
{
message = String.format("Cancelling repair command #%d (the local data center must be part of the repair)", cmd);
@@ -2538,13 +2557,25 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
return;
}
- List<RepairFuture> futures = new ArrayList<RepairFuture>(ranges.size());
+ Set<InetAddress> neighbours = new HashSet<>();
+ for (Range<Token> range : ranges)
+ neighbours.addAll(ActiveRepairService.getNeighbors(keyspace, range, dataCenters));
+
+ List<ColumnFamilyStore> columnFamilyStores = new ArrayList<>();
+ for (ColumnFamilyStore cfs : getValidColumnFamilies(false, false, keyspace, columnFamilies))
+ columnFamilyStores.add(cfs);
+
+ UUID parentSession = null;
+ if (!fullRepair)
+ parentSession = ActiveRepairService.instance.prepareForRepair(neighbours, ranges, columnFamilyStores);
+
+ List<RepairFuture> futures = new ArrayList<>(ranges.size());
for (Range<Token> range : ranges)
{
RepairFuture future;
try
{
- future = forceKeyspaceRepair(range, keyspace, isSequential, dataCenters, columnFamilies);
+ future = forceKeyspaceRepair(parentSession, range, keyspace, isSequential, neighbours, columnFamilies);
}
catch (IllegalArgumentException e)
{
@@ -2567,6 +2598,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
sendNotification("repair", message, new int[]{cmd, ActiveRepairService.Status.SESSION_FAILED.ordinal()});
}
}
+
for (RepairFuture future : futures)
{
try
@@ -2589,14 +2621,22 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
sendNotification("repair", message, new int[]{cmd, ActiveRepairService.Status.SESSION_FAILED.ordinal()});
}
}
+ if (!fullRepair)
+ ActiveRepairService.instance.finishParentSession(parentSession, neighbours);
sendNotification("repair", String.format("Repair command #%d finished", cmd), new int[]{cmd, ActiveRepairService.Status.FINISHED.ordinal()});
}
}, null);
}
- public RepairFuture forceKeyspaceRepair(final Range<Token> range, final String keyspaceName, boolean isSequential, Collection<String> dataCenters, final String... columnFamilies) throws IOException
+
+ public RepairFuture forceKeyspaceRepair(final UUID parentRepairSession,
+ final Range<Token> range,
+ final String keyspaceName,
+ boolean isSequential,
+ Set<InetAddress> endpoints,
+ String ... columnFamilies) throws IOException
{
- ArrayList<String> names = new ArrayList<String>();
+ ArrayList<String> names = new ArrayList<>();
for (ColumnFamilyStore cfStore : getValidColumnFamilies(false, false, keyspaceName, columnFamilies))
{
names.add(cfStore.name);
@@ -2608,7 +2648,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
return null;
}
- return ActiveRepairService.instance.submitRepairSession(range, keyspaceName, isSequential, dataCenters, names.toArray(new String[names.size()]));
+ return ActiveRepairService.instance.submitRepairSession(parentRepairSession, range, keyspaceName, isSequential, endpoints, names.toArray(new String[names.size()]));
}
public void forceTerminateAllRepairSessions() {
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7b72140/src/java/org/apache/cassandra/service/StorageServiceMBean.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageServiceMBean.java b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
index 5a1bb22..66afaa1 100644
--- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java
+++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
@@ -270,12 +270,12 @@ public interface StorageServiceMBean extends NotificationEmitter
*
* @return Repair command number, or 0 if nothing to repair
*/
- public int forceRepairAsync(String keyspace, boolean isSequential, Collection<String> dataCenters, boolean primaryRange, String... columnFamilies);
+ public int forceRepairAsync(String keyspace, boolean isSequential, Collection<String> dataCenters, boolean primaryRange, boolean repairedAt, String... columnFamilies) throws IOException;
/**
* Same as forceRepairAsync, but handles a specified range
*/
- public int forceRepairRangeAsync(String beginToken, String endToken, final String keyspaceName, boolean isSequential, Collection<String> dataCenters, final String... columnFamilies);
+ public int forceRepairRangeAsync(String beginToken, String endToken, String keyspaceName, boolean isSequential, Collection<String> dataCenters, boolean repairedAt, String... columnFamilies) throws IOException;
/**
@@ -286,14 +286,14 @@ public interface StorageServiceMBean extends NotificationEmitter
* userObject: int array of length 2, [0]=command number, [1]=ordinal of AntiEntropyService.Status
*
* @return Repair command number, or 0 if nothing to repair
- * @see #forceKeyspaceRepair(String, boolean, boolean, String...)
+ * @see #forceKeyspaceRepair(String, boolean, boolean, boolean, String...)
*/
- public int forceRepairAsync(String keyspace, boolean isSequential, boolean isLocal, boolean primaryRange, String... columnFamilies);
+ public int forceRepairAsync(String keyspace, boolean isSequential, boolean isLocal, boolean primaryRange, boolean fullRepair, String... columnFamilies);
/**
* Same as forceRepairAsync, but handles a specified range
*/
- public int forceRepairRangeAsync(String beginToken, String endToken, final String keyspaceName, boolean isSequential, boolean isLocal, final String... columnFamilies);
+ public int forceRepairRangeAsync(String beginToken, String endToken, String keyspaceName, boolean isSequential, boolean isLocal, boolean repairedAt, String... columnFamilies);
/**
* Triggers proactive repair for given column families, or all columnfamilies for the given keyspace
@@ -302,12 +302,12 @@ public interface StorageServiceMBean extends NotificationEmitter
* @param columnFamilies
* @throws IOException
*/
- public void forceKeyspaceRepair(String keyspaceName, boolean isSequential, boolean isLocal, String... columnFamilies) throws IOException;
+ public void forceKeyspaceRepair(String keyspaceName, boolean isSequential, boolean isLocal, boolean repairedAt, String... columnFamilies) throws IOException;
/**
* Triggers proactive repair but only for the node primary range.
*/
- public void forceKeyspaceRepairPrimaryRange(String keyspaceName, boolean isSequential, boolean isLocal, String... columnFamilies) throws IOException;
+ public void forceKeyspaceRepairPrimaryRange(String keyspaceName, boolean isSequential, boolean isLocal, boolean repairedAt, String... columnFamilies) throws IOException;
/**
* Perform repair of a specific range.
@@ -315,7 +315,7 @@ public interface StorageServiceMBean extends NotificationEmitter
* This allows incremental repair to be performed by having an external controller submitting repair jobs.
* Note that the provided range much be a subset of one of the node local range.
*/
- public void forceKeyspaceRepairRange(String beginToken, String endToken, String keyspaceName, boolean isSequential, boolean isLocal, String... columnFamilies) throws IOException;
+ public void forceKeyspaceRepairRange(String beginToken, String endToken, String keyspaceName, boolean isSequential, boolean isLocal, boolean repairedAt, String... columnFamilies) throws IOException;
public void forceTerminateAllRepairSessions();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7b72140/src/java/org/apache/cassandra/streaming/StreamPlan.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamPlan.java b/src/java/org/apache/cassandra/streaming/StreamPlan.java
index 288929c..ff78e84 100644
--- a/src/java/org/apache/cassandra/streaming/StreamPlan.java
+++ b/src/java/org/apache/cassandra/streaming/StreamPlan.java
@@ -22,6 +22,7 @@ import java.util.*;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.service.ActiveRepairService;
import org.apache.cassandra.utils.UUIDGen;
/**
@@ -36,6 +37,7 @@ public class StreamPlan
// sessions per InetAddress of the other end.
private final Map<InetAddress, StreamSession> sessions = new HashMap<>();
+ private final long repairedAt;
private boolean flushBeforeTransfer = true;
@@ -46,9 +48,16 @@ public class StreamPlan
*/
public StreamPlan(String description)
{
+ this(description, ActiveRepairService.UNREPAIRED_SSTABLE);
+ }
+
+ public StreamPlan(String description, long repairedAt)
+ {
this.description = description;
+ this.repairedAt = repairedAt;
}
+
/**
* Request data in {@code keyspace} and {@code ranges} from specific node.
*
@@ -74,7 +83,7 @@ public class StreamPlan
public StreamPlan requestRanges(InetAddress from, String keyspace, Collection<Range<Token>> ranges, String... columnFamilies)
{
StreamSession session = getOrCreateSession(from);
- session.addStreamRequest(keyspace, ranges, Arrays.asList(columnFamilies));
+ session.addStreamRequest(keyspace, ranges, Arrays.asList(columnFamilies), repairedAt);
return this;
}
@@ -103,7 +112,7 @@ public class StreamPlan
public StreamPlan transferRanges(InetAddress to, String keyspace, Collection<Range<Token>> ranges, String... columnFamilies)
{
StreamSession session = getOrCreateSession(to);
- session.addTransferRanges(keyspace, ranges, Arrays.asList(columnFamilies), flushBeforeTransfer);
+ session.addTransferRanges(keyspace, ranges, Arrays.asList(columnFamilies), flushBeforeTransfer, repairedAt);
return this;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7b72140/src/java/org/apache/cassandra/streaming/StreamReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamReader.java b/src/java/org/apache/cassandra/streaming/StreamReader.java
index 72c239c..d805bf3 100644
--- a/src/java/org/apache/cassandra/streaming/StreamReader.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReader.java
@@ -27,6 +27,9 @@ import java.util.Collection;
import java.util.UUID;
import com.google.common.base.Throwables;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import com.ning.compress.lzf.LZFInputStream;
import org.apache.cassandra.config.Schema;
@@ -37,6 +40,7 @@ import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.io.sstable.Component;
import org.apache.cassandra.io.sstable.Descriptor;
import org.apache.cassandra.io.sstable.SSTableWriter;
+import org.apache.cassandra.service.ActiveRepairService;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.streaming.messages.FileMessageHeader;
import org.apache.cassandra.utils.ByteBufferUtil;
@@ -48,11 +52,13 @@ import org.apache.cassandra.utils.Pair;
*/
public class StreamReader
{
+ private static final Logger logger = LoggerFactory.getLogger(StreamReader.class);
protected final UUID cfId;
protected final long estimatedKeys;
protected final Collection<Pair<Long, Long>> sections;
protected final StreamSession session;
protected final Descriptor.Version inputVersion;
+ protected final long repairedAt;
protected Descriptor desc;
@@ -63,6 +69,7 @@ public class StreamReader
this.estimatedKeys = header.estimatedKeys;
this.sections = header.sections;
this.inputVersion = new Descriptor.Version(header.version);
+ this.repairedAt = header.repairedAt;
}
/**
@@ -72,12 +79,13 @@ public class StreamReader
*/
public SSTableWriter read(ReadableByteChannel channel) throws IOException
{
+ logger.info("reading file from {}, repairedAt = {}", session.peer, repairedAt);
long totalSize = totalSize();
Pair<String, String> kscf = Schema.instance.getCF(cfId);
ColumnFamilyStore cfs = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right);
- SSTableWriter writer = createWriter(cfs, totalSize);
+ SSTableWriter writer = createWriter(cfs, totalSize, repairedAt);
DataInputStream dis = new DataInputStream(new LZFInputStream(Channels.newInputStream(channel)));
BytesReadTracker in = new BytesReadTracker(dis);
try
@@ -101,14 +109,14 @@ public class StreamReader
}
}
- protected SSTableWriter createWriter(ColumnFamilyStore cfs, long totalSize) throws IOException
+ protected SSTableWriter createWriter(ColumnFamilyStore cfs, long totalSize, long repairedAt) throws IOException
{
Directories.DataDirectory localDir = cfs.directories.getWriteableLocation();
if (localDir == null)
throw new IOException("Insufficient disk space to store " + totalSize + " bytes");
desc = Descriptor.fromFilename(cfs.getTempSSTablePath(cfs.directories.getLocationForDisk(localDir)));
- return new SSTableWriter(desc.filenameFor(Component.DATA), estimatedKeys);
+ return new SSTableWriter(desc.filenameFor(Component.DATA), estimatedKeys, repairedAt);
}
protected void drain(InputStream dis, long bytesRead) throws IOException
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7b72140/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
index 9a2568d..b4d5392 100644
--- a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
@@ -27,6 +27,7 @@ import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.io.sstable.SSTableReader;
import org.apache.cassandra.io.sstable.SSTableWriter;
+import org.apache.cassandra.service.ActiveRepairService;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.Pair;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7b72140/src/java/org/apache/cassandra/streaming/StreamRequest.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamRequest.java b/src/java/org/apache/cassandra/streaming/StreamRequest.java
index 9d3fdb2..e8a3fcb 100644
--- a/src/java/org/apache/cassandra/streaming/StreamRequest.java
+++ b/src/java/org/apache/cassandra/streaming/StreamRequest.java
@@ -29,6 +29,7 @@ import org.apache.cassandra.db.TypeSizes;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.service.ActiveRepairService;
public class StreamRequest
{
@@ -37,12 +38,13 @@ public class StreamRequest
public final String keyspace;
public final Collection<Range<Token>> ranges;
public final Collection<String> columnFamilies = new HashSet<>();
-
- public StreamRequest(String keyspace, Collection<Range<Token>> ranges, Collection<String> columnFamilies)
+ public final long repairedAt;
+ public StreamRequest(String keyspace, Collection<Range<Token>> ranges, Collection<String> columnFamilies, long repairedAt)
{
this.keyspace = keyspace;
this.ranges = ranges;
this.columnFamilies.addAll(columnFamilies);
+ this.repairedAt = repairedAt;
}
public static class StreamRequestSerializer implements IVersionedSerializer<StreamRequest>
@@ -50,6 +52,7 @@ public class StreamRequest
public void serialize(StreamRequest request, DataOutput out, int version) throws IOException
{
out.writeUTF(request.keyspace);
+ out.writeLong(request.repairedAt);
out.writeInt(request.ranges.size());
for (Range<Token> range : request.ranges)
{
@@ -64,6 +67,7 @@ public class StreamRequest
public StreamRequest deserialize(DataInput in, int version) throws IOException
{
String keyspace = in.readUTF();
+ long repairedAt = in.readLong();
int rangeCount = in.readInt();
List<Range<Token>> ranges = new ArrayList<>(rangeCount);
for (int i = 0; i < rangeCount; i++)
@@ -76,12 +80,13 @@ public class StreamRequest
List<String> columnFamilies = new ArrayList<>(cfCount);
for (int i = 0; i < cfCount; i++)
columnFamilies.add(in.readUTF());
- return new StreamRequest(keyspace, ranges, columnFamilies);
+ return new StreamRequest(keyspace, ranges, columnFamilies, repairedAt);
}
public long serializedSize(StreamRequest request, int version)
{
int size = TypeSizes.NATIVE.sizeof(request.keyspace);
+ size += TypeSizes.NATIVE.sizeof(request.repairedAt);
size += TypeSizes.NATIVE.sizeof(request.ranges.size());
for (Range<Token> range : request.ranges)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7b72140/src/java/org/apache/cassandra/streaming/StreamSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamSession.java b/src/java/org/apache/cassandra/streaming/StreamSession.java
index e65f2db..f766bb6 100644
--- a/src/java/org/apache/cassandra/streaming/StreamSession.java
+++ b/src/java/org/apache/cassandra/streaming/StreamSession.java
@@ -39,6 +39,7 @@ import org.apache.cassandra.io.sstable.Component;
import org.apache.cassandra.io.sstable.Descriptor;
import org.apache.cassandra.io.sstable.SSTableReader;
import org.apache.cassandra.metrics.StreamingMetrics;
+import org.apache.cassandra.service.ActiveRepairService;
import org.apache.cassandra.streaming.messages.*;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.Pair;
@@ -215,19 +216,34 @@ public class StreamSession implements IEndpointStateChangeSubscriber, IFailureDe
* @param ranges Ranges to retrieve data
* @param columnFamilies ColumnFamily names. Can be empty if requesting all CF under the keyspace.
*/
- public void addStreamRequest(String keyspace, Collection<Range<Token>> ranges, Collection<String> columnFamilies)
+ public void addStreamRequest(String keyspace, Collection<Range<Token>> ranges, Collection<String> columnFamilies, long repairedAt)
{
- requests.add(new StreamRequest(keyspace, ranges, columnFamilies));
+ requests.add(new StreamRequest(keyspace, ranges, columnFamilies, repairedAt));
}
/**
* Set up transfer for specific keyspace/ranges/CFs
*
+ * Used in repair - a streamed sstable in repair will be marked with the given repairedAt time
+ *
* @param keyspace Transfer keyspace
* @param ranges Transfer ranges
* @param columnFamilies Transfer ColumnFamilies
+ * @param flushTables flush tables?
+ * @param repairedAt the time the repair started.
*/
- public void addTransferRanges(String keyspace, Collection<Range<Token>> ranges, Collection<String> columnFamilies, boolean flushTables)
+ public void addTransferRanges(String keyspace, Collection<Range<Token>> ranges, Collection<String> columnFamilies, boolean flushTables, long repairedAt)
+ {
+ Collection<ColumnFamilyStore> stores = getColumnFamilyStores(keyspace, columnFamilies);
+ if (flushTables)
+ flushSSTables(stores);
+
+ List<Range<Token>> normalizedRanges = Range.normalize(ranges);
+ List<SSTableReader> sstables = getSSTablesForRanges(normalizedRanges, stores);
+ addTransferFiles(normalizedRanges, sstables, repairedAt);
+ }
+
+ private Collection<ColumnFamilyStore> getColumnFamilyStores(String keyspace, Collection<String> columnFamilies)
{
Collection<ColumnFamilyStore> stores = new HashSet<>();
// if columnfamilies are not specified, we add all cf under the keyspace
@@ -240,11 +256,11 @@ public class StreamSession implements IEndpointStateChangeSubscriber, IFailureDe
for (String cf : columnFamilies)
stores.add(Keyspace.open(keyspace).getColumnFamilyStore(cf));
}
+ return stores;
+ }
- if (flushTables)
- flushSSTables(stores);
-
- List<Range<Token>> normalizedRanges = Range.normalize(ranges);
+ private List<SSTableReader> getSSTablesForRanges(Collection<Range<Token>> normalizedRanges, Collection<ColumnFamilyStore> stores)
+ {
List<SSTableReader> sstables = Lists.newLinkedList();
for (ColumnFamilyStore cfStore : stores)
{
@@ -254,7 +270,7 @@ public class StreamSession implements IEndpointStateChangeSubscriber, IFailureDe
ColumnFamilyStore.ViewFragment view = cfStore.markReferenced(rowBoundsList);
sstables.addAll(view.sstables);
}
- addTransferFiles(normalizedRanges, sstables);
+ return sstables;
}
/**
@@ -263,12 +279,21 @@ public class StreamSession implements IEndpointStateChangeSubscriber, IFailureDe
*
* @param ranges Transfer ranges
* @param sstables Transfer files
+ * @param overriddenRepairedAt use this repairedAt time, for use in repair.
*/
- public void addTransferFiles(Collection<Range<Token>> ranges, Collection<SSTableReader> sstables)
+ public void addTransferFiles(Collection<Range<Token>> ranges, Collection<SSTableReader> sstables, long overriddenRepairedAt)
{
List<SSTableStreamingSections> sstableDetails = new ArrayList<>(sstables.size());
for (SSTableReader sstable : sstables)
- sstableDetails.add(new SSTableStreamingSections(sstable, sstable.getPositionsForRanges(ranges), sstable.estimatedKeysForRanges(ranges)));
+ {
+ long repairedAt = overriddenRepairedAt;
+ if (overriddenRepairedAt == ActiveRepairService.UNREPAIRED_SSTABLE)
+ repairedAt = sstable.getSSTableMetadata().repairedAt;
+ sstableDetails.add(new SSTableStreamingSections(sstable,
+ sstable.getPositionsForRanges(ranges),
+ sstable.estimatedKeysForRanges(ranges),
+ repairedAt));
+ }
addTransferFiles(sstableDetails);
}
@@ -291,7 +316,7 @@ public class StreamSession implements IEndpointStateChangeSubscriber, IFailureDe
task = new StreamTransferTask(this, cfId);
transfers.put(cfId, task);
}
- task.addTransferFile(details.sstable, details.estimatedKeys, details.sections);
+ task.addTransferFile(details.sstable, details.estimatedKeys, details.sections, details.repairedAt);
}
}
@@ -300,12 +325,14 @@ public class StreamSession implements IEndpointStateChangeSubscriber, IFailureDe
public final SSTableReader sstable;
public final List<Pair<Long, Long>> sections;
public final long estimatedKeys;
+ public final long repairedAt;
- public SSTableStreamingSections(SSTableReader sstable, List<Pair<Long, Long>> sections, long estimatedKeys)
+ public SSTableStreamingSections(SSTableReader sstable, List<Pair<Long, Long>> sections, long estimatedKeys, long repairedAt)
{
this.sstable = sstable;
this.sections = sections;
this.estimatedKeys = estimatedKeys;
+ this.repairedAt = repairedAt;
}
}
@@ -407,7 +434,7 @@ public class StreamSession implements IEndpointStateChangeSubscriber, IFailureDe
startStreamingFiles();
}
- /**
+ /**l
* Call back for handling exception during streaming.
*
* @param e thrown exception
@@ -430,7 +457,7 @@ public class StreamSession implements IEndpointStateChangeSubscriber, IFailureDe
// prepare tasks
state(State.PREPARING);
for (StreamRequest request : requests)
- addTransferRanges(request.keyspace, request.ranges, request.columnFamilies, true); // always flush on stream request
+ addTransferRanges(request.keyspace, request.ranges, request.columnFamilies, true, request.repairedAt); // always flush on stream request
for (StreamSummary summary : summaries)
prepareReceiving(summary);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7b72140/src/java/org/apache/cassandra/streaming/StreamTransferTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamTransferTask.java b/src/java/org/apache/cassandra/streaming/StreamTransferTask.java
index 8e461cc..13171f4 100644
--- a/src/java/org/apache/cassandra/streaming/StreamTransferTask.java
+++ b/src/java/org/apache/cassandra/streaming/StreamTransferTask.java
@@ -40,10 +40,10 @@ public class StreamTransferTask extends StreamTask
super(session, cfId);
}
- public void addTransferFile(SSTableReader sstable, long estimatedKeys, List<Pair<Long, Long>> sections)
+ public void addTransferFile(SSTableReader sstable, long estimatedKeys, List<Pair<Long, Long>> sections, long repairedAt)
{
assert sstable != null && cfId.equals(sstable.metadata.cfId);
- OutgoingFileMessage message = new OutgoingFileMessage(sstable, sequenceNumber.getAndIncrement(), estimatedKeys, sections);
+ OutgoingFileMessage message = new OutgoingFileMessage(sstable, sequenceNumber.getAndIncrement(), estimatedKeys, sections, repairedAt);
files.put(message.header.sequenceNumber, message);
totalSize += message.header.size();
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7b72140/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
index d294e4a..3c13d11 100644
--- a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
+++ b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
@@ -24,11 +24,15 @@ import java.nio.channels.ReadableByteChannel;
import com.google.common.base.Throwables;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import org.apache.cassandra.config.Schema;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.io.compress.CompressionMetadata;
import org.apache.cassandra.io.sstable.SSTableWriter;
+import org.apache.cassandra.service.ActiveRepairService;
import org.apache.cassandra.streaming.ProgressInfo;
import org.apache.cassandra.streaming.StreamReader;
import org.apache.cassandra.streaming.StreamSession;
@@ -41,6 +45,8 @@ import org.apache.cassandra.utils.Pair;
*/
public class CompressedStreamReader extends StreamReader
{
+ private static final Logger logger = LoggerFactory.getLogger(StreamReader.class);
+
protected final CompressionInfo compressionInfo;
public CompressedStreamReader(FileMessageHeader header, StreamSession session)
@@ -56,12 +62,13 @@ public class CompressedStreamReader extends StreamReader
@Override
public SSTableWriter read(ReadableByteChannel channel) throws IOException
{
+ logger.info("reading file from {}, repairedAt = {}", session.peer, repairedAt);
long totalSize = totalSize();
Pair<String, String> kscf = Schema.instance.getCF(cfId);
ColumnFamilyStore cfs = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right);
- SSTableWriter writer = createWriter(cfs, totalSize);
+ SSTableWriter writer = createWriter(cfs, totalSize, repairedAt);
CompressedInputStream cis = new CompressedInputStream(Channels.newInputStream(channel), compressionInfo, inputVersion.hasPostCompressionAdlerChecksums);
BytesReadTracker in = new BytesReadTracker(new DataInputStream(cis));
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7b72140/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java b/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java
index 24f1e04..3e86027 100644
--- a/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java
+++ b/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java
@@ -46,13 +46,15 @@ public class FileMessageHeader
public final long estimatedKeys;
public final List<Pair<Long, Long>> sections;
public final CompressionInfo compressionInfo;
+ public final long repairedAt;
public FileMessageHeader(UUID cfId,
int sequenceNumber,
String version,
long estimatedKeys,
List<Pair<Long, Long>> sections,
- CompressionInfo compressionInfo)
+ CompressionInfo compressionInfo,
+ long repairedAt)
{
this.cfId = cfId;
this.sequenceNumber = sequenceNumber;
@@ -60,6 +62,7 @@ public class FileMessageHeader
this.estimatedKeys = estimatedKeys;
this.sections = sections;
this.compressionInfo = compressionInfo;
+ this.repairedAt = repairedAt;
}
/**
@@ -92,6 +95,7 @@ public class FileMessageHeader
sb.append(", estimated keys: ").append(estimatedKeys);
sb.append(", transfer size: ").append(size());
sb.append(", compressed?: ").append(compressionInfo != null);
+ sb.append(", repairedAt: ").append(repairedAt);
sb.append(')');
return sb.toString();
}
@@ -129,6 +133,7 @@ public class FileMessageHeader
out.writeLong(section.right);
}
CompressionInfo.serializer.serialize(header.compressionInfo, out, version);
+ out.writeLong(header.repairedAt);
}
public FileMessageHeader deserialize(DataInput in, int version) throws IOException
@@ -142,7 +147,8 @@ public class FileMessageHeader
for (int k = 0; k < count; k++)
sections.add(Pair.create(in.readLong(), in.readLong()));
CompressionInfo compressionInfo = CompressionInfo.serializer.deserialize(in, MessagingService.current_version);
- return new FileMessageHeader(cfId, sequenceNumber, sstableVersion, estimatedKeys, sections, compressionInfo);
+ long repairedAt = in.readLong();
+ return new FileMessageHeader(cfId, sequenceNumber, sstableVersion, estimatedKeys, sections, compressionInfo, repairedAt);
}
public long serializedSize(FileMessageHeader header, int version)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7b72140/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java b/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java
index 1fa115f..82f6c01 100644
--- a/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java
+++ b/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java
@@ -64,7 +64,7 @@ public class OutgoingFileMessage extends StreamMessage
public FileMessageHeader header;
public SSTableReader sstable;
- public OutgoingFileMessage(SSTableReader sstable, int sequenceNumber, long estimatedKeys, List<Pair<Long, Long>> sections)
+ public OutgoingFileMessage(SSTableReader sstable, int sequenceNumber, long estimatedKeys, List<Pair<Long, Long>> sections, long repairedAt)
{
super(Type.FILE);
this.sstable = sstable;
@@ -76,11 +76,12 @@ public class OutgoingFileMessage extends StreamMessage
compressionInfo = new CompressionInfo(meta.getChunksForSections(sections), meta.parameters);
}
this.header = new FileMessageHeader(sstable.metadata.cfId,
- sequenceNumber,
- sstable.descriptor.version.toString(),
- estimatedKeys,
- sections,
- compressionInfo);
+ sequenceNumber,
+ sstable.descriptor.version.toString(),
+ estimatedKeys,
+ sections,
+ compressionInfo,
+ repairedAt);
}
@Override
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7b72140/src/java/org/apache/cassandra/tools/NodeProbe.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/NodeProbe.java b/src/java/org/apache/cassandra/tools/NodeProbe.java
index a342866..2fcdd63 100644
--- a/src/java/org/apache/cassandra/tools/NodeProbe.java
+++ b/src/java/org/apache/cassandra/tools/NodeProbe.java
@@ -211,19 +211,19 @@ public class NodeProbe implements AutoCloseable
ssProxy.forceKeyspaceFlush(keyspaceName, columnFamilies);
}
- public void forceKeyspaceRepair(String keyspaceName, boolean isSequential, boolean isLocal, String... columnFamilies) throws IOException
+ public void forceKeyspaceRepair(String keyspaceName, boolean isSequential, boolean isLocal, boolean fullRepair, String... columnFamilies) throws IOException
{
- ssProxy.forceKeyspaceRepair(keyspaceName, isSequential, isLocal, columnFamilies);
+ ssProxy.forceKeyspaceRepair(keyspaceName, isSequential, isLocal, fullRepair, columnFamilies);
}
- public void forceRepairAsync(final PrintStream out, final String keyspaceName, boolean isSequential, Collection<String> dataCenters, boolean primaryRange, String... columnFamilies) throws IOException
+ public void forceRepairAsync(final PrintStream out, final String keyspaceName, boolean isSequential, Collection<String> dataCenters, boolean primaryRange, boolean fullRepair, String... columnFamilies) throws IOException
{
RepairRunner runner = new RepairRunner(out, keyspaceName, columnFamilies);
try
{
jmxc.addConnectionNotificationListener(runner, null, null);
ssProxy.addNotificationListener(runner, null, null);
- if (!runner.repairAndWait(ssProxy, isSequential, dataCenters, primaryRange))
+ if (!runner.repairAndWait(ssProxy, isSequential, dataCenters, primaryRange, fullRepair))
failed = true;
}
catch (Exception e)
@@ -244,14 +244,14 @@ public class NodeProbe implements AutoCloseable
}
}
- public void forceRepairRangeAsync(final PrintStream out, final String keyspaceName, boolean isSequential, Collection<String> dataCenters, final String startToken, final String endToken, String... columnFamilies) throws IOException
+ public void forceRepairRangeAsync(final PrintStream out, final String keyspaceName, boolean isSequential, Collection<String> dataCenters, final String startToken, final String endToken, boolean fullRepair, String... columnFamilies) throws IOException
{
RepairRunner runner = new RepairRunner(out, keyspaceName, columnFamilies);
try
{
jmxc.addConnectionNotificationListener(runner, null, null);
ssProxy.addNotificationListener(runner, null, null);
- if (!runner.repairRangeAndWait(ssProxy, isSequential, dataCenters, startToken, endToken))
+ if (!runner.repairRangeAndWait(ssProxy, isSequential, dataCenters, startToken, endToken, fullRepair))
failed = true;
}
catch (Exception e)
@@ -272,14 +272,14 @@ public class NodeProbe implements AutoCloseable
}
}
- public void forceKeyspaceRepairPrimaryRange(String keyspaceName, boolean isSequential, boolean isLocal, String... columnFamilies) throws IOException
+ public void forceKeyspaceRepairPrimaryRange(String keyspaceName, boolean isSequential, boolean isLocal, boolean fullRepair, String... columnFamilies) throws IOException
{
- ssProxy.forceKeyspaceRepairPrimaryRange(keyspaceName, isSequential, isLocal, columnFamilies);
+ ssProxy.forceKeyspaceRepairPrimaryRange(keyspaceName, isSequential, isLocal, fullRepair, columnFamilies);
}
- public void forceKeyspaceRepairRange(String beginToken, String endToken, String keyspaceName, boolean isSequential, boolean isLocal, String... columnFamilies) throws IOException
+ public void forceKeyspaceRepairRange(String beginToken, String endToken, String keyspaceName, boolean isSequential, boolean isLocal, boolean fullRepair, String... columnFamilies) throws IOException
{
- ssProxy.forceKeyspaceRepairRange(beginToken, endToken, keyspaceName, isSequential, isLocal, columnFamilies);
+ ssProxy.forceKeyspaceRepairRange(beginToken, endToken, keyspaceName, isSequential, isLocal, fullRepair, columnFamilies);
}
public void invalidateCounterCache()
@@ -1237,16 +1237,16 @@ class RepairRunner implements NotificationListener
this.columnFamilies = columnFamilies;
}
- public boolean repairAndWait(StorageServiceMBean ssProxy, boolean isSequential, Collection<String> dataCenters, boolean primaryRangeOnly) throws Exception
+ public boolean repairAndWait(StorageServiceMBean ssProxy, boolean isSequential, Collection<String> dataCenters, boolean primaryRangeOnly, boolean fullRepair) throws Exception
{
- cmd = ssProxy.forceRepairAsync(keyspace, isSequential, dataCenters, primaryRangeOnly, columnFamilies);
+ cmd = ssProxy.forceRepairAsync(keyspace, isSequential, dataCenters, primaryRangeOnly, fullRepair, columnFamilies);
waitForRepair();
return success;
}
- public boolean repairRangeAndWait(StorageServiceMBean ssProxy, boolean isSequential, Collection<String> dataCenters, String startToken, String endToken) throws Exception
+ public boolean repairRangeAndWait(StorageServiceMBean ssProxy, boolean isSequential, Collection<String> dataCenters, String startToken, String endToken, boolean fullRepair) throws Exception
{
- cmd = ssProxy.forceRepairRangeAsync(startToken, endToken, keyspace, isSequential, dataCenters, columnFamilies);
+ cmd = ssProxy.forceRepairRangeAsync(startToken, endToken, keyspace, isSequential, dataCenters, fullRepair, columnFamilies);
waitForRepair();
return success;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7b72140/src/java/org/apache/cassandra/tools/NodeTool.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/NodeTool.java b/src/java/org/apache/cassandra/tools/NodeTool.java
index 10e581c..94bce74 100644
--- a/src/java/org/apache/cassandra/tools/NodeTool.java
+++ b/src/java/org/apache/cassandra/tools/NodeTool.java
@@ -1556,6 +1556,9 @@ public class NodeTool
@Option(title = "primary_range", name = {"-pr", "--partitioner-range"}, description = "Use -pr to repair only the first range returned by the partitioner")
private boolean primaryRange = false;
+ @Option(title = "incremental_repair", name = {"-inc", "--incremental"}, description = "Use -inc to use the new incremental repair")
+ private boolean incrementalRepair = false;
+
@Override
public void execute(NodeProbe probe)
{
@@ -1571,11 +1574,10 @@ public class NodeTool
dataCenters = newArrayList(specificDataCenters);
else if (localDC)
dataCenters = newArrayList(probe.getDataCenter());
-
if (!startToken.isEmpty() || !endToken.isEmpty())
- probe.forceRepairRangeAsync(System.out, keyspace, !parallel, dataCenters, startToken, endToken);
+ probe.forceRepairRangeAsync(System.out, keyspace, !parallel, dataCenters, startToken, endToken, !incrementalRepair);
else
- probe.forceRepairAsync(System.out, keyspace, !parallel, dataCenters, primaryRange, cfnames);
+ probe.forceRepairAsync(System.out, keyspace, !parallel, dataCenters, primaryRange, !incrementalRepair, cfnames);
} catch (Exception e)
{
throw new RuntimeException("Error occurred during repair", e);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7b72140/src/java/org/apache/cassandra/tools/SSTableImport.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/SSTableImport.java b/src/java/org/apache/cassandra/tools/SSTableImport.java
index 71b687b..2cb284e 100644
--- a/src/java/org/apache/cassandra/tools/SSTableImport.java
+++ b/src/java/org/apache/cassandra/tools/SSTableImport.java
@@ -46,6 +46,7 @@ import org.apache.cassandra.db.marshal.BytesType;
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.io.sstable.SSTableWriter;
import org.apache.cassandra.serializers.MarshalException;
+import org.apache.cassandra.service.ActiveRepairService;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.codehaus.jackson.JsonFactory;
import org.codehaus.jackson.JsonParser;
@@ -366,7 +367,7 @@ public class SSTableImport
Object[] data = parser.readValueAs(new TypeReference<Object[]>(){});
keyCountToImport = (keyCountToImport == null) ? data.length : keyCountToImport;
- SSTableWriter writer = new SSTableWriter(ssTablePath, keyCountToImport);
+ SSTableWriter writer = new SSTableWriter(ssTablePath, keyCountToImport, ActiveRepairService.UNREPAIRED_SSTABLE);
System.out.printf("Importing %s keys...%n", keyCountToImport);
@@ -442,7 +443,7 @@ public class SSTableImport
System.out.printf("Importing %s keys...%n", keyCountToImport);
parser = getParser(jsonFile); // renewing parser
- SSTableWriter writer = new SSTableWriter(ssTablePath, keyCountToImport);
+ SSTableWriter writer = new SSTableWriter(ssTablePath, keyCountToImport, ActiveRepairService.UNREPAIRED_SSTABLE);
int lineNumber = 1;
DecoratedKey prevStoredKey = null;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7b72140/src/java/org/apache/cassandra/tools/SSTableMetadataViewer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/SSTableMetadataViewer.java b/src/java/org/apache/cassandra/tools/SSTableMetadataViewer.java
index 0ab94c4..374ef79 100644
--- a/src/java/org/apache/cassandra/tools/SSTableMetadataViewer.java
+++ b/src/java/org/apache/cassandra/tools/SSTableMetadataViewer.java
@@ -66,6 +66,7 @@ public class SSTableMetadataViewer
out.printf("Compression ratio: %s%n", stats.compressionRatio);
out.printf("Estimated droppable tombstones: %s%n", stats.getEstimatedDroppableTombstoneRatio((int) (System.currentTimeMillis() / 1000)));
out.printf("SSTable Level: %d%n", stats.sstableLevel);
+ out.printf("Repaired at: %d%n", stats.repairedAt);
out.println(stats.replayPosition);
out.println("Estimated tombstone drop times:%n");
for (Map.Entry<Double, Long> entry : stats.estimatedTombstoneDropTime.getAsMap().entrySet())
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7b72140/test/long/org/apache/cassandra/db/compaction/LongLeveledCompactionStrategyTest.java
----------------------------------------------------------------------
diff --git a/test/long/org/apache/cassandra/db/compaction/LongLeveledCompactionStrategyTest.java b/test/long/org/apache/cassandra/db/compaction/LongLeveledCompactionStrategyTest.java
index 54fc22f..90e7123 100644
--- a/test/long/org/apache/cassandra/db/compaction/LongLeveledCompactionStrategyTest.java
+++ b/test/long/org/apache/cassandra/db/compaction/LongLeveledCompactionStrategyTest.java
@@ -72,7 +72,7 @@ public class LongLeveledCompactionStrategyTest extends SchemaLoader
{
while (true)
{
- final AbstractCompactionTask t = lcs.getMaximalTask(Integer.MIN_VALUE);
+ final AbstractCompactionTask t = lcs.getMaximalTask(Integer.MIN_VALUE).iterator().next();
if (t == null)
break;
tasks.add(new Runnable()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7b72140/test/unit/org/apache/cassandra/AbstractSerializationsTester.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/AbstractSerializationsTester.java b/test/unit/org/apache/cassandra/AbstractSerializationsTester.java
index 9bba196..311d21b 100644
--- a/test/unit/org/apache/cassandra/AbstractSerializationsTester.java
+++ b/test/unit/org/apache/cassandra/AbstractSerializationsTester.java
@@ -36,18 +36,19 @@ import java.util.Map;
public class AbstractSerializationsTester extends SchemaLoader
{
- protected static final String CUR_VER = System.getProperty("cassandra.version", "2.0");
+ protected static final String CUR_VER = System.getProperty("cassandra.version", "2.1");
protected static final Map<String, Integer> VERSION_MAP = new HashMap<String, Integer> ()
{{
put("0.7", 1);
put("1.0", 3);
put("1.2", MessagingService.VERSION_12);
put("2.0", MessagingService.VERSION_20);
+ put("2.1", MessagingService.VERSION_21);
}};
protected static final boolean EXECUTE_WRITES = Boolean.getBoolean("cassandra.test-serialization-writes");
- protected final int getVersion()
+ protected static int getVersion()
{
return VERSION_MAP.get(CUR_VER);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7b72140/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java b/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
index 92ca14e..7bc0256 100644
--- a/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
+++ b/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
@@ -50,6 +50,7 @@ import org.apache.cassandra.db.marshal.LongType;
import org.apache.cassandra.dht.*;
import org.apache.cassandra.io.sstable.*;
import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
+import org.apache.cassandra.service.ActiveRepairService;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.thrift.*;
import org.apache.cassandra.utils.ByteBufferUtil;
@@ -1596,6 +1597,7 @@ public class ColumnFamilyStoreTest extends SchemaLoader
collector.addAncestor(sstable1.descriptor.generation); // add ancestor from previously written sstable
return new SSTableWriter(makeFilename(directory, metadata.ksName, metadata.cfName),
0,
+ ActiveRepairService.UNREPAIRED_SSTABLE,
metadata,
StorageService.getPartitioner(),
collector);
@@ -1652,6 +1654,7 @@ public class ColumnFamilyStoreTest extends SchemaLoader
String file = new Descriptor(directory, ks, cf, 3, true).filenameFor(Component.DATA);
return new SSTableWriter(file,
0,
+ ActiveRepairService.UNREPAIRED_SSTABLE,
metadata,
StorageService.getPartitioner(),
collector);