You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2012/02/24 16:49:23 UTC

[30/50] git commit: Staggering repair patch by Vijay and Sylvain Lebresne; reviewed by Sylvain Lebresne for CASSANDRA-3721

Staggering repair
patch by Vijay and Sylvain Lebresne; reviewed by Sylvain Lebresne for CASSANDRA-3721


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/1ae32c93
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/1ae32c93
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/1ae32c93

Branch: refs/heads/cassandra-1.1
Commit: 1ae32c93eca24a5bdab7332a56418a12b4e6586b
Parents: c49a149
Author: Vijay Parthasarathy <vi...@gmail.com>
Authored: Tue Feb 14 10:48:12 2012 -0800
Committer: Vijay Parthasarathy <vi...@gmail.com>
Committed: Tue Feb 14 10:49:22 2012 -0800

----------------------------------------------------------------------
 .../org/apache/cassandra/db/ColumnFamilyStore.java |    8 +
 src/java/org/apache/cassandra/db/Directories.java  |   15 +
 .../cassandra/db/compaction/CompactionManager.java |   37 ++-
 .../cassandra/service/AntiEntropyService.java      |  223 ++++++++++++--
 .../apache/cassandra/service/StorageService.java   |   13 +-
 .../cassandra/service/StorageServiceMBean.java     |    4 +-
 src/java/org/apache/cassandra/tools/NodeCmd.java   |    7 +-
 src/java/org/apache/cassandra/tools/NodeProbe.java |    8 +-
 8 files changed, 256 insertions(+), 59 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/1ae32c93/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index bf4a000..ab3b064 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -1423,6 +1423,14 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         }
     }
 
+    public List<SSTableReader> getSnapshotSSTableReader(String tag) throws IOException
+    {
+        List<SSTableReader> readers = new ArrayList<SSTableReader>();
+        for (Map.Entry<Descriptor, Set<Component>> entries : directories.sstableLister().snapshots(tag).list().entrySet())
+            readers.add(SSTableReader.open(entries.getKey(), entries.getValue(), metadata, partitioner));
+        return readers;
+    }
+
     /**
      * Take a snap shot of this columnfamily store.
      *

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1ae32c93/src/java/org/apache/cassandra/db/Directories.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Directories.java b/src/java/org/apache/cassandra/db/Directories.java
index 2afefd2..7c51830 100644
--- a/src/java/org/apache/cassandra/db/Directories.java
+++ b/src/java/org/apache/cassandra/db/Directories.java
@@ -185,6 +185,7 @@ public class Directories
         private int nbFiles;
         private final Map<Descriptor, Set<Component>> components = new HashMap<Descriptor, Set<Component>>();
         private boolean filtered;
+        private String snapshotName;
 
         public SSTableLister skipCompacted(boolean b)
         {
@@ -219,6 +220,14 @@ public class Directories
             return this;
         }
 
+        public SSTableLister snapshots(String sn)
+        {
+            if (filtered)
+                throw new IllegalStateException("list() has already been called");
+            snapshotName = sn;
+            return this;
+        }
+
         public Map<Descriptor, Set<Component>> list()
         {
             filter();
@@ -246,6 +255,12 @@ public class Directories
 
             for (File location : sstableDirectories)
             {
+                if (snapshotName != null)
+                {
+                    new File(location, join(SNAPSHOT_SUBDIR, snapshotName)).listFiles(getFilter());
+                    continue;
+                }
+
                 if (!onlyBackups)
                     location.listFiles(getFilter());
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1ae32c93/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index c02aed2..448c569 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -799,23 +799,33 @@ public class CompactionManager implements CompactionManagerMBean
         if (!cfs.isValid())
             return;
 
-        // flush first so everyone is validating data that is as similar as possible
-        try
-        {
-            StorageService.instance.forceTableFlush(cfs.table.name, cfs.getColumnFamilyName());
-        }
-        catch (ExecutionException e)
+        Collection<SSTableReader> sstables;
+        if (cfs.table.snapshotExists(validator.request.sessionid))
         {
-            throw new IOException(e);
+            // If there is a snapshot created for the session then read from there.
+            sstables = cfs.getSnapshotSSTableReader(validator.request.sessionid);
         }
-        catch (InterruptedException e)
+        else
         {
-            throw new AssertionError(e);
+            // flush first so everyone is validating data that is as similar as possible
+            try
+            {
+                StorageService.instance.forceTableFlush(cfs.table.name, cfs.getColumnFamilyName());
+            }
+            catch (ExecutionException e)
+            {
+                throw new IOException(e);
+            }
+            catch (InterruptedException e)
+            {
+                throw new AssertionError(e);
+            }
+
+            // we don't mark validating sstables as compacting in DataTracker, so we have to mark them referenced
+            // instead so they won't be cleaned up if they do get compacted during the validation
+            sstables = cfs.markCurrentSSTablesReferenced();
         }
 
-        // we don't mark validating sstables as compacting in DataTracker, so we have to mark them referenced
-        // instead so they won't be cleaned up if they do get compacted during the validation
-        Collection<SSTableReader> sstables = cfs.markCurrentSSTablesReferenced();
         CompactionIterable ci = new ValidationCompactionIterable(cfs, sstables, validator.request.range);
         CloseableIterator<AbstractCompactedRow> iter = ci.iterator();
         validationExecutor.beginCompaction(ci);
@@ -838,6 +848,9 @@ public class CompactionManager implements CompactionManagerMBean
         {
             SSTableReader.releaseReferences(sstables);
             iter.close();
+            if (cfs.table.snapshotExists(validator.request.sessionid))
+                cfs.table.clearSnapshot(validator.request.sessionid);
+
             validationExecutor.finishCompaction(ci);
         }
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1ae32c93/src/java/org/apache/cassandra/service/AntiEntropyService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/AntiEntropyService.java b/src/java/org/apache/cassandra/service/AntiEntropyService.java
index 812c23a..10ea80c 100644
--- a/src/java/org/apache/cassandra/service/AntiEntropyService.java
+++ b/src/java/org/apache/cassandra/service/AntiEntropyService.java
@@ -38,6 +38,7 @@ import org.apache.cassandra.db.compaction.AbstractCompactedRow;
 import org.apache.cassandra.db.compaction.CompactionManager;
 import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.Table;
+import org.apache.cassandra.db.SnapshotCommand;
 import org.apache.cassandra.dht.AbstractBounds;
 import org.apache.cassandra.dht.RandomPartitioner;
 import org.apache.cassandra.dht.Range;
@@ -47,6 +48,7 @@ import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.io.util.FastByteArrayInputStream;
 import org.apache.cassandra.io.util.FastByteArrayOutputStream;
 import org.apache.cassandra.net.CompactEndpointSerializationHelper;
+import org.apache.cassandra.net.IAsyncCallback;
 import org.apache.cassandra.net.IVerbHandler;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.net.MessagingService;
@@ -119,9 +121,9 @@ public class AntiEntropyService
     /**
      * Requests repairs for the given table and column families, and blocks until all repairs have been completed.
      */
-    public RepairFuture submitRepairSession(Range<Token> range, String tablename, String... cfnames)
+    public RepairFuture submitRepairSession(Range<Token> range, String tablename, boolean isSequential, String... cfnames)
     {
-        RepairFuture futureTask = new RepairSession(range, tablename, cfnames).getFuture();
+        RepairFuture futureTask = new RepairSession(range, tablename, isSequential, cfnames).getFuture();
         executor.execute(futureTask);
         return futureTask;
     }
@@ -209,16 +211,6 @@ public class AntiEntropyService
     }
 
     /**
-     * Requests a tree from the given node, and returns the request that was sent.
-     */
-    TreeRequest request(String sessionid, InetAddress remote, Range<Token> range, String ksname, String cfname)
-    {
-        TreeRequest request = new TreeRequest(sessionid, remote, range, new CFPair(ksname, cfname));
-        MessagingService.instance().sendOneWay(TreeRequestVerbHandler.makeVerb(request, Gossiper.instance.getVersion(remote)), remote);
-        return request;
-    }
-
-    /**
      * Responds to the node that requested the given valid tree.
      * @param validator A locally generated validator
      * @param local localhost (parameterized for testing)
@@ -598,6 +590,7 @@ public class AntiEntropyService
     static class RepairSession extends WrappedRunnable implements IEndpointStateChangeSubscriber, IFailureDetectionEventListener
     {
         private final String sessionName;
+        private final boolean isSequential;
         private final String tablename;
         private final String[] cfnames;
         private final Range<Token> range;
@@ -615,18 +608,19 @@ public class AntiEntropyService
 
         public RepairSession(TreeRequest req, String tablename, String... cfnames)
         {
-            this(req.sessionid, req.range, tablename, cfnames);
+            this(req.sessionid, req.range, tablename, false, cfnames);
             AntiEntropyService.instance.sessions.put(getName(), this);
         }
 
-        public RepairSession(Range<Token> range, String tablename, String... cfnames)
+        public RepairSession(Range<Token> range, String tablename, boolean isSequential, String... cfnames)
         {
-            this(UUIDGen.makeType1UUIDFromHost(FBUtilities.getBroadcastAddress()).toString(), range, tablename, cfnames);
+            this(UUIDGen.makeType1UUIDFromHost(FBUtilities.getBroadcastAddress()).toString(), range, tablename, isSequential, cfnames);
         }
 
-        private RepairSession(String id, Range<Token> range, String tablename, String[] cfnames)
+        private RepairSession(String id, Range<Token> range, String tablename, boolean isSequential, String[] cfnames)
         {
             this.sessionName = id;
+            this.isSequential = isSequential;
             this.tablename = tablename;
             this.cfnames = cfnames;
             assert cfnames.length > 0 : "Repairing no column families seems pointless, doesn't it";
@@ -674,6 +668,12 @@ public class AntiEntropyService
                     logger.info(String.format("[repair #%s] Cannot proceed on repair because a neighbor (%s) is dead: session failed", getName(), endpoint));
                     return;
                 }
+
+                if (Gossiper.instance.getVersion(endpoint) < MessagingService.VERSION_11 && isSequential)
+                {
+                    logger.info(String.format("[repair #%s] Cannot repair using snapshots as node %s is pre-1.1", getName(), endpoint));
+                    return;
+                }
             }
 
             AntiEntropyService.instance.sessions.put(getName(), this);
@@ -729,6 +729,8 @@ public class AntiEntropyService
         public void terminate()
         {
             terminated = true;
+            for (RepairJob job : jobs)
+                job.terminate();
             jobs.clear();
             activeJobs.clear();
         }
@@ -810,17 +812,32 @@ public class AntiEntropyService
         {
             private final String cfname;
             // first we send tree requests.  this tracks the endpoints remaining to hear from
-            private final Set<InetAddress> remainingEndpoints = new HashSet<InetAddress>();
+            private final RequestCoordinator<TreeRequest> treeRequests;
             // tree responses are then tracked here
             private final List<TreeResponse> trees = new ArrayList<TreeResponse>(endpoints.size() + 1);
             // once all responses are received, each tree is compared with each other, and differencer tasks
             // are submitted.  the job is done when all differencers are complete.
-            private final Set<Differencer> remainingDifferencers = new HashSet<Differencer>();
+            private final RequestCoordinator<Differencer> differencers;
             private final Condition requestsSent = new SimpleCondition();
+            private CountDownLatch snapshotLatch = null;
 
             public RepairJob(String cfname)
             {
                 this.cfname = cfname;
+                this.treeRequests = new RequestCoordinator<TreeRequest>(isSequential)
+                {
+                    public void send(TreeRequest r)
+                    {
+                        MessagingService.instance().sendOneWay(TreeRequestVerbHandler.makeVerb(r, Gossiper.instance.getVersion(r.endpoint)), r.endpoint);
+                    }
+                };
+                this.differencers = new RequestCoordinator<Differencer>(isSequential)
+                {
+                    public void send(Differencer d)
+                    {
+                        StageManager.getStage(Stage.ANTI_ENTROPY).execute(d);
+                    }
+                };
             }
 
             /**
@@ -828,17 +845,51 @@ public class AntiEntropyService
              */
             public void sendTreeRequests()
             {
-                remainingEndpoints.addAll(endpoints);
-                remainingEndpoints.add(FBUtilities.getBroadcastAddress());
-
                 // send requests to all nodes
-                for (InetAddress endpoint : remainingEndpoints)
-                    AntiEntropyService.instance.request(getName(), endpoint, range, tablename, cfname);
+                List<InetAddress> allEndpoints = new ArrayList<InetAddress>(endpoints);
+                allEndpoints.add(FBUtilities.getBroadcastAddress());
 
-                logger.info(String.format("[repair #%s] requests for merkle tree sent for %s (to %s)", getName(), cfname, remainingEndpoints));
+                if (isSequential)
+                    makeSnapshots(endpoints);
+
+                for (InetAddress endpoint : allEndpoints)
+                    treeRequests.add(new TreeRequest(getName(), endpoint, range, new CFPair(tablename, cfname)));
+
+                logger.info(String.format("[repair #%s] requesting merkle trees for %s (to %s)", getName(), cfname, allEndpoints));
+                treeRequests.start();
                 requestsSent.signalAll();
             }
 
+            public void makeSnapshots(Collection<InetAddress> endpoints)
+            {
+                try
+                {
+                    snapshotLatch = new CountDownLatch(endpoints.size());
+                    IAsyncCallback callback = new IAsyncCallback()
+                    {
+                        @Override
+                            public boolean isLatencyForSnitch()
+                            {
+                                return false;
+                            }
+
+                        @Override
+                            public void response(Message msg)
+                            {
+                                RepairJob.this.snapshotLatch.countDown();
+                            }
+                    };
+                    for (InetAddress endpoint : endpoints)
+                        MessagingService.instance().sendRR(new SnapshotCommand(tablename, cfname, sessionName, false), endpoint, callback);
+                    snapshotLatch.await();
+                    snapshotLatch = null;
+                }
+                catch (InterruptedException e)
+                {
+                    throw new RuntimeException(e);
+                }
+            }
+
             /**
              * Add a new received tree and return the number of remaining tree to
              * be received for the job to be complete.
@@ -859,8 +910,7 @@ public class AntiEntropyService
 
                 assert request.cf.right.equals(cfname);
                 trees.add(new TreeResponse(request.endpoint, tree));
-                remainingEndpoints.remove(request.endpoint);
-                return remainingEndpoints.size();
+                return treeRequests.completed(request);
             }
 
             /**
@@ -869,8 +919,6 @@ public class AntiEntropyService
              */
             public void submitDifferencers()
             {
-                assert remainingEndpoints.isEmpty();
-
                 // We need to difference all trees one against another
                 for (int i = 0; i < trees.size() - 1; ++i)
                 {
@@ -880,10 +928,10 @@ public class AntiEntropyService
                         TreeResponse r2 = trees.get(j);
                         Differencer differencer = new Differencer(cfname, r1, r2);
                         logger.debug("Queueing comparison {}", differencer);
-                        remainingDifferencers.add(differencer);
-                        StageManager.getStage(Stage.ANTI_ENTROPY).execute(differencer);
+                        differencers.add(differencer);
                     }
                 }
+                differencers.start();
                 trees.clear(); // allows gc to do its thing
             }
 
@@ -892,8 +940,16 @@ public class AntiEntropyService
              */
             synchronized boolean completedSynchronization(Differencer differencer)
             {
-                remainingDifferencers.remove(differencer);
-                return remainingDifferencers.isEmpty();
+                return differencers.completed(differencer) == 0;
+            }
+
+            public void terminate()
+            {
+                if (snapshotLatch != null)
+                {
+                    while (snapshotLatch.getCount() > 0)
+                        snapshotLatch.countDown();
+                }
             }
         }
 
@@ -992,4 +1048,107 @@ public class AntiEntropyService
             this.session = session;
         }
     }
+
+    public static abstract class RequestCoordinator<R>
+    {
+        private final Order<R> orderer;
+
+        protected RequestCoordinator(boolean isSequential)
+        {
+            this.orderer = isSequential ? new SequentialOrder<R>(this) : new ParallelOrder<R>(this);
+        }
+
+        public abstract void send(R request);
+
+        public void add(R request)
+        {
+            orderer.add(request);
+        }
+
+        public void start()
+        {
+            orderer.start();
+        }
+
+        // Returns how many request remains
+        public int completed(R request)
+        {
+            return orderer.completed(request);
+        }
+
+        private static abstract class Order<R>
+        {
+            protected final RequestCoordinator<R> coordinator;
+
+            Order(RequestCoordinator<R> coordinator)
+            {
+                this.coordinator = coordinator;
+            }
+
+            public abstract void add(R request);
+            public abstract void start();
+            public abstract int completed(R request);
+        }
+
+        private static class SequentialOrder<R> extends Order<R>
+        {
+            private final Queue<R> requests = new LinkedList<R>();
+
+            SequentialOrder(RequestCoordinator<R> coordinator)
+            {
+                super(coordinator);
+            }
+
+            public void add(R request)
+            {
+                requests.add(request);
+            }
+
+            public void start()
+            {
+                if (requests.isEmpty())
+                    return;
+
+                coordinator.send(requests.peek());
+            }
+
+            public int completed(R request)
+            {
+                assert request.equals(requests.peek());
+                requests.poll();
+                int remaining = requests.size();
+                if (remaining != 0)
+                    coordinator.send(requests.peek());
+                return remaining;
+            }
+        }
+
+        private static class ParallelOrder<R> extends Order<R>
+        {
+            private final Set<R> requests = new HashSet<R>();
+
+            ParallelOrder(RequestCoordinator<R> coordinator)
+            {
+                super(coordinator);
+            }
+
+            public void add(R request)
+            {
+                requests.add(request);
+            }
+
+            public void start()
+            {
+                for (R request : requests)
+                    coordinator.send(request);
+            }
+
+            public int completed(R request)
+            {
+                requests.remove(request);
+                return requests.size();
+            }
+        }
+
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1ae32c93/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index c1681b9..58986a5 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -1827,12 +1827,11 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe
      * @param columnFamilies
      * @throws IOException
      */
-    public void forceTableRepair(final String tableName, final String... columnFamilies) throws IOException
+    public void forceTableRepair(final String tableName, boolean isSequential, final String... columnFamilies) throws IOException
     {
         if (Table.SYSTEM_TABLE.equals(tableName))
             return;
 
-
         Collection<Range<Token>> ranges = getLocalRanges(tableName);
         int cmd = nextRepairCommand.incrementAndGet();
         logger_.info("Starting repair command #{}, repairing {} ranges.", cmd, ranges.size());
@@ -1840,7 +1839,7 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe
         List<AntiEntropyService.RepairFuture> futures = new ArrayList<AntiEntropyService.RepairFuture>(ranges.size());
         for (Range<Token> range : ranges)
         {
-            AntiEntropyService.RepairFuture future = forceTableRepair(range, tableName, columnFamilies);
+            AntiEntropyService.RepairFuture future = forceTableRepair(range, tableName, isSequential, columnFamilies);
             futures.add(future);
             // wait for a session to be done with its differencing before starting the next one
             try
@@ -1875,12 +1874,12 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe
             logger_.info("Repair command #{} completed successfully", cmd);
     }
 
-    public void forceTableRepairPrimaryRange(final String tableName, final String... columnFamilies) throws IOException
+    public void forceTableRepairPrimaryRange(final String tableName, boolean isSequential, final String... columnFamilies) throws IOException
     {
         if (Table.SYSTEM_TABLE.equals(tableName))
             return;
 
-        AntiEntropyService.RepairFuture future = forceTableRepair(getLocalPrimaryRange(), tableName, columnFamilies);
+        AntiEntropyService.RepairFuture future = forceTableRepair(getLocalPrimaryRange(), tableName, isSequential, columnFamilies);
         try
         {
             future.get();
@@ -1892,7 +1891,7 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe
         }
     }
 
-    public AntiEntropyService.RepairFuture forceTableRepair(final Range<Token> range, final String tableName, final String... columnFamilies) throws IOException
+    public AntiEntropyService.RepairFuture forceTableRepair(final Range<Token> range, final String tableName, boolean isSequential, final String... columnFamilies) throws IOException
     {
         ArrayList<String> names = new ArrayList<String>();
         for (ColumnFamilyStore cfStore : getValidColumnFamilies(tableName, columnFamilies))
@@ -1900,7 +1899,7 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe
             names.add(cfStore.getColumnFamilyName());
         }
 
-        return AntiEntropyService.instance.submitRepairSession(range, tableName, names.toArray(new String[names.size()]));
+        return AntiEntropyService.instance.submitRepairSession(range, tableName, isSequential, names.toArray(new String[names.size()]));
     }
 
     public void forceTerminateAllRepairSessions() {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1ae32c93/src/java/org/apache/cassandra/service/StorageServiceMBean.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageServiceMBean.java b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
index 6af63b7..c5aa9fd 100644
--- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java
+++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
@@ -231,12 +231,12 @@ public interface StorageServiceMBean
      * @param columnFamilies
      * @throws IOException
      */
-    public void forceTableRepair(String tableName, String... columnFamilies) throws IOException;
+    public void forceTableRepair(String tableName, boolean isSequential, String... columnFamilies) throws IOException;
 
     /**
      * Triggers proactive repair but only for the node primary range.
      */
-    public void forceTableRepairPrimaryRange(String tableName, String... columnFamilies) throws IOException;
+    public void forceTableRepairPrimaryRange(String tableName, boolean isSequential, String... columnFamilies) throws IOException;
 
     public void forceTerminateAllRepairSessions();
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1ae32c93/src/java/org/apache/cassandra/tools/NodeCmd.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/NodeCmd.java b/src/java/org/apache/cassandra/tools/NodeCmd.java
index 2b19074..8a224b8 100644
--- a/src/java/org/apache/cassandra/tools/NodeCmd.java
+++ b/src/java/org/apache/cassandra/tools/NodeCmd.java
@@ -53,6 +53,7 @@ public class NodeCmd
     private static final Pair<String, String> PASSWORD_OPT = new Pair<String, String>("pw", "password");
     private static final Pair<String, String> TAG_OPT = new Pair<String, String>("t", "tag");
     private static final Pair<String, String> PRIMARY_RANGE_OPT = new Pair<String, String>("pr", "partitioner-range");
+    private static final Pair<String, String> SNAPSHOT_REPAIR_OPT = new Pair<String, String>("snapshot", "with-snapshot");
 
     private static final String DEFAULT_HOST = "127.0.0.1";
     private static final int DEFAULT_PORT = 7199;
@@ -71,6 +72,7 @@ public class NodeCmd
         options.addOption(PASSWORD_OPT, true, "remote jmx agent password");
         options.addOption(TAG_OPT,      true, "optional name to give a snapshot");
         options.addOption(PRIMARY_RANGE_OPT, false, "only repair the first range returned by the partitioner for the node");
+        options.addOption(SNAPSHOT_REPAIR_OPT, false, "repair one node at a time using snapshots");
     }
     
     public NodeCmd(NodeProbe probe)
@@ -921,10 +923,11 @@ public class NodeCmd
             switch (nc)
             {
                 case REPAIR  :
+                    boolean snapshot = cmd.hasOption(SNAPSHOT_REPAIR_OPT.left);
                     if (cmd.hasOption(PRIMARY_RANGE_OPT.left))
-                        probe.forceTableRepairPrimaryRange(keyspace, columnFamilies);
+                        probe.forceTableRepairPrimaryRange(keyspace, snapshot, columnFamilies);
                     else
-                        probe.forceTableRepair(keyspace, columnFamilies);
+                        probe.forceTableRepair(keyspace, snapshot, columnFamilies);
                     break;
                 case FLUSH   :
                     try { probe.forceTableFlush(keyspace, columnFamilies); }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1ae32c93/src/java/org/apache/cassandra/tools/NodeProbe.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/NodeProbe.java b/src/java/org/apache/cassandra/tools/NodeProbe.java
index 46d4c63..8739745 100644
--- a/src/java/org/apache/cassandra/tools/NodeProbe.java
+++ b/src/java/org/apache/cassandra/tools/NodeProbe.java
@@ -202,14 +202,14 @@ public class NodeProbe
         ssProxy.forceTableFlush(tableName, columnFamilies);
     }
 
-    public void forceTableRepair(String tableName, String... columnFamilies) throws IOException
+    public void forceTableRepair(String tableName, boolean isSequential, String... columnFamilies) throws IOException
     {
-        ssProxy.forceTableRepair(tableName, columnFamilies);
+        ssProxy.forceTableRepair(tableName, isSequential, columnFamilies);
     }
 
-    public void forceTableRepairPrimaryRange(String tableName, String... columnFamilies) throws IOException
+    public void forceTableRepairPrimaryRange(String tableName, boolean isSequential, String... columnFamilies) throws IOException
     {
-        ssProxy.forceTableRepairPrimaryRange(tableName, columnFamilies);
+        ssProxy.forceTableRepairPrimaryRange(tableName, isSequential, columnFamilies);
     }
 
     public void invalidateKeyCache() throws IOException