You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2013/11/07 20:54:46 UTC
[2/3] git commit: Allow restoring specific columnfamilies from
archived CL patch by Carl Yeksigian, Lyuben Todorov,
and jbellis for CASSANDRA-4809
Allow restoring specific columnfamilies from archived CL
patch by Carl Yeksigian, Lyuben Todorov, and jbellis for CASSANDRA-4809
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/66df206d
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/66df206d
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/66df206d
Branch: refs/heads/trunk
Commit: 66df206d8df74fa9c5380e2ecfd3746fd6b3316e
Parents: 0e9906e
Author: Jonathan Ellis <jb...@apache.org>
Authored: Thu Nov 7 13:53:15 2013 -0600
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Thu Nov 7 13:53:35 2013 -0600
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../db/commitlog/CommitLogReplayer.java | 102 ++++++++++++++++---
2 files changed, 90 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/66df206d/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 77749d6..6ff2799 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.0.3
+ * Allow restoring specific columnfamilies from archived CL (CASSANDRA-4809)
* Avoid flushing compaction_history after each operation (CASSANDRA-6287)
* Fix repair assertion error when tombstones expire (CASSANDRA-6277)
* Skip loading corrupt key cache (CASSANDRA-6260)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/66df206d/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
index 250e516..74daecb 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
@@ -24,6 +24,10 @@ import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.zip.Checksum;
+import com.google.common.base.Predicate;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Multimap;
import com.google.common.collect.Ordering;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
@@ -110,8 +114,72 @@ public class CommitLogReplayer
return replayedCount.get();
}
+ private abstract static class ReplayFilter
+ {
+ public abstract Iterable<ColumnFamily> filter(RowMutation rm);
+
+ public static ReplayFilter create()
+ {
+ // If no replaylist is supplied an empty array of strings is used to replay everything.
+ if (System.getProperty("cassandra.replayList") == null)
+ return new AlwaysReplayFilter();
+
+ Multimap<String, String> toReplay = HashMultimap.create();
+ for (String rawPair : System.getProperty("cassandra.replayList").split(","))
+ {
+ String[] pair = rawPair.trim().split(".");
+ if (pair.length != 2)
+ throw new IllegalArgumentException("Each table to be replayed must be fully qualified with keyspace name, e.g., 'system.peers'");
+
+ Keyspace ks = Schema.instance.getKeyspaceInstance(pair[0]);
+ if (ks == null)
+ throw new IllegalArgumentException("Unknown keyspace " + pair[0]);
+ if (ks.getColumnFamilyStore(pair[1]) == null)
+ throw new IllegalArgumentException(String.format("Unknown table %s.%s", pair[0], pair[1]));
+
+ toReplay.put(pair[0], pair[1]);
+ }
+ return new CustomReplayFilter(toReplay);
+ }
+ }
+
+ private static class AlwaysReplayFilter extends ReplayFilter
+ {
+ public Iterable<ColumnFamily> filter(RowMutation rm)
+ {
+ return rm.getColumnFamilies();
+ }
+ }
+
+ private static class CustomReplayFilter extends ReplayFilter
+ {
+ private Multimap<String, String> toReplay;
+
+ public CustomReplayFilter(Multimap<String, String> toReplay)
+ {
+ this.toReplay = toReplay;
+ }
+
+ public Iterable<ColumnFamily> filter(RowMutation rm)
+ {
+ final Collection<String> cfNames = toReplay.get(rm.getKeyspaceName());
+ if (cfNames == null)
+ return Collections.emptySet();
+
+ return Iterables.filter(rm.getColumnFamilies(), new Predicate<ColumnFamily>()
+ {
+ public boolean apply(ColumnFamily cf)
+ {
+ return cfNames.contains(cf.metadata().cfName);
+ }
+ });
+ }
+ }
+
public void recover(File file) throws IOException
{
+ final ReplayFilter replayFilter = ReplayFilter.create();
+
logger.info("Replaying " + file.getPath());
CommitLogDescriptor desc = CommitLogDescriptor.fromFileName(file.getName());
final long segment = desc.id;
@@ -195,7 +263,7 @@ public class CommitLogReplayer
/* deserialize the commit log entry */
FastByteArrayInputStream bufIn = new FastByteArrayInputStream(buffer, 0, serializedSize);
- RowMutation rm;
+ final RowMutation rm;
try
{
// assuming version here. We've gone to lengths to make sure what gets written to the CL is in
@@ -243,36 +311,36 @@ public class CommitLogReplayer
+ "}"));
final long entryLocation = reader.getFilePointer();
- final RowMutation frm = rm;
Runnable runnable = new WrappedRunnable()
{
public void runMayThrow() throws IOException
{
- if (Schema.instance.getKSMetaData(frm.getKeyspaceName()) == null)
+ if (Schema.instance.getKSMetaData(rm.getKeyspaceName()) == null)
return;
- if (pointInTimeExceeded(frm))
+ if (pointInTimeExceeded(rm))
return;
- final Keyspace keyspace = Keyspace.open(frm.getKeyspaceName());
+ final Keyspace keyspace = Keyspace.open(rm.getKeyspaceName());
- // Rebuild the row mutation, omitting column families that
- // a) have already been flushed,
- // b) are part of a cf that was dropped. Keep in mind that the cf.name() is suspect. do every thing based on the cfid instead.
+ // Rebuild the row mutation, omitting column families that
+ // a) the user has requested that we ignore,
+ // b) have already been flushed,
+ // or c) are part of a cf that was dropped.
+ // Keep in mind that the cf.name() is suspect. do every thing based on the cfid instead.
RowMutation newRm = null;
- for (ColumnFamily columnFamily : frm.getColumnFamilies())
+ for (ColumnFamily columnFamily : replayFilter.filter(rm))
{
if (Schema.instance.getCF(columnFamily.id()) == null)
- // null means the cf has been dropped
- continue;
+ continue; // dropped
ReplayPosition rp = cfPositions.get(columnFamily.id());
- // replay if current segment is newer than last flushed one or,
+ // replay if current segment is newer than last flushed one or,
// if it is the last known segment, if we are after the replay position
if (segment > rp.segment || (segment == rp.segment && entryLocation > rp.position))
{
if (newRm == null)
- newRm = new RowMutation(frm.getKeyspaceName(), frm.key());
+ newRm = new RowMutation(rm.getKeyspaceName(), rm.key());
newRm.add(columnFamily);
replayedCount.incrementAndGet();
}
@@ -311,4 +379,12 @@ public class CommitLogReplayer
}
return false;
}
+
+ private ColumnFamily getCFToRecover(String cfName, Collection<ColumnFamily> cfs)
+ {
+ for (ColumnFamily cf : cfs)
+ if (cf.metadata().cfName.equals(cfName))
+ return cf;
+ return null;
+ }
}