You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2013/11/07 20:54:46 UTC

[2/3] git commit: Allow restoring specific columnfamilies from archived CL patch by Carl Yeksigian, Lyuben Todorov, and jbellis for CASSANDRA-4809

Allow restoring specific columnfamilies from archived CL
patch by Carl Yeksigian, Lyuben Todorov, and jbellis for CASSANDRA-4809


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/66df206d
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/66df206d
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/66df206d

Branch: refs/heads/trunk
Commit: 66df206d8df74fa9c5380e2ecfd3746fd6b3316e
Parents: 0e9906e
Author: Jonathan Ellis <jb...@apache.org>
Authored: Thu Nov 7 13:53:15 2013 -0600
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Thu Nov 7 13:53:35 2013 -0600

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../db/commitlog/CommitLogReplayer.java         | 102 ++++++++++++++++---
 2 files changed, 90 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/66df206d/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 77749d6..6ff2799 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.0.3
+ * Allow restoring specific columnfamilies from archived CL (CASSANDRA-4809)
  * Avoid flushing compaction_history after each operation (CASSANDRA-6287)
  * Fix repair assertion error when tombstones expire (CASSANDRA-6277)
  * Skip loading corrupt key cache (CASSANDRA-6260)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/66df206d/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
index 250e516..74daecb 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
@@ -24,6 +24,10 @@ import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.zip.Checksum;
 
+import com.google.common.base.Predicate;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Multimap;
 import com.google.common.collect.Ordering;
 import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
@@ -110,8 +114,72 @@ public class CommitLogReplayer
         return replayedCount.get();
     }
 
+    private abstract static class ReplayFilter
+    {
+        public abstract Iterable<ColumnFamily> filter(RowMutation rm);
+
+        public static ReplayFilter create()
+        {
+            // If no replaylist is supplied an empty array of strings is used to replay everything.
+            if (System.getProperty("cassandra.replayList") == null)
+                return new AlwaysReplayFilter();
+
+            Multimap<String, String> toReplay = HashMultimap.create();
+            for (String rawPair : System.getProperty("cassandra.replayList").split(","))
+            {
+                String[] pair = rawPair.trim().split(".");
+                if (pair.length != 2)
+                    throw new IllegalArgumentException("Each table to be replayed must be fully qualified with keyspace name, e.g., 'system.peers'");
+
+                Keyspace ks = Schema.instance.getKeyspaceInstance(pair[0]);
+                if (ks == null)
+                    throw new IllegalArgumentException("Unknown keyspace " + pair[0]);
+                if (ks.getColumnFamilyStore(pair[1]) == null)
+                    throw new IllegalArgumentException(String.format("Unknown table %s.%s", pair[0], pair[1]));
+
+                toReplay.put(pair[0], pair[1]);
+            }
+            return new CustomReplayFilter(toReplay);
+        }
+    }
+
+    private static class AlwaysReplayFilter extends ReplayFilter
+    {
+        public Iterable<ColumnFamily> filter(RowMutation rm)
+        {
+            return rm.getColumnFamilies();
+        }
+    }
+
+    private static class CustomReplayFilter extends ReplayFilter
+    {
+        private Multimap<String, String> toReplay;
+
+        public CustomReplayFilter(Multimap<String, String> toReplay)
+        {
+            this.toReplay = toReplay;
+        }
+
+        public Iterable<ColumnFamily> filter(RowMutation rm)
+        {
+            final Collection<String> cfNames = toReplay.get(rm.getKeyspaceName());
+            if (cfNames == null)
+                return Collections.emptySet();
+
+            return Iterables.filter(rm.getColumnFamilies(), new Predicate<ColumnFamily>()
+            {
+                public boolean apply(ColumnFamily cf)
+                {
+                    return cfNames.contains(cf.metadata().cfName);
+                }
+            });
+        }
+    }
+
     public void recover(File file) throws IOException
     {
+        final ReplayFilter replayFilter = ReplayFilter.create();
+
         logger.info("Replaying " + file.getPath());
         CommitLogDescriptor desc = CommitLogDescriptor.fromFileName(file.getName());
         final long segment = desc.id;
@@ -195,7 +263,7 @@ public class CommitLogReplayer
 
                 /* deserialize the commit log entry */
                 FastByteArrayInputStream bufIn = new FastByteArrayInputStream(buffer, 0, serializedSize);
-                RowMutation rm;
+                final RowMutation rm;
                 try
                 {
                     // assuming version here. We've gone to lengths to make sure what gets written to the CL is in
@@ -243,36 +311,36 @@ public class CommitLogReplayer
                             + "}"));
 
                 final long entryLocation = reader.getFilePointer();
-                final RowMutation frm = rm;
                 Runnable runnable = new WrappedRunnable()
                 {
                     public void runMayThrow() throws IOException
                     {
-                        if (Schema.instance.getKSMetaData(frm.getKeyspaceName()) == null)
+                        if (Schema.instance.getKSMetaData(rm.getKeyspaceName()) == null)
                             return;
-                        if (pointInTimeExceeded(frm))
+                        if (pointInTimeExceeded(rm))
                             return;
 
-                        final Keyspace keyspace = Keyspace.open(frm.getKeyspaceName());
+                        final Keyspace keyspace = Keyspace.open(rm.getKeyspaceName());
 
-                        // Rebuild the row mutation, omitting column families that 
-                        // a) have already been flushed,
-                        // b) are part of a cf that was dropped. Keep in mind that the cf.name() is suspect. do every thing based on the cfid instead.
+                        // Rebuild the row mutation, omitting column families that
+                        //    a) the user has requested that we ignore,
+                        //    b) have already been flushed,
+                        // or c) are part of a cf that was dropped.
+                        // Keep in mind that the cf.name() is suspect. do every thing based on the cfid instead.
                         RowMutation newRm = null;
-                        for (ColumnFamily columnFamily : frm.getColumnFamilies())
+                        for (ColumnFamily columnFamily : replayFilter.filter(rm))
                         {
                             if (Schema.instance.getCF(columnFamily.id()) == null)
-                                // null means the cf has been dropped
-                                continue;
+                                continue; // dropped
 
                             ReplayPosition rp = cfPositions.get(columnFamily.id());
 
-                            // replay if current segment is newer than last flushed one or, 
+                            // replay if current segment is newer than last flushed one or,
                             // if it is the last known segment, if we are after the replay position
                             if (segment > rp.segment || (segment == rp.segment && entryLocation > rp.position))
                             {
                                 if (newRm == null)
-                                    newRm = new RowMutation(frm.getKeyspaceName(), frm.key());
+                                    newRm = new RowMutation(rm.getKeyspaceName(), rm.key());
                                 newRm.add(columnFamily);
                                 replayedCount.incrementAndGet();
                             }
@@ -311,4 +379,12 @@ public class CommitLogReplayer
         }
         return false;
     }
+
+    private ColumnFamily getCFToRecover(String cfName, Collection<ColumnFamily> cfs)
+    {
+        for (ColumnFamily cf : cfs)
+            if (cf.metadata().cfName.equals(cfName))
+                return cf;
+        return null;
+    }
 }