You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2013/12/02 20:09:24 UTC
[1/2] Multithreaded commitlog patch by Benedict Elliot Smith;
reviewed by jbellis for CASSANDRA-3578
Updated Branches:
refs/heads/trunk 679ec7e88 -> 22e18f5a3
http://git-wip-us.apache.org/repos/asf/cassandra/blob/22e18f5a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
index 2c7a848..bc5c7d1 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
@@ -17,19 +17,28 @@
*/
package org.apache.cassandra.db.commitlog;
-import java.io.DataOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
-import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
-import java.util.zip.Checksum;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import org.cliffc.high_scale_lib.NonBlockingHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -39,11 +48,9 @@ import org.apache.cassandra.config.Schema;
import org.apache.cassandra.db.ColumnFamily;
import org.apache.cassandra.db.RowMutation;
import org.apache.cassandra.io.FSWriteError;
-import org.apache.cassandra.io.util.ByteBufferOutputStream;
-import org.apache.cassandra.io.util.ChecksummedOutputStream;
import org.apache.cassandra.io.util.FileUtils;
-import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.utils.PureJavaCrc32;
+import org.apache.cassandra.utils.WaitQueue;
/*
* A single commit log file on disk. Manages creation of the file and writing row mutations to disk,
@@ -60,35 +67,54 @@ public class CommitLogSegment
// The commit log entry overhead in bytes (int: length + long: head checksum + long: tail checksum)
static final int ENTRY_OVERHEAD_SIZE = 4 + 8 + 8;
- // cache which cf is dirty in this segment to avoid having to lookup all ReplayPositions to decide if we can delete this segment
- private final HashMap<UUID, Integer> cfLastWrite = new HashMap<UUID, Integer>();
+ // The commit log (chained) sync marker/header size in bytes (int: length + long: checksum [segmentId, position])
+ static final int SYNC_MARKER_SIZE = 4 + 8;
+
+ // The current AppendLock object - i.e. the one all threads adding new log records should use to synchronise
+ private final AtomicReference<AppendLock> appendLock = new AtomicReference<>(new AppendLock());
+
+ private final AtomicInteger allocatePosition = new AtomicInteger();
+
+ // Everything before this offset has been synced and written. The SYNC_MARKER_SIZE bytes after
+ // each sync are reserved, and point forwards to the next such offset. The final
+ // sync marker in a segment will be zeroed out, or point to EOF.
+ private volatile int lastSyncedOffset;
+
+ // the amount of the tail of the file we have allocated but not used - this is used when we discard a log segment
+ // to ensure nobody writes to it after we've decided we're done with it
+ private int discardedTailFrom;
+
+ // a signal for writers to wait on to confirm the log message they provided has been written to disk
+ private final WaitQueue syncComplete = new WaitQueue();
+
+ // a map of Cf->dirty position; this is used to permit marking Cfs clean whilst the log is still in use
+ private final NonBlockingHashMap<UUID, AtomicInteger> cfDirty = new NonBlockingHashMap<>(1024);
+
+ // a map of Cf->clean position; this is used to permit marking Cfs clean whilst the log is still in use
+ private final ConcurrentHashMap<UUID, AtomicInteger> cfClean = new ConcurrentHashMap<>();
public final long id;
private final File logFile;
private final RandomAccessFile logFileAccessor;
- private boolean needsSync = false;
-
private final MappedByteBuffer buffer;
- private final Checksum checksum;
- private final DataOutputStream bufferStream;
- private boolean closed;
public final CommitLogDescriptor descriptor;
/**
* @return a newly minted segment file
*/
- public static CommitLogSegment freshSegment()
+ static CommitLogSegment freshSegment()
{
return new CommitLogSegment(null);
}
- public static long getNextId()
+ static long getNextId()
{
return idBase + nextId.getAndIncrement();
}
+
/**
* Constructs a new segment file.
*
@@ -122,16 +148,16 @@ public class CommitLogSegment
if (isCreating)
logger.debug("Creating new commit log segment {}", logFile.getPath());
- // Map the segment, extending or truncating it to the standard segment size
+ // Map the segment, extending or truncating it to the standard segment size.
+ // (We may have restarted after a segment size configuration change, leaving "incorrectly"
+ // sized segments on disk.)
logFileAccessor.setLength(DatabaseDescriptor.getCommitLogSegmentSize());
buffer = logFileAccessor.getChannel().map(FileChannel.MapMode.READ_WRITE, 0, DatabaseDescriptor.getCommitLogSegmentSize());
- checksum = new PureJavaCrc32();
- bufferStream = new DataOutputStream(new ChecksummedOutputStream(new ByteBufferOutputStream(buffer), checksum));
- buffer.putInt(CommitLog.END_OF_SEGMENT_MARKER);
- buffer.position(0);
-
- needsSync = true;
+ // mark the initial header as uninitialised
+ buffer.putInt(0, 0);
+ buffer.putLong(4, 0);
+ allocatePosition.set(SYNC_MARKER_SIZE);
}
catch (IOException e)
{
@@ -140,123 +166,184 @@ public class CommitLogSegment
}
/**
- * Completely discards a segment file by deleting it. (Potentially blocking operation)
- */
- public void discard(boolean deleteFile)
- {
- // TODO shouldn't we close the file when we're done writing to it, which comes (potentially) much earlier than it's eligible for recyling?
- close();
- if (deleteFile)
- FileUtils.deleteWithConfirm(logFile);
- }
-
- /**
- * Recycle processes an unneeded segment file for reuse.
- *
- * @return a new CommitLogSegment representing the newly reusable segment.
+ * allocate space in this buffer for the provided row mutation, and populate the provided
+ * Allocation object, returning true on success. False indicates there is not enough room in
+ * this segment, and a new segment is needed
*/
- public CommitLogSegment recycle()
+ boolean allocate(RowMutation rowMutation, int size, Allocation alloc)
{
- // writes an end-of-segment marker at the very beginning of the file and closes it
- buffer.position(0);
- buffer.putInt(CommitLog.END_OF_SEGMENT_MARKER);
- buffer.position(0);
-
+ final AppendLock appendLock = lockForAppend();
try
{
- sync();
+ int position = allocate(size);
+ if (position < 0)
+ {
+ appendLock.unlock();
+ return false;
+ }
+ alloc.buffer = (ByteBuffer) buffer.duplicate().position(position).limit(position + size);
+ alloc.position = position;
+ alloc.segment = this;
+ alloc.appendLock = appendLock;
+ markDirty(rowMutation, position);
+ return true;
}
- catch (FSWriteError e)
+ catch (Throwable t)
{
- logger.error("I/O error flushing {} {}", this, e.getMessage());
- throw e;
+ appendLock.unlock();
+ throw t;
}
+ }
- close();
-
- return new CommitLogSegment(getPath());
+ // obtain the current AppendLock and lock it for record appending
+ private AppendLock lockForAppend()
+ {
+ while (true)
+ {
+ AppendLock appendLock = this.appendLock.get();
+ if (appendLock.lock())
+ return appendLock;
+ }
}
- /**
- * @return true if there is room to write() @param size to this segment
- */
- public boolean hasCapacityFor(long size)
+ // allocate bytes in the segment, or return -1 if not enough space
+ private int allocate(int size)
{
- return size <= buffer.remaining();
+ while (true)
+ {
+ int prev = allocatePosition.get();
+ int next = prev + size;
+ if (next >= buffer.capacity())
+ return -1;
+ if (allocatePosition.compareAndSet(prev, next))
+ return prev;
+ }
}
- /**
- * mark all of the column families we're modifying as dirty at this position
- */
- private void markDirty(RowMutation rowMutation, ReplayPosition repPos)
+ // ensures no more of this segment is writeable, by allocating any unused section at the end and marking it discarded
+ synchronized void discardUnusedTail()
{
- for (ColumnFamily columnFamily : rowMutation.getColumnFamilies())
+ if (discardedTailFrom > 0)
+ return;
+ while (true)
{
- // check for null cfm in case a cl write goes through after the cf is
- // defined but before a new segment is created.
- CFMetaData cfm = Schema.instance.getCFMetaData(columnFamily.id());
- if (cfm == null)
- {
- logger.error("Attempted to write commit log entry for unrecognized column family: {}", columnFamily.id());
- }
- else
+ int prev = allocatePosition.get();
+ int next = buffer.capacity();
+ if (allocatePosition.compareAndSet(prev, next))
{
- markCFDirty(cfm.cfId, repPos.position);
+ discardedTailFrom = prev;
+ return;
}
}
}
- /**
- * Appends a row mutation onto the commit log. Requres that hasCapacityFor has already been checked.
- *
- * @param mutation the mutation to append to the commit log.
- * @return the position of the appended mutation
+ /**
+ * Forces a disk flush for this segment file.
*/
- public ReplayPosition write(RowMutation mutation) throws IOException
+ synchronized void sync()
{
- assert !closed;
- ReplayPosition repPos = getContext();
- markDirty(mutation, repPos);
+ try
+ {
+ // check we have more work to do
+ if (allocatePosition.get() <= lastSyncedOffset + SYNC_MARKER_SIZE)
+ return;
+
+ // allocate a new sync marker; this is both necessary in itself, but also serves to demarcate
+ // the point at which we can safely consider records to have been completely written to
+ int nextMarker;
+ nextMarker = allocate(SYNC_MARKER_SIZE);
+ boolean close = false;
+ if (nextMarker < 0)
+ {
+ // ensure no more of this CLS is writeable, and mark ourselves for closing
+ discardUnusedTail();
+ close = true;
- checksum.reset();
+ if (discardedTailFrom < buffer.capacity() - SYNC_MARKER_SIZE)
+ {
+ // if there's room in the discard section to write an empty header, use that as the nextMarker
+ nextMarker = discardedTailFrom;
+ }
+ else
+ {
+ // not enough space left in the buffer, so mark the next sync marker as the EOF position
+ nextMarker = buffer.capacity();
+ }
+ }
- // checksummed length
- int length = (int) RowMutation.serializer.serializedSize(mutation, MessagingService.current_version);
- bufferStream.writeInt(length);
- buffer.putLong(checksum.getValue());
+ // swap the append lock
+ AppendLock curAppendLock = appendLock.get();
+ appendLock.set(new AppendLock());
+ curAppendLock.expireAndWaitForCompletion();
+
+ // write previous sync marker to point to next sync marker
+ // we don't chain the crcs here to ensure this method is idempotent if it fails
+ int offset = lastSyncedOffset;
+ final PureJavaCrc32 crc = new PureJavaCrc32();
+ crc.update((int) (id & 0xFFFFFFFFL));
+ crc.update((int) (id >>> 32));
+ crc.update(offset);
+ buffer.putInt(offset, nextMarker);
+ buffer.putLong(offset + 4, crc.getValue());
+
+ // zero out the next sync marker so replayer can cleanly exit
+ if (nextMarker < buffer.capacity())
+ {
+ buffer.putInt(nextMarker, 0);
+ buffer.putLong(nextMarker + 4, 0);
+ }
- // checksummed mutation
- RowMutation.serializer.serialize(mutation, bufferStream, MessagingService.current_version);
- buffer.putLong(checksum.getValue());
+ // actually perform the sync and signal those waiting for it
+ buffer.force();
+ syncComplete.signalAll();
- if (buffer.remaining() >= 4)
+ if (close)
+ {
+ close();
+ nextMarker = buffer.capacity();
+ }
+
+ lastSyncedOffset = nextMarker;
+ }
+ catch (Exception e) // MappedByteBuffer.force() does not declare IOException but can actually throw it
{
- // writes end of segment marker and rewinds back to position where it starts
- buffer.putInt(CommitLog.END_OF_SEGMENT_MARKER);
- buffer.position(buffer.position() - CommitLog.END_OF_SEGMENT_MARKER_SIZE);
+ throw new FSWriteError(e, getPath());
}
+ }
- needsSync = true;
- return repPos;
+ public boolean isFullySynced()
+ {
+ return lastSyncedOffset == buffer.capacity();
}
/**
- * Forces a disk flush for this segment file.
+ * Completely discards a segment file by deleting it. (Potentially blocking operation)
*/
- public void sync()
+ void delete()
{
- if (needsSync)
+ FileUtils.deleteWithConfirm(logFile);
+ }
+
+ /**
+ * Recycle processes an unneeded segment file for reuse.
+ *
+ * @return a new CommitLogSegment representing the newly reusable segment.
+ */
+ CommitLogSegment recycle()
+ {
+ try
{
- try
- {
- buffer.force();
- }
- catch (Exception e) // MappedByteBuffer.force() does not declare IOException but can actually throw it
- {
- throw new FSWriteError(e, getPath());
- }
- needsSync = false;
+ sync();
+ }
+ catch (FSWriteError e)
+ {
+ logger.error("I/O error flushing {} {}", this, e.getMessage());
+ throw e;
}
+
+ close();
+
+ return new CommitLogSegment(getPath());
}
/**
@@ -264,7 +351,7 @@ public class CommitLogSegment
*/
public ReplayPosition getContext()
{
- return new ReplayPosition(id, buffer.position());
+ return new ReplayPosition(id, allocatePosition.get());
}
/**
@@ -286,16 +373,12 @@ public class CommitLogSegment
/**
* Close the segment file.
*/
- public void close()
+ void close()
{
- if (closed)
- return;
-
try
{
FileUtils.clean(buffer);
logFileAccessor.close();
- closed = true;
}
catch (IOException e)
{
@@ -303,15 +386,17 @@ public class CommitLogSegment
}
}
- /**
- * Records the CF as dirty at a certain position.
- *
- * @param cfId the column family ID that is now dirty
- * @param position the position the last write for this CF was written at
- */
- private void markCFDirty(UUID cfId, Integer position)
+ void markDirty(RowMutation rowMutation, int allocatedPosition)
{
- cfLastWrite.put(cfId, position);
+ for (ColumnFamily columnFamily : rowMutation.getColumnFamilies())
+ {
+ // check for deleted CFS
+ CFMetaData cfm = columnFamily.metadata();
+ if (cfm.isPurged())
+ logger.error("Attempted to write commit log entry for unrecognized column family: {}", columnFamily.id());
+ else
+ ensureAtleast(cfDirty, cfm.cfId, allocatedPosition);
+ }
}
/**
@@ -324,20 +409,79 @@ public class CommitLogSegment
*/
public void markClean(UUID cfId, ReplayPosition context)
{
- Integer lastWritten = cfLastWrite.get(cfId);
+ if (!cfDirty.containsKey(cfId))
+ return;
+ if (context.segment == id)
+ markClean(cfId, context.position);
+ else if (context.segment > id)
+ markClean(cfId, Integer.MAX_VALUE);
+ }
+
+ private void markClean(UUID cfId, int position)
+ {
+ ensureAtleast(cfClean, cfId, position);
+ removeCleanFromDirty();
+ }
+
+ private static void ensureAtleast(ConcurrentMap<UUID, AtomicInteger> map, UUID cfId, int value)
+ {
+ AtomicInteger i = map.get(cfId);
+ if (i == null)
+ {
+ AtomicInteger i2 = map.putIfAbsent(cfId, i = new AtomicInteger());
+ if (i2 != null)
+ i = i2;
+ }
+ while (true)
+ {
+ int cur = i.get();
+ if (cur > value)
+ break;
+ if (i.compareAndSet(cur, value))
+ break;
+ }
+ }
+
+ private void removeCleanFromDirty()
+ {
+ // if we're still allocating from this segment, don't touch anything since it can't be done thread-safely
+ if (!isFullySynced())
+ return;
- if (lastWritten != null && (!contains(context) || lastWritten < context.position))
+ Iterator<Map.Entry<UUID, AtomicInteger>> iter = cfClean.entrySet().iterator();
+ while (iter.hasNext())
{
- cfLastWrite.remove(cfId);
+ Map.Entry<UUID, AtomicInteger> clean = iter.next();
+ UUID cfId = clean.getKey();
+ AtomicInteger cleanPos = clean.getValue();
+ AtomicInteger dirtyPos = cfDirty.get(cfId);
+ if (dirtyPos != null && dirtyPos.intValue() < cleanPos.intValue())
+ {
+ cfDirty.remove(cfId);
+ iter.remove();
+ }
}
}
+
/**
* @return a collection of dirty CFIDs for this segment file.
*/
public Collection<UUID> getDirtyCFIDs()
{
- return cfLastWrite.keySet();
+ removeCleanFromDirty();
+ if (cfClean.isEmpty() || cfDirty.isEmpty())
+ return cfDirty.keySet();
+ List<UUID> r = new ArrayList<>(cfDirty.size());
+ for (Map.Entry<UUID, AtomicInteger> dirty : cfDirty.entrySet())
+ {
+ UUID cfId = dirty.getKey();
+ AtomicInteger dirtyPos = dirty.getValue();
+ AtomicInteger cleanPos = cfClean.get(cfId);
+ if (cleanPos == null || cleanPos.intValue() < dirtyPos.intValue())
+ r.add(dirty.getKey());
+ }
+ return r;
}
/**
@@ -345,7 +489,12 @@ public class CommitLogSegment
*/
public boolean isUnused()
{
- return cfLastWrite.isEmpty();
+ // if it's not fully synced, we assume we're still in use as the active allocatingFrom
+ if (!isFullySynced())
+ return false;
+
+ removeCleanFromDirty();
+ return cfDirty.isEmpty();
}
/**
@@ -363,7 +512,7 @@ public class CommitLogSegment
public String dirtyString()
{
StringBuilder sb = new StringBuilder();
- for (UUID cfId : cfLastWrite.keySet())
+ for (UUID cfId : getDirtyCFIDs())
{
CFMetaData m = Schema.instance.getCFMetaData(cfId);
sb.append(m == null ? "<deleted>" : m.cfName).append(" (").append(cfId).append("), ");
@@ -377,9 +526,9 @@ public class CommitLogSegment
return "CommitLogSegment(" + getPath() + ')';
}
- public int position()
+ public boolean equals(Object that)
{
- return buffer.position();
+ return super.equals(that);
}
@@ -392,4 +541,92 @@ public class CommitLogSegment
return (int) (desc.id - desc2.id);
}
}
+
+ /**
+ * A relatively simple class for synchronising flushes() with log message writers:
+ * Log writers take the readLock prior to allocating themselves space in the segment;
+ * once they complete writing the record they release the read lock. A call to sync()
+ * will first check the position we have allocated space up until, then allocate a new AppendLock object,
+ * take the writeLock of the previous AppendLock, and invalidate it for further log writes. All appends are
+ * redirected to the new AppendLock so they do not block; only the sync() blocks waiting to obtain the writeLock.
+ * Once it obtains the lock it is guaranteed that all writes up to the allocation position it checked at
+ * the start have been completely written to.
+ */
+ private static final class AppendLock
+ {
+ final ReadWriteLock syncLock = new ReentrantReadWriteLock();
+ final Lock logLock = syncLock.readLock();
+ // a map of Cfs with log records that have not been synced to disk, so cannot be marked clean yet
+
+ boolean expired;
+
+ // false if the lock could not be acquired for adding a log record;
+ // a new AppendLock object will already be available, so fetch appendLock().get()
+ // and retry
+ boolean lock()
+ {
+ if (!logLock.tryLock())
+ return false;
+ if (expired)
+ {
+ logLock.unlock();
+ return false;
+ }
+ return true;
+ }
+
+ // release the lock so that a appendLock() may complete
+ void unlock()
+ {
+ logLock.unlock();
+ }
+
+ void expireAndWaitForCompletion()
+ {
+ // wait for log records to complete (take writeLock)
+ syncLock.writeLock().lock();
+ expired = true;
+ // release lock immediately, though effectively a NOOP since we use tryLock() for log record appends
+ syncLock.writeLock().unlock();
+ }
+ }
+
+ /**
+ * A simple class for tracking information about the portion of a segment that has been allocated to a log write.
+ * The constructor leaves the fields uninitialized for population by CommitlogManager, so that it can be
+ * stack-allocated by escape analysis in CommitLog.add.
+ */
+ static final class Allocation
+ {
+ private CommitLogSegment segment;
+ private AppendLock appendLock;
+ private int position;
+ private ByteBuffer buffer;
+
+ CommitLogSegment getSegment()
+ {
+ return segment;
+ }
+
+ ByteBuffer getBuffer()
+ {
+ return buffer;
+ }
+
+ // markWritten() MUST be called once we are done with the segment or the CL will never flush
+ void markWritten()
+ {
+ appendLock.unlock();
+ }
+
+ void awaitDiskSync()
+ {
+ while (segment.lastSyncedOffset < position)
+ {
+ WaitQueue.Signal signal = segment.syncComplete.register();
+ if (segment.lastSyncedOffset < position)
+ signal.awaitUninterruptibly();
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/22e18f5a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java
new file mode 100644
index 0000000..dd96f35
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java
@@ -0,0 +1,533 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.commitlog;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+
+import com.google.common.collect.Iterables;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.RowMutation;
+import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.Pair;
+import org.apache.cassandra.utils.WaitQueue;
+import org.apache.cassandra.utils.WrappedRunnable;
+
+import static org.apache.cassandra.db.commitlog.CommitLogSegment.Allocation;
+
+/**
+ * Performs eager-creation of commit log segments in a background thread. All the
+ * public methods are thread safe.
+ */
+public class CommitLogSegmentManager
+{
+ static final Logger logger = LoggerFactory.getLogger(CommitLogSegmentManager.class);
+
+ /**
+ * Queue of work to be done by the manager thread. This is usually a recycle operation, which returns
+ * a CommitLogSegment, or a delete operation, which returns null.
+ */
+ private final BlockingQueue<Callable<CommitLogSegment>> segmentManagementTasks = new LinkedBlockingQueue<>();
+
+ /** Segments that are ready to be used. Head of the queue is the one we allocate writes to */
+ private final ConcurrentLinkedQueue<CommitLogSegment> availableSegments = new ConcurrentLinkedQueue<>();
+
+ /** Active segments, containing unflushed data */
+ private final ConcurrentLinkedQueue<CommitLogSegment> activeSegments = new ConcurrentLinkedQueue<>();
+
+ /** The segment we are currently allocating commit log records to */
+ private volatile CommitLogSegment allocatingFrom = null;
+
+ private final WaitQueue hasAvailableSegments = new WaitQueue();
+
+ private static final AtomicReferenceFieldUpdater<CommitLogSegmentManager, CommitLogSegment> allocatingFromUpdater = AtomicReferenceFieldUpdater.newUpdater(CommitLogSegmentManager.class, CommitLogSegment.class, "allocatingFrom");
+
+ /**
+ * Tracks commitlog size, in multiples of the segment size. We need to do this so we can "promise" size
+ * adjustments ahead of actually adding/freeing segments on disk, so that the "evict oldest segment" logic
+ * can see the effect of recycling segments immediately (even though they're really happening asynchronously
+ * on the manager thread, which will take a ms or two).
+ */
+ private final AtomicLong size = new AtomicLong();
+
+ /**
+ * New segment creation is initially disabled because we'll typically get some "free" segments
+ * recycled after log replay.
+ */
+ private volatile boolean createReserveSegments = false;
+
+ private final Thread managerThread;
+ private volatile boolean run = true;
+
+ public CommitLogSegmentManager()
+ {
+ // The run loop for the manager thread
+ Runnable runnable = new WrappedRunnable()
+ {
+ public void runMayThrow() throws Exception
+ {
+ while (run)
+ {
+ Callable<CommitLogSegment> task = segmentManagementTasks.poll();
+ if (task == null)
+ {
+ // if we have no more work to do, check if we should create a new segment
+ if (availableSegments.isEmpty() && (activeSegments.isEmpty() || createReserveSegments))
+ {
+ logger.debug("No segments in reserve; creating a fresh one");
+ size.addAndGet(DatabaseDescriptor.getCommitLogSegmentSize());
+ // TODO : some error handling in case we fail to create a new segment
+ availableSegments.add(CommitLogSegment.freshSegment());
+ hasAvailableSegments.signalAll();
+ }
+
+ // flush old Cfs if we're full
+ long unused = unusedCapacity();
+ if (unused < 0)
+ {
+ List<CommitLogSegment> segmentsToRecycle = new ArrayList<>();
+ long spaceToReclaim = 0;
+ for (CommitLogSegment segment : activeSegments)
+ {
+ if (segment == allocatingFrom)
+ break;
+ segmentsToRecycle.add(segment);
+ spaceToReclaim += DatabaseDescriptor.getCommitLogSegmentSize();
+ if (spaceToReclaim + unused >= 0)
+ break;
+ }
+ flushDataFrom(segmentsToRecycle);
+ }
+
+ try
+ {
+ // wait for new work to be provided
+ task = segmentManagementTasks.take();
+ }
+ catch (InterruptedException e)
+ {
+ // shutdown signal; exit cleanly
+ continue;
+ }
+ }
+
+ // TODO : some error handling in case we fail on executing call (e.g. recycling)
+ CommitLogSegment recycled = task.call();
+ if (recycled != null)
+ {
+ // if the work resulted in a segment to recycle, publish it
+ availableSegments.add(recycled);
+ hasAvailableSegments.signalAll();
+ }
+ }
+ }
+ };
+
+ managerThread = new Thread(runnable, "COMMIT-LOG-ALLOCATOR");
+ managerThread.start();
+ }
+
+ /**
+ * Reserve space in the current segment for the provided row mutation or, if there isn't space available,
+ * create a new segment.
+ *
+ * @return the provided Allocation object
+ */
+ public Allocation allocate(RowMutation rowMutation, int size, Allocation alloc)
+ {
+ CommitLogSegment segment = allocatingFrom();
+
+ while (!segment.allocate(rowMutation, size, alloc))
+ {
+ // failed to allocate, so move to a new segment with enough room
+ advanceAllocatingFrom(segment);
+ segment = allocatingFrom;
+ }
+
+ return alloc;
+ }
+
+ // simple wrapper to ensure non-null value for allocatingFrom; only necessary on first call
+ CommitLogSegment allocatingFrom()
+ {
+ CommitLogSegment r = allocatingFrom;
+ if (r == null)
+ {
+ advanceAllocatingFrom(null);
+ r = allocatingFrom;
+ }
+ return r;
+ }
+
+ /**
+ * Fetches a new segment from the queue, creating a new one if necessary, and activates it
+ */
+ private void advanceAllocatingFrom(CommitLogSegment old)
+ {
+ while (true)
+ {
+ Iterator<CommitLogSegment> iter = availableSegments.iterator();
+ if (iter.hasNext())
+ {
+ CommitLogSegment next;
+ if (!allocatingFromUpdater.compareAndSet(this, old, next = iter.next()))
+ // failed to swap so we should already be able to continue
+ return;
+
+ iter.remove();
+ activeSegments.add(next);
+
+ if (availableSegments.isEmpty())
+ {
+ // if we've emptied the queue of available segments, trigger the manager to maybe add another
+ wakeManager();
+ }
+
+ if (old != null)
+ {
+ // Now we can run the user defined command just after switching to the new commit log.
+ // (Do this here instead of in the recycle call so we can get a head start on the archive.)
+ CommitLog.instance.archiver.maybeArchive(old.getPath(), old.getName());
+ }
+
+ // ensure we don't continue to use the old file; not strictly necessary, but cleaner to enforce it
+ if (old != null)
+ old.discardUnusedTail();
+
+ // request that the CL be synced out-of-band, as we've finished a segment
+ CommitLog.instance.requestExtraSync();
+ return;
+ }
+
+ // no more segments, so register to receive a signal when not empty
+ WaitQueue.Signal signal = hasAvailableSegments.register();
+
+ // trigger the management thread; this must occur after registering
+ // the signal to ensure we are woken by any new segment creation
+ wakeManager();
+
+ // check if the queue has already been added to before waiting on the signal, to catch modifications
+ // that happened prior to registering the signal
+ if (availableSegments.isEmpty())
+ {
+ // check to see if we've been beaten to it
+ if (allocatingFrom != old)
+ return;
+
+ // can only reach here if the queue hasn't been inserted into
+ // before we registered the signal, as we only remove items from the queue
+ // after updating allocatingFrom. Can safely block until we are signalled
+ // by the allocator that new segments have been published
+ signal.awaitUninterruptibly();
+ }
+ }
+ }
+
+ private void wakeManager()
+ {
+ // put a NO-OP on the queue, to trigger management thread (and create a new segment if necessary)
+ segmentManagementTasks.add(new Callable<CommitLogSegment>()
+ {
+ public CommitLogSegment call()
+ {
+ return null;
+ }
+ });
+ }
+
+ /**
+ * Switch to a new segment, regardless of how much is left in the current one.
+ *
+ * Flushes any dirty CFs for this segment and any older segments, and then recycles
+ * the segments
+ */
+ void forceRecycleAll()
+ {
+ CommitLogSegment last = allocatingFrom;
+ last.discardUnusedTail();
+ List<CommitLogSegment> segmentsToRecycle = new ArrayList<>(activeSegments);
+ advanceAllocatingFrom(last);
+
+ // flush and wait for all CFs that are dirty in segments up-to and including 'last'
+ Future<?> future = flushDataFrom(segmentsToRecycle);
+ try
+ {
+ future.get();
+
+ // now recycle segments that are unused, as we may not have triggered a discardCompletedSegments()
+ // if the previous active segment was the only one to recycle (since an active segment isn't
+ // necessarily dirty, and we only call dCS after a flush).
+ for (CommitLogSegment segment : activeSegments)
+ if (segment.isUnused())
+ recycleSegment(segment);
+
+ CommitLogSegment first;
+ assert (first = activeSegments.peek()) == null || first.id > last.id;
+ }
+ catch (Throwable t)
+ {
+ // for now just log the error and return false, indicating that we failed
+ logger.error("Failed waiting for a forced recycle of in-use commit log segments", t);
+ }
+ }
+
+ /**
+ * Indicates that a segment is no longer in use and that it should be recycled.
+ *
+ * @param segment segment that is no longer in use
+ */
+ void recycleSegment(final CommitLogSegment segment)
+ {
+ activeSegments.remove(segment);
+ if (!CommitLog.instance.archiver.maybeWaitForArchiving(segment.getName()))
+ {
+ // if archiving (command) was not successful then leave the file alone. don't delete or recycle.
+ discardSegment(segment, false);
+ return;
+ }
+ if (isCapExceeded())
+ {
+ discardSegment(segment, true);
+ return;
+ }
+
+ logger.debug("Recycling {}", segment);
+ segmentManagementTasks.add(new Callable<CommitLogSegment>()
+ {
+ public CommitLogSegment call()
+ {
+ return segment.recycle();
+ }
+ });
+ }
+
+ /**
+ * Differs from the above because it can work on any file instead of just existing
+ * commit log segments managed by this manager.
+ *
+ * @param file segment file that is no longer in use.
+ */
+ void recycleSegment(final File file)
+ {
+ if (isCapExceeded()
+ || CommitLogDescriptor.fromFileName(file.getName()).getMessagingVersion() != MessagingService.current_version)
+ {
+ // (don't decrease managed size, since this was never a "live" segment)
+ logger.debug("(Unopened) segment {} is no longer needed and will be deleted now", file);
+ FileUtils.deleteWithConfirm(file);
+ return;
+ }
+
+ logger.debug("Recycling {}", file);
+ // this wasn't previously a live segment, so add it to the managed size when we make it live
+ size.addAndGet(DatabaseDescriptor.getCommitLogSegmentSize());
+ segmentManagementTasks.add(new Callable<CommitLogSegment>()
+ {
+ public CommitLogSegment call()
+ {
+ return new CommitLogSegment(file.getPath());
+ }
+ });
+ }
+
+ /**
+ * Indicates that a segment file should be deleted.
+ *
+ * @param segment segment to be discarded
+ */
+ private void discardSegment(final CommitLogSegment segment, final boolean deleteFile)
+ {
+ logger.debug("Segment {} is no longer active and will be deleted {}", segment, deleteFile ? "now" : "by the archive script");
+ size.addAndGet(-DatabaseDescriptor.getCommitLogSegmentSize());
+
+ segmentManagementTasks.add(new Callable<CommitLogSegment>()
+ {
+ public CommitLogSegment call()
+ {
+ segment.close();
+ if (deleteFile)
+ segment.delete();
+ return null;
+ }
+ });
+ }
+
+ /**
+ * @return the space (in bytes) used by all segment files.
+ */
+ public long bytesUsed()
+ {
+ return size.get();
+ }
+
+ /**
+ * @param name the filename to check
+ * @return true if file is managed by this manager.
+ */
+ public boolean manages(String name)
+ {
+ for (CommitLogSegment segment : Iterables.concat(activeSegments, availableSegments))
+ if (segment.getName().equals(name))
+ return true;
+ return false;
+ }
+
+ /**
+ * Check to see if the speculative current size exceeds the cap.
+ *
+ * @return true if cap is exceeded
+ */
+ private boolean isCapExceeded()
+ {
+ return unusedCapacity() < 0;
+ }
+
+ private long unusedCapacity()
+ {
+ long currentSize = size.get();
+ logger.debug("Total active commitlog segment space used is {}", currentSize);
+ return DatabaseDescriptor.getTotalCommitlogSpaceInMB() * 1024 * 1024 - currentSize;
+ }
+
+ /**
+ * Throws a flag that enables the behavior of keeping at least one spare segment
+ * available at all times.
+ */
+ public void enableReserveSegmentCreation()
+ {
+ createReserveSegments = true;
+ wakeManager();
+ }
+
+ /**
+ * Force a flush on all CFs that are still dirty in @param segments.
+ *
+ * @return a Future that will finish when all the flushes are complete.
+ */
+ private Future<?> flushDataFrom(Collection<CommitLogSegment> segments)
+ {
+ // a map of CfId -> forceFlush() to ensure we only queue one flush per cf
+ final Map<UUID, Future<?>> flushes = new LinkedHashMap<>();
+
+ for (CommitLogSegment segment : segments)
+ {
+ for (UUID dirtyCFId : segment.getDirtyCFIDs())
+ {
+ Pair<String,String> pair = Schema.instance.getCF(dirtyCFId);
+ if (pair == null)
+ {
+ // even though we remove the schema entry before a final flush when dropping a CF,
+ // it's still possible for a writer to race and finish his append after the flush.
+ logger.debug("Marking clean CF {} that doesn't exist anymore", dirtyCFId);
+ segment.markClean(dirtyCFId, segment.getContext());
+ }
+ else if (!flushes.containsKey(dirtyCFId))
+ {
+ String keyspace = pair.left;
+ final ColumnFamilyStore cfs = Keyspace.open(keyspace).getColumnFamilyStore(dirtyCFId);
+ // Push the flush out to another thread to avoid potential deadlock: Table.add
+ // acquires switchlock, and could be blocking for the manager thread. So if the manager
+ // thread itself tries to acquire switchlock (via flush -> switchMemtable) we'd have a problem.
+ Runnable runnable = new Runnable()
+ {
+ public void run()
+ {
+ cfs.forceFlush();
+ }
+ };
+ flushes.put(dirtyCFId, StorageService.optionalTasks.submit(runnable));
+ }
+ }
+ }
+
+ return new FutureTask<>(new Callable<Object>()
+ {
+ public Object call()
+ {
+ FBUtilities.waitOnFutures(flushes.values());
+ return null;
+ }
+ });
+ }
+
+ /**
+ * Resets all the segments, for testing purposes. DO NOT USE THIS OUTSIDE OF TESTS.
+ */
+ public void resetUnsafe()
+ {
+ logger.debug("Closing and clearing existing commit log segments...");
+
+ while (!segmentManagementTasks.isEmpty())
+ Thread.yield();
+
+ activeSegments.clear();
+ availableSegments.clear();
+ allocatingFrom = null;
+ }
+
+ /**
+ * Initiates the shutdown process for the management thread.
+ */
+ public void shutdown()
+ {
+ run = false;
+ managerThread.interrupt();
+ }
+
+ /**
+ * Returns when the management thread terminates.
+ */
+ public void awaitTermination() throws InterruptedException
+ {
+ managerThread.join();
+ }
+
+ /**
+ * @return a read-only collection of the active commit log segments
+ */
+ Collection<CommitLogSegment> getActiveSegments()
+ {
+ return Collections.unmodifiableCollection(activeSegments);
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/cassandra/blob/22e18f5a/src/java/org/apache/cassandra/db/commitlog/ICommitLogExecutorService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/ICommitLogExecutorService.java b/src/java/org/apache/cassandra/db/commitlog/ICommitLogExecutorService.java
deleted file mode 100644
index e2d0b0f..0000000
--- a/src/java/org/apache/cassandra/db/commitlog/ICommitLogExecutorService.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db.commitlog;
-
-import java.util.concurrent.Callable;
-import java.util.concurrent.Future;
-
-/**
- * Like ExecutorService, but customized for batch and periodic commitlog execution.
- */
-public interface ICommitLogExecutorService
-{
- /**
- * Get the number of completed tasks
- */
- public long getCompletedTasks();
-
- /**
- * Get the number of tasks waiting to be executed
- */
- public long getPendingTasks();
-
-
- public <T> Future<T> submit(Callable<T> task);
-
- /**
- * submits the adder for execution and blocks for it to be synced, if necessary
- */
- public void add(CommitLog.LogRecordAdder adder);
-
- /** shuts down the CommitLogExecutor in an orderly fashion */
- public void shutdown();
-
- /** Blocks until shutdown is complete. */
- public void awaitTermination() throws InterruptedException;
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/22e18f5a/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogExecutorService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogExecutorService.java b/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogExecutorService.java
deleted file mode 100644
index 30f33b6..0000000
--- a/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogExecutorService.java
+++ /dev/null
@@ -1,135 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db.commitlog;
-
-import java.io.IOException;
-import java.util.concurrent.*;
-
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.WrappedRunnable;
-
-import com.google.common.util.concurrent.Uninterruptibles;
-
-class PeriodicCommitLogExecutorService implements ICommitLogExecutorService
-{
- private final BlockingQueue<Runnable> queue;
- protected volatile long completedTaskCount = 0;
- private final Thread appendingThread;
- private volatile boolean run = true;
-
- public PeriodicCommitLogExecutorService(final CommitLog commitLog)
- {
- queue = new LinkedBlockingQueue<Runnable>(DatabaseDescriptor.getCommitLogPeriodicQueueSize());
- Runnable runnable = new WrappedRunnable()
- {
- public void runMayThrow() throws Exception
- {
- while (run)
- {
- Runnable r = queue.poll(100, TimeUnit.MILLISECONDS);
- if (r == null)
- continue;
- r.run();
- completedTaskCount++;
- }
- commitLog.sync();
- }
- };
- appendingThread = new Thread(runnable, "COMMIT-LOG-WRITER");
- appendingThread.start();
-
- final Callable syncer = new Callable()
- {
- public Object call() throws Exception
- {
- commitLog.sync();
- return null;
- }
- };
-
- new Thread(new Runnable()
- {
- public void run()
- {
- while (run)
- {
- FBUtilities.waitOnFuture(submit(syncer));
- Uninterruptibles.sleepUninterruptibly(DatabaseDescriptor.getCommitLogSyncPeriod(), TimeUnit.MILLISECONDS);
- }
- }
- }, "PERIODIC-COMMIT-LOG-SYNCER").start();
-
- }
-
- public void add(CommitLog.LogRecordAdder adder)
- {
- try
- {
- queue.put(adder);
- }
- catch (InterruptedException e)
- {
- throw new RuntimeException(e);
- }
- }
-
- public <T> Future<T> submit(Callable<T> task)
- {
- FutureTask<T> ft = new FutureTask<T>(task);
- try
- {
- queue.put(ft);
- }
- catch (InterruptedException e)
- {
- throw new RuntimeException(e);
- }
- return ft;
- }
-
- public void shutdown()
- {
- new Thread(new WrappedRunnable()
- {
- public void runMayThrow() throws InterruptedException, IOException
- {
- while (!queue.isEmpty())
- Thread.sleep(100);
- run = false;
- appendingThread.join();
- }
- }, "Commitlog Shutdown").start();
- }
-
- public void awaitTermination() throws InterruptedException
- {
- appendingThread.join();
- }
-
- public long getPendingTasks()
- {
- return queue.size();
- }
-
- public long getCompletedTasks()
- {
- return completedTaskCount;
- }
-
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/cassandra/blob/22e18f5a/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogService.java b/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogService.java
new file mode 100644
index 0000000..19d2770
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogService.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.commitlog;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.utils.WaitQueue;
+
+class PeriodicCommitLogService extends AbstractCommitLogService
+{
+
+ private static final int blockWhenSyncLagsMillis = (int) (DatabaseDescriptor.getCommitLogSyncPeriod() * 1.5);
+
+ public PeriodicCommitLogService(final CommitLog commitLog)
+ {
+ super(commitLog, "PERIODIC-COMMIT-LOG-SYNCER", DatabaseDescriptor.getCommitLogSyncPeriod());
+ }
+
+ protected void maybeWaitForSync(CommitLogSegment.Allocation alloc)
+ {
+ if (waitForSyncToCatchUp(Long.MAX_VALUE))
+ {
+ // wait until periodic sync() catches up with its schedule
+ long started = System.currentTimeMillis();
+ pending.incrementAndGet();
+ while (waitForSyncToCatchUp(started))
+ {
+ WaitQueue.Signal signal = syncComplete.register();
+ if (waitForSyncToCatchUp(started))
+ signal.awaitUninterruptibly();
+ }
+ pending.decrementAndGet();
+ }
+ }
+
+ /**
+ * @return true if sync is currently lagging behind inserts
+ */
+ private boolean waitForSyncToCatchUp(long started)
+ {
+ return started > lastSyncedAt + blockWhenSyncLagsMillis;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/cassandra/blob/22e18f5a/src/java/org/apache/cassandra/io/util/RandomAccessReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/RandomAccessReader.java b/src/java/org/apache/cassandra/io/util/RandomAccessReader.java
index 9a03480..8347cd9 100644
--- a/src/java/org/apache/cassandra/io/util/RandomAccessReader.java
+++ b/src/java/org/apache/cassandra/io/util/RandomAccessReader.java
@@ -365,6 +365,11 @@ public class RandomAccessReader extends RandomAccessFile implements FileDataInpu
return fileLength;
}
+ public long getPosition()
+ {
+ return current;
+ }
+
@Override
public void write(int value)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/22e18f5a/src/java/org/apache/cassandra/metrics/CommitLogMetrics.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/metrics/CommitLogMetrics.java b/src/java/org/apache/cassandra/metrics/CommitLogMetrics.java
index c18b3a2..7c8ca61 100644
--- a/src/java/org/apache/cassandra/metrics/CommitLogMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/CommitLogMetrics.java
@@ -20,8 +20,8 @@ package org.apache.cassandra.metrics;
import com.yammer.metrics.Metrics;
import com.yammer.metrics.core.Gauge;
-import org.apache.cassandra.db.commitlog.CommitLogAllocator;
-import org.apache.cassandra.db.commitlog.ICommitLogExecutorService;
+import org.apache.cassandra.db.commitlog.AbstractCommitLogService;
+import org.apache.cassandra.db.commitlog.CommitLogSegmentManager;
/**
* Metrics for commit log
@@ -37,20 +37,20 @@ public class CommitLogMetrics
/** Current size used by all the commit log segments */
public final Gauge<Long> totalCommitLogSize;
- public CommitLogMetrics(final ICommitLogExecutorService executor, final CommitLogAllocator allocator)
+ public CommitLogMetrics(final AbstractCommitLogService service, final CommitLogSegmentManager allocator)
{
completedTasks = Metrics.newGauge(factory.createMetricName("CompletedTasks"), new Gauge<Long>()
{
public Long value()
{
- return executor.getCompletedTasks();
+ return service.getCompletedTasks();
}
});
pendingTasks = Metrics.newGauge(factory.createMetricName("PendingTasks"), new Gauge<Long>()
{
public Long value()
{
- return executor.getPendingTasks();
+ return service.getPendingTasks();
}
});
totalCommitLogSize = Metrics.newGauge(factory.createMetricName("TotalCommitLogSize"), new Gauge<Long>()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/22e18f5a/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index 932829f..c16d906 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -29,7 +29,6 @@ import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
-import java.util.logging.Level;
import javax.management.MBeanServer;
import javax.management.Notification;
@@ -40,7 +39,6 @@ import static java.nio.charset.StandardCharsets.ISO_8859_1;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.*;
-import com.google.common.util.concurrent.AtomicDouble;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.Uninterruptibles;
@@ -3254,6 +3252,10 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
}
FBUtilities.waitOnFutures(flushes);
+ // whilst we've flushed all the CFs, which will have recycled all completed segments, we want to ensure
+ // there are no segments to replay, so we force the recycling of any remaining (should be at most one)
+ CommitLog.instance.forceRecycleAllSegments();
+
ColumnFamilyStore.postFlushExecutor.shutdown();
ColumnFamilyStore.postFlushExecutor.awaitTermination(60, TimeUnit.SECONDS);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/22e18f5a/src/java/org/apache/cassandra/utils/AtomicLongArrayUpdater.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/AtomicLongArrayUpdater.java b/src/java/org/apache/cassandra/utils/AtomicLongArrayUpdater.java
new file mode 100644
index 0000000..69cdca2
--- /dev/null
+++ b/src/java/org/apache/cassandra/utils/AtomicLongArrayUpdater.java
@@ -0,0 +1,74 @@
+package org.apache.cassandra.utils;
+
+
+import sun.misc.Unsafe;
+
+import java.lang.reflect.Field;
+import java.security.AccessController;
+import java.security.PrivilegedAction;
+
+public final class AtomicLongArrayUpdater {
+
+ private static final long offset;
+ private static final int shift;
+
+ static final Unsafe theUnsafe;
+
+ static {
+ theUnsafe = (Unsafe) AccessController.doPrivileged(
+ new PrivilegedAction<Object>()
+ {
+ @Override
+ public Object run()
+ {
+ try
+ {
+ Field f = Unsafe.class.getDeclaredField("theUnsafe");
+ f.setAccessible(true);
+ return f.get(null);
+ } catch (NoSuchFieldException e)
+ {
+ // It doesn't matter what we throw;
+ // it's swallowed in getBestComparer().
+ throw new Error();
+ } catch (IllegalAccessException e)
+ {
+ throw new Error();
+ }
+ }
+ });
+ Class<?> clazz = long[].class;
+ offset = theUnsafe.arrayBaseOffset(clazz);
+ shift = shift(theUnsafe.arrayIndexScale(clazz));
+ }
+
+ private static int shift(int scale)
+ {
+ if (Integer.bitCount(scale) != 1)
+ throw new IllegalStateException();
+ return Integer.bitCount(scale - 1);
+ }
+
+ public AtomicLongArrayUpdater() { }
+
+ public final boolean compareAndSet(Object trg, int i, long exp, long upd) {
+ return theUnsafe.compareAndSwapLong(trg, offset + (i << shift), exp, upd);
+ }
+
+ public final void putVolatile(Object trg, int i, long val) {
+ theUnsafe.putLongVolatile(trg, offset + (i << shift), val);
+ }
+
+ public final void putOrdered(Object trg, int i, long val) {
+ theUnsafe.putOrderedLong(trg, offset + (i << shift), val);
+ }
+
+ public final long get(Object trg, int i) {
+ return theUnsafe.getLong(trg, offset + (i << shift));
+ }
+
+ public final long getVolatile(Object trg, int i) {
+ return theUnsafe.getLongVolatile(trg, offset + (i << shift));
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/cassandra/blob/22e18f5a/src/java/org/apache/cassandra/utils/StatusLogger.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/StatusLogger.java b/src/java/org/apache/cassandra/utils/StatusLogger.java
index 7df94aa..66ff985 100644
--- a/src/java/org/apache/cassandra/utils/StatusLogger.java
+++ b/src/java/org/apache/cassandra/utils/StatusLogger.java
@@ -19,8 +19,6 @@ package org.apache.cassandra.utils;
import java.lang.management.ManagementFactory;
import java.util.Set;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.ThreadPoolExecutor;
import javax.management.JMX;
import javax.management.MBeanServer;
import javax.management.MalformedObjectNameException;
@@ -36,9 +34,7 @@ import org.slf4j.LoggerFactory;
import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutorMBean;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.db.Memtable;
import org.apache.cassandra.db.RowIndexEntry;
-import org.apache.cassandra.db.commitlog.CommitLog;
import org.apache.cassandra.db.compaction.CompactionManager;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.service.CacheService;
@@ -78,8 +74,6 @@ public class StatusLogger
// one offs
logger.info(String.format("%-25s%10s%10s",
"CompactionManager", CompactionManager.instance.getActiveCompactions(), CompactionManager.instance.getPendingTasks()));
- logger.info(String.format("%-25s%10s%10s",
- "Commitlog", "n/a", CommitLog.instance.getPendingTasks()));
int pendingCommands = 0;
for (int n : MessagingService.instance().getCommandPendingTasks().values())
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/22e18f5a/src/java/org/apache/cassandra/utils/WaitQueue.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/WaitQueue.java b/src/java/org/apache/cassandra/utils/WaitQueue.java
new file mode 100644
index 0000000..01b2559
--- /dev/null
+++ b/src/java/org/apache/cassandra/utils/WaitQueue.java
@@ -0,0 +1,264 @@
+package org.apache.cassandra.utils;
+
+import java.util.Iterator;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import java.util.concurrent.locks.AbstractQueuedSynchronizer;
+import java.util.concurrent.locks.LockSupport;
+
+/**
+ * <p>A relatively easy to use utility for general purpose thread signalling.</p>
+ * <p>Usage on a thread awaiting a state change using a WaitQueue q is:</p>
+ * <pre>
+ * {@code
+ * while (!conditionMet())
+ * WaitSignal s = q.register();
+ * if (!conditionMet()) // or, perhaps more correctly, !conditionChanged()
+ * s.await();
+ * else
+ * s.cancel();
+ * }
+ * </pre>
+ * A signalling thread, AFTER changing the state, then calls q.signal() to wake up one, or q.signalAll()
+ * to wake up all, waiting threads.
+ *
+ * <p>A few notes on utilisation:</p>
+ * <p>1. A thread will only exit await() when it has been signalled, but this does
+ * not guarantee the condition has not been altered since it was signalled,
+ * and depending on your design it is likely the outer condition will need to be
+ * checked in a loop, though this is not always the case.</p>
+ * <p>2. Each signal is single use, so must be re-registered after each await(). This is true even if it times out.</p>
+ * <p>3. If you choose not to wait on the signal (because the condition has been met before you waited on it)
+ * you must cancel() the signal if the signalling thread uses signal() to awake waiters; otherwise signals will be
+ * lost</p>
+ * <p>4. Care must be taken when selecting conditionMet() to ensure we are waiting on the condition that actually
+ * indicates progress is possible. In some complex cases it may be tempting to wait on a condition that is only indicative
+ * of local progress, not progress on the task we are aiming to complete, and a race may leave us waiting for a condition
+ * to be met that we no longer need.
+ * <p>5. This scheme is not fair</p>
+ * <p>6. Only the thread that calls register() may call await()</p>
+ * <p>To understand intuitively how this class works, the idea is simply that a thread, once it considers itself
+ * incapable of making progress, registers itself to be awoken once that condition changes. However, that condition
+ * could have changed between checking and registering (in which case a thread updating the state would have been unable to signal it),
+ * so before going to sleep on the signal, it checks the condition again, sleeping only if it hasn't changed.</p>
+ */
+// TODO : switch to a Lock Free queue
+public final class WaitQueue
+{
+ public final class Signal
+ {
+ private final Thread thread = Thread.currentThread();
+ volatile int signalled;
+
+ private boolean isSignalled()
+ {
+ return signalled == 1;
+ }
+
+ public boolean isCancelled()
+ {
+ return signalled == -1;
+ }
+
+ private boolean signal()
+ {
+ if (signalledUpdater.compareAndSet(this, 0, 1))
+ {
+ LockSupport.unpark(thread);
+ return true;
+ }
+ return false;
+ }
+
+ public void awaitUninterruptibly()
+ {
+ assert !isCancelled();
+ if (thread != Thread.currentThread())
+ throw new IllegalStateException();
+ boolean interrupted = false;
+ while (!isSignalled())
+ {
+ if (Thread.interrupted())
+ interrupted = true;
+ LockSupport.park();
+ }
+ if (interrupted)
+ thread.interrupt();
+ }
+
+ public void await() throws InterruptedException
+ {
+ assert !isCancelled();
+ while (!isSignalled())
+ {
+ if (Thread.interrupted())
+ {
+ checkAndClear();
+ throw new InterruptedException();
+ }
+ if (thread != Thread.currentThread())
+ throw new IllegalStateException();
+ LockSupport.park();
+ }
+ }
+
+ public long awaitNanos(long nanosTimeout) throws InterruptedException
+ {
+ assert signalled != -1;
+ long start = System.nanoTime();
+ while (!isSignalled())
+ {
+ if (Thread.interrupted())
+ {
+ checkAndClear();
+ throw new InterruptedException();
+ }
+ LockSupport.parkNanos(nanosTimeout);
+ }
+ return nanosTimeout - (System.nanoTime() - start);
+ }
+
+ public boolean await(long time, TimeUnit unit) throws InterruptedException
+ {
+ // ignores nanos atm
+ long until = System.currentTimeMillis() + unit.toMillis(time);
+ if (until < 0)
+ until = Long.MAX_VALUE;
+ return awaitUntil(until);
+ }
+
+ public boolean awaitUntil(long until) throws InterruptedException
+ {
+ assert !isCancelled();
+ while (until < System.currentTimeMillis() && !isSignalled())
+ {
+ if (Thread.interrupted())
+ {
+ checkAndClear();
+ throw new InterruptedException();
+ }
+ LockSupport.parkUntil(until);
+ }
+ return checkAndClear();
+ }
+
+ private boolean checkAndClear()
+ {
+ if (isSignalled())
+ {
+ signalled = -1;
+ return true;
+ }
+ else if (signalledUpdater.compareAndSet(this, 0, -1))
+ {
+ cleanUpCancelled();
+ return false;
+ }
+ else
+ {
+ // must now be signalled, as checkAndClear() can only be called by
+ // owning thread if used correctly
+ signalled = -1;
+ return true;
+ }
+ }
+
+ public void cancel()
+ {
+ if (signalled < 0)
+ return;
+ if (!signalledUpdater.compareAndSet(this, 0, -1))
+ {
+ signalled = -1;
+ signal();
+ cleanUpCancelled();
+ }
+ }
+
+ }
+
+ private static final AtomicIntegerFieldUpdater signalledUpdater = AtomicIntegerFieldUpdater.newUpdater(Signal.class, "signalled");
+
+ // the waiting signals
+ private final ConcurrentLinkedQueue<Signal> queue = new ConcurrentLinkedQueue<>();
+
+ /**
+ * The calling thread MUST be the thread that uses the signal (for now)
+ * @return
+ */
+ public Signal register()
+ {
+ Signal signal = new Signal();
+ queue.add(signal);
+ return signal;
+ }
+
+ /**
+ * Signal one waiting thread
+ */
+ public void signal()
+ {
+ if (queue.isEmpty())
+ return;
+ Iterator<Signal> iter = queue.iterator();
+ while (iter.hasNext())
+ {
+ Signal next = iter.next();
+ if (next.signal())
+ {
+ iter.remove();
+ return;
+ }
+ }
+ }
+
+ /**
+ * Signal all waiting threads
+ */
+ public void signalAll()
+ {
+ if (queue.isEmpty())
+ return;
+ Iterator<Signal> iter = queue.iterator();
+ while (iter.hasNext())
+ {
+ Signal next = iter.next();
+ if (next.signal())
+ iter.remove();
+ }
+ }
+
+ private void cleanUpCancelled()
+ {
+ Iterator<Signal> iter = queue.iterator();
+ while (iter.hasNext())
+ {
+ Signal next = iter.next();
+ if (next.isCancelled())
+ iter.remove();
+ }
+ }
+
+ /**
+ * Return how many threads are waiting
+ * @return
+ */
+ public int getWaiting()
+ {
+ if (queue.isEmpty())
+ return 0;
+ Iterator<Signal> iter = queue.iterator();
+ int count = 0;
+ while (iter.hasNext())
+ {
+ Signal next = iter.next();
+ if (next.isCancelled())
+ iter.remove();
+ else
+ count++;
+ }
+ return count;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/22e18f5a/test/long/org/apache/cassandra/db/commitlog/ComitLogStress.java
----------------------------------------------------------------------
diff --git a/test/long/org/apache/cassandra/db/commitlog/ComitLogStress.java b/test/long/org/apache/cassandra/db/commitlog/ComitLogStress.java
new file mode 100644
index 0000000..3f65714
--- /dev/null
+++ b/test/long/org/apache/cassandra/db/commitlog/ComitLogStress.java
@@ -0,0 +1,72 @@
+package org.apache.cassandra.db.commitlog;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
+import org.apache.cassandra.concurrent.NamedThreadFactory;
+import org.apache.cassandra.db.RowMutation;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.UUIDGen;
+
+public class ComitLogStress
+{
+
+ public static final String format = "%s,%s,%s,%s,%s,%s";
+
+ public static void main(String[] args) throws Exception {
+ int NUM_THREADS = Runtime.getRuntime().availableProcessors();
+ if (args.length >= 1) {
+ NUM_THREADS = Integer.parseInt(args[0]);
+ System.out.println("Setting num threads to: " + NUM_THREADS);
+ }
+ ExecutorService executor = new JMXEnabledThreadPoolExecutor(NUM_THREADS, NUM_THREADS, 60,
+ TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(10 * NUM_THREADS), new NamedThreadFactory(""), "");
+ ScheduledExecutorService scheduled = Executors.newScheduledThreadPool(1);
+
+ org.apache.cassandra.SchemaLoader.loadSchema();
+ final AtomicLong count = new AtomicLong();
+ final long start = System.currentTimeMillis();
+ System.out.println(String.format(format, "seconds", "max_mb", "allocated_mb", "free_mb", "diffrence", "count"));
+ scheduled.scheduleAtFixedRate(new Runnable() {
+ long lastUpdate = 0;
+
+ public void run() {
+ Runtime runtime = Runtime.getRuntime();
+ long maxMemory = mb(runtime.maxMemory());
+ long allocatedMemory = mb(runtime.totalMemory());
+ long freeMemory = mb(runtime.freeMemory());
+ long temp = count.get();
+ System.out.println(String.format(format, ((System.currentTimeMillis() - start) / 1000),
+ maxMemory, allocatedMemory, freeMemory, (temp - lastUpdate), lastUpdate));
+ lastUpdate = temp;
+ }
+ }, 1, 1, TimeUnit.SECONDS);
+
+ while (true) {
+ executor.execute(new CommitlogExecutor());
+ count.incrementAndGet();
+ }
+ }
+
+ private static long mb(long maxMemory) {
+ return maxMemory / (1024 * 1024);
+ }
+
+ static final String keyString = UUIDGen.getTimeUUID().toString();
+ public static class CommitlogExecutor implements Runnable {
+ public void run() {
+ String ks = "Keyspace1";
+ ByteBuffer key = ByteBufferUtil.bytes(keyString);
+ RowMutation mutation = new RowMutation(ks, key);
+ mutation.add("Standard1", ByteBufferUtil.bytes("name"), ByteBufferUtil.bytes("value"),
+ System.currentTimeMillis());
+ CommitLog.instance.add(mutation);
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/cassandra/blob/22e18f5a/test/unit/org/apache/cassandra/concurrent/WaitQueueTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/concurrent/WaitQueueTest.java b/test/unit/org/apache/cassandra/concurrent/WaitQueueTest.java
new file mode 100644
index 0000000..c2888bc
--- /dev/null
+++ b/test/unit/org/apache/cassandra/concurrent/WaitQueueTest.java
@@ -0,0 +1,137 @@
+package org.apache.cassandra.concurrent;
+
+import org.apache.cassandra.utils.WaitQueue;
+import org.junit.*;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.junit.Assert.*;
+
+public class WaitQueueTest
+{
+
+ @Test
+ public void testSerial() throws InterruptedException
+ {
+ testSerial(new WaitQueue());
+ }
+ public void testSerial(final WaitQueue queue) throws InterruptedException
+ {
+ Thread[] ts = new Thread[4];
+ for (int i = 0 ; i < ts.length ; i++)
+ ts[i] = new Thread(new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ WaitQueue.Signal wait = queue.register();
+ try
+ {
+ wait.await();
+ } catch (InterruptedException e)
+ {
+ e.printStackTrace();
+ }
+ }
+ });
+ for (int i = 0 ; i < ts.length ; i++)
+ ts[i].start();
+ Thread.sleep(100);
+ queue.signal();
+ queue.signal();
+ queue.signal();
+ queue.signal();
+ for (int i = 0 ; i < ts.length ; i++)
+ {
+ ts[i].join(100);
+ assertFalse(queue.getClass().getName(), ts[i].isAlive());
+ }
+ }
+
+
+ @Test
+ public void testCondition1() throws InterruptedException
+ {
+ testCondition1(new WaitQueue());
+ }
+
+ public void testCondition1(final WaitQueue queue) throws InterruptedException
+ {
+ final AtomicBoolean cond1 = new AtomicBoolean(false);
+ final AtomicBoolean fail = new AtomicBoolean(false);
+ Thread t1 = new Thread(new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ try
+ {
+ Thread.sleep(200);
+ } catch (InterruptedException e)
+ {
+ e.printStackTrace();
+ }
+ WaitQueue.Signal wait = queue.register();
+ if (!cond1.get())
+ {
+ System.err.println("Condition should have already been met");
+ fail.set(true);
+ }
+ }
+ });
+ t1.start();
+ Thread.sleep(50);
+ cond1.set(true);
+ Thread.sleep(300);
+ queue.signal();
+ t1.join(300);
+ assertFalse(queue.getClass().getName(), t1.isAlive());
+ assertFalse(fail.get());
+ }
+
+ @Test
+ public void testCondition2() throws InterruptedException
+ {
+ testCondition2(new WaitQueue());
+ }
+ public void testCondition2(final WaitQueue queue) throws InterruptedException
+ {
+ final AtomicBoolean condition = new AtomicBoolean(false);
+ final AtomicBoolean fail = new AtomicBoolean(false);
+ Thread t = new Thread(new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ WaitQueue.Signal wait = queue.register();
+ if (condition.get())
+ {
+ System.err.println("");
+ fail.set(true);
+ }
+
+ try
+ {
+ Thread.sleep(200);
+ wait.await();
+ } catch (InterruptedException e)
+ {
+ e.printStackTrace();
+ }
+ if (!condition.get())
+ {
+ System.err.println("Woke up when condition not met");
+ fail.set(true);
+ }
+ }
+ });
+ t.start();
+ Thread.sleep(50);
+ condition.set(true);
+ queue.signal();
+ t.join(300);
+ assertFalse(queue.getClass().getName(), t.isAlive());
+ assertFalse(fail.get());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/22e18f5a/test/unit/org/apache/cassandra/db/CommitLogTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/CommitLogTest.java b/test/unit/org/apache/cassandra/db/CommitLogTest.java
index 8e5f418..6c1b56b 100644
--- a/test/unit/org/apache/cassandra/db/CommitLogTest.java
+++ b/test/unit/org/apache/cassandra/db/CommitLogTest.java
@@ -125,10 +125,11 @@ public class CommitLogTest extends SchemaLoader
@Test
public void testDeleteIfNotDirty() throws Exception
{
+ DatabaseDescriptor.getCommitLogSegmentSize();
CommitLog.instance.resetUnsafe();
// Roughly 32 MB mutation
RowMutation rm = new RowMutation("Keyspace1", bytes("k"));
- rm.add("Standard1", bytes("c1"), ByteBuffer.allocate(DatabaseDescriptor.getCommitLogSegmentSize()/4), 0);
+ rm.add("Standard1", bytes("c1"), ByteBuffer.allocate((DatabaseDescriptor.getCommitLogSegmentSize()/4) - 1), 0);
// Adding it twice (won't change segment)
CommitLog.instance.add(rm);
@@ -138,15 +139,17 @@ public class CommitLogTest extends SchemaLoader
// "Flush": this won't delete anything
UUID cfid1 = rm.getColumnFamilyIds().iterator().next();
+ CommitLog.instance.sync(true);
CommitLog.instance.discardCompletedSegments(cfid1, CommitLog.instance.getContext().get());
assert CommitLog.instance.activeSegments() == 1 : "Expecting 1 segment, got " + CommitLog.instance.activeSegments();
// Adding new mutation on another CF, large enough (including CL entry overhead) that a new segment is created
RowMutation rm2 = new RowMutation("Keyspace1", bytes("k"));
- rm2.add("Standard2", bytes("c1"), ByteBuffer.allocate(DatabaseDescriptor.getCommitLogSegmentSize()/2), 0);
+ rm2.add("Standard2", bytes("c1"), ByteBuffer.allocate((DatabaseDescriptor.getCommitLogSegmentSize()/2) - 100), 0);
+ CommitLog.instance.add(rm2);
+ // also forces a new segment, since each entry-with-overhead is just under half the CL size
CommitLog.instance.add(rm2);
- // also forces a new segment, since each entry-with-overhead is just over half the CL size
CommitLog.instance.add(rm2);
assert CommitLog.instance.activeSegments() == 3 : "Expecting 3 segments, got " + CommitLog.instance.activeSegments();
[2/2] git commit: Multithreaded commitlog patch by Benedict Elliot
Smith; reviewed by jbellis for CASSANDRA-3578
Posted by jb...@apache.org.
Multithreaded commitlog
patch by Benedict Elliot Smith; reviewed by jbellis for CASSANDRA-3578
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/22e18f5a
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/22e18f5a
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/22e18f5a
Branch: refs/heads/trunk
Commit: 22e18f5a348a911f89deed9f9984950de451d28a
Parents: 679ec7e
Author: Jonathan Ellis <jb...@apache.org>
Authored: Mon Dec 2 13:07:28 2013 -0600
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Mon Dec 2 13:09:19 2013 -0600
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../org/apache/cassandra/config/CFMetaData.java | 11 +
.../org/apache/cassandra/config/Schema.java | 1 +
.../org/apache/cassandra/db/DefsTables.java | 7 +
.../AbstractCommitLogExecutorService.java | 55 --
.../db/commitlog/AbstractCommitLogService.java | 153 ++++++
.../BatchCommitLogExecutorService.java | 176 ------
.../db/commitlog/BatchCommitLogService.java | 36 ++
.../cassandra/db/commitlog/CommitLog.java | 278 +++++-----
.../db/commitlog/CommitLogAllocator.java | 369 -------------
.../db/commitlog/CommitLogArchiver.java | 2 +-
.../db/commitlog/CommitLogReplayer.java | 351 +++++++-----
.../db/commitlog/CommitLogSegment.java | 487 ++++++++++++-----
.../db/commitlog/CommitLogSegmentManager.java | 533 +++++++++++++++++++
.../db/commitlog/ICommitLogExecutorService.java | 51 --
.../PeriodicCommitLogExecutorService.java | 135 -----
.../db/commitlog/PeriodicCommitLogService.java | 57 ++
.../cassandra/io/util/RandomAccessReader.java | 5 +
.../cassandra/metrics/CommitLogMetrics.java | 10 +-
.../cassandra/service/StorageService.java | 6 +-
.../cassandra/utils/AtomicLongArrayUpdater.java | 74 +++
.../apache/cassandra/utils/StatusLogger.java | 6 -
.../org/apache/cassandra/utils/WaitQueue.java | 264 +++++++++
.../cassandra/db/commitlog/ComitLogStress.java | 72 +++
.../cassandra/concurrent/WaitQueueTest.java | 137 +++++
.../org/apache/cassandra/db/CommitLogTest.java | 9 +-
26 files changed, 2059 insertions(+), 1227 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/22e18f5a/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 6efbfbc..bb3a98a 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.1
+ * Multithreaded commitlog (CASSANDRA-3578)
* allocate fixed index summary memory pool and resample cold index summaries
to use less memory (CASSANDRA-5519)
* Removed multithreaded compaction (CASSANDRA-6142)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/22e18f5a/src/java/org/apache/cassandra/config/CFMetaData.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/CFMetaData.java b/src/java/org/apache/cassandra/config/CFMetaData.java
index d04dc25..0a33c20 100644
--- a/src/java/org/apache/cassandra/config/CFMetaData.java
+++ b/src/java/org/apache/cassandra/config/CFMetaData.java
@@ -405,6 +405,7 @@ public final class CFMetaData
private volatile boolean populateIoCacheOnFlush = DEFAULT_POPULATE_IO_CACHE_ON_FLUSH;
private volatile Map<ColumnIdentifier, Long> droppedColumns = new HashMap<>();
private volatile Map<String, TriggerDefinition> triggers = new HashMap<>();
+ private volatile boolean isPurged = false;
/*
* All CQL3 columns definition are stored in the columnMetadata map.
@@ -1546,6 +1547,16 @@ public final class CFMetaData
return rm;
}
+ public boolean isPurged()
+ {
+ return isPurged;
+ }
+
+ void markPurged()
+ {
+ isPurged = true;
+ }
+
public void toSchema(RowMutation rm, long timestamp)
{
toSchemaNoColumnsNoTriggers(rm, timestamp);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/22e18f5a/src/java/org/apache/cassandra/config/Schema.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/Schema.java b/src/java/org/apache/cassandra/config/Schema.java
index 146b82b..a38c097 100644
--- a/src/java/org/apache/cassandra/config/Schema.java
+++ b/src/java/org/apache/cassandra/config/Schema.java
@@ -348,6 +348,7 @@ public class Schema
public void purge(CFMetaData cfm)
{
cfIdMap.remove(Pair.create(cfm.ksName, cfm.cfName));
+ cfm.markPurged();
}
/* Version control */
http://git-wip-us.apache.org/repos/asf/cassandra/blob/22e18f5a/src/java/org/apache/cassandra/db/DefsTables.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DefsTables.java b/src/java/org/apache/cassandra/db/DefsTables.java
index 3cd5156..828981e 100644
--- a/src/java/org/apache/cassandra/db/DefsTables.java
+++ b/src/java/org/apache/cassandra/db/DefsTables.java
@@ -24,6 +24,7 @@ import java.util.*;
import com.google.common.collect.Iterables;
import com.google.common.collect.MapDifference;
import com.google.common.collect.Maps;
+import org.apache.cassandra.db.commitlog.CommitLog;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -461,6 +462,10 @@ public class DefsTables
// remove the keyspace from the static instances.
Keyspace.clear(ksm.name);
Schema.instance.clearKeyspaceDefinition(ksm);
+
+ // force a new segment in the CL
+ CommitLog.instance.forceRecycleAllSegments();
+
if (!StorageService.instance.isClientMode())
{
MigrationManager.instance.notifyDropKeyspace(ksm);
@@ -482,6 +487,8 @@ public class DefsTables
CompactionManager.instance.interruptCompactionFor(Arrays.asList(cfm), true);
+ CommitLog.instance.forceRecycleAllSegments();
+
if (!StorageService.instance.isClientMode())
{
if (DatabaseDescriptor.isAutoSnapshot())
http://git-wip-us.apache.org/repos/asf/cassandra/blob/22e18f5a/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogExecutorService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogExecutorService.java b/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogExecutorService.java
deleted file mode 100644
index ec43114..0000000
--- a/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogExecutorService.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db.commitlog;
-
-import java.util.List;
-import java.util.concurrent.AbstractExecutorService;
-import java.util.concurrent.TimeUnit;
-
-public abstract class AbstractCommitLogExecutorService extends AbstractExecutorService implements ICommitLogExecutorService
-{
- protected volatile long completedTaskCount = 0;
-
- /**
- * Get the number of completed tasks
- */
- public long getCompletedTasks()
- {
- return completedTaskCount;
- }
-
- public boolean isTerminated()
- {
- throw new UnsupportedOperationException();
- }
-
- public boolean isShutdown()
- {
- throw new UnsupportedOperationException();
- }
-
- public List<Runnable> shutdownNow()
- {
- throw new UnsupportedOperationException();
- }
-
- public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException
- {
- throw new UnsupportedOperationException();
- }
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/22e18f5a/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java b/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
new file mode 100644
index 0000000..2f9b236
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.commitlog;
+
+import org.apache.cassandra.utils.WaitQueue;
+import org.slf4j.*;
+
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.apache.cassandra.db.commitlog.CommitLogSegment.Allocation;
+
+public abstract class AbstractCommitLogService
+{
+ private final Thread thread;
+ private volatile boolean shutdown = false;
+
+ // all Allocations written before this time will be synced
+ protected volatile long lastSyncedAt = System.currentTimeMillis();
+
+ // counts of total written, and pending, log messages
+ private final AtomicLong written = new AtomicLong(0);
+ protected final AtomicLong pending = new AtomicLong(0);
+
+ // signal that writers can wait on to be notified of a completed sync
+ protected final WaitQueue syncComplete = new WaitQueue();
+ private final Semaphore haveWork = new Semaphore(1);
+
+ private static final Logger logger = LoggerFactory.getLogger(AbstractCommitLogService.class);
+
+ /**
+ * CommitLogService provides a fsync service for Allocations, fulfilling either the
+ * Batch or Periodic contract.
+ *
+ * Subclasses may be notified when a sync finishes by using the syncComplete WaitQueue.
+ */
+ AbstractCommitLogService(final CommitLog commitLog, final String name, final long pollIntervalMillis)
+ {
+ if (pollIntervalMillis < 1)
+ throw new IllegalArgumentException(String.format("Commit log flush interval must be positive: %dms", pollIntervalMillis));
+
+ Runnable runnable = new Runnable()
+ {
+ public void run()
+ {
+ boolean run = true;
+ while (run)
+ {
+ try
+ {
+ // always run once after shutdown signalled
+ run = !shutdown;
+
+ // sync and signal
+ long syncStarted = System.currentTimeMillis();
+ commitLog.sync(shutdown);
+ lastSyncedAt = syncStarted;
+ syncComplete.signalAll();
+
+ // sleep any time we have left before the next one is due
+ long sleep = syncStarted + pollIntervalMillis - System.currentTimeMillis();
+ if (sleep < 0)
+ {
+ logger.warn(String.format("Commit log sync took longer than sync interval (by %.2fs), indicating it is a bottleneck", sleep / -1000d));
+ // don't sleep, as we probably have work to do
+ continue;
+ }
+ try
+ {
+ haveWork.tryAcquire(sleep, TimeUnit.MILLISECONDS);
+ }
+ catch (InterruptedException e)
+ {
+ throw new AssertionError();
+ }
+ }
+ catch (Throwable t)
+ {
+ logger.error("Commit log sync failed", t);
+ // sleep for full poll-interval after an error, so we don't spam the log file
+ try
+ {
+ haveWork.tryAcquire(pollIntervalMillis, TimeUnit.MILLISECONDS);
+ }
+ catch (InterruptedException e)
+ {
+ throw new AssertionError();
+ }
+ }
+ }
+ }
+ };
+
+ thread = new Thread(runnable, name);
+ thread.start();
+ }
+
+ /**
+ * Block for @param alloc to be sync'd as necessary, and handle bookkeeping
+ */
+ public void finishWriteFor(Allocation alloc)
+ {
+ maybeWaitForSync(alloc);
+ written.incrementAndGet();
+ }
+
+ protected abstract void maybeWaitForSync(Allocation alloc);
+
+ /**
+ * Sync immediately, but don't block for the sync to cmplete
+ */
+ public void requestExtraSync()
+ {
+ haveWork.release();
+ }
+
+ public void shutdown()
+ {
+ shutdown = true;
+ haveWork.release(1);
+ }
+
+ public void awaitTermination() throws InterruptedException
+ {
+ thread.join();
+ }
+
+ public long getCompletedTasks()
+ {
+ return written.incrementAndGet();
+ }
+
+ public long getPendingTasks()
+ {
+ return pending.incrementAndGet();
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/cassandra/blob/22e18f5a/src/java/org/apache/cassandra/db/commitlog/BatchCommitLogExecutorService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/BatchCommitLogExecutorService.java b/src/java/org/apache/cassandra/db/commitlog/BatchCommitLogExecutorService.java
deleted file mode 100644
index d985f1f..0000000
--- a/src/java/org/apache/cassandra/db/commitlog/BatchCommitLogExecutorService.java
+++ /dev/null
@@ -1,176 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db.commitlog;
-
-import java.util.ArrayList;
-import java.util.concurrent.*;
-
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.WrappedRunnable;
-
-class BatchCommitLogExecutorService extends AbstractCommitLogExecutorService
-{
- private final BlockingQueue<CheaterFutureTask> queue;
- private final Thread appendingThread;
- private volatile boolean run = true;
-
- public BatchCommitLogExecutorService()
- {
- this(DatabaseDescriptor.getConcurrentWriters());
- }
-
- public BatchCommitLogExecutorService(int queueSize)
- {
- queue = new LinkedBlockingQueue<CheaterFutureTask>(queueSize);
- Runnable runnable = new WrappedRunnable()
- {
- public void runMayThrow() throws Exception
- {
- while (run)
- {
- if (processWithSyncBatch())
- completedTaskCount++;
- }
- }
- };
- appendingThread = new Thread(runnable, "COMMIT-LOG-WRITER");
- appendingThread.start();
-
- }
-
- public long getPendingTasks()
- {
- return queue.size();
- }
-
- private final ArrayList<CheaterFutureTask> incompleteTasks = new ArrayList<CheaterFutureTask>();
- private final ArrayList taskValues = new ArrayList(); // TODO not sure how to generify this
- private boolean processWithSyncBatch() throws Exception
- {
- CheaterFutureTask firstTask = queue.poll(100, TimeUnit.MILLISECONDS);
- if (firstTask == null)
- return false;
- if (!(firstTask.getRawCallable() instanceof CommitLog.LogRecordAdder))
- {
- firstTask.run();
- return true;
- }
-
- // attempt to do a bunch of LogRecordAdder ops before syncing
- // (this is a little clunky since there is no blocking peek method,
- // so we have to break it into firstTask / extra tasks)
- incompleteTasks.clear();
- taskValues.clear();
- long start = System.nanoTime();
- long window = (long)(1000000 * DatabaseDescriptor.getCommitLogSyncBatchWindow());
-
- // it doesn't seem worth bothering future-izing the exception
- // since if a commitlog op throws, we're probably screwed anyway
- incompleteTasks.add(firstTask);
- taskValues.add(firstTask.getRawCallable().call());
- while (!queue.isEmpty()
- && queue.peek().getRawCallable() instanceof CommitLog.LogRecordAdder
- && System.nanoTime() - start < window)
- {
- CheaterFutureTask task = queue.remove();
- incompleteTasks.add(task);
- taskValues.add(task.getRawCallable().call());
- }
-
- // now sync and set the tasks' values (which allows thread calling get() to proceed)
- CommitLog.instance.sync();
- for (int i = 0; i < incompleteTasks.size(); i++)
- {
- incompleteTasks.get(i).set(taskValues.get(i));
- }
- return true;
- }
-
-
- @Override
- protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value)
- {
- return newTaskFor(Executors.callable(runnable, value));
- }
-
- @Override
- protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable)
- {
- return new CheaterFutureTask(callable);
- }
-
- public void execute(Runnable command)
- {
- try
- {
- queue.put((CheaterFutureTask)command);
- }
- catch (InterruptedException e)
- {
- throw new RuntimeException(e);
- }
- }
-
- public void add(CommitLog.LogRecordAdder adder)
- {
- FBUtilities.waitOnFuture(submit((Callable)adder));
- }
-
- public void shutdown()
- {
- new Thread(new WrappedRunnable()
- {
- public void runMayThrow() throws InterruptedException
- {
- while (!queue.isEmpty())
- Thread.sleep(100);
- run = false;
- appendingThread.join();
- }
- }, "Commitlog Shutdown").start();
- }
-
- public void awaitTermination() throws InterruptedException
- {
- appendingThread.join();
- }
-
- private static class CheaterFutureTask<V> extends FutureTask<V>
- {
- private final Callable rawCallable;
-
- public CheaterFutureTask(Callable<V> callable)
- {
- super(callable);
- rawCallable = callable;
- }
-
- public Callable getRawCallable()
- {
- return rawCallable;
- }
-
- @Override
- public void set(V v)
- {
- super.set(v);
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/22e18f5a/src/java/org/apache/cassandra/db/commitlog/BatchCommitLogService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/BatchCommitLogService.java b/src/java/org/apache/cassandra/db/commitlog/BatchCommitLogService.java
new file mode 100644
index 0000000..65bee40
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/commitlog/BatchCommitLogService.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.commitlog;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+
+class BatchCommitLogService extends AbstractCommitLogService
+{
+ public BatchCommitLogService(CommitLog commitLog)
+ {
+ super(commitLog, "COMMIT-LOG-WRITER", (int) DatabaseDescriptor.getCommitLogSyncBatchWindow());
+ }
+
+ protected void maybeWaitForSync(CommitLogSegment.Allocation alloc)
+ {
+ // wait until record has been safely persisted to disk
+ pending.incrementAndGet();
+ alloc.awaitDiskSync();
+ pending.decrementAndGet();
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/22e18f5a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
index 706df37..7240aee 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
@@ -19,23 +19,30 @@ package org.apache.cassandra.db.commitlog;
import java.io.*;
import java.lang.management.ManagementFactory;
+import java.nio.ByteBuffer;
import java.util.*;
-import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
+
import javax.management.MBeanServer;
import javax.management.ObjectName;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
import org.apache.cassandra.config.Config;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.*;
import org.apache.cassandra.io.FSWriteError;
+import org.apache.cassandra.io.util.ByteBufferOutputStream;
+import org.apache.cassandra.io.util.ChecksummedOutputStream;
import org.apache.cassandra.metrics.CommitLogMetrics;
import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.PureJavaCrc32;
+
+import com.google.common.util.concurrent.Futures;
+
+import static org.apache.cassandra.db.commitlog.CommitLogSegment.*;
/*
* Commit Log tracks every write operation into the system. The aim of the commit log is to be able to
@@ -47,29 +54,24 @@ public class CommitLog implements CommitLogMBean
public static final CommitLog instance = new CommitLog();
- private final ICommitLogExecutorService executor;
-
- public final CommitLogAllocator allocator;
+ // we only permit records HALF the size of a commit log, to ensure we don't spin allocating many mostly
+ // empty segments when writing large records
+ private static final long MAX_MUTATION_SIZE = DatabaseDescriptor.getCommitLogSegmentSize() >> 1;
+ public final CommitLogSegmentManager allocator;
public final CommitLogArchiver archiver = new CommitLogArchiver();
-
- public static final int END_OF_SEGMENT_MARKER = 0; // this is written out at the end of a segment
- public static final int END_OF_SEGMENT_MARKER_SIZE = 4; // number of bytes of ^^^
-
- public CommitLogSegment activeSegment;
-
private final CommitLogMetrics metrics;
+ final AbstractCommitLogService executor;
private CommitLog()
{
DatabaseDescriptor.createAllDirectories();
- allocator = new CommitLogAllocator();
- activateNextSegment();
+ allocator = new CommitLogSegmentManager();
executor = DatabaseDescriptor.getCommitLogSync() == Config.CommitLogSync.batch
- ? new BatchCommitLogExecutorService()
- : new PeriodicCommitLogExecutorService(this);
+ ? new BatchCommitLogService(this)
+ : new PeriodicCommitLogService(this);
MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
try
@@ -86,15 +88,6 @@ public class CommitLog implements CommitLogMBean
}
/**
- * FOR TESTING PURPOSES. See CommitLogAllocator.
- */
- public void resetUnsafe()
- {
- allocator.resetUnsafe();
- activateNextSegment();
- }
-
- /**
* Perform recovery on commit logs located in the directory specified by the config file.
*
* @return the number of mutations replayed
@@ -121,7 +114,7 @@ public class CommitLog implements CommitLogMBean
}
else
{
- Arrays.sort(files, new CommitLogSegment.CommitLogSegmentFileComparator());
+ Arrays.sort(files, new CommitLogSegmentFileComparator());
logger.info("Replaying {}", StringUtils.join(files, ", "));
replayed = recover(files);
logger.info("Log replay complete, {} replayed mutations", replayed);
@@ -157,38 +150,84 @@ public class CommitLog implements CommitLogMBean
/**
* @return a Future representing a ReplayPosition such that when it is ready,
- * all commitlog tasks enqueued prior to the getContext call will be complete (i.e., appended to the log)
+ * all Allocations created prior to the getContext call will be written to the log
*/
public Future<ReplayPosition> getContext()
{
- Callable<ReplayPosition> task = new Callable<ReplayPosition>()
+ return Futures.immediateFuture(allocator.allocatingFrom().getContext());
+ }
+
+ /**
+ * Flushes all dirty CFs, waiting for them to free and recycle any segments they were retaining
+ */
+ public void forceRecycleAllSegments()
+ {
+ allocator.forceRecycleAll();
+ }
+
+ /**
+ * Forces a disk flush on the commit log files that need it. Blocking.
+ */
+ public void sync(boolean syncAllSegments)
+ {
+ CommitLogSegment current = allocator.allocatingFrom();
+ for (CommitLogSegment segment : allocator.getActiveSegments())
{
- public ReplayPosition call()
- {
- return activeSegment.getContext();
- }
- };
- return executor.submit(task);
+ if (!syncAllSegments && segment.id > current.id)
+ return;
+ segment.sync();
+ }
}
/**
- * Used by tests.
- *
- * @return the number of active segments (segments with unflushed data in them)
+ * Preempts the CLExecutor, telling to to sync immediately
*/
- public int activeSegments()
+ public void requestExtraSync()
{
- return allocator.getActiveSegments().size();
+ executor.requestExtraSync();
}
/**
* Add a RowMutation to the commit log.
*
- * @param rm the RowMutation to add to the log
+ * @param rowMutation the RowMutation to add to the log
*/
- public void add(RowMutation rm)
+ public void add(RowMutation rowMutation)
{
- executor.add(new LogRecordAdder(rm));
+ long size = RowMutation.serializer.serializedSize(rowMutation, MessagingService.current_version);
+
+ long totalSize = size + ENTRY_OVERHEAD_SIZE;
+ if (totalSize > MAX_MUTATION_SIZE)
+ {
+ logger.warn("Skipping commitlog append of extremely large mutation ({} bytes)", totalSize);
+ return;
+ }
+
+ Allocation alloc = allocator.allocate(rowMutation, (int) totalSize, new Allocation());
+ try
+ {
+ PureJavaCrc32 checksum = new PureJavaCrc32();
+ final ByteBuffer buffer = alloc.getBuffer();
+ DataOutputStream dos = new DataOutputStream(new ChecksummedOutputStream(new ByteBufferOutputStream(buffer), checksum));
+
+ // checksummed length
+ dos.writeInt((int) size);
+ buffer.putLong(checksum.getValue());
+
+ // checksummed mutation
+ RowMutation.serializer.serialize(rowMutation, dos, MessagingService.current_version);
+ buffer.putLong(checksum.getValue());
+ }
+ catch (IOException e)
+ {
+ throw new FSWriteError(e, alloc.getSegment().getPath());
+ }
+ finally
+ {
+ alloc.markWritten();
+ }
+
+ executor.finishWriteFor(alloc);
}
/**
@@ -200,104 +239,58 @@ public class CommitLog implements CommitLogMBean
*/
public void discardCompletedSegments(final UUID cfId, final ReplayPosition context)
{
- Callable task = new Callable()
+ logger.debug("discard completed log segments for {}, column family {}", context, cfId);
+
+ // Go thru the active segment files, which are ordered oldest to newest, marking the
+ // flushed CF as clean, until we reach the segment file containing the ReplayPosition passed
+ // in the arguments. Any segments that become unused after they are marked clean will be
+ // recycled or discarded.
+ for (Iterator<CommitLogSegment> iter = allocator.getActiveSegments().iterator(); iter.hasNext();)
{
- public Object call()
+ CommitLogSegment segment = iter.next();
+ segment.markClean(cfId, context);
+
+ if (segment.isUnused())
{
- logger.debug("discard completed log segments for {}, column family {}", context, cfId);
-
- // Go thru the active segment files, which are ordered oldest to newest, marking the
- // flushed CF as clean, until we reach the segment file containing the ReplayPosition passed
- // in the arguments. Any segments that become unused after they are marked clean will be
- // recycled or discarded.
- for (Iterator<CommitLogSegment> iter = allocator.getActiveSegments().iterator(); iter.hasNext();)
- {
- CommitLogSegment segment = iter.next();
- segment.markClean(cfId, context);
-
- // If the segment is no longer needed, and we have another spare segment in the hopper
- // (to keep the last segment from getting discarded), pursue either recycling or deleting
- // this segment file.
- if (iter.hasNext())
- {
- if (segment.isUnused())
- {
- logger.debug("Commit log segment {} is unused", segment);
- allocator.recycleSegment(segment);
- }
- else
- {
- logger.debug("Not safe to delete commit log segment {}; dirty is {}",
- segment, segment.dirtyString());
- }
- }
- else
- {
- logger.debug("Not deleting active commitlog segment {}", segment);
- }
-
- // Don't mark or try to delete any newer segments once we've reached the one containing the
- // position of the flush.
- if (segment.contains(context))
- break;
- }
-
- return null;
+ logger.debug("Commit log segment {} is unused", segment);
+ allocator.recycleSegment(segment);
+ }
+ else
+ {
+ logger.debug("Not safe to delete{} commit log segment {}; dirty is {}",
+ (iter.hasNext() ? "" : " active"), segment, segment.dirtyString());
}
- };
-
- FBUtilities.waitOnFuture(executor.submit(task));
- }
- /**
- * Forces a disk flush on the commit log files that need it.
- */
- public void sync()
- {
- for (CommitLogSegment segment : allocator.getActiveSegments())
- {
- segment.sync();
+ // Don't mark or try to delete any newer segments once we've reached the one containing the
+ // position of the flush.
+ if (segment.contains(context))
+ break;
}
}
- /**
- * @return the number of tasks completed by the commit log executor
- */
+ @Override
public long getCompletedTasks()
{
return metrics.completedTasks.value();
}
- /**
- * @return the depth of pending commit log executor queue
- */
+ @Override
public long getPendingTasks()
{
return metrics.pendingTasks.value();
}
/**
- * @return the total size occupied by commitlo segments expressed in bytes. (used by MBean)
+ * @return the total size occupied by commitlog segments expressed in bytes. (used by MBean)
*/
public long getTotalCommitlogSize()
{
return metrics.totalCommitLogSize.value();
}
- /**
- * Fetches a new segment file from the allocator and activates it.
- *
- * @return the newly activated segment
- */
- private void activateNextSegment()
- {
- activeSegment = allocator.fetchSegment();
- logger.debug("Active segment is now {}", activeSegment);
- }
-
public List<String> getActiveSegmentNames()
{
- List<String> segmentNames = new ArrayList<String>();
+ List<String> segmentNames = new ArrayList<>();
for (CommitLogSegment segment : allocator.getActiveSegments())
segmentNames.add(segment.getName());
return segmentNames;
@@ -305,7 +298,7 @@ public class CommitLog implements CommitLogMBean
public List<String> getArchivingSegmentNames()
{
- return new ArrayList<String>(archiver.archivePending.keySet());
+ return new ArrayList<>(archiver.archivePending.keySet());
}
/**
@@ -319,48 +312,21 @@ public class CommitLog implements CommitLogMBean
allocator.awaitTermination();
}
- // TODO this should be a Runnable since it doesn't actually return anything, but it's difficult to do that
- // without breaking the fragile CheaterFutureTask in BatchCLES.
- class LogRecordAdder implements Callable, Runnable
+ /**
+ * FOR TESTING PURPOSES. See CommitLogAllocator.
+ */
+ public void resetUnsafe()
{
- final RowMutation rowMutation;
-
- LogRecordAdder(RowMutation rm)
- {
- this.rowMutation = rm;
- }
-
- public void run()
- {
- long totalSize = RowMutation.serializer.serializedSize(rowMutation, MessagingService.current_version) + CommitLogSegment.ENTRY_OVERHEAD_SIZE;
- if (totalSize > DatabaseDescriptor.getCommitLogSegmentSize())
- {
- logger.warn("Skipping commitlog append of extremely large mutation ({} bytes)", totalSize);
- return;
- }
-
- if (!activeSegment.hasCapacityFor(totalSize))
- {
- CommitLogSegment oldSegment = activeSegment;
- activateNextSegment();
- // Now we can run the user defined command just before switching to the new commit log.
- // (Do this here instead of in the recycle call so we can get a head start on the archive.)
- archiver.maybeArchive(oldSegment.getPath(), oldSegment.getName());
- }
- try
- {
- activeSegment.write(rowMutation);
- }
- catch (IOException e)
- {
- throw new FSWriteError(e, activeSegment.getPath());
- }
- }
+ allocator.resetUnsafe();
+ }
- public Object call()
- {
- run();
- return null;
- }
+ /**
+ * Used by tests.
+ *
+ * @return the number of active segments (segments with unflushed data in them)
+ */
+ public int activeSegments()
+ {
+ return allocator.getActiveSegments().size();
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/22e18f5a/src/java/org/apache/cassandra/db/commitlog/CommitLogAllocator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogAllocator.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogAllocator.java
deleted file mode 100644
index 706cf9e..0000000
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogAllocator.java
+++ /dev/null
@@ -1,369 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db.commitlog;
-
-import java.io.File;
-
-import java.util.Collection;
-import java.util.Collections;
-import java.util.UUID;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.atomic.AtomicLong;
-
-import com.google.common.collect.Iterables;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.config.Schema;
-import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.db.Keyspace;
-import org.apache.cassandra.io.util.FileUtils;
-import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.utils.Pair;
-import org.apache.cassandra.utils.WrappedRunnable;
-
-/**
- * Performs the pre-allocation of commit log segments in a background thread. All the
- * public methods are thread safe.
- */
-public class CommitLogAllocator
-{
- static final Logger logger = LoggerFactory.getLogger(CommitLogAllocator.class);
-
- /** The (theoretical) max milliseconds between loop runs to perform janitorial tasks */
- public final static int TICK_CYCLE_TIME = 100;
-
- /** Segments that are ready to be used */
- private final BlockingQueue<CommitLogSegment> availableSegments = new LinkedBlockingQueue<CommitLogSegment>();
-
- /** Allocations to be run by the thread */
- private final BlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>();
-
- /** Active segments, containing unflushed data */
- private final ConcurrentLinkedQueue<CommitLogSegment> activeSegments = new ConcurrentLinkedQueue<CommitLogSegment>();
-
- /**
- * Tracks commitlog size, in multiples of the segment size. We need to do this so we can "promise" size
- * adjustments ahead of actually adding/freeing segments on disk, so that the "evict oldest segment" logic
- * can see the effect of recycling segments immediately (even though they're really happening asynchronously
- * on the allocator thread, which will take a ms or two).
- */
- private final AtomicLong size = new AtomicLong();
-
- /**
- * New segment creation is initially disabled because we'll typically get some "free" segments
- * recycled after log replay.
- */
- private volatile boolean createReserveSegments = false;
-
- private final Thread allocationThread;
- private volatile boolean run = true;
-
- public CommitLogAllocator()
- {
- // The run loop for the allocation thread
- Runnable runnable = new WrappedRunnable()
- {
- public void runMayThrow() throws Exception
- {
- while (run)
- {
- Runnable r = queue.poll(TICK_CYCLE_TIME, TimeUnit.MILLISECONDS);
-
- if (r != null)
- {
- r.run();
- }
- else
- {
- // no job, so we're clear to check to see if we're out of segments
- // and ready a new one if needed. has the effect of ensuring there's
- // almost always a segment available when it's needed.
- if (availableSegments.isEmpty() && (activeSegments.isEmpty() || createReserveSegments))
- {
- logger.debug("No segments in reserve; creating a fresh one");
- createFreshSegment();
- }
- }
- }
- }
- };
-
- allocationThread = new Thread(runnable, "COMMIT-LOG-ALLOCATOR");
- allocationThread.start();
- }
-
- /**
- * Fetches an empty segment file.
- *
- * @return the next writable segment
- */
- public CommitLogSegment fetchSegment()
- {
- CommitLogSegment next;
- try
- {
- next = availableSegments.take();
- }
- catch (InterruptedException e)
- {
- throw new AssertionError(e);
- }
-
- assert !activeSegments.contains(next);
- activeSegments.add(next);
- if (isCapExceeded())
- flushOldestKeyspaces();
-
- return next;
- }
-
- /**
- * Indicates that a segment is no longer in use and that it should be recycled.
- *
- * @param segment segment that is no longer in use
- */
- public void recycleSegment(final CommitLogSegment segment)
- {
- activeSegments.remove(segment);
- if (!CommitLog.instance.archiver.maybeWaitForArchiving(segment.getName()))
- {
- // if archiving (command) was not successful then leave the file alone. don't delete or recycle.
- discardSegment(segment, false);
- return;
- }
- if (isCapExceeded())
- {
- discardSegment(segment, true);
- return;
- }
-
- logger.debug("Recycling {}", segment);
- queue.add(new Runnable()
- {
- public void run()
- {
- CommitLogSegment recycled = segment.recycle();
- internalAddReadySegment(recycled);
- }
- });
- }
-
- /**
- * Differs from the above because it can work on any file instead of just existing
- * commit log segments managed by this allocator.
- *
- * @param file segment file that is no longer in use.
- */
- public void recycleSegment(final File file)
- {
- // check against SEGMENT_SIZE avoids recycling odd-sized or empty segments from old C* versions and unit tests
- if (isCapExceeded() || file.length() != DatabaseDescriptor.getCommitLogSegmentSize()
- || CommitLogDescriptor.fromFileName(file.getName()).getMessagingVersion() != MessagingService.current_version)
- {
- // (don't decrease managed size, since this was never a "live" segment)
- logger.debug("(Unopened) segment {} is no longer needed and will be deleted now", file);
- FileUtils.deleteWithConfirm(file);
- return;
- }
-
- logger.debug("Recycling {}", file);
- // this wasn't previously a live segment, so add it to the managed size when we make it live
- size.addAndGet(DatabaseDescriptor.getCommitLogSegmentSize());
- queue.add(new Runnable()
- {
- public void run()
- {
- CommitLogSegment segment = new CommitLogSegment(file.getPath());
- internalAddReadySegment(segment);
- }
- });
- }
-
- /**
- * Indicates that a segment file should be deleted.
- *
- * @param segment segment to be discarded
- */
- private void discardSegment(final CommitLogSegment segment, final boolean deleteFile)
- {
- logger.debug("Segment {} is no longer active and will be deleted {}", segment, deleteFile ? "now" : "by the archive script");
- size.addAndGet(-DatabaseDescriptor.getCommitLogSegmentSize());
-
- queue.add(new Runnable()
- {
- public void run()
- {
- segment.discard(deleteFile);
- }
- });
- }
-
- /**
- * @return the space (in bytes) used by all segment files.
- */
- public long bytesUsed()
- {
- return size.get();
- }
-
- /**
- * @param name the filename to check
- * @return true if file is managed by this allocator.
- */
- public boolean manages(String name)
- {
- for (CommitLogSegment segment : Iterables.concat(activeSegments, availableSegments))
- if (segment.getName().equals(name))
- return true;
-
- return false;
- }
-
- /**
- * Creates and readies a brand new segment.
- *
- * @return the newly minted segment
- */
- private CommitLogSegment createFreshSegment()
- {
- size.addAndGet(DatabaseDescriptor.getCommitLogSegmentSize());
- return internalAddReadySegment(CommitLogSegment.freshSegment());
- }
-
- /**
- * Adds a segment to our internal tracking list and makes it ready for consumption.
- *
- * @param segment the segment to add
- * @return the newly added segment
- */
- private CommitLogSegment internalAddReadySegment(CommitLogSegment segment)
- {
- assert !activeSegments.contains(segment);
- assert !availableSegments.contains(segment);
- availableSegments.add(segment);
- return segment;
- }
-
- /**
- * Check to see if the speculative current size exceeds the cap.
- *
- * @return true if cap is exceeded
- */
- private boolean isCapExceeded()
- {
- long currentSize = size.get();
- logger.debug("Total active commitlog segment space used is {}", currentSize);
- return currentSize > DatabaseDescriptor.getTotalCommitlogSpaceInMB() * 1024 * 1024;
- }
-
- /**
- * Throws a flag that enables the behavior of keeping at least one spare segment
- * available at all times.
- */
- public void enableReserveSegmentCreation()
- {
- createReserveSegments = true;
- }
-
- /**
- * Force a flush on all dirty CFs represented in the oldest commitlog segment
- */
- private void flushOldestKeyspaces()
- {
- CommitLogSegment oldestSegment = activeSegments.peek();
-
- if (oldestSegment != null)
- {
- for (UUID dirtyCFId : oldestSegment.getDirtyCFIDs())
- {
- Pair<String,String> pair = Schema.instance.getCF(dirtyCFId);
- if (pair == null)
- {
- // even though we remove the schema entry before a final flush when dropping a CF,
- // it's still possible for a writer to race and finish his append after the flush.
- logger.debug("Marking clean CF {} that doesn't exist anymore", dirtyCFId);
- oldestSegment.markClean(dirtyCFId, oldestSegment.getContext());
- }
- else
- {
- String keypace = pair.left;
- final ColumnFamilyStore cfs = Keyspace.open(keypace).getColumnFamilyStore(dirtyCFId);
- // flush shouldn't run on the commitlog executor, since it acquires Table.switchLock,
- // which may already be held by a thread waiting for the CL executor (via getContext),
- // causing deadlock
- Runnable runnable = new Runnable()
- {
- public void run()
- {
- cfs.forceFlush();
- }
- };
- StorageService.optionalTasks.execute(runnable);
- }
- }
- }
- }
-
- /**
- * Resets all the segments, for testing purposes. DO NOT USE THIS OUTSIDE OF TESTS.
- */
- public void resetUnsafe()
- {
- logger.debug("Closing and clearing existing commit log segments...");
-
- while (!queue.isEmpty())
- Thread.yield();
-
- for (CommitLogSegment segment : Iterables.concat(activeSegments, availableSegments))
- segment.close();
-
- activeSegments.clear();
- availableSegments.clear();
- }
-
- /**
- * Initiates the shutdown process for the allocator thread.
- */
- public void shutdown()
- {
- run = false;
- }
-
- /**
- * Returns when the allocator thread terminates.
- */
- public void awaitTermination() throws InterruptedException
- {
- allocationThread.join();
- }
-
- /**
- * @return a read-only collection of the active commit log segments
- */
- public Collection<CommitLogSegment> getActiveSegments()
- {
- return Collections.unmodifiableCollection(activeSegments);
- }
-}
-
http://git-wip-us.apache.org/repos/asf/cassandra/blob/22e18f5a/src/java/org/apache/cassandra/db/commitlog/CommitLogArchiver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogArchiver.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogArchiver.java
index f020182..1385ea4 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogArchiver.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogArchiver.java
@@ -137,7 +137,7 @@ public class CommitLogArchiver
{
if (e.getCause() instanceof IOException)
{
- logger.info("Looks like the archiving of file {} failed earlier, cassandra is going to ignore this segment for now.", name);
+ logger.error("Looks like the archiving of file {} failed earlier, cassandra is going to ignore this segment for now.", name);
return false;
}
throw new RuntimeException(e);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/22e18f5a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
index ae091bf..c42ba9b 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
@@ -48,6 +48,7 @@ public class CommitLogReplayer
{
private static final Logger logger = LoggerFactory.getLogger(CommitLogReplayer.class);
private static final int MAX_OUTSTANDING_REPLAY_COUNT = 1024;
+ private static final int LEGACY_END_OF_SEGMENT_MARKER = 0;
private final Set<Keyspace> keyspacesRecovered;
private final List<Future<?>> futures;
@@ -114,6 +115,53 @@ public class CommitLogReplayer
return replayedCount.get();
}
+ private int readHeader(long segmentId, int offset, RandomAccessReader reader) throws IOException
+ {
+ if (offset > reader.length() - CommitLogSegment.SYNC_MARKER_SIZE)
+ {
+ if (offset != reader.length() && offset != Integer.MAX_VALUE)
+ logger.warn("Encountered bad header at position {} of Commit log {}; not enough room for a header");
+ // cannot possibly be a header here. if we're == length(), assume it's a correctly written final segment
+ return -1;
+ }
+ reader.seek(offset);
+ PureJavaCrc32 crc = new PureJavaCrc32();
+ crc.update((int) (segmentId & 0xFFFFFFFFL));
+ crc.update((int) (segmentId >>> 32));
+ crc.update((int) reader.getPosition());
+ int end = reader.readInt();
+ long filecrc = reader.readLong();
+ if (crc.getValue() != filecrc)
+ {
+ if (end != 0 || filecrc != 0)
+ {
+ logger.warn("Encountered bad header at position {} of commit log {}, with invalid CRC. The end of segment marker should be zero.", offset, reader.getPath());
+ }
+ return -1;
+ }
+ else if (end < offset || end > reader.length())
+ {
+ logger.warn("Encountered bad header at position {} of commit log {}, with bad position but valid CRC", offset, reader.getPath());
+ return -1;
+ }
+ return end;
+ }
+
+ private int getStartOffset(long segmentId, int version, File file)
+ {
+ if (globalPosition.segment < segmentId)
+ {
+ if (version >= CommitLogDescriptor.VERSION_21)
+ return CommitLogSegment.SYNC_MARKER_SIZE;
+ else
+ return 0;
+ }
+ else if (globalPosition.segment == segmentId)
+ return globalPosition.position;
+ else
+ return -1;
+ }
+
private abstract static class ReplayFilter
{
public abstract Iterable<ColumnFamily> filter(RowMutation rm);
@@ -181,182 +229,199 @@ public class CommitLogReplayer
final ReplayFilter replayFilter = ReplayFilter.create();
logger.info("Replaying {}", file.getPath());
CommitLogDescriptor desc = CommitLogDescriptor.fromFileName(file.getName());
- final long segment = desc.id;
+ final long segmentId = desc.id;
int version = desc.getMessagingVersion();
RandomAccessReader reader = RandomAccessReader.open(new File(file.getAbsolutePath()));
+
try
{
assert reader.length() <= Integer.MAX_VALUE;
- int replayPosition;
- if (globalPosition.segment < segment)
- {
- replayPosition = 0;
- }
- else if (globalPosition.segment == segment)
- {
- replayPosition = globalPosition.position;
- }
- else
+ int offset = getStartOffset(segmentId, version, file);
+ if (offset < 0)
{
logger.debug("skipping replay of fully-flushed {}", file);
return;
}
- if (logger.isDebugEnabled())
- logger.debug("Replaying {} starting at {}", file, replayPosition);
- reader.seek(replayPosition);
-
- /* read the logs populate RowMutation and apply */
- while (!reader.isEOF())
+ int prevEnd = 0;
+ main: while (true)
{
- if (logger.isDebugEnabled())
- logger.debug("Reading mutation at {}", reader.getFilePointer());
- long claimedCRC32;
- int serializedSize;
- try
+ int end = prevEnd;
+ if (version < CommitLogDescriptor.VERSION_21)
+ end = Integer.MAX_VALUE;
+ else
{
- // any of the reads may hit EOF
- serializedSize = reader.readInt();
- if (serializedSize == CommitLog.END_OF_SEGMENT_MARKER)
- {
- logger.debug("Encountered end of segment marker at {}", reader.getFilePointer());
- break;
- }
-
- // RowMutation must be at LEAST 10 bytes:
- // 3 each for a non-empty Keyspace and Key (including the
- // 2-byte length from writeUTF/writeWithShortLength) and 4 bytes for column count.
- // This prevents CRC by being fooled by special-case garbage in the file; see CASSANDRA-2128
- if (serializedSize < 10)
- break;
-
- long claimedSizeChecksum = reader.readLong();
- checksum.reset();
- if (version < CommitLogDescriptor.VERSION_20)
- checksum.update(serializedSize);
- else
- FBUtilities.updateChecksumInt(checksum, serializedSize);
-
- if (checksum.getValue() != claimedSizeChecksum)
- break; // entry wasn't synced correctly/fully. that's
- // ok.
-
- if (serializedSize > buffer.length)
- buffer = new byte[(int) (1.2 * serializedSize)];
- reader.readFully(buffer, 0, serializedSize);
- claimedCRC32 = reader.readLong();
- }
- catch (EOFException eof)
- {
- break; // last CL entry didn't get completely written. that's ok.
+ do { end = readHeader(segmentId, end, reader); }
+ while (end < offset && end > prevEnd);
}
- checksum.update(buffer, 0, serializedSize);
- if (claimedCRC32 != checksum.getValue())
- {
- // this entry must not have been fsynced. probably the rest is bad too,
- // but just in case there is no harm in trying them (since we still read on an entry boundary)
- continue;
- }
+ if (end < prevEnd)
+ break;
- /* deserialize the commit log entry */
- FastByteArrayInputStream bufIn = new FastByteArrayInputStream(buffer, 0, serializedSize);
- final RowMutation rm;
- try
- {
- // assuming version here. We've gone to lengths to make sure what gets written to the CL is in
- // the current version. so do make sure the CL is drained prior to upgrading a node.
- rm = RowMutation.serializer.deserialize(new DataInputStream(bufIn), version, ColumnSerializer.Flag.LOCAL);
- // doublecheck that what we read is [still] valid for the current schema
- for (ColumnFamily cf : rm.getColumnFamilies())
- for (Column cell : cf)
- cf.getComparator().validate(cell.name());
- }
- catch (UnknownColumnFamilyException ex)
- {
- if (ex.cfId == null)
- continue;
- AtomicInteger i = invalidMutations.get(ex.cfId);
- if (i == null)
- {
- i = new AtomicInteger(1);
- invalidMutations.put(ex.cfId, i);
- }
- else
- i.incrementAndGet();
- continue;
- }
- catch (Throwable t)
+ if (logger.isDebugEnabled())
+ logger.debug("Replaying {} between {} and {}", file, offset, prevEnd);
+
+ reader.seek(offset);
+
+ /* read the logs populate RowMutation and apply */
+ while (reader.getPosition() < end && !reader.isEOF())
{
- File f = File.createTempFile("mutation", "dat");
- DataOutputStream out = new DataOutputStream(new FileOutputStream(f));
+ if (logger.isDebugEnabled())
+ logger.debug("Reading mutation at {}", reader.getFilePointer());
+
+ long claimedCRC32;
+ int serializedSize;
try
{
- out.write(buffer, 0, serializedSize);
+ // any of the reads may hit EOF
+ serializedSize = reader.readInt();
+ if (serializedSize == LEGACY_END_OF_SEGMENT_MARKER)
+ {
+ logger.debug("Encountered end of segment marker at {}", reader.getFilePointer());
+ break main;
+ }
+
+ // RowMutation must be at LEAST 10 bytes:
+ // 3 each for a non-empty Keyspace and Key (including the
+ // 2-byte length from writeUTF/writeWithShortLength) and 4 bytes for column count.
+ // This prevents CRC by being fooled by special-case garbage in the file; see CASSANDRA-2128
+ if (serializedSize < 10)
+ break main;
+
+ long claimedSizeChecksum = reader.readLong();
+ checksum.reset();
+ if (version < CommitLogDescriptor.VERSION_20)
+ checksum.update(serializedSize);
+ else
+ FBUtilities.updateChecksumInt(checksum, serializedSize);
+
+ if (checksum.getValue() != claimedSizeChecksum)
+ break main; // entry wasn't synced correctly/fully. that's
+ // ok.
+
+ if (serializedSize > buffer.length)
+ buffer = new byte[(int) (1.2 * serializedSize)];
+ reader.readFully(buffer, 0, serializedSize);
+ claimedCRC32 = reader.readLong();
}
- finally
+ catch (EOFException eof)
{
- out.close();
+ break main; // last CL entry didn't get completely written. that's ok.
}
- String st = String.format("Unexpected error deserializing mutation; saved to %s and ignored. This may be caused by replaying a mutation against a table with the same name but incompatible schema. Exception follows: ",
- f.getAbsolutePath());
- logger.error(st, t);
- continue;
- }
- if (logger.isDebugEnabled())
- logger.debug("replaying mutation for {}.{}: {}", rm.getKeyspaceName(), ByteBufferUtil.bytesToHex(rm.key()), "{" + StringUtils.join(rm.getColumnFamilies().iterator(), ", ") + "}");
+ checksum.update(buffer, 0, serializedSize);
+ if (claimedCRC32 != checksum.getValue())
+ {
+ // this entry must not have been fsynced. probably the rest is bad too,
+ // but just in case there is no harm in trying them (since we still read on an entry boundary)
+ continue;
+ }
- final long entryLocation = reader.getFilePointer();
- Runnable runnable = new WrappedRunnable()
- {
- public void runMayThrow() throws IOException
+ /* deserialize the commit log entry */
+ FastByteArrayInputStream bufIn = new FastByteArrayInputStream(buffer, 0, serializedSize);
+ final RowMutation rm;
+ try
{
- if (Schema.instance.getKSMetaData(rm.getKeyspaceName()) == null)
- return;
- if (pointInTimeExceeded(rm))
- return;
-
- final Keyspace keyspace = Keyspace.open(rm.getKeyspaceName());
-
- // Rebuild the row mutation, omitting column families that
- // a) the user has requested that we ignore,
- // b) have already been flushed,
- // or c) are part of a cf that was dropped.
- // Keep in mind that the cf.name() is suspect. do every thing based on the cfid instead.
- RowMutation newRm = null;
- for (ColumnFamily columnFamily : replayFilter.filter(rm))
+ // assuming version here. We've gone to lengths to make sure what gets written to the CL is in
+ // the current version. so do make sure the CL is drained prior to upgrading a node.
+ rm = RowMutation.serializer.deserialize(new DataInputStream(bufIn), version, ColumnSerializer.Flag.LOCAL);
+ // doublecheck that what we read is [still] valid for the current schema
+ for (ColumnFamily cf : rm.getColumnFamilies())
+ for (Column cell : cf)
+ cf.getComparator().validate(cell.name());
+ }
+ catch (UnknownColumnFamilyException ex)
+ {
+ if (ex.cfId == null)
+ continue;
+ AtomicInteger i = invalidMutations.get(ex.cfId);
+ if (i == null)
{
- if (Schema.instance.getCF(columnFamily.id()) == null)
- continue; // dropped
+ i = new AtomicInteger(1);
+ invalidMutations.put(ex.cfId, i);
+ }
+ else
+ i.incrementAndGet();
+ continue;
+ }
+ catch (Throwable t)
+ {
+ File f = File.createTempFile("mutation", "dat");
+ DataOutputStream out = new DataOutputStream(new FileOutputStream(f));
+ try
+ {
+ out.write(buffer, 0, serializedSize);
+ }
+ finally
+ {
+ out.close();
+ }
+ String st = String.format("Unexpected error deserializing mutation; saved to %s and ignored. This may be caused by replaying a mutation against a table with the same name but incompatible schema. Exception follows: ",
+ f.getAbsolutePath());
+ logger.error(st, t);
+ continue;
+ }
- ReplayPosition rp = cfPositions.get(columnFamily.id());
+ if (logger.isDebugEnabled())
+ logger.debug("replaying mutation for {}.{}: {}", rm.getKeyspaceName(), ByteBufferUtil.bytesToHex(rm.key()), "{" + StringUtils.join(rm.getColumnFamilies().iterator(), ", ") + "}");
- // replay if current segment is newer than last flushed one or,
- // if it is the last known segment, if we are after the replay position
- if (segment > rp.segment || (segment == rp.segment && entryLocation > rp.position))
+ final long entryLocation = reader.getFilePointer();
+ Runnable runnable = new WrappedRunnable()
+ {
+ public void runMayThrow() throws IOException
+ {
+ if (Schema.instance.getKSMetaData(rm.getKeyspaceName()) == null)
+ return;
+ if (pointInTimeExceeded(rm))
+ return;
+
+ final Keyspace keyspace = Keyspace.open(rm.getKeyspaceName());
+
+ // Rebuild the row mutation, omitting column families that
+ // a) the user has requested that we ignore,
+ // b) have already been flushed,
+ // or c) are part of a cf that was dropped.
+ // Keep in mind that the cf.name() is suspect. do every thing based on the cfid instead.
+ RowMutation newRm = null;
+ for (ColumnFamily columnFamily : replayFilter.filter(rm))
{
- if (newRm == null)
- newRm = new RowMutation(rm.getKeyspaceName(), rm.key());
- newRm.add(columnFamily);
- replayedCount.incrementAndGet();
+ if (Schema.instance.getCF(columnFamily.id()) == null)
+ continue; // dropped
+
+ ReplayPosition rp = cfPositions.get(columnFamily.id());
+
+ // replay if current segment is newer than last flushed one or,
+ // if it is the last known segment, if we are after the replay position
+ if (segmentId > rp.segment || (segmentId == rp.segment && entryLocation > rp.position))
+ {
+ if (newRm == null)
+ newRm = new RowMutation(rm.getKeyspaceName(), rm.key());
+ newRm.add(columnFamily);
+ replayedCount.incrementAndGet();
+ }
+ }
+ if (newRm != null)
+ {
+ assert !newRm.isEmpty();
+ Keyspace.open(newRm.getKeyspaceName()).apply(newRm, false);
+ keyspacesRecovered.add(keyspace);
}
}
- if (newRm != null)
- {
- assert !newRm.isEmpty();
- Keyspace.open(newRm.getKeyspaceName()).apply(newRm, false);
- keyspacesRecovered.add(keyspace);
- }
+ };
+ futures.add(StageManager.getStage(Stage.MUTATION).submit(runnable));
+ if (futures.size() > MAX_OUTSTANDING_REPLAY_COUNT)
+ {
+ FBUtilities.waitOnFutures(futures);
+ futures.clear();
}
- };
- futures.add(StageManager.getStage(Stage.MUTATION).submit(runnable));
- if (futures.size() > MAX_OUTSTANDING_REPLAY_COUNT)
- {
- FBUtilities.waitOnFutures(futures);
- futures.clear();
}
+
+ if (version < CommitLogDescriptor.VERSION_21)
+ break;
+
+ offset = end + CommitLogSegment.SYNC_MARKER_SIZE;
+ prevEnd = end;
}
}
finally