You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2011/11/22 23:39:44 UTC
svn commit: r1205203 - in /cassandra/trunk: ./
src/java/org/apache/cassandra/db/ src/java/org/apache/cassandra/db/commitlog/
src/java/org/apache/cassandra/service/ test/unit/org/apache/cassandra/db/
Author: jbellis
Date: Tue Nov 22 22:39:39 2011
New Revision: 1205203
URL: http://svn.apache.org/viewvc?rev=1205203&view=rev
Log:
Recycle commitlog segments for improved performance
patch by Rick Branson and jbellis for CASSANDRA-3411
Added:
cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogAllocator.java
Modified:
cassandra/trunk/CHANGES.txt
cassandra/trunk/NEWS.txt
cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java
cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
cassandra/trunk/src/java/org/apache/cassandra/service/AbstractCassandraDaemon.java
cassandra/trunk/test/unit/org/apache/cassandra/db/CommitLogTest.java
cassandra/trunk/test/unit/org/apache/cassandra/db/RecoveryManager2Test.java
cassandra/trunk/test/unit/org/apache/cassandra/db/RecoveryManager3Test.java
cassandra/trunk/test/unit/org/apache/cassandra/db/RecoveryManagerTest.java
cassandra/trunk/test/unit/org/apache/cassandra/db/RecoveryManagerTruncateTest.java
Modified: cassandra/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=1205203&r1=1205202&r2=1205203&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Tue Nov 22 22:39:39 2011
@@ -1,4 +1,5 @@
1.1-dev
+ * Recycle commitlog segments for improved performance (CASSANDRA-3411)
* update size-tiered compaction to prioritize small tiers (CASSANDRA-2407)
* add message expiration logic to OutboundTcpConnection (CASSANDRA-3005)
* off-heap cache to use sun.misc.Unsafe instead of JNA (CASSANDRA-3271)
Modified: cassandra/trunk/NEWS.txt
URL: http://svn.apache.org/viewvc/cassandra/trunk/NEWS.txt?rev=1205203&r1=1205202&r2=1205203&view=diff
==============================================================================
--- cassandra/trunk/NEWS.txt (original)
+++ cassandra/trunk/NEWS.txt Tue Nov 22 22:39:39 2011
@@ -21,6 +21,12 @@ Upgrading
throw an InvalidRequestException when used for reads. (Previous
versions would silently perform a ONE read for range queries;
single-row and multiget reads already rejected ANY.)
+ - The largest mutation batch accepted by the commitlog is now 128MB.
+ (In practice, batches larger than ~10MB always caused poor
+ performance due to load volatility and GC promotion failures.)
+ Larger batches will continue to be accepted but will not be
+ durable. Consider setting durable_writes=false if you really
+ want to use such large batches.
1.0.4
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java?rev=1205203&r1=1205202&r2=1205203&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java Tue Nov 22 22:39:39 2011
@@ -366,7 +366,7 @@ public class RowMutation implements IMut
}
// We need to deserialize at least once for counters to cleanup the delta
- if (!hasCounters)
+ if (!hasCounters && version == MessagingService.version_)
rm.preserializedBuffers.put(version, raw);
return rm;
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLog.java?rev=1205203&r1=1205202&r2=1205203&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLog.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLog.java Tue Nov 22 22:39:39 2011
@@ -32,6 +32,7 @@ import com.google.common.collect.Orderin
import org.apache.cassandra.config.Schema;
import org.apache.cassandra.db.*;
+import org.apache.cassandra.io.IColumnSerializer;
import org.apache.cassandra.io.util.*;
import org.apache.cassandra.net.MessagingService;
import org.apache.commons.lang.StringUtils;
@@ -42,9 +43,7 @@ import org.apache.cassandra.concurrent.S
import org.apache.cassandra.concurrent.StageManager;
import org.apache.cassandra.config.Config;
import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.io.IColumnSerializer;
import org.apache.cassandra.io.util.FastByteArrayInputStream;
-import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.WrappedRunnable;
@@ -53,32 +52,8 @@ import javax.management.MBeanServer;
import javax.management.ObjectName;
/*
- * Commit Log tracks every write operation into the system. The aim
- * of the commit log is to be able to successfully recover data that was
- * not stored to disk via the Memtable. Every Commit Log maintains a
- * header represented by the abstraction CommitLogHeader. The header
- * contains a bit array and an array of longs and both the arrays are
- * of size, #column families for the Table, the Commit Log represents.
- *
- * Whenever a ColumnFamily is written to, for the first time its bit flag
- * is set to one in the CommitLogHeader. When it is flushed to disk by the
- * Memtable its corresponding bit in the header is set to zero. This helps
- * track which CommitLogs can be thrown away as a result of Memtable flushes.
- * Additionally, when a ColumnFamily is flushed and written to disk, its
- * entry in the array of longs is updated with the offset in the Commit Log
- * file where it was written. This helps speed up recovery since we can seek
- * to these offsets and start processing the commit log.
- *
- * Every Commit Log is rolled over everytime it reaches its threshold in size;
- * the new log inherits the "dirty" bits from the old.
- *
- * Over time there could be a number of commit logs that would be generated.
- * To allow cleaning up non-active commit logs, whenever we flush a column family and update its bit flag in
- * the active CL, we take the dirty bit array and bitwise & it with the headers of the older logs.
- * If the result is 0, then it is safe to remove the older file. (Since the new CL
- * inherited the old's dirty bitflags, getting a zero for any given bit in the anding
- * means that either the CF was clean in the old CL or it has been flushed since the
- * switch in the new.)
+ * Commit Log tracks every write operation into the system. The aim of the commit log is to be able to
+ * successfully recover data that was not stored to disk via the Memtable.
*/
public class CommitLog implements CommitLogMBean
{
@@ -88,33 +63,31 @@ public class CommitLog implements Commit
public static final CommitLog instance = new CommitLog();
- private final Deque<CommitLogSegment> segments = new ArrayDeque<CommitLogSegment>();
-
private final ICommitLogExecutorService executor;
- private static final int SEGMENT_SIZE = 128*1024*1024; // roll after log gets this big
+ private final CommitLogAllocator allocator;
+
+ public static final int END_OF_SEGMENT_MARKER = 0; // this is written out at the end of a segment
+ public static final int END_OF_SEGMENT_MARKER_SIZE = 4; // number of bytes of ^^^
+
+ /** size of commitlog segments to allocate */
+ public static final int SEGMENT_SIZE = 128 * 1024 * 1024;
+ public CommitLogSegment activeSegment;
- /**
- * param @ table - name of table for which we are maintaining
- * this commit log.
- * param @ recoverymode - is commit log being instantiated in
- * in recovery mode.
- */
private CommitLog()
{
try
{
DatabaseDescriptor.createAllDirectories();
+
+ allocator = new CommitLogAllocator();
+ activateNextSegment();
}
catch (IOException e)
{
throw new IOError(e);
}
- // all old segments are recovered and deleted before CommitLog is instantiated.
- // All we need to do is create a new one.
- segments.add(new CommitLogSegment());
-
executor = DatabaseDescriptor.getCommitLogSync() == Config.CommitLogSync.batch
? new BatchCommitLogExecutorService()
: new PeriodicCommitLogExecutorService(this);
@@ -130,57 +103,56 @@ public class CommitLog implements Commit
}
}
- public void resetUnsafe()
- {
- for (CommitLogSegment segment : segments)
- segment.close();
- segments.clear();
- segments.add(new CommitLogSegment());
- }
-
- private boolean manages(String name)
+ /**
+ * FOR TESTING PURPOSES. See CommitLogAllocator.
+ */
+ public void resetUnsafe() throws IOException
{
- for (CommitLogSegment segment : segments)
- {
- if (segment.getPath().endsWith(name))
- return true;
- }
- return false;
+ allocator.resetUnsafe();
+ activateNextSegment();
}
- public static int recover() throws IOException
+ /**
+ * Perform recovery on commit logs located in the directory specified by the config file.
+ *
+ * @return the number of mutations replayed
+ */
+ public int recover() throws IOException
{
- String directory = DatabaseDescriptor.getCommitLogLocation();
- File[] files = new File(directory).listFiles(new FilenameFilter()
+ File[] files = new File(DatabaseDescriptor.getCommitLogLocation()).listFiles(new FilenameFilter()
{
public boolean accept(File dir, String name)
{
// we used to try to avoid instantiating commitlog (thus creating an empty segment ready for writes)
// until after recover was finished. this turns out to be fragile; it is less error-prone to go
// ahead and allow writes before recover(), and just skip active segments when we do.
- return CommitLogSegment.possibleCommitLogFile(name) && !instance.manages(name);
+ return CommitLogSegment.possibleCommitLogFile(name) && !instance.allocator.manages(name);
}
});
+
if (files.length == 0)
{
logger.info("No commitlog files found; skipping replay");
return 0;
}
-
+
Arrays.sort(files, new FileUtils.FileComparator());
logger.info("Replaying " + StringUtils.join(files, ", "));
int replayed = recover(files);
- for (File f : files)
- {
- if (!f.delete())
- logger.error("Unable to remove " + f + "; you should remove it manually or next restart will replay it again (harmless, but time-consuming)");
- }
logger.info("Log replay complete, " + replayed + " replayed mutations");
+
+ for (File f : files)
+ CommitLog.instance.allocator.recycleSegment(f);
return replayed;
}
- // returns the number of replayed mutation (useful for tests in particular)
- public static int recover(File[] clogs) throws IOException
+ /**
+ * Perform recovery on a list of commit log files.
+ *
+ * @param clogs the list of commit log files to replay
+ * @return the number of mutations replayed
+ */
+ public int recover(File[] clogs) throws IOException
{
final Set<Table> tablesRecovered = new HashSet<Table>();
List<Future<?>> futures = new ArrayList<Future<?>>();
@@ -205,6 +177,8 @@ public class CommitLog implements Commit
Checksum checksum = new CRC32();
for (final File file : clogs)
{
+ logger.info("Replaying " + file.getPath());
+
final long segment = CommitLogSegment.idFromFilename(file.getName());
RandomAccessReader reader = RandomAccessReader.open(new File(file.getAbsolutePath()), true);
@@ -245,6 +219,12 @@ public class CommitLog implements Commit
{
// any of the reads may hit EOF
serializedSize = reader.readInt();
+ if (serializedSize == CommitLog.END_OF_SEGMENT_MARKER)
+ {
+ logger.debug("Encountered end of segment marker at " + reader.getFilePointer());
+ break;
+ }
+
// RowMutation must be at LEAST 10 bytes:
// 3 each for a non-empty Table and Key (including the 2-byte length from
// writeUTF/writeWithShortLength) and 4 bytes for column count.
@@ -301,7 +281,7 @@ public class CommitLog implements Commit
logger.debug(String.format("replaying mutation for %s.%s: %s",
rm.getTable(),
ByteBufferUtil.bytesToHex(rm.key()),
- "{" + StringUtils.join(rm.getColumnFamilies(), ", ") + "}"));
+ "{" + StringUtils.join(rm.getColumnFamilies().iterator(), ", ") + "}"));
final long entryLocation = reader.getFilePointer();
final RowMutation frm = rm;
@@ -368,21 +348,21 @@ public class CommitLog implements Commit
futures.addAll(table.flush());
FBUtilities.waitOnFutures(futures);
+ allocator.enableReserveSegmentCreation();
+
return replayedCount.get();
}
- private CommitLogSegment currentSegment()
- {
- return segments.getLast();
- }
-
+ /**
+ * @return the current ReplayPosition of the current segment file
+ */
public ReplayPosition getContext()
{
Callable<ReplayPosition> task = new Callable<ReplayPosition>()
{
public ReplayPosition call() throws Exception
{
- return currentSegment().getContext();
+ return activeSegment.getContext();
}
};
try
@@ -399,39 +379,83 @@ public class CommitLog implements Commit
}
}
- // for tests mainly
- public int segmentsCount()
+ /**
+ * Used by tests.
+ *
+ * @return the number of active segments (segments with unflushed data in them)
+ */
+ public int activeSegments()
{
- return segments.size();
+ return allocator.activeSegments.size();
}
- /*
- * Adds the specified row to the commit log. This method will reset the
- * file offset to what it is before the start of the operation in case
- * of any problems. This way we can assume that the subsequent commit log
- * entry will override the garbage left over by the previous write.
- */
- public void add(RowMutation rowMutation) throws IOException
+ /**
+ * Add a RowMutation to the commit log.
+ *
+ * @param rm the RowMutation to add to the log
+ */
+ public void add(RowMutation rm) throws IOException
{
- executor.add(new LogRecordAdder(rowMutation));
+ long totalSize = RowMutation.serializer().serializedSize(rm, MessagingService.version_) + CommitLogSegment.ENTRY_OVERHEAD_SIZE;
+ if (totalSize > CommitLog.SEGMENT_SIZE)
+ {
+ logger.warn("Skipping commitlog append of extremely large mutation ({} bytes)", totalSize);
+ return;
+ }
+
+ executor.add(new LogRecordAdder(rm));
}
- /*
- * This is called on Memtable flush to add to the commit log
- * a token indicating that this column family has been flushed.
- * The bit flag associated with this column family is set in the
- * header and this is used to decide if the log file can be deleted.
- */
+ /**
+ * Modifies the per-CF dirty cursors of any commit log segments for the column family according to the position
+ * given. Discards any commit log segments that are no longer used.
+ *
+ * @param cfId the column family ID that was flushed
+ * @param context the replay position of the flush
+ */
public void discardCompletedSegments(final Integer cfId, final ReplayPosition context) throws IOException
{
Callable task = new Callable()
{
public Object call() throws IOException
{
- discardCompletedSegmentsInternal(context, cfId);
+ logger.debug("discard completed log segments for {}, column family {}", context, cfId);
+
+ // Go thru the active segment files, which are ordered oldest to newest, marking the
+ // flushed CF as clean, until we reach the segment file containing the ReplayPosition passed
+ // in the arguments. Any segments that become unused after they are marked clean will be
+ // recycled or discarded.
+ for (Iterator<CommitLogSegment> iter = allocator.activeSegments.iterator(); iter.hasNext(); )
+ {
+ CommitLogSegment segment = iter.next();
+ segment.markClean(cfId, context);
+
+ // If the segment is no longer needed, and we have another spare segment in the hopper
+ // (to keep the last segment from getting discarded), pursue either recycling or deleting
+ // this segment file.
+ if (segment.isUnused() && iter.hasNext())
+ {
+ logger.debug("Commit log segment {} is unused", segment);
+ iter.remove();
+ allocator.recycleSegment(segment);
+ }
+ else
+ {
+ if (logger.isDebugEnabled())
+ logger.debug(String.format("Not safe to delete commit log %s; dirty is %s; hasNext: %s",
+ segment, segment.dirtyString(), iter.hasNext()));
+ }
+
+ // Don't mark or try to delete any newer segments once we've reached the one containing the
+ // position of the flush.
+ if (segment.contains(context))
+ break;
+ }
+
return null;
}
};
+
try
{
executor.submit(task).get();
@@ -447,101 +471,54 @@ public class CommitLog implements Commit
}
/**
- * Delete log segments whose contents have been turned into SSTables. NOT threadsafe.
- *
- * param @ context The commitLog context .
- * param @ id id of the columnFamily being flushed to disk.
- *
- */
- private void discardCompletedSegmentsInternal(ReplayPosition context, Integer id) throws IOException
+ * Forces a disk flush on the commit log files that need it.
+ */
+ public void sync() throws IOException
{
- if (logger.isDebugEnabled())
- logger.debug("discard completed log segments for " + context + ", column family " + id + ".");
-
- /*
- * Loop through all the commit log files in the history. Now process
- * all files that are older than the one in the context. For each of
- * these files the header needs to modified by resetting the dirty
- * bit corresponding to the flushed CF.
- */
- Iterator<CommitLogSegment> iter = segments.iterator();
- while (iter.hasNext())
+ for (CommitLogSegment segment : allocator.activeSegments)
{
- CommitLogSegment segment = iter.next();
- if (segment.id == context.segment)
+ if (segment.needsSync())
{
- // Only unmark this segment if there were not write since the
- // ReplayPosition was grabbed.
- segment.turnOffIfNotWritten(id, context.position);
- maybeDiscardSegment(segment, iter);
- break;
+ segment.sync();
}
-
- segment.turnOff(id);
- maybeDiscardSegment(segment, iter);
- }
- }
-
- private void maybeDiscardSegment(CommitLogSegment segment, Iterator<CommitLogSegment> iter)
- {
- if (segment.isSafeToDelete() && iter.hasNext())
- {
- logger.info("Discarding obsolete commit log:" + segment);
- FileUtils.deleteAsync(segment.getPath());
- // usually this will be the first (remaining) segment, but not always, if segment A contains
- // writes to a CF that is unflushed but is followed by segment B whose CFs are all flushed.
- iter.remove();
- }
- else
- {
- if (logger.isDebugEnabled())
- logger.debug("Not safe to delete commit log " + segment + "; dirty is " + segment.dirtyString() + "; hasNext: " + iter.hasNext());
}
}
- void sync() throws IOException
- {
- currentSegment().sync();
- }
-
/**
- * @return the total size occupied by the commitlog segments expressed in bytes.
+ * @return the number of tasks completed by the commit log executor
*/
- public long getSize()
- {
- long commitlogTotalSize = 0;
-
- for (CommitLogSegment segment : segments)
- {
- commitlogTotalSize += segment.length();
- }
-
- return commitlogTotalSize;
- }
-
public long getCompletedTasks()
{
return executor.getCompletedTasks();
}
+ /**
+ * @return the depth of pending commit log executor queue
+ */
public long getPendingTasks()
{
return executor.getPendingTasks();
}
+ /**
+ * @return the total size occupied by commitlo segments expressed in bytes. (used by MBean)
+ */
public long getTotalCommitlogSize()
{
- return getSize();
+ return allocator.bytesUsed();
}
+ /**
+ * Forces a new segment file to be allocated and activated. Used mainly by truncate.
+ */
public void forceNewSegment() throws ExecutionException, InterruptedException
{
Callable<?> task = new Callable()
{
public Object call() throws IOException
{
- if (currentSegment().length() > 0)
- createNewSegment();
+ if (activeSegment.position() > 0)
+ activateNextSegment();
return null;
}
};
@@ -549,36 +526,25 @@ public class CommitLog implements Commit
executor.submit(task).get();
}
- private void createNewSegment() throws IOException
+ /**
+ * Fetches a new segment file from the allocator and activates it.
+ *
+ * @return the newly activated segment
+ */
+ private void activateNextSegment() throws IOException
{
- assert !segments.isEmpty();
- sync();
- segments.getLast().close();
- segments.add(new CommitLogSegment());
-
- // Maintain desired CL size cap
- if (getSize() >= DatabaseDescriptor.getTotalCommitlogSpaceInMB() * 1024 * 1024)
- {
- // Force a flush on all CFs keeping the oldest segment from being removed
- CommitLogSegment oldestSegment = segments.peek();
- assert oldestSegment != null; // has to be at least the one we just added
- for (Integer dirtyCFId : oldestSegment.cfLastWrite.keySet())
- {
- String keypace = Schema.instance.getCF(dirtyCFId).left;
- final ColumnFamilyStore cfs = Table.open(keypace).getColumnFamilyStore(dirtyCFId);
- // flush shouldn't run on the commitlog executor, since it acquires Table.switchLock,
- // which may already be held by a thread waiting for the CL executor (via getContext),
- // causing deadlock
- Runnable runnable = new Runnable()
- {
- public void run()
- {
- cfs.forceFlush();
- }
- };
- StorageService.optionalTasks.execute(runnable);
- }
- }
+ activeSegment = allocator.fetchSegment();
+ }
+
+ /**
+ * Shuts down the threads used by the commit log, blocking until completion.
+ */
+ public void shutdownBlocking() throws InterruptedException
+ {
+ executor.shutdown();
+ executor.awaitTermination();
+ allocator.shutdown();
+ allocator.awaitTermination();
}
// TODO this should be a Runnable since it doesn't actually return anything, but it's difficult to do that
@@ -596,10 +562,9 @@ public class CommitLog implements Commit
{
try
{
- currentSegment().write(rowMutation);
- // roll log if necessary
- if (currentSegment().length() >= SEGMENT_SIZE)
- createNewSegment();
+ if (!activeSegment.hasCapacityFor(rowMutation))
+ activateNextSegment();
+ activeSegment.write(rowMutation);
}
catch (IOException e)
{
@@ -613,10 +578,4 @@ public class CommitLog implements Commit
return null;
}
}
-
- public void shutdownBlocking() throws InterruptedException
- {
- executor.shutdown();
- executor.awaitTermination();
- }
}
Added: cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogAllocator.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogAllocator.java?rev=1205203&view=auto
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogAllocator.java (added)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogAllocator.java Tue Nov 22 22:39:39 2011
@@ -0,0 +1,327 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.commitlog;
+
+import java.io.File;
+
+import java.io.IOError;
+import java.io.IOException;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicLong;
+
+import com.google.common.collect.Iterables;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Table;
+import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.WrappedRunnable;
+
+/**
+ * Performs the pre-allocation of commit log segments in a background thread. All the
+ * public methods are thread safe.
+ */
+public class CommitLogAllocator
+{
+ static final Logger logger = LoggerFactory.getLogger(CommitLogAllocator.class);
+
+ /** The (theoretical) max milliseconds between loop runs to perform janitorial tasks */
+ public final static int TICK_CYCLE_TIME = 100;
+
+ /** Segments that are ready to be used */
+ private final BlockingQueue<CommitLogSegment> availableSegments = new LinkedBlockingQueue<CommitLogSegment>();
+
+ /** Allocations to be run by the thread */
+ private final BlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>();
+
+ /** Active segments, containing unflushed data */
+ final ConcurrentLinkedQueue<CommitLogSegment> activeSegments = new ConcurrentLinkedQueue<CommitLogSegment>();
+
+ /**
+ * Tracks commitlog size, in multiples of the segment size. We need to do this so we can "promise" size
+ * adjustments ahead of actually adding/freeing segments on disk, so that the "evict oldest segment" logic
+ * can see the effect of recycling segments immediately (even though they're really happening asynchronously
+ * on the allocator thread, which will take a ms or two).
+ */
+ private final AtomicLong size = new AtomicLong();
+
+ /**
+ * New segment creation is initially disabled because we'll typically get some "free" segments
+ * recycled after log replay.
+ */
+ private volatile boolean createReserveSegments = false;
+
+ private final Thread allocationThread;
+ private volatile boolean run = true;
+
+ public CommitLogAllocator()
+ {
+ // The run loop for the allocation thread
+ Runnable runnable = new WrappedRunnable()
+ {
+ public void runMayThrow() throws Exception
+ {
+ while (run)
+ {
+ Runnable r = queue.poll(TICK_CYCLE_TIME, TimeUnit.MILLISECONDS);
+
+ if (r != null)
+ {
+ r.run();
+ }
+ else
+ {
+ // no job, so we're clear to check to see if we're out of segments
+ // and ready a new one if needed. has the effect of ensuring there's
+ // almost always a segment available when it's needed.
+ if (availableSegments.isEmpty() && (activeSegments.isEmpty() || createReserveSegments))
+ {
+ createFreshSegment();
+ }
+ }
+ }
+ }
+ };
+
+ allocationThread = new Thread(runnable, "COMMIT-LOG-ALLOCATOR");
+ allocationThread.start();
+ }
+
+ /**
+ * Fetches an empty segment file.
+ *
+ * @return the next writeable segment
+ */
+ public CommitLogSegment fetchSegment()
+ {
+ CommitLogSegment next;
+ try
+ {
+ next = availableSegments.take();
+ }
+ catch (InterruptedException e)
+ {
+ throw new AssertionError(e);
+ }
+
+ assert !activeSegments.contains(next);
+ activeSegments.add(next);
+ if (isCapExceeded())
+ flushOldestTables();
+
+ return next;
+ }
+
+ /**
+ * Indicates that a segment is no longer in use and that it should be recycled.
+ *
+ * @param segment segment that is no longer in use
+ */
+ public void recycleSegment(final CommitLogSegment segment)
+ {
+ if (isCapExceeded())
+ {
+ discardSegment(segment);
+ return;
+ }
+
+ queue.add(new Runnable()
+ {
+ public void run()
+ {
+ segment.recycle();
+ }
+ });
+ }
+
+ /**
+ * Differs from the above because it can work on any file instead of just existing
+ * commit log segments managed by this allocator.
+ *
+ * @param file segment file that is no longer in use.
+ */
+ public void recycleSegment(final File file)
+ {
+ // check against SEGMENT_SIZE avoids recycling odd-sized or empty segments from old C* versions and unit tests
+ if (isCapExceeded() || file.length() != CommitLog.SEGMENT_SIZE)
+ {
+ try
+ {
+ FileUtils.deleteWithConfirm(file);
+ }
+ catch (IOException e)
+ {
+ throw new IOError(e);
+ }
+ return;
+ }
+
+ queue.add(new Runnable()
+ {
+ public void run()
+ {
+ CommitLogSegment segment = new CommitLogSegment(file.getPath());
+ internalAddReadySegment(segment);
+ }
+ });
+ }
+
+ /**
+ * Indicates that a segment file should be deleted.
+ *
+ * @param segment segment to be discarded
+ */
+ private void discardSegment(final CommitLogSegment segment)
+ {
+ size.addAndGet(-CommitLog.SEGMENT_SIZE);
+ queue.add(new Runnable()
+ {
+ public void run()
+ {
+ activeSegments.remove(segment);
+ segment.discard();
+ }
+ });
+ }
+
+ /**
+ * @return the space (in bytes) used by all segment files.
+ */
+ public long bytesUsed()
+ {
+ return size.get();
+ }
+
+ /**
+ * @param name the filename to check
+ * @return true if file is managed by this allocator.
+ */
+ public boolean manages(String name)
+ {
+ for (CommitLogSegment segment : Iterables.concat(activeSegments, availableSegments))
+ if (segment.getName().equals(name))
+ return true;
+
+ return false;
+ }
+
+ /**
+ * Creates and readies a brand new segment.
+ *
+ * @return the newly minted segment
+ */
+ private CommitLogSegment createFreshSegment()
+ {
+ size.addAndGet(CommitLog.SEGMENT_SIZE);
+ return internalAddReadySegment(CommitLogSegment.freshSegment());
+ }
+
+ /**
+ * Adds a segment to our internal tracking list and makes it ready for consumption.
+ *
+ * @param segment the segment to add
+ * @return the newly added segment
+ */
+ private CommitLogSegment internalAddReadySegment(CommitLogSegment segment)
+ {
+ assert !activeSegments.contains(segment);
+ assert !availableSegments.contains(segment);
+ availableSegments.add(segment);
+ return segment;
+ }
+
+ public boolean isCapExceeded()
+ {
+ return size.get() > DatabaseDescriptor.getTotalCommitlogSpaceInMB() * 1024 * 1024;
+ }
+
+ public void enableReserveSegmentCreation()
+ {
+ createReserveSegments = true;
+ }
+
+ /**
+ * Force a flush on all dirty CFs represented in the oldest commitlog segment
+ */
+ private void flushOldestTables()
+ {
+ CommitLogSegment oldestSegment = activeSegments.peek();
+
+ if (oldestSegment != null)
+ {
+ for (Integer dirtyCFId : oldestSegment.getDirtyCFIDs())
+ {
+ String keypace = Schema.instance.getCF(dirtyCFId).left;
+ final ColumnFamilyStore cfs = Table.open(keypace).getColumnFamilyStore(dirtyCFId);
+ // flush shouldn't run on the commitlog executor, since it acquires Table.switchLock,
+ // which may already be held by a thread waiting for the CL executor (via getContext),
+ // causing deadlock
+ Runnable runnable = new Runnable()
+ {
+ public void run()
+ {
+ cfs.forceFlush();
+ }
+ };
+ StorageService.optionalTasks.execute(runnable);
+ }
+ }
+ }
+
+ /**
+ * Resets all the segments, for testing purposes. DO NOT USE THIS OUTSIDE OF TESTS.
+ */
+ public void resetUnsafe()
+ {
+ logger.debug("Closing and clearing existing commit log segments...");
+
+ while (!queue.isEmpty())
+ Thread.yield();
+
+ for (CommitLogSegment segment : activeSegments)
+ segment.close();
+
+ activeSegments.clear();
+ availableSegments.clear();
+ }
+
+ /**
+ * Initiates the shutdown process for the allocator thread.
+ */
+ public void shutdown()
+ {
+ run = false;
+ }
+
+ /**
+ * Returns when the allocator thread terminates.
+ */
+ public void awaitTermination() throws InterruptedException
+ {
+ allocationThread.join();
+ }
+}
+
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java?rev=1205203&r1=1205202&r2=1205203&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java Tue Nov 22 22:39:39 2011
@@ -23,53 +23,115 @@ package org.apache.cassandra.db.commitlo
import java.io.File;
import java.io.IOError;
import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
+import java.io.RandomAccessFile;
+import java.nio.channels.FileChannel;
+import java.nio.MappedByteBuffer;
+import java.util.Collection;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.zip.CRC32;
-import java.util.zip.Checksum;
+import java.util.HashMap;
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.Schema;
-import org.apache.cassandra.io.util.SequentialWriter;
-import org.apache.cassandra.net.MessagingService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.RowMutation;
import org.apache.cassandra.db.ColumnFamily;
+/*
+ * A single commit log file on disk. Manages creation of the file and writing row mutations to disk,
+ * as well as tracking the last mutation position of any "dirty" CFs covered by the segment file. Segment
+ * files are initially allocated to a fixed size and can grow to accomidate a larger value if necessary.
+ */
public class CommitLogSegment
{
private static final Logger logger = LoggerFactory.getLogger(CommitLogSegment.class);
- private static Pattern COMMIT_LOG_FILE_PATTERN = Pattern.compile("CommitLog-(\\d+).log");
+
+ private static final String FILENAME_PREFIX = "CommitLog-";
+ private static final String FILENAME_EXTENSION = ".log";
+ private static Pattern COMMIT_LOG_FILE_PATTERN = Pattern.compile(FILENAME_PREFIX + "(\\d+)" + FILENAME_EXTENSION);
+
+ // The commit log entry overhead in bytes (int: length + long: head checksum + long: tail checksum)
+ static final int ENTRY_OVERHEAD_SIZE = 4 + 8 + 8;
+
+ // cache which cf is dirty in this segment to avoid having to lookup all ReplayPositions to decide if we can delete this segment
+ private final HashMap<Integer, Integer> cfLastWrite = new HashMap<Integer, Integer>();
public final long id;
- private final SequentialWriter logWriter;
- private long finalSize = -1;
- // cache which cf is dirty in this segment to avoid having to lookup all ReplayPositions to decide if we could delete this segment
- public final Map<Integer, Integer> cfLastWrite = new HashMap<Integer, Integer>();
+ private File logFile;
+ private RandomAccessFile logFileAccessor;
+
+ private boolean needsSync = false;
+
+ private final MappedByteBuffer buffer;
+ private boolean closed;
- public CommitLogSegment()
+ /**
+ * @return a newly minted segment file
+ */
+ public static CommitLogSegment freshSegment()
{
- id = System.currentTimeMillis();
- String logFile = DatabaseDescriptor.getCommitLogLocation() + File.separator + "CommitLog-" + id + ".log";
- logger.info("Creating new commitlog segment " + logFile);
+ return new CommitLogSegment(null);
+ }
+
+ /**
+ * Constructs a new segment file.
+ *
+ * @param filePath if not null, recycles the existing file by renaming it and truncating it to CommitLog.SEGMENT_SIZE.
+ */
+ CommitLogSegment(String filePath)
+ {
+ id = System.nanoTime();
+ logFile = new File(DatabaseDescriptor.getCommitLogLocation(), FILENAME_PREFIX + id + FILENAME_EXTENSION);
+ boolean isCreating = true;
try
{
- logWriter = createWriter(logFile);
+ if (filePath != null)
+ {
+ File oldFile = new File(filePath);
+
+ if (oldFile.exists())
+ {
+ logger.debug("Re-using discarded CommitLog segment for " + id + " from " + filePath);
+ oldFile.renameTo(logFile);
+ isCreating = false;
+ }
+ }
+
+ // Open the initial the segment file
+ logFileAccessor = new RandomAccessFile(logFile, "rw");
+
+ if (isCreating)
+ {
+ logger.debug("Creating new commit log segment " + logFile.getPath());
+ }
+
+ // Map the segment, extending or truncating it to the standard segment size
+ logFileAccessor.setLength(CommitLog.SEGMENT_SIZE);
+
+ buffer = logFileAccessor.getChannel().map(FileChannel.MapMode.READ_WRITE, (long) 0, (long) CommitLog.SEGMENT_SIZE);
+ buffer.putInt(CommitLog.END_OF_SEGMENT_MARKER);
+ buffer.position(0);
}
catch (IOException e)
{
throw new IOError(e);
- }
+ }
}
- // assume filename is a 'possibleCommitLogFile()'
+ /**
+ * Extracts the commit log ID from filename
+ *
+ * @param filename the filename of the commit log file
+ * @returns the extracted commit log ID
+ */
public static long idFromFilename(String filename)
{
Matcher matcher = COMMIT_LOG_FILE_PATTERN.matcher(filename);
@@ -86,102 +148,151 @@ public class CommitLogSegment
}
}
+ /**
+ * @param filename the filename to check
+ * @returns true if filename could be a commit log based on it's filename
+ */
public static boolean possibleCommitLogFile(String filename)
{
return COMMIT_LOG_FILE_PATTERN.matcher(filename).matches();
}
- private static SequentialWriter createWriter(String file) throws IOException
+ /**
+ * Completely discards a segment file by deleting it. (Potentially blocking operation)
+ */
+ public void discard()
{
- return SequentialWriter.open(new File(file), true);
+ close();
+ try
+ {
+ FileUtils.deleteWithConfirm(logFile);
+ }
+ catch (IOException e)
+ {
+ throw new IOError(e);
+ }
}
- public ReplayPosition write(RowMutation rowMutation) throws IOException
+ /**
+ * Recycle processes an unneeded segment file for reuse.
+ *
+ * @return a new CommitLogSegment representing the newly reusable segment.
+ */
+ public void recycle()
+ {
+ // writes an end-of-segment marker at the very beginning of the file and closes it
+ buffer.position(0);
+ buffer.putInt(CommitLog.END_OF_SEGMENT_MARKER);
+ buffer.position(0);
+ needsSync = true;
+ }
+
+ /**
+ * @return true if there is room to write() @param mutation to this segment
+ */
+ public boolean hasCapacityFor(RowMutation mutation)
{
- ReplayPosition cLogCtx = getContext();
+ long totalSize = RowMutation.serializer().serializedSize(mutation, MessagingService.version_) + ENTRY_OVERHEAD_SIZE;
+ return totalSize <= buffer.remaining();
+ }
- try
+ /**
+ * mark all of the column families we're modifying as dirty at this position
+ */
+ private void markDirty(RowMutation rowMutation, ReplayPosition repPos)
+ {
+ for (ColumnFamily columnFamily : rowMutation.getColumnFamilies())
{
- for (ColumnFamily columnFamily : rowMutation.getColumnFamilies())
+ // check for null cfm in case a cl write goes through after the cf is
+ // defined but before a new segment is created.
+ CFMetaData cfm = Schema.instance.getCFMetaData(columnFamily.id());
+ if (cfm == null)
{
- // check for null cfm in case a cl write goes through after the cf is
- // defined but before a new segment is created.
- CFMetaData cfm = Schema.instance.getCFMetaData(columnFamily.id());
- if (cfm == null)
- {
- logger.error("Attempted to write commit log entry for unrecognized column family: " + columnFamily.id());
- }
- else
- {
- turnOn(cfm.cfId, cLogCtx.position);
- }
+ logger.error("Attempted to write commit log entry for unrecognized column family: " + columnFamily.id());
+ }
+ else
+ {
+ markCFDirty(cfm.cfId, repPos.position);
}
+ }
+ }
- // write mutation, w/ checksum on the size and data
- Checksum checksum = new CRC32();
- byte[] serializedRow = rowMutation.getSerializedBuffer(MessagingService.version_);
- checksum.update(serializedRow.length);
- logWriter.stream.writeInt(serializedRow.length);
- logWriter.stream.writeLong(checksum.getValue());
- logWriter.write(serializedRow);
- checksum.update(serializedRow, 0, serializedRow.length);
- logWriter.stream.writeLong(checksum.getValue());
+ /**
+ * Appends a row mutation onto the commit log. Requres that hasCapacityFor has already been checked.
+ *
+ * @param rowMutation the mutation to append to the commit log.
+ * @return the position of the appended mutation
+ */
+ public ReplayPosition write(RowMutation rowMutation) throws IOException
+ {
+ assert !closed;
+ ReplayPosition repPos = getContext();
+ markDirty(rowMutation, repPos);
+
+ CRC32 checksum = new CRC32();
+ byte[] serializedRow = rowMutation.getSerializedBuffer(MessagingService.version_);
+
+ checksum.update(serializedRow.length);
+ buffer.putInt(serializedRow.length);
+ buffer.putLong(checksum.getValue());
+
+ buffer.put(serializedRow);
+ checksum.update(serializedRow);
+ buffer.putLong(checksum.getValue());
+
+ // writes end of segment marker and rewinds back to position where it starts
+ buffer.putInt(CommitLog.END_OF_SEGMENT_MARKER);
+ buffer.position(buffer.position() - CommitLog.END_OF_SEGMENT_MARKER_SIZE);
- return cLogCtx;
- }
- catch (IOException e)
- {
- logWriter.truncate(cLogCtx.position);
- throw e;
- }
+ needsSync = true;
+ return repPos;
}
+ /**
+ * Forces a disk flush for this segment file.
+ */
public void sync() throws IOException
{
- logWriter.sync();
+ buffer.force();
+ needsSync = false;
}
+ /**
+ * @return the current ReplayPosition for this log segment
+ */
public ReplayPosition getContext()
{
- long position = logWriter.getFilePointer();
- assert position <= Integer.MAX_VALUE;
- return new ReplayPosition(id, (int) position);
+ return new ReplayPosition(id, buffer.position());
}
+ /**
+ * @return the file path to this segment
+ */
public String getPath()
{
- return logWriter.getPath();
+ return logFile.getPath();
}
+ /**
+ * @return the file name of this segment
+ */
public String getName()
{
- return logWriter.getPath().substring(logWriter.getPath().lastIndexOf(File.separator) + 1);
- }
-
- public long length()
- {
- if (finalSize >= 0)
- return finalSize;
-
- try
- {
- return logWriter.length();
- }
- catch (IOException e)
- {
- throw new IOError(e);
- }
+ return logFile.getName();
}
+ /**
+ * Close the segment file.
+ */
public void close()
{
- if (finalSize >= 0)
+ if (closed)
return;
try
{
- finalSize = logWriter.length();
- logWriter.close();
+ logFileAccessor.close();
+ closed = true;
}
catch (IOException e)
{
@@ -189,29 +300,72 @@ public class CommitLogSegment
}
}
- void turnOn(Integer cfId, Integer position)
+ /**
+ * Records the CF as dirty at a certain position.
+ *
+ * @param cfId the column family ID that is now dirty
+ * @param position the position the last write for this CF was written at
+ */
+ private void markCFDirty(Integer cfId, Integer position)
{
cfLastWrite.put(cfId, position);
}
/**
- * Turn the dirty bit off only if there has been no write since the flush
- * position was grabbed.
+ * Marks the ColumnFamily specified by cfId as clean for this log segment. If the
+ * given context argument is contained in this file, it will only mark the CF as
+ * clean if no newer writes have taken place.
+ *
+ * @param cfId the column family ID that is now clean
+ * @param context the optional clean offset
*/
- void turnOffIfNotWritten(Integer cfId, Integer flushPosition)
+ public void markClean(Integer cfId, ReplayPosition context)
{
Integer lastWritten = cfLastWrite.get(cfId);
- if (lastWritten == null || lastWritten < flushPosition)
+
+ if (lastWritten != null && (!contains(context) || lastWritten < context.position))
+ {
cfLastWrite.remove(cfId);
+ }
+ }
+
+ /**
+ * @return a collection of dirty CFIDs for this segment file.
+ */
+ public Collection<Integer> getDirtyCFIDs()
+ {
+ return cfLastWrite.keySet();
+ }
+
+ /**
+ * @return true if this segment is unused and safe to recycle or delete
+ */
+ public boolean isUnused()
+ {
+ return cfLastWrite.isEmpty();
+ }
+
+ /**
+ * @return true if this segment file has unflushed writes
+ */
+ public boolean needsSync()
+ {
+ return needsSync;
}
- void turnOff(Integer cfId)
+ /**
+ * Check to see if a certain ReplayPosition is contained by this segment file.
+ *
+ * @param context the replay position to be checked
+ * @return true if the replay position is contained by this segment file.
+ */
+ public boolean contains(ReplayPosition context)
{
- cfLastWrite.remove(cfId);
+ return context.segment == id;
}
// For debugging, not fast
- String dirtyString()
+ public String dirtyString()
{
StringBuilder sb = new StringBuilder();
for (Integer cfId : cfLastWrite.keySet())
@@ -222,14 +376,21 @@ public class CommitLogSegment
return sb.toString();
}
- boolean isSafeToDelete()
+ @Override
+ public String toString()
{
- return cfLastWrite.isEmpty();
+ return "CommitLogSegment(" + getPath() + ')';
}
- @Override
- public String toString()
+ public int position()
{
- return "CommitLogSegment(" + logWriter.getPath() + ')';
+ try
+ {
+ return (int) logFileAccessor.getFilePointer();
+ }
+ catch (IOException e)
+ {
+ throw new IOError(e);
+ }
}
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/service/AbstractCassandraDaemon.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/AbstractCassandraDaemon.java?rev=1205203&r1=1205202&r2=1205203&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/AbstractCassandraDaemon.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/AbstractCassandraDaemon.java Tue Nov 22 22:39:39 2011
@@ -193,7 +193,7 @@ public abstract class AbstractCassandraD
}
// replay the log if necessary
- CommitLog.recover();
+ CommitLog.instance.recover();
// check to see if CL.recovery modified the lastMigrationId. if it did, we need to re apply migrations. this isn't
// the same as merely reloading the schema (which wouldn't perform file deletion after a DROP). The solution
Modified: cassandra/trunk/test/unit/org/apache/cassandra/db/CommitLogTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/db/CommitLogTest.java?rev=1205203&r1=1205202&r2=1205203&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/db/CommitLogTest.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/db/CommitLogTest.java Tue Nov 22 22:39:39 2011
@@ -28,8 +28,9 @@ import org.junit.Test;
import org.apache.cassandra.CleanupHelper;
import org.apache.cassandra.db.commitlog.CommitLog;
+import org.apache.cassandra.db.commitlog.CommitLogSegment;
import org.apache.cassandra.db.filter.QueryPath;
-import org.apache.cassandra.utils.Pair;
+
import static org.apache.cassandra.utils.ByteBufferUtil.bytes;
public class CommitLogTest extends CleanupHelper
@@ -37,7 +38,7 @@ public class CommitLogTest extends Clean
@Test
public void testRecoveryWithEmptyLog() throws Exception
{
- CommitLog.recover(new File[] {tmpFile()});
+ CommitLog.instance.recover(new File[]{ tmpFile() });
}
@Test
@@ -109,13 +110,13 @@ public class CommitLogTest extends Clean
rm2.add(new QueryPath("Standard2", null, bytes("c1")), ByteBuffer.allocate(4), 0);
CommitLog.instance.add(rm2);
- assert CommitLog.instance.segmentsCount() == 2 : "Expecting 2 segments, got " + CommitLog.instance.segmentsCount();
+ assert CommitLog.instance.activeSegments() == 2 : "Expecting 2 segments, got " + CommitLog.instance.activeSegments();
int cfid2 = rm2.getColumnFamilyIds().iterator().next();
CommitLog.instance.discardCompletedSegments(cfid2, CommitLog.instance.getContext());
// Assert we still have both our segment
- assert CommitLog.instance.segmentsCount() == 2 : "Expecting 2 segments, got " + CommitLog.instance.segmentsCount();
+ assert CommitLog.instance.activeSegments() == 2 : "Expecting 2 segments, got " + CommitLog.instance.activeSegments();
}
@Test
@@ -130,21 +131,22 @@ public class CommitLogTest extends Clean
CommitLog.instance.add(rm);
CommitLog.instance.add(rm);
- assert CommitLog.instance.segmentsCount() == 1 : "Expecting 1 segment, got " + CommitLog.instance.segmentsCount();
+ assert CommitLog.instance.activeSegments() == 1 : "Expecting 1 segment, got " + CommitLog.instance.activeSegments();
// "Flush": this won't delete anything
int cfid1 = rm.getColumnFamilyIds().iterator().next();
CommitLog.instance.discardCompletedSegments(cfid1, CommitLog.instance.getContext());
- assert CommitLog.instance.segmentsCount() == 1 : "Expecting 1 segment, got " + CommitLog.instance.segmentsCount();
+ assert CommitLog.instance.activeSegments() == 1 : "Expecting 1 segment, got " + CommitLog.instance.activeSegments();
- // Adding new mutation on another CF so that a new segment is created
+ // Adding new mutation on another CF, large enough (including CL entry overhead) that a new segment is created
RowMutation rm2 = new RowMutation("Keyspace1", bytes("k"));
rm2.add(new QueryPath("Standard2", null, bytes("c1")), ByteBuffer.allocate(64 * 1024 * 1024), 0);
CommitLog.instance.add(rm2);
+ // also forces a new segment, since each entry-with-overhead is just over half the CL size
CommitLog.instance.add(rm2);
- assert CommitLog.instance.segmentsCount() == 2 : "Expecting 2 segments, got " + CommitLog.instance.segmentsCount();
+ assert CommitLog.instance.activeSegments() == 3 : "Expecting 3 segments, got " + CommitLog.instance.activeSegments();
// "Flush" second cf: The first segment should be deleted since we
@@ -154,7 +156,7 @@ public class CommitLogTest extends Clean
CommitLog.instance.discardCompletedSegments(cfid2, CommitLog.instance.getContext());
// Assert we still have both our segment
- assert CommitLog.instance.segmentsCount() == 1 : "Expecting 1 segment, got " + CommitLog.instance.segmentsCount();
+ assert CommitLog.instance.activeSegments() == 1 : "Expecting 1 segment, got " + CommitLog.instance.activeSegments();
}
protected void testRecoveryWithBadSizeArgument(int size, int dataSize) throws Exception
@@ -189,6 +191,6 @@ public class CommitLogTest extends Clean
OutputStream lout = new FileOutputStream(logFile);
lout.write(logData);
//statics make it annoying to test things correctly
- CommitLog.recover(new File[] {logFile}); //CASSANDRA-1119 / CASSANDRA-1179 throw on failure*/
+ CommitLog.instance.recover(new File[]{ logFile }); //CASSANDRA-1119 / CASSANDRA-1179 throw on failure*/
}
}
Modified: cassandra/trunk/test/unit/org/apache/cassandra/db/RecoveryManager2Test.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/db/RecoveryManager2Test.java?rev=1205203&r1=1205202&r2=1205203&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/db/RecoveryManager2Test.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/db/RecoveryManager2Test.java Tue Nov 22 22:39:39 2011
@@ -64,7 +64,7 @@ public class RecoveryManager2Test extend
// replay the commit log (nothing on Standard1 should be replayed since everything was flushed, so only the row on Standard2
// will be replayed)
CommitLog.instance.resetUnsafe();
- int replayed = CommitLog.recover();
+ int replayed = CommitLog.instance.recover();
assert replayed == 1 : "Expecting only 1 replayed mutation, got " + replayed;
}
Modified: cassandra/trunk/test/unit/org/apache/cassandra/db/RecoveryManager3Test.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/db/RecoveryManager3Test.java?rev=1205203&r1=1205202&r2=1205203&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/db/RecoveryManager3Test.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/db/RecoveryManager3Test.java Tue Nov 22 22:39:39 2011
@@ -71,7 +71,7 @@ public class RecoveryManager3Test extend
}
CommitLog.instance.resetUnsafe(); // disassociate segments from live CL
- CommitLog.recover();
+ CommitLog.instance.recover();
assertColumns(Util.getColumnFamily(table1, dk, "Standard1"), "col1");
assertColumns(Util.getColumnFamily(table2, dk, "Standard3"), "col2");
Modified: cassandra/trunk/test/unit/org/apache/cassandra/db/RecoveryManagerTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/db/RecoveryManagerTest.java?rev=1205203&r1=1205202&r2=1205203&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/db/RecoveryManagerTest.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/db/RecoveryManagerTest.java Tue Nov 22 22:39:39 2011
@@ -36,7 +36,7 @@ public class RecoveryManagerTest extends
{
@Test
public void testNothingToRecover() throws IOException {
- CommitLog.recover();
+ CommitLog.instance.recover();
}
@Test
@@ -65,7 +65,7 @@ public class RecoveryManagerTest extends
table2.getColumnFamilyStore("Standard3").clearUnsafe();
CommitLog.instance.resetUnsafe(); // disassociate segments from live CL
- CommitLog.recover();
+ CommitLog.instance.recover();
assertColumns(Util.getColumnFamily(table1, dk, "Standard1"), "col1");
assertColumns(Util.getColumnFamily(table2, dk, "Standard3"), "col2");
@@ -92,7 +92,7 @@ public class RecoveryManagerTest extends
table1.getColumnFamilyStore("Counter1").clearUnsafe();
CommitLog.instance.resetUnsafe(); // disassociate segments from live CL
- CommitLog.recover();
+ CommitLog.instance.recover();
cf = Util.getColumnFamily(table1, dk, "Counter1");
Modified: cassandra/trunk/test/unit/org/apache/cassandra/db/RecoveryManagerTruncateTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/db/RecoveryManagerTruncateTest.java?rev=1205203&r1=1205202&r2=1205203&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/db/RecoveryManagerTruncateTest.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/db/RecoveryManagerTruncateTest.java Tue Nov 22 22:39:39 2011
@@ -61,7 +61,7 @@ public class RecoveryManagerTruncateTest
// and now truncate it
cfs.truncate().get();
CommitLog.instance.resetUnsafe();
- CommitLog.recover();
+ CommitLog.instance.recover();
// and validate truncation.
assertNull(getFromTable(table, "Standard1", "keymulti", "col1"));