You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jm...@apache.org on 2016/06/16 13:55:39 UTC
[3/5] cassandra git commit: Add Change Data Capture
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5dcab286/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
index 2045c35..2e97fd5 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
@@ -22,34 +22,22 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.file.StandardOpenOption;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
+import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.zip.CRC32;
-import com.codahale.metrics.Timer;
-
import org.cliffc.high_scale_lib.NonBlockingHashMap;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.config.Schema;
+import com.codahale.metrics.Timer;
+import org.apache.cassandra.config.*;
import org.apache.cassandra.db.Mutation;
import org.apache.cassandra.db.commitlog.CommitLog.Configuration;
import org.apache.cassandra.db.partitions.PartitionUpdate;
import org.apache.cassandra.io.FSWriteError;
-import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.utils.CLibrary;
import org.apache.cassandra.utils.concurrent.OpOrder;
import org.apache.cassandra.utils.concurrent.WaitQueue;
@@ -66,6 +54,14 @@ public abstract class CommitLogSegment
private static final Logger logger = LoggerFactory.getLogger(CommitLogSegment.class);
private final static long idBase;
+
+ private CDCState cdcState = CDCState.PERMITTED;
+ public enum CDCState {
+ PERMITTED,
+ FORBIDDEN,
+ CONTAINS
+ }
+
private final static AtomicInteger nextId = new AtomicInteger(1);
private static long replayLimitId;
static
@@ -115,18 +111,20 @@ public abstract class CommitLogSegment
final FileChannel channel;
final int fd;
+ protected final AbstractCommitLogSegmentManager manager;
+
ByteBuffer buffer;
private volatile boolean headerWritten;
final CommitLog commitLog;
public final CommitLogDescriptor descriptor;
- static CommitLogSegment createSegment(CommitLog commitLog, Runnable onClose)
+ static CommitLogSegment createSegment(CommitLog commitLog, AbstractCommitLogSegmentManager manager, Runnable onClose)
{
Configuration config = commitLog.configuration;
- CommitLogSegment segment = config.useEncryption() ? new EncryptedSegment(commitLog, onClose)
- : config.useCompression() ? new CompressedSegment(commitLog, onClose)
- : new MemoryMappedSegment(commitLog);
+ CommitLogSegment segment = config.useEncryption() ? new EncryptedSegment(commitLog, manager, onClose)
+ : config.useCompression() ? new CompressedSegment(commitLog, manager, onClose)
+ : new MemoryMappedSegment(commitLog, manager);
segment.writeLogHeader();
return segment;
}
@@ -151,14 +149,16 @@ public abstract class CommitLogSegment
/**
* Constructs a new segment file.
*/
- CommitLogSegment(CommitLog commitLog)
+ CommitLogSegment(CommitLog commitLog, AbstractCommitLogSegmentManager manager)
{
this.commitLog = commitLog;
+ this.manager = manager;
+
id = getNextId();
descriptor = new CommitLogDescriptor(id,
commitLog.configuration.getCompressorClass(),
commitLog.configuration.getEncryptionContext());
- logFile = new File(commitLog.location, descriptor.fileName());
+ logFile = new File(manager.storageDirectory, descriptor.fileName());
try
{
@@ -369,22 +369,11 @@ public abstract class CommitLogSegment
}
/**
- * Completely discards a segment file by deleting it. (Potentially blocking operation)
- */
- void discard(boolean deleteFile)
- {
- close();
- if (deleteFile)
- FileUtils.deleteWithConfirm(logFile);
- commitLog.allocator.addSize(-onDiskSize());
- }
-
- /**
- * @return the current ReplayPosition for this log segment
+ * @return the current CommitLogPosition for this log segment
*/
- public ReplayPosition getContext()
+ public CommitLogPosition getCurrentCommitLogPosition()
{
- return new ReplayPosition(id, allocatePosition.get());
+ return new CommitLogPosition(id, allocatePosition.get());
}
/**
@@ -474,13 +463,13 @@ public abstract class CommitLogSegment
* @param cfId the column family ID that is now clean
* @param context the optional clean offset
*/
- public synchronized void markClean(UUID cfId, ReplayPosition context)
+ public synchronized void markClean(UUID cfId, CommitLogPosition context)
{
if (!cfDirty.containsKey(cfId))
return;
- if (context.segment == id)
+ if (context.segmentId == id)
markClean(cfId, context.position);
- else if (context.segment > id)
+ else if (context.segmentId > id)
markClean(cfId, Integer.MAX_VALUE);
}
@@ -565,14 +554,14 @@ public abstract class CommitLogSegment
}
/**
- * Check to see if a certain ReplayPosition is contained by this segment file.
+ * Check to see if a certain CommitLogPosition is contained by this segment file.
*
- * @param context the replay position to be checked
- * @return true if the replay position is contained by this segment file.
+ * @param context the commit log segment position to be checked
+ * @return true if the commit log segment position is contained by this segment file.
*/
- public boolean contains(ReplayPosition context)
+ public boolean contains(CommitLogPosition context)
{
- return context.segment == id;
+ return context.segmentId == id;
}
// For debugging, not fast
@@ -610,12 +599,37 @@ public abstract class CommitLogSegment
}
}
+ public CDCState getCDCState()
+ {
+ return cdcState;
+ }
+
+ /**
+ * Change the current cdcState on this CommitLogSegment. There are some restrictions on state transitions and this
+ * method is idempotent.
+ */
+ public void setCDCState(CDCState newState)
+ {
+ if (newState == cdcState)
+ return;
+
+ // Also synchronized in CDCSizeTracker.processNewSegment and .processDiscardedSegment
+ synchronized(this)
+ {
+ if (cdcState == CDCState.CONTAINS && newState != CDCState.CONTAINS)
+ throw new IllegalArgumentException("Cannot transition from CONTAINS to any other state.");
+
+ if (cdcState == CDCState.FORBIDDEN && newState != CDCState.PERMITTED)
+ throw new IllegalArgumentException("Only transition from FORBIDDEN to PERMITTED is allowed.");
+
+ cdcState = newState;
+ }
+ }
+
/**
* A simple class for tracking information about the portion of a segment that has been allocated to a log write.
- * The constructor leaves the fields uninitialized for population by CommitlogManager, so that it can be
- * stack-allocated by escape analysis in CommitLog.add.
*/
- static class Allocation
+ protected static class Allocation
{
private final CommitLogSegment segment;
private final OpOrder.Group appendOp;
@@ -652,9 +666,9 @@ public abstract class CommitLogSegment
segment.waitForSync(position, waitingOnCommit);
}
- public ReplayPosition getReplayPosition()
+ public CommitLogPosition getCommitLogPosition()
{
- return new ReplayPosition(segment.id, buffer.limit());
+ return new CommitLogPosition(segment.id, buffer.limit());
}
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5dcab286/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java
deleted file mode 100644
index 4f1166b..0000000
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java
+++ /dev/null
@@ -1,567 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db.commitlog;
-
-import java.io.File;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.Future;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.Iterables;
-import com.google.common.util.concurrent.*;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.config.Schema;
-import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.db.Keyspace;
-import org.apache.cassandra.db.Mutation;
-import org.apache.cassandra.io.util.FileUtils;
-import org.apache.cassandra.utils.Pair;
-import org.apache.cassandra.utils.concurrent.WaitQueue;
-import org.apache.cassandra.utils.JVMStabilityInspector;
-import org.apache.cassandra.utils.WrappedRunnable;
-
-import static org.apache.cassandra.db.commitlog.CommitLogSegment.Allocation;
-
-/**
- * Performs eager-creation of commit log segments in a background thread. All the
- * public methods are thread safe.
- */
-public class CommitLogSegmentManager
-{
- static final Logger logger = LoggerFactory.getLogger(CommitLogSegmentManager.class);
-
- /**
- * Queue of work to be done by the manager thread, also used to wake the thread to perform segment allocation.
- */
- private final BlockingQueue<Runnable> segmentManagementTasks = new LinkedBlockingQueue<>();
-
- /** Segments that are ready to be used. Head of the queue is the one we allocate writes to */
- private final ConcurrentLinkedQueue<CommitLogSegment> availableSegments = new ConcurrentLinkedQueue<>();
-
- /** Active segments, containing unflushed data */
- private final ConcurrentLinkedQueue<CommitLogSegment> activeSegments = new ConcurrentLinkedQueue<>();
-
- /** The segment we are currently allocating commit log records to */
- private volatile CommitLogSegment allocatingFrom = null;
-
- private final WaitQueue hasAvailableSegments = new WaitQueue();
-
- /**
- * Tracks commitlog size, in multiples of the segment size. We need to do this so we can "promise" size
- * adjustments ahead of actually adding/freeing segments on disk, so that the "evict oldest segment" logic
- * can see the effect of recycling segments immediately (even though they're really happening asynchronously
- * on the manager thread, which will take a ms or two).
- */
- private final AtomicLong size = new AtomicLong();
-
- /**
- * New segment creation is initially disabled because we'll typically get some "free" segments
- * recycled after log replay.
- */
- volatile boolean createReserveSegments = false;
-
- private Thread managerThread;
- private volatile boolean run = true;
- private final CommitLog commitLog;
-
- CommitLogSegmentManager(final CommitLog commitLog)
- {
- this.commitLog = commitLog;
- }
-
- void start()
- {
- // The run loop for the manager thread
- Runnable runnable = new WrappedRunnable()
- {
- public void runMayThrow() throws Exception
- {
- while (run)
- {
- try
- {
- Runnable task = segmentManagementTasks.poll();
- if (task == null)
- {
- // if we have no more work to do, check if we should create a new segment
- if (!atSegmentLimit() && availableSegments.isEmpty() && (activeSegments.isEmpty() || createReserveSegments))
- {
- logger.trace("No segments in reserve; creating a fresh one");
- // TODO : some error handling in case we fail to create a new segment
- availableSegments.add(CommitLogSegment.createSegment(commitLog, () -> wakeManager()));
- hasAvailableSegments.signalAll();
- }
-
- // flush old Cfs if we're full
- long unused = unusedCapacity();
- if (unused < 0)
- {
- List<CommitLogSegment> segmentsToRecycle = new ArrayList<>();
- long spaceToReclaim = 0;
- for (CommitLogSegment segment : activeSegments)
- {
- if (segment == allocatingFrom)
- break;
- segmentsToRecycle.add(segment);
- spaceToReclaim += DatabaseDescriptor.getCommitLogSegmentSize();
- if (spaceToReclaim + unused >= 0)
- break;
- }
- flushDataFrom(segmentsToRecycle, false);
- }
-
- try
- {
- // wait for new work to be provided
- task = segmentManagementTasks.take();
- }
- catch (InterruptedException e)
- {
- throw new AssertionError();
- }
- }
-
- task.run();
- }
- catch (Throwable t)
- {
- JVMStabilityInspector.inspectThrowable(t);
- if (!CommitLog.handleCommitError("Failed managing commit log segments", t))
- return;
- // sleep some arbitrary period to avoid spamming CL
- Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
- }
- }
- }
-
- private boolean atSegmentLimit()
- {
- return CommitLogSegment.usesBufferPool(commitLog) && CompressedSegment.hasReachedPoolLimit();
- }
-
- };
-
- run = true;
-
- managerThread = new Thread(runnable, "COMMIT-LOG-ALLOCATOR");
- managerThread.start();
- }
-
- /**
- * Reserve space in the current segment for the provided mutation or, if there isn't space available,
- * create a new segment.
- *
- * @return the provided Allocation object
- */
- public Allocation allocate(Mutation mutation, int size)
- {
- CommitLogSegment segment = allocatingFrom();
-
- Allocation alloc;
- while ( null == (alloc = segment.allocate(mutation, size)) )
- {
- // failed to allocate, so move to a new segment with enough room
- advanceAllocatingFrom(segment);
- segment = allocatingFrom;
- }
-
- return alloc;
- }
-
- // simple wrapper to ensure non-null value for allocatingFrom; only necessary on first call
- CommitLogSegment allocatingFrom()
- {
- CommitLogSegment r = allocatingFrom;
- if (r == null)
- {
- advanceAllocatingFrom(null);
- r = allocatingFrom;
- }
- return r;
- }
-
- /**
- * Fetches a new segment from the queue, creating a new one if necessary, and activates it
- */
- private void advanceAllocatingFrom(CommitLogSegment old)
- {
- while (true)
- {
- CommitLogSegment next;
- synchronized (this)
- {
- // do this in a critical section so we can atomically remove from availableSegments and add to allocatingFrom/activeSegments
- // see https://issues.apache.org/jira/browse/CASSANDRA-6557?focusedCommentId=13874432&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-13874432
- if (allocatingFrom != old)
- return;
- next = availableSegments.poll();
- if (next != null)
- {
- allocatingFrom = next;
- activeSegments.add(next);
- }
- }
-
- if (next != null)
- {
- if (old != null)
- {
- // Now we can run the user defined command just after switching to the new commit log.
- // (Do this here instead of in the recycle call so we can get a head start on the archive.)
- commitLog.archiver.maybeArchive(old);
-
- // ensure we don't continue to use the old file; not strictly necessary, but cleaner to enforce it
- old.discardUnusedTail();
- }
-
- // request that the CL be synced out-of-band, as we've finished a segment
- commitLog.requestExtraSync();
- return;
- }
-
- // no more segments, so register to receive a signal when not empty
- WaitQueue.Signal signal = hasAvailableSegments.register(commitLog.metrics.waitingOnSegmentAllocation.time());
-
- // trigger the management thread; this must occur after registering
- // the signal to ensure we are woken by any new segment creation
- wakeManager();
-
- // check if the queue has already been added to before waiting on the signal, to catch modifications
- // that happened prior to registering the signal; *then* check to see if we've been beaten to making the change
- if (!availableSegments.isEmpty() || allocatingFrom != old)
- {
- signal.cancel();
- // if we've been beaten, just stop immediately
- if (allocatingFrom != old)
- return;
- // otherwise try again, as there should be an available segment
- continue;
- }
-
- // can only reach here if the queue hasn't been inserted into
- // before we registered the signal, as we only remove items from the queue
- // after updating allocatingFrom. Can safely block until we are signalled
- // by the allocator that new segments have been published
- signal.awaitUninterruptibly();
- }
- }
-
- private void wakeManager()
- {
- // put a NO-OP on the queue, to trigger management thread (and create a new segment if necessary)
- segmentManagementTasks.add(Runnables.doNothing());
- }
-
- /**
- * Switch to a new segment, regardless of how much is left in the current one.
- *
- * Flushes any dirty CFs for this segment and any older segments, and then recycles
- * the segments
- */
- void forceRecycleAll(Iterable<UUID> droppedCfs)
- {
- List<CommitLogSegment> segmentsToRecycle = new ArrayList<>(activeSegments);
- CommitLogSegment last = segmentsToRecycle.get(segmentsToRecycle.size() - 1);
- advanceAllocatingFrom(last);
-
- // wait for the commit log modifications
- last.waitForModifications();
-
- // make sure the writes have materialized inside of the memtables by waiting for all outstanding writes
- // on the relevant keyspaces to complete
- Keyspace.writeOrder.awaitNewBarrier();
-
- // flush and wait for all CFs that are dirty in segments up-to and including 'last'
- Future<?> future = flushDataFrom(segmentsToRecycle, true);
- try
- {
- future.get();
-
- for (CommitLogSegment segment : activeSegments)
- for (UUID cfId : droppedCfs)
- segment.markClean(cfId, segment.getContext());
-
- // now recycle segments that are unused, as we may not have triggered a discardCompletedSegments()
- // if the previous active segment was the only one to recycle (since an active segment isn't
- // necessarily dirty, and we only call dCS after a flush).
- for (CommitLogSegment segment : activeSegments)
- if (segment.isUnused())
- recycleSegment(segment);
-
- CommitLogSegment first;
- if ((first = activeSegments.peek()) != null && first.id <= last.id)
- logger.error("Failed to force-recycle all segments; at least one segment is still in use with dirty CFs.");
- }
- catch (Throwable t)
- {
- // for now just log the error and return false, indicating that we failed
- logger.error("Failed waiting for a forced recycle of in-use commit log segments", t);
- }
- }
-
- /**
- * Indicates that a segment is no longer in use and that it should be recycled.
- *
- * @param segment segment that is no longer in use
- */
- void recycleSegment(final CommitLogSegment segment)
- {
- boolean archiveSuccess = commitLog.archiver.maybeWaitForArchiving(segment.getName());
- if (activeSegments.remove(segment))
- {
- // if archiving (command) was not successful then leave the file alone. don't delete or recycle.
- discardSegment(segment, archiveSuccess);
- }
- else
- {
- logger.warn("segment {} not found in activeSegments queue", segment);
- }
- }
-
- /**
- * Differs from the above because it can work on any file instead of just existing
- * commit log segments managed by this manager.
- *
- * @param file segment file that is no longer in use.
- */
- void recycleSegment(final File file)
- {
- // (don't decrease managed size, since this was never a "live" segment)
- logger.trace("(Unopened) segment {} is no longer needed and will be deleted now", file);
- FileUtils.deleteWithConfirm(file);
- }
-
- /**
- * Indicates that a segment file should be deleted.
- *
- * @param segment segment to be discarded
- */
- private void discardSegment(final CommitLogSegment segment, final boolean deleteFile)
- {
- logger.trace("Segment {} is no longer active and will be deleted {}", segment, deleteFile ? "now" : "by the archive script");
-
- segmentManagementTasks.add(new Runnable()
- {
- public void run()
- {
- segment.discard(deleteFile);
- }
- });
- }
-
- /**
- * Adjust the tracked on-disk size. Called by individual segments to reflect writes, allocations and discards.
- * @param addedSize
- */
- void addSize(long addedSize)
- {
- size.addAndGet(addedSize);
- }
-
- /**
- * @return the space (in bytes) used by all segment files.
- */
- public long onDiskSize()
- {
- return size.get();
- }
-
- private long unusedCapacity()
- {
- long total = DatabaseDescriptor.getTotalCommitlogSpaceInMB() * 1024 * 1024;
- long currentSize = size.get();
- logger.trace("Total active commitlog segment space used is {} out of {}", currentSize, total);
- return total - currentSize;
- }
-
- /**
- * @param name the filename to check
- * @return true if file is managed by this manager.
- */
- public boolean manages(String name)
- {
- for (CommitLogSegment segment : Iterables.concat(activeSegments, availableSegments))
- if (segment.getName().equals(name))
- return true;
- return false;
- }
-
- /**
- * Throws a flag that enables the behavior of keeping at least one spare segment
- * available at all times.
- */
- void enableReserveSegmentCreation()
- {
- createReserveSegments = true;
- wakeManager();
- }
-
- /**
- * Force a flush on all CFs that are still dirty in @param segments.
- *
- * @return a Future that will finish when all the flushes are complete.
- */
- private Future<?> flushDataFrom(List<CommitLogSegment> segments, boolean force)
- {
- if (segments.isEmpty())
- return Futures.immediateFuture(null);
- final ReplayPosition maxReplayPosition = segments.get(segments.size() - 1).getContext();
-
- // a map of CfId -> forceFlush() to ensure we only queue one flush per cf
- final Map<UUID, ListenableFuture<?>> flushes = new LinkedHashMap<>();
-
- for (CommitLogSegment segment : segments)
- {
- for (UUID dirtyCFId : segment.getDirtyCFIDs())
- {
- Pair<String,String> pair = Schema.instance.getCF(dirtyCFId);
- if (pair == null)
- {
- // even though we remove the schema entry before a final flush when dropping a CF,
- // it's still possible for a writer to race and finish his append after the flush.
- logger.trace("Marking clean CF {} that doesn't exist anymore", dirtyCFId);
- segment.markClean(dirtyCFId, segment.getContext());
- }
- else if (!flushes.containsKey(dirtyCFId))
- {
- String keyspace = pair.left;
- final ColumnFamilyStore cfs = Keyspace.open(keyspace).getColumnFamilyStore(dirtyCFId);
- // can safely call forceFlush here as we will only ever block (briefly) for other attempts to flush,
- // no deadlock possibility since switchLock removal
- flushes.put(dirtyCFId, force ? cfs.forceFlush() : cfs.forceFlush(maxReplayPosition));
- }
- }
- }
-
- return Futures.allAsList(flushes.values());
- }
-
- /**
- * Stops CL, for testing purposes. DO NOT USE THIS OUTSIDE OF TESTS.
- * Only call this after the AbstractCommitLogService is shut down.
- */
- public void stopUnsafe(boolean deleteSegments)
- {
- logger.trace("CLSM closing and clearing existing commit log segments...");
- createReserveSegments = false;
-
- awaitManagementTasksCompletion();
-
- shutdown();
- try
- {
- awaitTermination();
- }
- catch (InterruptedException e)
- {
- throw new RuntimeException(e);
- }
-
- synchronized (this)
- {
- for (CommitLogSegment segment : activeSegments)
- closeAndDeleteSegmentUnsafe(segment, deleteSegments);
- activeSegments.clear();
-
- for (CommitLogSegment segment : availableSegments)
- closeAndDeleteSegmentUnsafe(segment, deleteSegments);
- availableSegments.clear();
- }
-
- allocatingFrom = null;
-
- segmentManagementTasks.clear();
-
- size.set(0L);
-
- logger.trace("CLSM done with closing and clearing existing commit log segments.");
- }
-
- // Used by tests only.
- void awaitManagementTasksCompletion()
- {
- while (!segmentManagementTasks.isEmpty())
- Thread.yield();
- // The last management task is not yet complete. Wait a while for it.
- Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
- // TODO: If this functionality is required by anything other than tests, signalling must be used to ensure
- // waiting completes correctly.
- }
-
- private static void closeAndDeleteSegmentUnsafe(CommitLogSegment segment, boolean delete)
- {
- try
- {
- segment.discard(delete);
- }
- catch (AssertionError ignored)
- {
- // segment file does not exist
- }
- }
-
- /**
- * Initiates the shutdown process for the management thread.
- */
- public void shutdown()
- {
- run = false;
- wakeManager();
- }
-
- /**
- * Returns when the management thread terminates.
- */
- public void awaitTermination() throws InterruptedException
- {
- managerThread.join();
-
- for (CommitLogSegment segment : activeSegments)
- segment.close();
-
- for (CommitLogSegment segment : availableSegments)
- segment.close();
-
- FileDirectSegment.shutdown();
- }
-
- /**
- * @return a read-only collection of the active commit log segments
- */
- @VisibleForTesting
- public Collection<CommitLogSegment> getActiveSegments()
- {
- return Collections.unmodifiableCollection(activeSegments);
- }
-
-}
-
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5dcab286/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerCDC.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerCDC.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerCDC.java
new file mode 100644
index 0000000..5c6fd3f
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerCDC.java
@@ -0,0 +1,302 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.commitlog;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicLong;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.RateLimiter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.commitlog.CommitLogSegment.CDCState;
+import org.apache.cassandra.exceptions.WriteTimeoutException;
+import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.utils.DirectorySizeCalculator;
+
+public class CommitLogSegmentManagerCDC extends AbstractCommitLogSegmentManager
+{
+ static final Logger logger = LoggerFactory.getLogger(CommitLogSegmentManagerCDC.class);
+ private final CDCSizeTracker cdcSizeTracker;
+
+ public CommitLogSegmentManagerCDC(final CommitLog commitLog, String storageDirectory)
+ {
+ super(commitLog, storageDirectory);
+ cdcSizeTracker = new CDCSizeTracker(this, new File(DatabaseDescriptor.getCDCLogLocation()));
+ }
+
+ @Override
+ void start()
+ {
+ super.start();
+ cdcSizeTracker.start();
+ }
+
+ public void discard(CommitLogSegment segment, boolean delete)
+ {
+ segment.close();
+ addSize(-segment.onDiskSize());
+
+ cdcSizeTracker.processDiscardedSegment(segment);
+
+ if (segment.getCDCState() == CDCState.CONTAINS)
+ FileUtils.renameWithConfirm(segment.logFile.getAbsolutePath(), DatabaseDescriptor.getCDCLogLocation() + File.separator + segment.logFile.getName());
+ else
+ {
+ if (delete)
+ FileUtils.deleteWithConfirm(segment.logFile);
+ }
+ }
+
+ /**
+ * Initiates the shutdown process for the management thread. Also stops the cdc on-disk size calculator executor.
+ */
+ public void shutdown()
+ {
+ run = false;
+ cdcSizeTracker.shutdown();
+ wakeManager();
+ }
+
+ /**
+ * Reserve space in the current segment for the provided mutation or, if there isn't space available,
+ * create a new segment. For CDC mutations, allocation is expected to throw WTE if the segment disallows CDC mutations.
+ *
+ * @param mutation Mutation to allocate in segment manager
+ * @param size total size (overhead + serialized) of mutation
+ * @return the created Allocation object
+ * @throws WriteTimeoutException If segment disallows CDC mutations, we throw WTE
+ */
+ @Override
+ public CommitLogSegment.Allocation allocate(Mutation mutation, int size) throws WriteTimeoutException
+ {
+ CommitLogSegment segment = allocatingFrom();
+ CommitLogSegment.Allocation alloc;
+
+ throwIfForbidden(mutation, segment);
+ while ( null == (alloc = segment.allocate(mutation, size)) )
+ {
+ // Failed to allocate, so move to a new segment with enough room if possible.
+ advanceAllocatingFrom(segment);
+ segment = allocatingFrom;
+
+ throwIfForbidden(mutation, segment);
+ }
+
+ if (mutation.trackedByCDC())
+ segment.setCDCState(CDCState.CONTAINS);
+
+ return alloc;
+ }
+
+ private void throwIfForbidden(Mutation mutation, CommitLogSegment segment) throws WriteTimeoutException
+ {
+ if (mutation.trackedByCDC() && segment.getCDCState() == CDCState.FORBIDDEN)
+ {
+ cdcSizeTracker.submitOverflowSizeRecalculation();
+ throw new WriteTimeoutException(WriteType.CDC, ConsistencyLevel.LOCAL_ONE, 0, 1);
+ }
+ }
+
+ /**
+ * Move files to cdc_raw after replay, since recovery will flush to SSTable and these mutations won't be available
+ * in the CL subsystem otherwise.
+ */
+ void handleReplayedSegment(final File file)
+ {
+ logger.trace("Moving (Unopened) segment {} to cdc_raw directory after replay", file);
+ FileUtils.renameWithConfirm(file.getAbsolutePath(), DatabaseDescriptor.getCDCLogLocation() + File.separator + file.getName());
+ cdcSizeTracker.addFlushedSize(file.length());
+ }
+
+ /**
+ * On segment creation, flag whether the segment should accept CDC mutations or not based on the total currently
+ * allocated unflushed CDC segments and the contents of cdc_raw
+ */
+ public CommitLogSegment createSegment()
+ {
+ CommitLogSegment segment = CommitLogSegment.createSegment(commitLog, this, () -> wakeManager());
+ cdcSizeTracker.processNewSegment(segment);
+ return segment;
+ }
+
+ /**
+ * Tracks total disk usage of CDC subsystem, defined by the summation of all unflushed CommitLogSegments with CDC
+ * data in them and all segments archived into cdc_raw.
+ *
+ * Allows atomic increment/decrement of unflushed size, however only allows increment on flushed and requires a full
+ * directory walk to determine any potential deletions by CDC consumer.
+ *
+ * TODO: linux performs approximately 25% better with the following one-liner instead of this walker:
+ * Arrays.stream(path.listFiles()).mapToLong(File::length).sum();
+ * However this solution is 375% slower on Windows. Revisit this and split logic to per-OS
+ */
+ private class CDCSizeTracker extends DirectorySizeCalculator
+ {
+ private final RateLimiter rateLimiter = RateLimiter.create(1000 / DatabaseDescriptor.getCDCDiskCheckInterval());
+ private ExecutorService cdcSizeCalculationExecutor;
+ private CommitLogSegmentManagerCDC segmentManager;
+ private AtomicLong unflushedCDCSize = new AtomicLong(0);
+
+ CDCSizeTracker(CommitLogSegmentManagerCDC segmentManager, File path)
+ {
+ super(path);
+ this.segmentManager = segmentManager;
+ }
+
+ /**
+ * Needed for stop/restart during unit tests
+ */
+ public void start()
+ {
+ cdcSizeCalculationExecutor = new ThreadPoolExecutor(1, 1, 1000, TimeUnit.SECONDS, new SynchronousQueue<>(), new ThreadPoolExecutor.DiscardPolicy());
+ }
+
+ /**
+ * Synchronous size recalculation on each segment creation/deletion call could lead to very long delays in new
+ * segment allocation, thus long delays in thread signaling to wake waiting allocation / writer threads.
+ *
+ * This can be reached either from the segment management thread in ABstractCommitLogSegmentManager or from the
+ * size recalculation executor, so we synchronize on this object to reduce the race overlap window available for
+ * size to get off.
+ *
+ * Reference DirectorySizerBench for more information about performance of the directory size recalc.
+ */
+ void processNewSegment(CommitLogSegment segment)
+ {
+ // See synchronization in CommitLogSegment.setCDCState
+ synchronized(segment)
+ {
+ segment.setCDCState(defaultSegmentSize() + totalCDCSizeOnDisk() > allowableCDCBytes()
+ ? CDCState.FORBIDDEN
+ : CDCState.PERMITTED);
+ if (segment.getCDCState() == CDCState.PERMITTED)
+ unflushedCDCSize.addAndGet(defaultSegmentSize());
+ }
+
+ // Take this opportunity to kick off a recalc to pick up any consumer file deletion.
+ submitOverflowSizeRecalculation();
+ }
+
+ void processDiscardedSegment(CommitLogSegment segment)
+ {
+ // See synchronization in CommitLogSegment.setCDCState
+ synchronized(segment)
+ {
+ // Add to flushed size before decrementing unflushed so we don't have a window of false generosity
+ if (segment.getCDCState() == CDCState.CONTAINS)
+ size.addAndGet(segment.onDiskSize());
+ if (segment.getCDCState() != CDCState.FORBIDDEN)
+ unflushedCDCSize.addAndGet(-defaultSegmentSize());
+ }
+
+ // Take this opportunity to kick off a recalc to pick up any consumer file deletion.
+ submitOverflowSizeRecalculation();
+ }
+
+ private long allowableCDCBytes()
+ {
+ return (long)DatabaseDescriptor.getCDCSpaceInMB() * 1024 * 1024;
+ }
+
+ public void submitOverflowSizeRecalculation()
+ {
+ try
+ {
+ cdcSizeCalculationExecutor.submit(() -> recalculateOverflowSize());
+ }
+ catch (RejectedExecutionException e)
+ {
+ // Do nothing. Means we have one in flight so this req. should be satisfied when it completes.
+ }
+ }
+
+ private void recalculateOverflowSize()
+ {
+ rateLimiter.acquire();
+ calculateSize();
+ CommitLogSegment allocatingFrom = segmentManager.allocatingFrom;
+ if (allocatingFrom.getCDCState() == CDCState.FORBIDDEN)
+ processNewSegment(allocatingFrom);
+ }
+
+ private int defaultSegmentSize()
+ {
+ return DatabaseDescriptor.getCommitLogSegmentSize();
+ }
+
+ private void calculateSize()
+ {
+ try
+ {
+ // Since we don't synchronize around either rebuilding our file list or walking the tree and adding to
+ // size, it's possible we could have changes take place underneath us and end up with a slightly incorrect
+ // view of our flushed size by the time this walking completes. Given that there's a linear growth in
+ // runtime on both rebuildFileList and walkFileTree (about 50% for each one on runtime), and that the
+ // window for this race should be very small, this is an acceptable trade-off since it will be resolved
+ // on the next segment creation / deletion with a subsequent call to submitOverflowSizeRecalculation.
+ rebuildFileList();
+ Files.walkFileTree(path.toPath(), this);
+ }
+ catch (IOException ie)
+ {
+ CommitLog.instance.handleCommitError("Failed CDC Size Calculation", ie);
+ }
+ }
+
+ private long addFlushedSize(long toAdd)
+ {
+ return size.addAndGet(toAdd);
+ }
+
+ private long totalCDCSizeOnDisk()
+ {
+ return unflushedCDCSize.get() + size.get();
+ }
+
+ public void shutdown()
+ {
+ cdcSizeCalculationExecutor.shutdown();
+ }
+ }
+
+ /**
+ * Only use for testing / validation that size tracker is working. Not for production use.
+ */
+ @VisibleForTesting
+ public long updateCDCTotalSize()
+ {
+ cdcSizeTracker.submitOverflowSizeRecalculation();
+
+ // Give the update time to run
+ try
+ {
+ Thread.sleep(DatabaseDescriptor.getCDCDiskCheckInterval() + 10);
+ }
+ catch (InterruptedException e) {}
+
+ return cdcSizeTracker.totalCDCSizeOnDisk();
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5dcab286/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerStandard.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerStandard.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerStandard.java
new file mode 100644
index 0000000..333077c
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerStandard.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.commitlog;
+
+import java.io.File;
+
+import org.apache.cassandra.db.Mutation;
+import org.apache.cassandra.io.util.FileUtils;
+
+public class CommitLogSegmentManagerStandard extends AbstractCommitLogSegmentManager
+{
+ public CommitLogSegmentManagerStandard(final CommitLog commitLog, String storageDirectory)
+ {
+ super(commitLog, storageDirectory);
+ }
+
+ public void discard(CommitLogSegment segment, boolean delete)
+ {
+ segment.close();
+ if (delete)
+ FileUtils.deleteWithConfirm(segment.logFile);
+ addSize(-segment.onDiskSize());
+ }
+
+ /**
+ * Initiates the shutdown process for the management thread.
+ */
+ public void shutdown()
+ {
+ run = false;
+ wakeManager();
+ }
+
+ /**
+ * Reserve space in the current segment for the provided mutation or, if there isn't space available,
+ * create a new segment. allocate() is blocking until allocation succeeds as it waits on a signal in advanceAllocatingFrom
+ *
+ * @param mutation mutation to allocate space for
+ * @param size total size of mutation (overhead + serialized size)
+ * @return the provided Allocation object
+ */
+ public CommitLogSegment.Allocation allocate(Mutation mutation, int size)
+ {
+ CommitLogSegment segment = allocatingFrom();
+
+ CommitLogSegment.Allocation alloc;
+ while ( null == (alloc = segment.allocate(mutation, size)) )
+ {
+ // failed to allocate, so move to a new segment with enough room
+ advanceAllocatingFrom(segment);
+ segment = allocatingFrom;
+ }
+
+ return alloc;
+ }
+
+ /**
+ * Simply delete untracked segment files w/standard, as it'll be flushed to sstables during recovery
+ *
+ * @param file segment file that is no longer in use.
+ */
+ void handleReplayedSegment(final File file)
+ {
+ // (don't decrease managed size, since this was never a "live" segment)
+ logger.trace("(Unopened) segment {} is no longer needed and will be deleted now", file);
+ FileUtils.deleteWithConfirm(file);
+ }
+
+ public CommitLogSegment createSegment()
+ {
+ return CommitLogSegment.createSegment(commitLog, this, () -> wakeManager());
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5dcab286/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentReader.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentReader.java
new file mode 100644
index 0000000..b547131
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentReader.java
@@ -0,0 +1,366 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.commitlog;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Iterator;
+import java.util.zip.CRC32;
+import javax.crypto.Cipher;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.AbstractIterator;
+
+import org.apache.cassandra.db.commitlog.EncryptedFileSegmentInputStream.ChunkProvider;
+import org.apache.cassandra.db.commitlog.CommitLogReadHandler.*;
+import org.apache.cassandra.io.FSReadError;
+import org.apache.cassandra.io.compress.ICompressor;
+import org.apache.cassandra.io.util.FileDataInput;
+import org.apache.cassandra.io.util.FileSegmentInputStream;
+import org.apache.cassandra.io.util.RandomAccessReader;
+import org.apache.cassandra.schema.CompressionParams;
+import org.apache.cassandra.security.EncryptionUtils;
+import org.apache.cassandra.security.EncryptionContext;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+import static org.apache.cassandra.db.commitlog.CommitLogSegment.SYNC_MARKER_SIZE;
+import static org.apache.cassandra.utils.FBUtilities.updateChecksumInt;
+
+/**
+ * Read each sync section of a commit log, iteratively.
+ */
+public class CommitLogSegmentReader implements Iterable<CommitLogSegmentReader.SyncSegment>
+{
+ private final CommitLogReadHandler handler;
+ private final CommitLogDescriptor descriptor;
+ private final RandomAccessReader reader;
+ private final Segmenter segmenter;
+ private final boolean tolerateTruncation;
+
+ /**
+ * ending position of the current sync section.
+ */
+ protected int end;
+
+ protected CommitLogSegmentReader(CommitLogReadHandler handler,
+ CommitLogDescriptor descriptor,
+ RandomAccessReader reader,
+ boolean tolerateTruncation)
+ {
+ this.handler = handler;
+ this.descriptor = descriptor;
+ this.reader = reader;
+ this.tolerateTruncation = tolerateTruncation;
+
+ end = (int) reader.getFilePointer();
+ if (descriptor.getEncryptionContext().isEnabled())
+ segmenter = new EncryptedSegmenter(descriptor, reader);
+ else if (descriptor.compression != null)
+ segmenter = new CompressedSegmenter(descriptor, reader);
+ else
+ segmenter = new NoOpSegmenter(reader);
+ }
+
+ public Iterator<SyncSegment> iterator()
+ {
+ return new SegmentIterator();
+ }
+
+ protected class SegmentIterator extends AbstractIterator<CommitLogSegmentReader.SyncSegment>
+ {
+ protected SyncSegment computeNext()
+ {
+ while (true)
+ {
+ try
+ {
+ final int currentStart = end;
+ end = readSyncMarker(descriptor, currentStart, reader);
+ if (end == -1)
+ {
+ return endOfData();
+ }
+ if (end > reader.length())
+ {
+ // the CRC was good (meaning it was good when it was written and still looks legit), but the file is truncated now.
+ // try to grab and use as much of the file as possible, which might be nothing if the end of the file truly is corrupt
+ end = (int) reader.length();
+ }
+ return segmenter.nextSegment(currentStart + SYNC_MARKER_SIZE, end);
+ }
+ catch(CommitLogSegmentReader.SegmentReadException e)
+ {
+ try
+ {
+ handler.handleUnrecoverableError(new CommitLogReadException(
+ e.getMessage(),
+ CommitLogReadErrorReason.UNRECOVERABLE_DESCRIPTOR_ERROR,
+ !e.invalidCrc && tolerateTruncation));
+ }
+ catch (IOException ioe)
+ {
+ throw new RuntimeException(ioe);
+ }
+ }
+ catch (IOException e)
+ {
+ try
+ {
+ boolean tolerateErrorsInSection = tolerateTruncation & segmenter.tolerateSegmentErrors(end, reader.length());
+ // if no exception is thrown, the while loop will continue
+ handler.handleUnrecoverableError(new CommitLogReadException(
+ e.getMessage(),
+ CommitLogReadErrorReason.UNRECOVERABLE_DESCRIPTOR_ERROR,
+ tolerateErrorsInSection));
+ }
+ catch (IOException ioe)
+ {
+ throw new RuntimeException(ioe);
+ }
+ }
+ }
+ }
+ }
+
+ private int readSyncMarker(CommitLogDescriptor descriptor, int offset, RandomAccessReader reader) throws IOException
+ {
+ if (offset > reader.length() - SYNC_MARKER_SIZE)
+ {
+ // There was no room in the segment to write a final header. No data could be present here.
+ return -1;
+ }
+ reader.seek(offset);
+ CRC32 crc = new CRC32();
+ updateChecksumInt(crc, (int) (descriptor.id & 0xFFFFFFFFL));
+ updateChecksumInt(crc, (int) (descriptor.id >>> 32));
+ updateChecksumInt(crc, (int) reader.getPosition());
+ final int end = reader.readInt();
+ long filecrc = reader.readInt() & 0xffffffffL;
+ if (crc.getValue() != filecrc)
+ {
+ if (end != 0 || filecrc != 0)
+ {
+ String msg = String.format("Encountered bad header at position %d of commit log %s, with invalid CRC. " +
+ "The end of segment marker should be zero.", offset, reader.getPath());
+ throw new SegmentReadException(msg, true);
+ }
+ return -1;
+ }
+ else if (end < offset || end > reader.length())
+ {
+ String msg = String.format("Encountered bad header at position %d of commit log %s, with bad position but valid CRC", offset, reader.getPath());
+ throw new SegmentReadException(msg, false);
+ }
+ return end;
+ }
+
+ public static class SegmentReadException extends IOException
+ {
+ public final boolean invalidCrc;
+
+ public SegmentReadException(String msg, boolean invalidCrc)
+ {
+ super(msg);
+ this.invalidCrc = invalidCrc;
+ }
+ }
+
+ public static class SyncSegment
+ {
+ /** the 'buffer' to replay commit log data from */
+ public final FileDataInput input;
+
+ /** offset in file where this section begins. */
+ public final int fileStartPosition;
+
+ /** offset in file where this section ends. */
+ public final int fileEndPosition;
+
+ /** the logical ending position of the buffer */
+ public final int endPosition;
+
+ public final boolean toleratesErrorsInSection;
+
+ public SyncSegment(FileDataInput input, int fileStartPosition, int fileEndPosition, int endPosition, boolean toleratesErrorsInSection)
+ {
+ this.input = input;
+ this.fileStartPosition = fileStartPosition;
+ this.fileEndPosition = fileEndPosition;
+ this.endPosition = endPosition;
+ this.toleratesErrorsInSection = toleratesErrorsInSection;
+ }
+ }
+
+ /**
+ * Derives the next section of the commit log to be replayed. Section boundaries are derived from the commit log sync markers.
+ */
+ interface Segmenter
+ {
+ /**
+ * Get the next section of the commit log to replay.
+ *
+ * @param startPosition the position in the file to begin reading at
+ * @param nextSectionStartPosition the file position of the beginning of the next section
+ * @return the buffer and it's logical end position
+ * @throws IOException
+ */
+ SyncSegment nextSegment(int startPosition, int nextSectionStartPosition) throws IOException;
+
+ /**
+ * Determine if we tolerate errors in the current segment.
+ */
+ default boolean tolerateSegmentErrors(int segmentEndPosition, long fileLength)
+ {
+ return segmentEndPosition >= fileLength || segmentEndPosition < 0;
+ }
+ }
+
+ static class NoOpSegmenter implements Segmenter
+ {
+ private final RandomAccessReader reader;
+
+ public NoOpSegmenter(RandomAccessReader reader)
+ {
+ this.reader = reader;
+ }
+
+ public SyncSegment nextSegment(int startPosition, int nextSectionStartPosition)
+ {
+ reader.seek(startPosition);
+ return new SyncSegment(reader, startPosition, nextSectionStartPosition, nextSectionStartPosition, true);
+ }
+
+ public boolean tolerateSegmentErrors(int end, long length)
+ {
+ return true;
+ }
+ }
+
+ static class CompressedSegmenter implements Segmenter
+ {
+ private final ICompressor compressor;
+ private final RandomAccessReader reader;
+ private byte[] compressedBuffer;
+ private byte[] uncompressedBuffer;
+ private long nextLogicalStart;
+
+ public CompressedSegmenter(CommitLogDescriptor desc, RandomAccessReader reader)
+ {
+ this(CompressionParams.createCompressor(desc.compression), reader);
+ }
+
+ public CompressedSegmenter(ICompressor compressor, RandomAccessReader reader)
+ {
+ this.compressor = compressor;
+ this.reader = reader;
+ compressedBuffer = new byte[0];
+ uncompressedBuffer = new byte[0];
+ nextLogicalStart = reader.getFilePointer();
+ }
+
+ public SyncSegment nextSegment(final int startPosition, final int nextSectionStartPosition) throws IOException
+ {
+ reader.seek(startPosition);
+ int uncompressedLength = reader.readInt();
+
+ int compressedLength = nextSectionStartPosition - (int)reader.getPosition();
+ if (compressedLength > compressedBuffer.length)
+ compressedBuffer = new byte[(int) (1.2 * compressedLength)];
+ reader.readFully(compressedBuffer, 0, compressedLength);
+
+ if (uncompressedLength > uncompressedBuffer.length)
+ uncompressedBuffer = new byte[(int) (1.2 * uncompressedLength)];
+ int count = compressor.uncompress(compressedBuffer, 0, compressedLength, uncompressedBuffer, 0);
+ nextLogicalStart += SYNC_MARKER_SIZE;
+ FileDataInput input = new FileSegmentInputStream(ByteBuffer.wrap(uncompressedBuffer, 0, count), reader.getPath(), nextLogicalStart);
+ nextLogicalStart += uncompressedLength;
+ return new SyncSegment(input, startPosition, nextSectionStartPosition, (int)nextLogicalStart, tolerateSegmentErrors(nextSectionStartPosition, reader.length()));
+ }
+ }
+
+ static class EncryptedSegmenter implements Segmenter
+ {
+ private final RandomAccessReader reader;
+ private final ICompressor compressor;
+ private final Cipher cipher;
+
+ /**
+ * the result of the decryption is written into this buffer.
+ */
+ private ByteBuffer decryptedBuffer;
+
+ /**
+ * the result of the decryption is written into this buffer.
+ */
+ private ByteBuffer uncompressedBuffer;
+
+ private final ChunkProvider chunkProvider;
+
+ private long currentSegmentEndPosition;
+ private long nextLogicalStart;
+
+ public EncryptedSegmenter(CommitLogDescriptor descriptor, RandomAccessReader reader)
+ {
+ this(reader, descriptor.getEncryptionContext());
+ }
+
+ @VisibleForTesting
+ EncryptedSegmenter(final RandomAccessReader reader, EncryptionContext encryptionContext)
+ {
+ this.reader = reader;
+ decryptedBuffer = ByteBuffer.allocate(0);
+ compressor = encryptionContext.getCompressor();
+ nextLogicalStart = reader.getFilePointer();
+
+ try
+ {
+ cipher = encryptionContext.getDecryptor();
+ }
+ catch (IOException ioe)
+ {
+ throw new FSReadError(ioe, reader.getPath());
+ }
+
+ chunkProvider = () -> {
+ if (reader.getFilePointer() >= currentSegmentEndPosition)
+ return ByteBufferUtil.EMPTY_BYTE_BUFFER;
+ try
+ {
+ decryptedBuffer = EncryptionUtils.decrypt(reader, decryptedBuffer, true, cipher);
+ uncompressedBuffer = EncryptionUtils.uncompress(decryptedBuffer, uncompressedBuffer, true, compressor);
+ return uncompressedBuffer;
+ }
+ catch (IOException e)
+ {
+ throw new FSReadError(e, reader.getPath());
+ }
+ };
+ }
+
+ public SyncSegment nextSegment(int startPosition, int nextSectionStartPosition) throws IOException
+ {
+ int totalPlainTextLength = reader.readInt();
+ currentSegmentEndPosition = nextSectionStartPosition - 1;
+
+ nextLogicalStart += SYNC_MARKER_SIZE;
+ FileDataInput input = new EncryptedFileSegmentInputStream(reader.getPath(), nextLogicalStart, 0, totalPlainTextLength, chunkProvider);
+ nextLogicalStart += totalPlainTextLength;
+ return new SyncSegment(input, startPosition, nextSectionStartPosition, (int)nextLogicalStart, tolerateSegmentErrors(nextSectionStartPosition, reader.length()));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5dcab286/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java b/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java
index 684fc2c..e44dfdf 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java
@@ -44,9 +44,9 @@ public class CompressedSegment extends FileDirectSegment
/**
* Constructs a new segment file.
*/
- CompressedSegment(CommitLog commitLog, Runnable onClose)
+ CompressedSegment(CommitLog commitLog, AbstractCommitLogSegmentManager manager, Runnable onClose)
{
- super(commitLog, onClose);
+ super(commitLog, manager, onClose);
this.compressor = commitLog.configuration.getCompressor();
}
@@ -57,7 +57,7 @@ public class CompressedSegment extends FileDirectSegment
ByteBuffer createBuffer(CommitLog commitLog)
{
- return createBuffer(commitLog.configuration.getCompressor().preferredBufferType());
+ return manager.getBufferPool().createBuffer(commitLog.configuration.getCompressor().preferredBufferType());
}
@Override
@@ -71,13 +71,13 @@ public class CompressedSegment extends FileDirectSegment
try
{
int neededBufferSize = compressor.initialCompressedBufferLength(length) + COMPRESSED_MARKER_SIZE;
- ByteBuffer compressedBuffer = reusableBufferHolder.get();
+ ByteBuffer compressedBuffer = manager.getBufferPool().getThreadLocalReusableBuffer();
if (compressor.preferredBufferType() != BufferType.typeOf(compressedBuffer) ||
compressedBuffer.capacity() < neededBufferSize)
{
FileUtils.clean(compressedBuffer);
compressedBuffer = allocate(neededBufferSize);
- reusableBufferHolder.set(compressedBuffer);
+ manager.getBufferPool().setThreadLocalReusableBuffer(compressedBuffer);
}
ByteBuffer inputBuffer = buffer.duplicate();
@@ -91,7 +91,7 @@ public class CompressedSegment extends FileDirectSegment
// Only one thread can be here at a given time.
// Protected by synchronization on CommitLogSegment.sync().
writeSyncMarker(compressedBuffer, 0, (int) channel.position(), (int) channel.position() + compressedBuffer.remaining());
- commitLog.allocator.addSize(compressedBuffer.limit());
+ manager.addSize(compressedBuffer.limit());
channel.write(compressedBuffer);
assert channel.position() - lastWrittenPos == compressedBuffer.limit();
lastWrittenPos = channel.position();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5dcab286/src/java/org/apache/cassandra/db/commitlog/EncryptedSegment.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/EncryptedSegment.java b/src/java/org/apache/cassandra/db/commitlog/EncryptedSegment.java
index c34a365..e13b20a 100644
--- a/src/java/org/apache/cassandra/db/commitlog/EncryptedSegment.java
+++ b/src/java/org/apache/cassandra/db/commitlog/EncryptedSegment.java
@@ -65,9 +65,9 @@ public class EncryptedSegment extends FileDirectSegment
private final EncryptionContext encryptionContext;
private final Cipher cipher;
- public EncryptedSegment(CommitLog commitLog, Runnable onClose)
+ public EncryptedSegment(CommitLog commitLog, AbstractCommitLogSegmentManager manager, Runnable onClose)
{
- super(commitLog, onClose);
+ super(commitLog, manager, onClose);
this.encryptionContext = commitLog.configuration.getEncryptionContext();
try
@@ -90,9 +90,9 @@ public class EncryptedSegment extends FileDirectSegment
ByteBuffer createBuffer(CommitLog commitLog)
{
- //Note: we want to keep the compression buffers on-heap as we need those bytes for encryption,
+ // Note: we want to keep the compression buffers on-heap as we need those bytes for encryption,
// and we want to avoid copying from off-heap (compression buffer) to on-heap encryption APIs
- return createBuffer(BufferType.ON_HEAP);
+ return manager.getBufferPool().createBuffer(BufferType.ON_HEAP);
}
void write(int startMarker, int nextMarker)
@@ -108,7 +108,7 @@ public class EncryptedSegment extends FileDirectSegment
{
ByteBuffer inputBuffer = buffer.duplicate();
inputBuffer.limit(contentStart + length).position(contentStart);
- ByteBuffer buffer = reusableBufferHolder.get();
+ ByteBuffer buffer = manager.getBufferPool().getThreadLocalReusableBuffer();
// save space for the sync marker at the beginning of this section
final long syncMarkerPosition = lastWrittenPos;
@@ -127,7 +127,7 @@ public class EncryptedSegment extends FileDirectSegment
buffer = EncryptionUtils.encryptAndWrite(buffer, channel, true, cipher);
contentStart += nextBlockSize;
- commitLog.allocator.addSize(buffer.limit() + ENCRYPTED_BLOCK_HEADER_SIZE);
+ manager.addSize(buffer.limit() + ENCRYPTED_BLOCK_HEADER_SIZE);
}
lastWrittenPos = channel.position();
@@ -138,15 +138,15 @@ public class EncryptedSegment extends FileDirectSegment
writeSyncMarker(buffer, 0, (int) syncMarkerPosition, (int) lastWrittenPos);
buffer.putInt(SYNC_MARKER_SIZE, length);
buffer.position(0).limit(ENCRYPTED_SECTION_HEADER_SIZE);
- commitLog.allocator.addSize(buffer.limit());
+ manager.addSize(buffer.limit());
channel.position(syncMarkerPosition);
channel.write(buffer);
SyncUtil.force(channel, true);
- if (reusableBufferHolder.get().capacity() < buffer.capacity())
- reusableBufferHolder.set(buffer);
+ if (manager.getBufferPool().getThreadLocalReusableBuffer().capacity() < buffer.capacity())
+ manager.getBufferPool().setThreadLocalReusableBuffer(buffer);
}
catch (Exception e)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5dcab286/src/java/org/apache/cassandra/db/commitlog/FileDirectSegment.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/FileDirectSegment.java b/src/java/org/apache/cassandra/db/commitlog/FileDirectSegment.java
index 50f9efd..d4160e4 100644
--- a/src/java/org/apache/cassandra/db/commitlog/FileDirectSegment.java
+++ b/src/java/org/apache/cassandra/db/commitlog/FileDirectSegment.java
@@ -19,15 +19,8 @@ package org.apache.cassandra.db.commitlog;
import java.io.IOException;
import java.nio.ByteBuffer;
-import java.util.Queue;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.atomic.AtomicInteger;
-import io.netty.util.concurrent.FastThreadLocal;
-import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.io.FSWriteError;
-import org.apache.cassandra.io.compress.BufferType;
-import org.apache.cassandra.io.util.FileUtils;
/**
* Writes to the backing commit log file only on sync, allowing transformations of the mutations,
@@ -35,45 +28,23 @@ import org.apache.cassandra.io.util.FileUtils;
*/
public abstract class FileDirectSegment extends CommitLogSegment
{
- protected static final FastThreadLocal<ByteBuffer> reusableBufferHolder = new FastThreadLocal<ByteBuffer>()
- {
- protected ByteBuffer initialValue()
- {
- return ByteBuffer.allocate(0);
- }
- };
-
- static Queue<ByteBuffer> bufferPool = new ConcurrentLinkedQueue<>();
-
- /**
- * Maximum number of buffers in the compression pool. The default value is 3, it should not be set lower than that
- * (one segment in compression, one written to, one in reserve); delays in compression may cause the log to use
- * more, depending on how soon the sync policy stops all writing threads.
- */
- static final int MAX_BUFFERPOOL_SIZE = DatabaseDescriptor.getCommitLogMaxCompressionBuffersInPool();
-
- /**
- * The number of buffers in use
- */
- private static AtomicInteger usedBuffers = new AtomicInteger(0);
-
volatile long lastWrittenPos = 0;
-
private final Runnable onClose;
- FileDirectSegment(CommitLog commitLog, Runnable onClose)
+ FileDirectSegment(CommitLog commitLog, AbstractCommitLogSegmentManager manager, Runnable onClose)
{
- super(commitLog);
+ super(commitLog, manager);
this.onClose = onClose;
}
+ @Override
void writeLogHeader()
{
super.writeLogHeader();
try
{
channel.write((ByteBuffer) buffer.duplicate().flip());
- commitLog.allocator.addSize(lastWrittenPos = buffer.position());
+ manager.addSize(lastWrittenPos = buffer.position());
}
catch (IOException e)
{
@@ -81,30 +52,12 @@ public abstract class FileDirectSegment extends CommitLogSegment
}
}
- ByteBuffer createBuffer(BufferType bufferType)
- {
- usedBuffers.incrementAndGet();
- ByteBuffer buf = bufferPool.poll();
- if (buf != null)
- {
- buf.clear();
- return buf;
- }
-
- return bufferType.allocate(DatabaseDescriptor.getCommitLogSegmentSize());
- }
-
@Override
protected void internalClose()
{
- usedBuffers.decrementAndGet();
-
try
{
- if (bufferPool.size() < MAX_BUFFERPOOL_SIZE)
- bufferPool.add(buffer);
- else
- FileUtils.clean(buffer);
+ manager.getBufferPool().releaseBuffer(buffer);
super.internalClose();
}
finally
@@ -112,20 +65,4 @@ public abstract class FileDirectSegment extends CommitLogSegment
onClose.run();
}
}
-
- static void shutdown()
- {
- bufferPool.clear();
- }
-
- /**
- * Checks if the number of buffers in use is greater or equals to the maximum number of buffers allowed in the pool.
- *
- * @return <code>true</code> if the number of buffers in use is greater or equals to the maximum number of buffers
- * allowed in the pool, <code>false</code> otherwise.
- */
- static boolean hasReachedPoolLimit()
- {
- return usedBuffers.get() >= MAX_BUFFERPOOL_SIZE;
- }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5dcab286/src/java/org/apache/cassandra/db/commitlog/MemoryMappedSegment.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/MemoryMappedSegment.java b/src/java/org/apache/cassandra/db/commitlog/MemoryMappedSegment.java
index 3fdf886..2bbd12d 100644
--- a/src/java/org/apache/cassandra/db/commitlog/MemoryMappedSegment.java
+++ b/src/java/org/apache/cassandra/db/commitlog/MemoryMappedSegment.java
@@ -41,9 +41,9 @@ public class MemoryMappedSegment extends CommitLogSegment
*
* @param commitLog the commit log it will be used with.
*/
- MemoryMappedSegment(CommitLog commitLog)
+ MemoryMappedSegment(CommitLog commitLog, AbstractCommitLogSegmentManager manager)
{
- super(commitLog);
+ super(commitLog, manager);
// mark the initial sync marker as uninitialised
int firstSync = buffer.position();
buffer.putInt(firstSync + 0, 0);
@@ -66,7 +66,7 @@ public class MemoryMappedSegment extends CommitLogSegment
{
throw new FSWriteError(e, logFile);
}
- commitLog.allocator.addSize(DatabaseDescriptor.getCommitLogSegmentSize());
+ manager.addSize(DatabaseDescriptor.getCommitLogSegmentSize());
return channel.map(FileChannel.MapMode.READ_WRITE, 0, DatabaseDescriptor.getCommitLogSegmentSize());
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5dcab286/src/java/org/apache/cassandra/db/commitlog/ReplayPosition.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/ReplayPosition.java b/src/java/org/apache/cassandra/db/commitlog/ReplayPosition.java
deleted file mode 100644
index 0b21763..0000000
--- a/src/java/org/apache/cassandra/db/commitlog/ReplayPosition.java
+++ /dev/null
@@ -1,178 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db.commitlog;
-
-import java.io.IOException;
-import java.util.Map;
-import java.util.NavigableMap;
-import java.util.TreeMap;
-
-import com.google.common.collect.Ordering;
-
-import org.apache.cassandra.db.TypeSizes;
-import org.apache.cassandra.io.ISerializer;
-import org.apache.cassandra.io.sstable.format.SSTableReader;
-import org.apache.cassandra.io.util.DataInputPlus;
-import org.apache.cassandra.io.util.DataOutputPlus;
-
-public class ReplayPosition implements Comparable<ReplayPosition>
-{
- public static final ReplayPositionSerializer serializer = new ReplayPositionSerializer();
-
- // NONE is used for SSTables that are streamed from other nodes and thus have no relationship
- // with our local commitlog. The values satisfy the criteria that
- // - no real commitlog segment will have the given id
- // - it will sort before any real replayposition, so it will be effectively ignored by getReplayPosition
- public static final ReplayPosition NONE = new ReplayPosition(-1, 0);
-
- public final long segment;
- public final int position;
-
- /**
- * A filter of known safe-to-discard commit log replay positions, based on
- * the range covered by on disk sstables and those prior to the most recent truncation record
- */
- public static class ReplayFilter
- {
- final NavigableMap<ReplayPosition, ReplayPosition> persisted = new TreeMap<>();
- public ReplayFilter(Iterable<SSTableReader> onDisk, ReplayPosition truncatedAt)
- {
- for (SSTableReader reader : onDisk)
- {
- ReplayPosition start = reader.getSSTableMetadata().commitLogLowerBound;
- ReplayPosition end = reader.getSSTableMetadata().commitLogUpperBound;
- add(persisted, start, end);
- }
- if (truncatedAt != null)
- add(persisted, ReplayPosition.NONE, truncatedAt);
- }
-
- private static void add(NavigableMap<ReplayPosition, ReplayPosition> ranges, ReplayPosition start, ReplayPosition end)
- {
- // extend ourselves to cover any ranges we overlap
- // record directly preceding our end may extend past us, so take the max of our end and its
- Map.Entry<ReplayPosition, ReplayPosition> extend = ranges.floorEntry(end);
- if (extend != null && extend.getValue().compareTo(end) > 0)
- end = extend.getValue();
-
- // record directly preceding our start may extend into us; if it does, we take it as our start
- extend = ranges.lowerEntry(start);
- if (extend != null && extend.getValue().compareTo(start) >= 0)
- start = extend.getKey();
-
- ranges.subMap(start, end).clear();
- ranges.put(start, end);
- }
-
- public boolean shouldReplay(ReplayPosition position)
- {
- // replay ranges are start exclusive, end inclusive
- Map.Entry<ReplayPosition, ReplayPosition> range = persisted.lowerEntry(position);
- return range == null || position.compareTo(range.getValue()) > 0;
- }
-
- public boolean isEmpty()
- {
- return persisted.isEmpty();
- }
- }
-
- public static ReplayPosition firstNotCovered(Iterable<ReplayFilter> ranges)
- {
- ReplayPosition min = null;
- for (ReplayFilter map : ranges)
- {
- ReplayPosition first = map.persisted.firstEntry().getValue();
- if (min == null)
- min = first;
- else
- min = Ordering.natural().min(min, first);
- }
- if (min == null)
- return NONE;
- return min;
- }
-
- public ReplayPosition(long segment, int position)
- {
- this.segment = segment;
- assert position >= 0;
- this.position = position;
- }
-
- public int compareTo(ReplayPosition that)
- {
- if (this.segment != that.segment)
- return Long.compare(this.segment, that.segment);
-
- return Integer.compare(this.position, that.position);
- }
-
- @Override
- public boolean equals(Object o)
- {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
-
- ReplayPosition that = (ReplayPosition) o;
-
- if (position != that.position) return false;
- return segment == that.segment;
- }
-
- @Override
- public int hashCode()
- {
- int result = (int) (segment ^ (segment >>> 32));
- result = 31 * result + position;
- return result;
- }
-
- @Override
- public String toString()
- {
- return "ReplayPosition(" +
- "segmentId=" + segment +
- ", position=" + position +
- ')';
- }
-
- public ReplayPosition clone()
- {
- return new ReplayPosition(segment, position);
- }
-
- public static class ReplayPositionSerializer implements ISerializer<ReplayPosition>
- {
- public void serialize(ReplayPosition rp, DataOutputPlus out) throws IOException
- {
- out.writeLong(rp.segment);
- out.writeInt(rp.position);
- }
-
- public ReplayPosition deserialize(DataInputPlus in) throws IOException
- {
- return new ReplayPosition(in.readLong(), in.readInt());
- }
-
- public long serializedSize(ReplayPosition rp)
- {
- return TypeSizes.sizeof(rp.segment) + TypeSizes.sizeof(rp.position);
- }
- }
-}