You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2012/04/09 18:30:20 UTC
[2/3] git commit: add commitlog archiving and pitr patch by Vijay and
jbellis for CASSANDRA-3690
add commitlog archiving and pitr
patch by Vijay and jbellis for CASSANDRA-3690
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/5923d329
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/5923d329
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/5923d329
Branch: refs/heads/trunk
Commit: 5923d32959ff419821dbb7fb36114a0604324498
Parents: 044e17a
Author: Jonathan Ellis <jb...@apache.org>
Authored: Mon Apr 9 10:01:50 2012 -0500
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Mon Apr 9 11:23:02 2012 -0500
----------------------------------------------------------------------
CHANGES.txt | 2 +
conf/cassandra.yaml | 8 +
conf/commitlog_archiving.properties | 37 ++
src/java/org/apache/cassandra/config/Config.java | 1 +
.../cassandra/config/DatabaseDescriptor.java | 8 +
.../apache/cassandra/db/commitlog/CommitLog.java | 253 ++------------
.../cassandra/db/commitlog/CommitLogAllocator.java | 20 +-
.../cassandra/db/commitlog/CommitLogArchiver.java | 147 ++++++++
.../cassandra/db/commitlog/CommitLogMBean.java | 17 +
.../cassandra/db/commitlog/CommitLogReplayer.java | 269 +++++++++++++++
.../cassandra/db/commitlog/CommitLogSegment.java | 16 +-
src/java/org/apache/cassandra/utils/CLibrary.java | 33 +--
.../org/apache/cassandra/utils/FBUtilities.java | 34 ++-
13 files changed, 581 insertions(+), 264 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5923d329/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index e47bb95..b80368a 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,6 @@
1.1.1-dev
+ * add support for commitlog archiving and point-in-time recovery
+ (CASSANDRA-3647)
* update caches to use byte[] keys to reduce memory overhead (CASSANDRA-3966)
* add column limit to cli (CASSANDRA-3012, 4098)
* clean up and optimize DataOutputBuffer, used by CQL compression and
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5923d329/conf/cassandra.yaml
----------------------------------------------------------------------
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index af6072a..611c7a4 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -150,6 +150,14 @@ saved_caches_directory: /var/lib/cassandra/saved_caches
commitlog_sync: periodic
commitlog_sync_period_in_ms: 10000
+# Configure the Size of the individual Commitlog file. The
+# default is 128 MB, which is almost always fine, but if you are
+# archiving commitlog segments (see commitlog_archiving.properties),
+# then you probably want a finer granularity of archiving; 16 MB
+# is reasonable.
+#
+# commitlog_segment_size_in_mb: 128
+
# any class that implements the SeedProvider interface and has a
# constructor that takes a Map<String, String> of parameters will do.
seed_provider:
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5923d329/conf/commitlog_archiving.properties
----------------------------------------------------------------------
diff --git a/conf/commitlog_archiving.properties b/conf/commitlog_archiving.properties
new file mode 100644
index 0000000..b4dfd79
--- /dev/null
+++ b/conf/commitlog_archiving.properties
@@ -0,0 +1,37 @@
+# commitlog archiving configuration. Leave blank to disable.
+
+# Command to execute to archive a commitlog segment
+# Parameters: %path => Fully qualified path of the segment to archive
+# %name => Name of the commit log.
+# Example: archive_command=/bin/ln %path /backup/%name
+#
+# commitlog archiving configuration. Leave blank to disable.
+
+# Command to execute to archive a commitlog segment
+# Parameters: %path => Fully qualified path of the segment to archive
+# %name => Name of the commit log.
+# Example: archive_command=/bin/ln %path /backup/%name
+#
+# Limitation: *_command= expects one command with arguments. STDOUT
+# and STDIN or multiple commands cannot be executed. You might want
+# to script multiple commands and add a pointer here.
+archive_command=
+
+# Command to execute to make an archived commitlog live again.
+# Parameters: %from is the full path to an archived commitlog segment (from restore_directories)
+# %to is the live commitlog directory
+# Example: restore_command=cp -f %from %to
+restore_command=
+
+# Directory to scan the recovery files in.
+restore_directories=
+
+# Restore mutations created up to and including this timestamp.
+# Format: 2012-04-31 20:43:12
+#
+# Note! Recovery will stop when the first client-supplied timestamp
+# greater than this time is encountered. Since the order Cassandra
+# receives mutations does not always strictly follow timestamp order,
+# this may leave some mutations with timestamps earlier than the
+# point-in-time unrecovered.
+restore_point_in_time=
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5923d329/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java
index 91b96f1..fe8eb84 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -100,6 +100,7 @@ public class Config
public CommitLogSync commitlog_sync;
public Double commitlog_sync_batch_window_in_ms;
public Integer commitlog_sync_period_in_ms;
+ public int commitlog_segment_size_in_mb = 128;
public String endpoint_snitch;
public Boolean dynamic_snitch = true;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5923d329/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 2eb89ba..259d6e8 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -782,6 +782,14 @@ public class DatabaseDescriptor
return conf.commitlog_directory;
}
+ /**
+ * size of commitlog segments to allocate
+ */
+ public static int getCommitLogSegmentSize()
+ {
+ return conf.commitlog_segment_size_in_mb * 1024 * 1024;
+ }
+
public static String getSavedCachesLocation()
{
return conf.saved_caches_directory;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5923d329/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
index 69dd0b1..3c34772 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
@@ -23,31 +23,17 @@ import java.lang.management.ManagementFactory;
import java.util.*;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.zip.Checksum;
-import com.google.common.collect.Ordering;
-
-import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.config.ConfigurationException;
import org.apache.cassandra.db.*;
-import org.apache.cassandra.io.IColumnSerializer;
import org.apache.cassandra.io.util.*;
import org.apache.cassandra.net.MessagingService;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.cassandra.concurrent.Stage;
-import org.apache.cassandra.concurrent.StageManager;
import org.apache.cassandra.config.Config;
import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.io.util.FastByteArrayInputStream;
-import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.PureJavaCrc32;
-import org.apache.cassandra.utils.WrappedRunnable;
-import org.cliffc.high_scale_lib.NonBlockingHashSet;
import javax.management.MBeanServer;
import javax.management.ObjectName;
@@ -58,9 +44,7 @@ import javax.management.ObjectName;
*/
public class CommitLog implements CommitLogMBean
{
- private static final int MAX_OUTSTANDING_REPLAY_COUNT = 1024;
-
- static final Logger logger = LoggerFactory.getLogger(CommitLog.class);
+ private static final Logger logger = LoggerFactory.getLogger(CommitLog.class);
public static final CommitLog instance = new CommitLog();
@@ -68,11 +52,11 @@ public class CommitLog implements CommitLogMBean
public final CommitLogAllocator allocator;
+ public final CommitLogArchiver archiver = new CommitLogArchiver();
+
public static final int END_OF_SEGMENT_MARKER = 0; // this is written out at the end of a segment
public static final int END_OF_SEGMENT_MARKER_SIZE = 4; // number of bytes of ^^^
- /** size of commitlog segments to allocate */
- public static final int SEGMENT_SIZE = 128 * 1024 * 1024;
public CommitLogSegment activeSegment;
private CommitLog()
@@ -120,6 +104,8 @@ public class CommitLog implements CommitLogMBean
*/
public int recover() throws IOException
{
+ archiver.maybeRestoreArchive();
+
File[] files = new File(DatabaseDescriptor.getCommitLogLocation()).listFiles(new FilenameFilter()
{
public boolean accept(File dir, String name)
@@ -157,203 +143,19 @@ public class CommitLog implements CommitLogMBean
* @param clogs the list of commit log files to replay
* @return the number of mutations replayed
*/
- public int recover(File[] clogs) throws IOException
+ public int recover(File... clogs) throws IOException
{
- final Set<Table> tablesRecovered = new NonBlockingHashSet<Table>();
- List<Future<?>> futures = new ArrayList<Future<?>>();
- byte[] bytes = new byte[4096];
- Map<Integer, AtomicInteger> invalidMutations = new HashMap<Integer, AtomicInteger>();
-
- // count the number of replayed mutation. We don't really care about atomicity, but we need it to be a reference.
- final AtomicInteger replayedCount = new AtomicInteger();
-
- // compute per-CF and global replay positions
- final Map<Integer, ReplayPosition> cfPositions = new HashMap<Integer, ReplayPosition>();
- for (ColumnFamilyStore cfs : ColumnFamilyStore.all())
- {
- // it's important to call RP.gRP per-cf, before aggregating all the positions w/ the Ordering.min call
- // below: gRP will return NONE if there are no flushed sstables, which is important to have in the
- // list (otherwise we'll just start replay from the first flush position that we do have, which is not correct).
- ReplayPosition rp = ReplayPosition.getReplayPosition(cfs.getSSTables());
- cfPositions.put(cfs.metadata.cfId, rp);
- }
- final ReplayPosition globalPosition = Ordering.from(ReplayPosition.comparator).min(cfPositions.values());
-
- Checksum checksum = new PureJavaCrc32();
- for (final File file : clogs)
- {
- logger.info("Replaying " + file.getPath());
-
- final long segment = CommitLogSegment.idFromFilename(file.getName());
-
- RandomAccessReader reader = RandomAccessReader.open(new File(file.getAbsolutePath()), true);
- assert reader.length() <= Integer.MAX_VALUE;
-
- try
- {
- int replayPosition;
- if (globalPosition.segment < segment)
- replayPosition = 0;
- else if (globalPosition.segment == segment)
- replayPosition = globalPosition.position;
- else
- replayPosition = (int) reader.length();
-
- if (replayPosition < 0 || replayPosition >= reader.length())
- {
- // replayPosition > reader.length() can happen if some data gets flushed before it is written to the commitlog
- // (see https://issues.apache.org/jira/browse/CASSANDRA-2285)
- logger.debug("skipping replay of fully-flushed {}", file);
- continue;
- }
-
- reader.seek(replayPosition);
-
- if (logger.isDebugEnabled())
- logger.debug("Replaying " + file + " starting at " + reader.getFilePointer());
-
- /* read the logs populate RowMutation and apply */
- while (!reader.isEOF())
- {
- if (logger.isDebugEnabled())
- logger.debug("Reading mutation at " + reader.getFilePointer());
-
- long claimedCRC32;
- int serializedSize;
- try
- {
- // any of the reads may hit EOF
- serializedSize = reader.readInt();
- if (serializedSize == CommitLog.END_OF_SEGMENT_MARKER)
- {
- logger.debug("Encountered end of segment marker at " + reader.getFilePointer());
- break;
- }
-
- // RowMutation must be at LEAST 10 bytes:
- // 3 each for a non-empty Table and Key (including the 2-byte length from
- // writeUTF/writeWithShortLength) and 4 bytes for column count.
- // This prevents CRC by being fooled by special-case garbage in the file; see CASSANDRA-2128
- if (serializedSize < 10)
- break;
- long claimedSizeChecksum = reader.readLong();
- checksum.reset();
- checksum.update(serializedSize);
- if (checksum.getValue() != claimedSizeChecksum)
- break; // entry wasn't synced correctly/fully. that's ok.
-
- if (serializedSize > bytes.length)
- bytes = new byte[(int) (1.2 * serializedSize)];
- reader.readFully(bytes, 0, serializedSize);
- claimedCRC32 = reader.readLong();
- }
- catch(EOFException eof)
- {
- break; // last CL entry didn't get completely written. that's ok.
- }
-
- checksum.update(bytes, 0, serializedSize);
- if (claimedCRC32 != checksum.getValue())
- {
- // this entry must not have been fsynced. probably the rest is bad too,
- // but just in case there is no harm in trying them (since we still read on an entry boundary)
- continue;
- }
-
- /* deserialize the commit log entry */
- FastByteArrayInputStream bufIn = new FastByteArrayInputStream(bytes, 0, serializedSize);
- RowMutation rm = null;
- try
- {
- // assuming version here. We've gone to lengths to make sure what gets written to the CL is in
- // the current version. so do make sure the CL is drained prior to upgrading a node.
- rm = RowMutation.serializer().deserialize(new DataInputStream(bufIn), MessagingService.version_, IColumnSerializer.Flag.LOCAL);
- }
- catch (UnknownColumnFamilyException ex)
- {
- AtomicInteger i = invalidMutations.get(ex.cfId);
- if (i == null)
- {
- i = new AtomicInteger(1);
- invalidMutations.put(ex.cfId, i);
- }
- else
- i.incrementAndGet();
- continue;
- }
-
- if (logger.isDebugEnabled())
- logger.debug(String.format("replaying mutation for %s.%s: %s",
- rm.getTable(),
- ByteBufferUtil.bytesToHex(rm.key()),
- "{" + StringUtils.join(rm.getColumnFamilies().iterator(), ", ") + "}"));
-
- final long entryLocation = reader.getFilePointer();
- final RowMutation frm = rm;
- Runnable runnable = new WrappedRunnable()
- {
- public void runMayThrow() throws IOException
- {
- if (Schema.instance.getKSMetaData(frm.getTable()) == null)
- return;
- final Table table = Table.open(frm.getTable());
- RowMutation newRm = new RowMutation(frm.getTable(), frm.key());
-
- // Rebuild the row mutation, omitting column families that a) have already been flushed,
- // b) are part of a cf that was dropped. Keep in mind that the cf.name() is suspect. do every
- // thing based on the cfid instead.
- for (ColumnFamily columnFamily : frm.getColumnFamilies())
- {
- if (Schema.instance.getCF(columnFamily.id()) == null)
- // null means the cf has been dropped
- continue;
-
- ReplayPosition rp = cfPositions.get(columnFamily.id());
-
- // replay if current segment is newer than last flushed one or, if it is the last known
- // segment, if we are after the replay position
- if (segment > rp.segment || (segment == rp.segment && entryLocation > rp.position))
- {
- newRm.add(columnFamily);
- replayedCount.incrementAndGet();
- }
- }
- if (!newRm.isEmpty())
- {
- Table.open(newRm.getTable()).apply(newRm, false);
- tablesRecovered.add(table);
- }
- }
- };
- futures.add(StageManager.getStage(Stage.MUTATION).submit(runnable));
- if (futures.size() > MAX_OUTSTANDING_REPLAY_COUNT)
- {
- FBUtilities.waitOnFutures(futures);
- futures.clear();
- }
- }
- }
- finally
- {
- FileUtils.closeQuietly(reader);
- logger.info("Finished reading " + file);
- }
- }
-
- for (Map.Entry<Integer, AtomicInteger> entry : invalidMutations.entrySet())
- logger.info(String.format("Skipped %d mutations from unknown (probably removed) CF with id %d", entry.getValue().intValue(), entry.getKey()));
-
- // wait for all the writes to finish on the mutation stage
- FBUtilities.waitOnFutures(futures);
- logger.debug("Finished waiting on mutations from recovery");
-
- // flush replayed tables
- futures.clear();
- for (Table table : tablesRecovered)
- futures.addAll(table.flush());
- FBUtilities.waitOnFutures(futures);
+ CommitLogReplayer recovery = new CommitLogReplayer();
+ recovery.recover(clogs);
+ return recovery.blockForWrites();
+ }
- return replayedCount.get();
+ /**
+ * Perform recovery on a single commit log.
+ */
+ public void recover(String path) throws IOException
+ {
+ recover(new File(path));
}
/**
@@ -400,7 +202,7 @@ public class CommitLog implements CommitLogMBean
public void add(RowMutation rm) throws IOException
{
long totalSize = RowMutation.serializer().serializedSize(rm, MessagingService.version_) + CommitLogSegment.ENTRY_OVERHEAD_SIZE;
- if (totalSize > CommitLog.SEGMENT_SIZE)
+ if (totalSize > DatabaseDescriptor.getCommitLogSegmentSize())
{
logger.warn("Skipping commitlog append of extremely large mutation ({} bytes)", totalSize);
return;
@@ -538,6 +340,19 @@ public class CommitLog implements CommitLogMBean
activeSegment = allocator.fetchSegment();
}
+ public List<String> getActiveSegmentNames()
+ {
+ List<String> segmentNames = new ArrayList<String>();
+ for (CommitLogSegment segment : allocator.getActiveSegments())
+ segmentNames.add(segment.getName());
+ return segmentNames;
+ }
+
+ public List<String> getArchivingSegmentNames()
+ {
+ return new ArrayList<String>(archiver.archivePending.keySet());
+ }
+
/**
* Shuts down the threads used by the commit log, blocking until completion.
*/
@@ -565,7 +380,13 @@ public class CommitLog implements CommitLogMBean
try
{
if (!activeSegment.hasCapacityFor(rowMutation))
+ {
+ CommitLogSegment oldSegment = activeSegment;
activateNextSegment();
+ // Now we can run the user defined command just before switching to the new commit log.
+ // (Do this here instead of in the recycle call so we can get a head start on the archive.)
+ archiver.maybeArchive(oldSegment.getPath(), oldSegment.getName());
+ }
activeSegment.write(rowMutation);
}
catch (IOException e)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5923d329/src/java/org/apache/cassandra/db/commitlog/CommitLogAllocator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogAllocator.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogAllocator.java
index da30fe1..963e41e 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogAllocator.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogAllocator.java
@@ -145,13 +145,17 @@ public class CommitLogAllocator
public void recycleSegment(final CommitLogSegment segment)
{
activeSegments.remove(segment);
-
+ if (!CommitLog.instance.archiver.maybeWaitForArchiving(segment.getName()))
+ {
+ // if archiving (command) was not successful then leave the file alone. don't delete or recycle.
+ discardSegment(segment, false);
+ return;
+ }
if (isCapExceeded())
{
- discardSegment(segment);
+ discardSegment(segment, true);
return;
}
-
queue.add(new Runnable()
{
public void run()
@@ -171,7 +175,7 @@ public class CommitLogAllocator
public void recycleSegment(final File file)
{
// check against SEGMENT_SIZE avoids recycling odd-sized or empty segments from old C* versions and unit tests
- if (isCapExceeded() || file.length() != CommitLog.SEGMENT_SIZE)
+ if (isCapExceeded() || file.length() != DatabaseDescriptor.getCommitLogSegmentSize())
{
try
{
@@ -199,15 +203,15 @@ public class CommitLogAllocator
*
* @param segment segment to be discarded
*/
- private void discardSegment(final CommitLogSegment segment)
+ private void discardSegment(final CommitLogSegment segment, final boolean deleteFile)
{
- size.addAndGet(-CommitLog.SEGMENT_SIZE);
+ size.addAndGet(-DatabaseDescriptor.getCommitLogSegmentSize());
queue.add(new Runnable()
{
public void run()
{
- segment.discard();
+ segment.discard(deleteFile);
}
});
}
@@ -240,7 +244,7 @@ public class CommitLogAllocator
*/
private CommitLogSegment createFreshSegment()
{
- size.addAndGet(CommitLog.SEGMENT_SIZE);
+ size.addAndGet(DatabaseDescriptor.getCommitLogSegmentSize());
return internalAddReadySegment(CommitLogSegment.freshSegment());
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5923d329/src/java/org/apache/cassandra/db/commitlog/CommitLogArchiver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogArchiver.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogArchiver.java
new file mode 100644
index 0000000..a2204c6
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogArchiver.java
@@ -0,0 +1,147 @@
+package org.apache.cassandra.db.commitlog;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.*;
+
+import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
+import org.apache.cassandra.config.ConfigurationException;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.WrappedRunnable;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Strings;
+
+public class CommitLogArchiver
+{
+ private static final Logger logger = LoggerFactory.getLogger(CommitLogArchiver.class);
+ public final Map<String, Future<?>> archivePending = new ConcurrentHashMap<String, Future<?>>();
+ public final ExecutorService executor = new JMXEnabledThreadPoolExecutor("commitlog_archiver");
+ private final String archiveCommand;
+ private final String restoreCommand;
+ private final String restoreDirectories;
+ public final long restorePointInTime;
+
+ public CommitLogArchiver()
+ {
+ Properties commitlog_commands = new Properties();
+ InputStream stream = null;
+ try
+ {
+ stream = getClass().getClassLoader().getResourceAsStream("commitlog_archiving.properties");
+
+ if (stream == null)
+ {
+ logger.debug("No commitlog_archiving properties found; archive + pitr will be disabled");
+ archiveCommand = null;
+ restoreCommand = null;
+ restoreDirectories = null;
+ restorePointInTime = Long.MAX_VALUE;
+ }
+ else
+ {
+ commitlog_commands.load(stream);
+ archiveCommand = commitlog_commands.getProperty("archive_command");
+ restoreCommand = commitlog_commands.getProperty("restore_command");
+ restoreDirectories = commitlog_commands.getProperty("restore_directories");
+ String targetTime = commitlog_commands.getProperty("restore_point_in_time");
+ try
+ {
+ restorePointInTime = Strings.isNullOrEmpty(targetTime) ? Long.MAX_VALUE : new SimpleDateFormat("yyyy:MM:dd HH:mm:ss").parse(targetTime).getTime();
+ }
+ catch (ParseException e)
+ {
+ throw new RuntimeException("Unable to parse restore target time", e);
+ }
+ }
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException("Unable to load commitlog_archiving.properties", e);
+ }
+ finally
+ {
+ FileUtils.closeQuietly(stream);
+ }
+ }
+
+ public void maybeArchive(final String path, final String name)
+ {
+ if (Strings.isNullOrEmpty(archiveCommand))
+ return;
+
+ archivePending.put(name, executor.submit(new WrappedRunnable()
+ {
+ protected void runMayThrow() throws IOException
+ {
+ String command = archiveCommand.replace("%name", name);
+ command = command.replace("%path", path);
+ exec(command);
+ }
+ }));
+ }
+
+ public boolean maybeWaitForArchiving(String name)
+ {
+ Future<?> f = archivePending.remove(name);
+ if (f == null)
+ return true; // archiving disabled
+
+ try
+ {
+ f.get();
+ }
+ catch (InterruptedException e)
+ {
+ throw new AssertionError(e);
+ }
+ catch (ExecutionException e)
+ {
+ if (e.getCause() instanceof IOException)
+ {
+ logger.info("Looks like the archiving of file {} failed earlier, cassandra is going to ignore this segment for now.", name);
+ return false;
+ }
+ throw new RuntimeException(e);
+ }
+
+ return true;
+ }
+
+ public void maybeRestoreArchive() throws IOException
+ {
+ if (Strings.isNullOrEmpty(restoreDirectories))
+ return;
+
+ for (String dir : restoreDirectories.split(","))
+ {
+ File[] files = new File(dir).listFiles();
+ for (File fromFile : files)
+ {
+ File toFile = new File(DatabaseDescriptor.getCommitLogLocation(),
+ CommitLogSegment.FILENAME_PREFIX +
+ System.nanoTime() +
+ CommitLogSegment.FILENAME_EXTENSION);
+ String command = restoreCommand.replace("%from", fromFile.getPath());
+ command = command.replace("%to", toFile.getPath());
+ exec(command);
+ }
+ }
+ }
+
+ private void exec(String command) throws IOException
+ {
+ ProcessBuilder pb = new ProcessBuilder(command.split(" "));
+ pb.redirectErrorStream(true);
+ FBUtilities.exec(pb);
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5923d329/src/java/org/apache/cassandra/db/commitlog/CommitLogMBean.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogMBean.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogMBean.java
index 8f8fa19..29f95a7 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogMBean.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogMBean.java
@@ -19,6 +19,8 @@
package org.apache.cassandra.db.commitlog;
+import java.io.IOException;
+import java.util.List;
public interface CommitLogMBean
{
@@ -36,4 +38,19 @@ public interface CommitLogMBean
* Get the current size used by all the commitlog segments.
*/
public long getTotalCommitlogSize();
+
+ /**
+ * Recover a single file.
+ */
+ public void recover(String path) throws IOException;
+
+ /**
+ * @return file names (not full paths) of active commit log segments (segments containing unflushed data)
+ */
+ public List<String> getActiveSegmentNames();
+
+ /**
+ * @return Files which are pending for archival attempt. Does NOT include failed archive attempts.
+ */
+ public List<String> getArchivingSegmentNames();
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5923d329/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
new file mode 100644
index 0000000..eb997fc
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
@@ -0,0 +1,269 @@
+package org.apache.cassandra.db.commitlog;
+
+import java.io.DataInputStream;
+import java.io.EOFException;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.zip.Checksum;
+
+import org.apache.cassandra.concurrent.Stage;
+import org.apache.cassandra.concurrent.StageManager;
+import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.db.ColumnFamily;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.RowMutation;
+import org.apache.cassandra.db.Table;
+import org.apache.cassandra.db.UnknownColumnFamilyException;
+import org.apache.cassandra.io.IColumnSerializer;
+import org.apache.cassandra.io.util.FastByteArrayInputStream;
+import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.io.util.RandomAccessReader;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.PureJavaCrc32;
+import org.apache.cassandra.utils.WrappedRunnable;
+import org.apache.commons.lang.StringUtils;
+import org.cliffc.high_scale_lib.NonBlockingHashSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Ordering;
+
+public class CommitLogReplayer
+{
+ private static final Logger logger = LoggerFactory.getLogger(CommitLogReplayer.class);
+ private static final int MAX_OUTSTANDING_REPLAY_COUNT = 1024;
+
+ private final Set<Table> tablesRecovered;
+ private final List<Future<?>> futures;
+ private final Map<Integer, AtomicInteger> invalidMutations;
+private final AtomicInteger replayedCount;
+ private final Map<Integer, ReplayPosition> cfPositions;
+ private final ReplayPosition globalPosition;
+ private final Checksum checksum;
+ private byte[] buffer;
+
+ public CommitLogReplayer()
+ {
+ this.tablesRecovered = new NonBlockingHashSet<Table>();
+ this.futures = new ArrayList<Future<?>>();
+ this.buffer = new byte[4096];
+ this.invalidMutations = new HashMap<Integer, AtomicInteger>();
+ // count the number of replayed mutation. We don't really care about atomicity, but we need it to be a reference.
+ this.replayedCount = new AtomicInteger();
+ // compute per-CF and global replay positions
+ this.cfPositions = new HashMap<Integer, ReplayPosition>();
+ for (ColumnFamilyStore cfs : ColumnFamilyStore.all())
+ {
+ // it's important to call RP.gRP per-cf, before aggregating all the positions w/ the Ordering.min call
+ // below: gRP will return NONE if there are no flushed sstables, which is important to have in the
+ // list (otherwise we'll just start replay from the first flush position that we do have, which is not correct).
+ ReplayPosition rp = ReplayPosition.getReplayPosition(cfs.getSSTables());
+ cfPositions.put(cfs.metadata.cfId, rp);
+ }
+ this.globalPosition = Ordering.from(ReplayPosition.comparator).min(cfPositions.values());
+ this.checksum = new PureJavaCrc32();
+ }
+
+ public void recover(File[] clogs) throws IOException
+ {
+ for (final File file : clogs)
+ recover(file);
+ }
+
+ public int blockForWrites() throws IOException
+ {
+ for (Map.Entry<Integer, AtomicInteger> entry : invalidMutations.entrySet())
+ logger.info(String.format("Skipped %d mutations from unknown (probably removed) CF with id %d", entry.getValue().intValue(), entry.getKey()));
+
+ // wait for all the writes to finish on the mutation stage
+ FBUtilities.waitOnFutures(futures);
+ logger.debug("Finished waiting on mutations from recovery");
+
+ // flush replayed tables
+ futures.clear();
+ for (Table table : tablesRecovered)
+ futures.addAll(table.flush());
+ FBUtilities.waitOnFutures(futures);
+ return replayedCount.get();
+ }
+
+ public void recover(File file) throws IOException
+ {
+ logger.info("Replaying " + file.getPath());
+ final long segment = CommitLogSegment.idFromFilename(file.getName());
+ RandomAccessReader reader = RandomAccessReader.open(new File(file.getAbsolutePath()), true);
+ assert reader.length() <= Integer.MAX_VALUE;
+ try
+ {
+ int replayPosition;
+ if (globalPosition.segment < segment)
+ replayPosition = 0;
+ else if (globalPosition.segment == segment)
+ replayPosition = globalPosition.position;
+ else
+ replayPosition = (int) reader.length();
+
+ if (replayPosition < 0 || replayPosition >= reader.length())
+ {
+ // replayPosition > reader.length() can happen if some data gets flushed before it is written to the commitlog
+ // (see https://issues.apache.org/jira/browse/CASSANDRA-2285)
+ logger.debug("skipping replay of fully-flushed {}", file);
+ return;
+ }
+
+ reader.seek(replayPosition);
+
+ if (logger.isDebugEnabled())
+ logger.debug("Replaying " + file + " starting at " + reader.getFilePointer());
+
+ /* read the logs populate RowMutation and apply */
+ while (!reader.isEOF())
+ {
+ if (logger.isDebugEnabled())
+ logger.debug("Reading mutation at " + reader.getFilePointer());
+
+ long claimedCRC32;
+ int serializedSize;
+ try
+ {
+ // any of the reads may hit EOF
+ serializedSize = reader.readInt();
+ if (serializedSize == CommitLog.END_OF_SEGMENT_MARKER)
+ {
+ logger.debug("Encountered end of segment marker at " + reader.getFilePointer());
+ break;
+ }
+
+ // RowMutation must be at LEAST 10 bytes:
+ // 3 each for a non-empty Table and Key (including the
+ // 2-byte length from writeUTF/writeWithShortLength) and 4 bytes for column count.
+ // This prevents CRC by being fooled by special-case garbage in the file; see CASSANDRA-2128
+ if (serializedSize < 10)
+ break;
+ long claimedSizeChecksum = reader.readLong();
+ checksum.reset();
+ checksum.update(serializedSize);
+ if (checksum.getValue() != claimedSizeChecksum)
+ break; // entry wasn't synced correctly/fully. that's
+ // ok.
+
+ if (serializedSize > buffer.length)
+ buffer = new byte[(int) (1.2 * serializedSize)];
+ reader.readFully(buffer, 0, serializedSize);
+ claimedCRC32 = reader.readLong();
+ }
+ catch (EOFException eof)
+ {
+ break; // last CL entry didn't get completely written. that's ok.
+ }
+
+ checksum.update(buffer, 0, serializedSize);
+ if (claimedCRC32 != checksum.getValue())
+ {
+ // this entry must not have been fsynced. probably the rest is bad too,
+ // but just in case there is no harm in trying them (since we still read on an entry boundary)
+ continue;
+ }
+
+ /* deserialize the commit log entry */
+ FastByteArrayInputStream bufIn = new FastByteArrayInputStream(buffer, 0, serializedSize);
+ RowMutation rm;
+ try
+ {
+ // assuming version here. We've gone to lengths to make sure what gets written to the CL is in
+ // the current version. so do make sure the CL is drained prior to upgrading a node.
+ rm = RowMutation.serializer().deserialize(new DataInputStream(bufIn), MessagingService.version_, IColumnSerializer.Flag.LOCAL);
+ }
+ catch (UnknownColumnFamilyException ex)
+ {
+ AtomicInteger i = invalidMutations.get(ex.cfId);
+ if (i == null)
+ {
+ i = new AtomicInteger(1);
+ invalidMutations.put(ex.cfId, i);
+ }
+ else
+ i.incrementAndGet();
+ continue;
+ }
+
+ if (logger.isDebugEnabled())
+ logger.debug(String.format("replaying mutation for %s.%s: %s", rm.getTable(), ByteBufferUtil.bytesToHex(rm.key()), "{" + StringUtils.join(rm.getColumnFamilies().iterator(), ", ")
+ + "}"));
+
+ final long entryLocation = reader.getFilePointer();
+ final RowMutation frm = rm;
+ Runnable runnable = new WrappedRunnable()
+ {
+ public void runMayThrow() throws IOException
+ {
+ if (Schema.instance.getKSMetaData(frm.getTable()) == null)
+ return;
+ if (pointInTimeExceeded(frm))
+ return;
+
+ final Table table = Table.open(frm.getTable());
+ RowMutation newRm = new RowMutation(frm.getTable(), frm.key());
+
+ // Rebuild the row mutation, omitting column families that
+ // a) have already been flushed,
+ // b) are part of a cf that was dropped. Keep in mind that the cf.name() is suspect. do every thing based on the cfid instead.
+ for (ColumnFamily columnFamily : frm.getColumnFamilies())
+ {
+ if (Schema.instance.getCF(columnFamily.id()) == null)
+ // null means the cf has been dropped
+ continue;
+
+ ReplayPosition rp = cfPositions.get(columnFamily.id());
+
+ // replay if current segment is newer than last flushed one or,
+ // if it is the last known segment, if we are after the replay position
+ if (segment > rp.segment || (segment == rp.segment && entryLocation > rp.position))
+ {
+ newRm.add(columnFamily);
+ replayedCount.incrementAndGet();
+ }
+ }
+ if (!newRm.isEmpty())
+ {
+ Table.open(newRm.getTable()).apply(newRm, false);
+ tablesRecovered.add(table);
+ }
+ }
+ };
+ futures.add(StageManager.getStage(Stage.MUTATION).submit(runnable));
+ if (futures.size() > MAX_OUTSTANDING_REPLAY_COUNT)
+ {
+ FBUtilities.waitOnFutures(futures);
+ futures.clear();
+ }
+ }
+ }
+ finally
+ {
+ FileUtils.closeQuietly(reader);
+ logger.info("Finished reading " + file);
+ }
+ }
+
+ protected boolean pointInTimeExceeded(RowMutation frm)
+ {
+ long restoreTarget = CommitLog.instance.archiver.restorePointInTime;
+
+ for (ColumnFamily families : frm.getColumnFamilies())
+ {
+ if (families.maxTimestamp() > restoreTarget)
+ return true;
+ }
+ return false;
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5923d329/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
index 45408ec..2ee90ad 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
@@ -53,9 +53,9 @@ public class CommitLogSegment
{
private static final Logger logger = LoggerFactory.getLogger(CommitLogSegment.class);
- private static final String FILENAME_PREFIX = "CommitLog-";
- private static final String FILENAME_EXTENSION = ".log";
- private static Pattern COMMIT_LOG_FILE_PATTERN = Pattern.compile(FILENAME_PREFIX + "(\\d+)" + FILENAME_EXTENSION);
+ static final String FILENAME_PREFIX = "CommitLog-";
+ static final String FILENAME_EXTENSION = ".log";
+ private static final Pattern COMMIT_LOG_FILE_PATTERN = Pattern.compile(FILENAME_PREFIX + "(\\d+)" + FILENAME_EXTENSION);
// The commit log entry overhead in bytes (int: length + long: head checksum + long: tail checksum)
static final int ENTRY_OVERHEAD_SIZE = 4 + 8 + 8;
@@ -115,9 +115,9 @@ public class CommitLogSegment
}
// Map the segment, extending or truncating it to the standard segment size
- logFileAccessor.setLength(CommitLog.SEGMENT_SIZE);
+ logFileAccessor.setLength(DatabaseDescriptor.getCommitLogSegmentSize());
- buffer = logFileAccessor.getChannel().map(FileChannel.MapMode.READ_WRITE, 0, CommitLog.SEGMENT_SIZE);
+ buffer = logFileAccessor.getChannel().map(FileChannel.MapMode.READ_WRITE, 0, DatabaseDescriptor.getCommitLogSegmentSize());
buffer.putInt(CommitLog.END_OF_SEGMENT_MARKER);
buffer.position(0);
@@ -163,12 +163,14 @@ public class CommitLogSegment
/**
* Completely discards a segment file by deleting it. (Potentially blocking operation)
*/
- public void discard()
+ public void discard(boolean deleteFile)
{
+ // TODO shouldn't we close the file when we're done writing to it, which comes (potentially) much earlier than it's eligible for recyling?
close();
try
{
- FileUtils.deleteWithConfirm(logFile);
+ if (deleteFile)
+ FileUtils.deleteWithConfirm(logFile);
}
catch (IOException e)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5923d329/src/java/org/apache/cassandra/utils/CLibrary.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/CLibrary.java b/src/java/org/apache/cassandra/utils/CLibrary.java
index 18140a2..d970d61 100644
--- a/src/java/org/apache/cassandra/utils/CLibrary.java
+++ b/src/java/org/apache/cassandra/utils/CLibrary.java
@@ -18,18 +18,14 @@
*/
package org.apache.cassandra.utils;
-import java.io.BufferedReader;
import java.io.File;
import java.io.FileDescriptor;
import java.io.IOException;
-import java.io.InputStreamReader;
import java.lang.reflect.Field;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.commons.lang.StringUtils;
-
import com.sun.jna.LastErrorException;
import com.sun.jna.Native;
@@ -188,7 +184,7 @@ public final class CLibrary
}
try
{
- exec(pb);
+ FBUtilities.exec(pb);
}
catch (IOException ex)
{
@@ -197,33 +193,6 @@ public final class CLibrary
}
}
- private static void exec(ProcessBuilder pb) throws IOException
- {
- Process p = pb.start();
- try
- {
- int errCode = p.waitFor();
- if (errCode != 0)
- {
- BufferedReader in = new BufferedReader(new InputStreamReader(p.getInputStream()));
- BufferedReader err = new BufferedReader(new InputStreamReader(p.getErrorStream()));
- StringBuffer buff = new StringBuffer();
- String str;
- while ((str = in.readLine()) != null)
- buff.append(str).append(System.getProperty("line.separator"));
- while ((str = err.readLine()) != null)
- buff.append(str).append(System.getProperty("line.separator"));
- throw new IOException("Exception while executing the command: "+ StringUtils.join(pb.command(), " ") +
- ", command error Code: " + errCode +
- ", command output: "+ buff.toString());
- }
- }
- catch (InterruptedException e)
- {
- throw new RuntimeException(e);
- }
- }
-
public static void trySkipCache(int fd, long offset, int len)
{
if (fd < 0)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5923d329/src/java/org/apache/cassandra/utils/FBUtilities.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/FBUtilities.java b/src/java/org/apache/cassandra/utils/FBUtilities.java
index 38df00e..c0fe9bd 100644
--- a/src/java/org/apache/cassandra/utils/FBUtilities.java
+++ b/src/java/org/apache/cassandra/utils/FBUtilities.java
@@ -35,6 +35,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import com.google.common.base.Joiner;
import com.google.common.collect.AbstractIterator;
+import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -320,7 +321,7 @@ public class FBUtilities
public static String resourceToFile(String filename) throws ConfigurationException
{
- ClassLoader loader = PropertyFileSnitch.class.getClassLoader();
+ ClassLoader loader = FBUtilities.class.getClassLoader();
URL scpurl = loader.getResource(filename);
if (scpurl == null)
throw new ConfigurationException("unable to locate " + filename);
@@ -549,6 +550,37 @@ public class FBUtilities
}
}
+ /**
+ * Starts and waits for the given @param pb to finish.
+ * @throws java.io.IOException on non-zero exit code
+ */
+ public static void exec(ProcessBuilder pb) throws IOException
+ {
+ Process p = pb.start();
+ try
+ {
+ int errCode = p.waitFor();
+ if (errCode != 0)
+ {
+ BufferedReader in = new BufferedReader(new InputStreamReader(p.getInputStream()));
+ BufferedReader err = new BufferedReader(new InputStreamReader(p.getErrorStream()));
+ StringBuilder sb = new StringBuilder();
+ String str;
+ while ((str = in.readLine()) != null)
+ sb.append(str).append(System.getProperty("line.separator"));
+ while ((str = err.readLine()) != null)
+ sb.append(str).append(System.getProperty("line.separator"));
+ throw new IOException("Exception while executing the command: "+ StringUtils.join(pb.command(), " ") +
+ ", command error Code: " + errCode +
+ ", command output: "+ sb.toString());
+ }
+ }
+ catch (InterruptedException e)
+ {
+ throw new AssertionError(e);
+ }
+ }
+
private static final class WrappedCloseableIterator<T>
extends AbstractIterator<T> implements CloseableIterator<T>
{