You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2013/12/02 20:09:25 UTC
[2/2] git commit: Multithreaded commitlog patch by Benedict Elliot
Smith; reviewed by jbellis for CASSANDRA-3578
Multithreaded commitlog
patch by Benedict Elliot Smith; reviewed by jbellis for CASSANDRA-3578
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/22e18f5a
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/22e18f5a
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/22e18f5a
Branch: refs/heads/trunk
Commit: 22e18f5a348a911f89deed9f9984950de451d28a
Parents: 679ec7e
Author: Jonathan Ellis <jb...@apache.org>
Authored: Mon Dec 2 13:07:28 2013 -0600
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Mon Dec 2 13:09:19 2013 -0600
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../org/apache/cassandra/config/CFMetaData.java | 11 +
.../org/apache/cassandra/config/Schema.java | 1 +
.../org/apache/cassandra/db/DefsTables.java | 7 +
.../AbstractCommitLogExecutorService.java | 55 --
.../db/commitlog/AbstractCommitLogService.java | 153 ++++++
.../BatchCommitLogExecutorService.java | 176 ------
.../db/commitlog/BatchCommitLogService.java | 36 ++
.../cassandra/db/commitlog/CommitLog.java | 278 +++++-----
.../db/commitlog/CommitLogAllocator.java | 369 -------------
.../db/commitlog/CommitLogArchiver.java | 2 +-
.../db/commitlog/CommitLogReplayer.java | 351 +++++++-----
.../db/commitlog/CommitLogSegment.java | 487 ++++++++++++-----
.../db/commitlog/CommitLogSegmentManager.java | 533 +++++++++++++++++++
.../db/commitlog/ICommitLogExecutorService.java | 51 --
.../PeriodicCommitLogExecutorService.java | 135 -----
.../db/commitlog/PeriodicCommitLogService.java | 57 ++
.../cassandra/io/util/RandomAccessReader.java | 5 +
.../cassandra/metrics/CommitLogMetrics.java | 10 +-
.../cassandra/service/StorageService.java | 6 +-
.../cassandra/utils/AtomicLongArrayUpdater.java | 74 +++
.../apache/cassandra/utils/StatusLogger.java | 6 -
.../org/apache/cassandra/utils/WaitQueue.java | 264 +++++++++
.../cassandra/db/commitlog/ComitLogStress.java | 72 +++
.../cassandra/concurrent/WaitQueueTest.java | 137 +++++
.../org/apache/cassandra/db/CommitLogTest.java | 9 +-
26 files changed, 2059 insertions(+), 1227 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/22e18f5a/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 6efbfbc..bb3a98a 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.1
+ * Multithreaded commitlog (CASSANDRA-3578)
* allocate fixed index summary memory pool and resample cold index summaries
to use less memory (CASSANDRA-5519)
* Removed multithreaded compaction (CASSANDRA-6142)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/22e18f5a/src/java/org/apache/cassandra/config/CFMetaData.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/CFMetaData.java b/src/java/org/apache/cassandra/config/CFMetaData.java
index d04dc25..0a33c20 100644
--- a/src/java/org/apache/cassandra/config/CFMetaData.java
+++ b/src/java/org/apache/cassandra/config/CFMetaData.java
@@ -405,6 +405,7 @@ public final class CFMetaData
private volatile boolean populateIoCacheOnFlush = DEFAULT_POPULATE_IO_CACHE_ON_FLUSH;
private volatile Map<ColumnIdentifier, Long> droppedColumns = new HashMap<>();
private volatile Map<String, TriggerDefinition> triggers = new HashMap<>();
+ private volatile boolean isPurged = false;
/*
* All CQL3 columns definition are stored in the columnMetadata map.
@@ -1546,6 +1547,16 @@ public final class CFMetaData
return rm;
}
+ public boolean isPurged()
+ {
+ return isPurged;
+ }
+
+ void markPurged()
+ {
+ isPurged = true;
+ }
+
public void toSchema(RowMutation rm, long timestamp)
{
toSchemaNoColumnsNoTriggers(rm, timestamp);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/22e18f5a/src/java/org/apache/cassandra/config/Schema.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/Schema.java b/src/java/org/apache/cassandra/config/Schema.java
index 146b82b..a38c097 100644
--- a/src/java/org/apache/cassandra/config/Schema.java
+++ b/src/java/org/apache/cassandra/config/Schema.java
@@ -348,6 +348,7 @@ public class Schema
public void purge(CFMetaData cfm)
{
cfIdMap.remove(Pair.create(cfm.ksName, cfm.cfName));
+ cfm.markPurged();
}
/* Version control */
http://git-wip-us.apache.org/repos/asf/cassandra/blob/22e18f5a/src/java/org/apache/cassandra/db/DefsTables.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DefsTables.java b/src/java/org/apache/cassandra/db/DefsTables.java
index 3cd5156..828981e 100644
--- a/src/java/org/apache/cassandra/db/DefsTables.java
+++ b/src/java/org/apache/cassandra/db/DefsTables.java
@@ -24,6 +24,7 @@ import java.util.*;
import com.google.common.collect.Iterables;
import com.google.common.collect.MapDifference;
import com.google.common.collect.Maps;
+import org.apache.cassandra.db.commitlog.CommitLog;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -461,6 +462,10 @@ public class DefsTables
// remove the keyspace from the static instances.
Keyspace.clear(ksm.name);
Schema.instance.clearKeyspaceDefinition(ksm);
+
+ // force a new segment in the CL
+ CommitLog.instance.forceRecycleAllSegments();
+
if (!StorageService.instance.isClientMode())
{
MigrationManager.instance.notifyDropKeyspace(ksm);
@@ -482,6 +487,8 @@ public class DefsTables
CompactionManager.instance.interruptCompactionFor(Arrays.asList(cfm), true);
+ CommitLog.instance.forceRecycleAllSegments();
+
if (!StorageService.instance.isClientMode())
{
if (DatabaseDescriptor.isAutoSnapshot())
http://git-wip-us.apache.org/repos/asf/cassandra/blob/22e18f5a/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogExecutorService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogExecutorService.java b/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogExecutorService.java
deleted file mode 100644
index ec43114..0000000
--- a/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogExecutorService.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db.commitlog;
-
-import java.util.List;
-import java.util.concurrent.AbstractExecutorService;
-import java.util.concurrent.TimeUnit;
-
-public abstract class AbstractCommitLogExecutorService extends AbstractExecutorService implements ICommitLogExecutorService
-{
- protected volatile long completedTaskCount = 0;
-
- /**
- * Get the number of completed tasks
- */
- public long getCompletedTasks()
- {
- return completedTaskCount;
- }
-
- public boolean isTerminated()
- {
- throw new UnsupportedOperationException();
- }
-
- public boolean isShutdown()
- {
- throw new UnsupportedOperationException();
- }
-
- public List<Runnable> shutdownNow()
- {
- throw new UnsupportedOperationException();
- }
-
- public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException
- {
- throw new UnsupportedOperationException();
- }
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/22e18f5a/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java b/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
new file mode 100644
index 0000000..2f9b236
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.commitlog;
+
+import org.apache.cassandra.utils.WaitQueue;
+import org.slf4j.*;
+
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.apache.cassandra.db.commitlog.CommitLogSegment.Allocation;
+
+public abstract class AbstractCommitLogService
+{
+ private final Thread thread;
+ private volatile boolean shutdown = false;
+
+ // all Allocations written before this time will be synced
+ protected volatile long lastSyncedAt = System.currentTimeMillis();
+
+ // counts of total written, and pending, log messages
+ private final AtomicLong written = new AtomicLong(0);
+ protected final AtomicLong pending = new AtomicLong(0);
+
+ // signal that writers can wait on to be notified of a completed sync
+ protected final WaitQueue syncComplete = new WaitQueue();
+ private final Semaphore haveWork = new Semaphore(1);
+
+ private static final Logger logger = LoggerFactory.getLogger(AbstractCommitLogService.class);
+
+ /**
+ * CommitLogService provides a fsync service for Allocations, fulfilling either the
+ * Batch or Periodic contract.
+ *
+ * Subclasses may be notified when a sync finishes by using the syncComplete WaitQueue.
+ */
+ AbstractCommitLogService(final CommitLog commitLog, final String name, final long pollIntervalMillis)
+ {
+ if (pollIntervalMillis < 1)
+ throw new IllegalArgumentException(String.format("Commit log flush interval must be positive: %dms", pollIntervalMillis));
+
+ Runnable runnable = new Runnable()
+ {
+ public void run()
+ {
+ boolean run = true;
+ while (run)
+ {
+ try
+ {
+ // always run once after shutdown signalled
+ run = !shutdown;
+
+ // sync and signal
+ long syncStarted = System.currentTimeMillis();
+ commitLog.sync(shutdown);
+ lastSyncedAt = syncStarted;
+ syncComplete.signalAll();
+
+ // sleep any time we have left before the next one is due
+ long sleep = syncStarted + pollIntervalMillis - System.currentTimeMillis();
+ if (sleep < 0)
+ {
+ logger.warn(String.format("Commit log sync took longer than sync interval (by %.2fs), indicating it is a bottleneck", sleep / -1000d));
+ // don't sleep, as we probably have work to do
+ continue;
+ }
+ try
+ {
+ haveWork.tryAcquire(sleep, TimeUnit.MILLISECONDS);
+ }
+ catch (InterruptedException e)
+ {
+ throw new AssertionError();
+ }
+ }
+ catch (Throwable t)
+ {
+ logger.error("Commit log sync failed", t);
+ // sleep for full poll-interval after an error, so we don't spam the log file
+ try
+ {
+ haveWork.tryAcquire(pollIntervalMillis, TimeUnit.MILLISECONDS);
+ }
+ catch (InterruptedException e)
+ {
+ throw new AssertionError();
+ }
+ }
+ }
+ }
+ };
+
+ thread = new Thread(runnable, name);
+ thread.start();
+ }
+
+ /**
+ * Block for @param alloc to be sync'd as necessary, and handle bookkeeping
+ */
+ public void finishWriteFor(Allocation alloc)
+ {
+ maybeWaitForSync(alloc);
+ written.incrementAndGet();
+ }
+
+ protected abstract void maybeWaitForSync(Allocation alloc);
+
+ /**
+ * Sync immediately, but don't block for the sync to cmplete
+ */
+ public void requestExtraSync()
+ {
+ haveWork.release();
+ }
+
+ public void shutdown()
+ {
+ shutdown = true;
+ haveWork.release(1);
+ }
+
+ public void awaitTermination() throws InterruptedException
+ {
+ thread.join();
+ }
+
+ public long getCompletedTasks()
+ {
+ return written.incrementAndGet();
+ }
+
+ public long getPendingTasks()
+ {
+ return pending.incrementAndGet();
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/cassandra/blob/22e18f5a/src/java/org/apache/cassandra/db/commitlog/BatchCommitLogExecutorService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/BatchCommitLogExecutorService.java b/src/java/org/apache/cassandra/db/commitlog/BatchCommitLogExecutorService.java
deleted file mode 100644
index d985f1f..0000000
--- a/src/java/org/apache/cassandra/db/commitlog/BatchCommitLogExecutorService.java
+++ /dev/null
@@ -1,176 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db.commitlog;
-
-import java.util.ArrayList;
-import java.util.concurrent.*;
-
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.WrappedRunnable;
-
-class BatchCommitLogExecutorService extends AbstractCommitLogExecutorService
-{
- private final BlockingQueue<CheaterFutureTask> queue;
- private final Thread appendingThread;
- private volatile boolean run = true;
-
- public BatchCommitLogExecutorService()
- {
- this(DatabaseDescriptor.getConcurrentWriters());
- }
-
- public BatchCommitLogExecutorService(int queueSize)
- {
- queue = new LinkedBlockingQueue<CheaterFutureTask>(queueSize);
- Runnable runnable = new WrappedRunnable()
- {
- public void runMayThrow() throws Exception
- {
- while (run)
- {
- if (processWithSyncBatch())
- completedTaskCount++;
- }
- }
- };
- appendingThread = new Thread(runnable, "COMMIT-LOG-WRITER");
- appendingThread.start();
-
- }
-
- public long getPendingTasks()
- {
- return queue.size();
- }
-
- private final ArrayList<CheaterFutureTask> incompleteTasks = new ArrayList<CheaterFutureTask>();
- private final ArrayList taskValues = new ArrayList(); // TODO not sure how to generify this
- private boolean processWithSyncBatch() throws Exception
- {
- CheaterFutureTask firstTask = queue.poll(100, TimeUnit.MILLISECONDS);
- if (firstTask == null)
- return false;
- if (!(firstTask.getRawCallable() instanceof CommitLog.LogRecordAdder))
- {
- firstTask.run();
- return true;
- }
-
- // attempt to do a bunch of LogRecordAdder ops before syncing
- // (this is a little clunky since there is no blocking peek method,
- // so we have to break it into firstTask / extra tasks)
- incompleteTasks.clear();
- taskValues.clear();
- long start = System.nanoTime();
- long window = (long)(1000000 * DatabaseDescriptor.getCommitLogSyncBatchWindow());
-
- // it doesn't seem worth bothering future-izing the exception
- // since if a commitlog op throws, we're probably screwed anyway
- incompleteTasks.add(firstTask);
- taskValues.add(firstTask.getRawCallable().call());
- while (!queue.isEmpty()
- && queue.peek().getRawCallable() instanceof CommitLog.LogRecordAdder
- && System.nanoTime() - start < window)
- {
- CheaterFutureTask task = queue.remove();
- incompleteTasks.add(task);
- taskValues.add(task.getRawCallable().call());
- }
-
- // now sync and set the tasks' values (which allows thread calling get() to proceed)
- CommitLog.instance.sync();
- for (int i = 0; i < incompleteTasks.size(); i++)
- {
- incompleteTasks.get(i).set(taskValues.get(i));
- }
- return true;
- }
-
-
- @Override
- protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value)
- {
- return newTaskFor(Executors.callable(runnable, value));
- }
-
- @Override
- protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable)
- {
- return new CheaterFutureTask(callable);
- }
-
- public void execute(Runnable command)
- {
- try
- {
- queue.put((CheaterFutureTask)command);
- }
- catch (InterruptedException e)
- {
- throw new RuntimeException(e);
- }
- }
-
- public void add(CommitLog.LogRecordAdder adder)
- {
- FBUtilities.waitOnFuture(submit((Callable)adder));
- }
-
- public void shutdown()
- {
- new Thread(new WrappedRunnable()
- {
- public void runMayThrow() throws InterruptedException
- {
- while (!queue.isEmpty())
- Thread.sleep(100);
- run = false;
- appendingThread.join();
- }
- }, "Commitlog Shutdown").start();
- }
-
- public void awaitTermination() throws InterruptedException
- {
- appendingThread.join();
- }
-
- private static class CheaterFutureTask<V> extends FutureTask<V>
- {
- private final Callable rawCallable;
-
- public CheaterFutureTask(Callable<V> callable)
- {
- super(callable);
- rawCallable = callable;
- }
-
- public Callable getRawCallable()
- {
- return rawCallable;
- }
-
- @Override
- public void set(V v)
- {
- super.set(v);
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/22e18f5a/src/java/org/apache/cassandra/db/commitlog/BatchCommitLogService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/BatchCommitLogService.java b/src/java/org/apache/cassandra/db/commitlog/BatchCommitLogService.java
new file mode 100644
index 0000000..65bee40
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/commitlog/BatchCommitLogService.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.commitlog;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+
+class BatchCommitLogService extends AbstractCommitLogService
+{
+ public BatchCommitLogService(CommitLog commitLog)
+ {
+ super(commitLog, "COMMIT-LOG-WRITER", (int) DatabaseDescriptor.getCommitLogSyncBatchWindow());
+ }
+
+ protected void maybeWaitForSync(CommitLogSegment.Allocation alloc)
+ {
+ // wait until record has been safely persisted to disk
+ pending.incrementAndGet();
+ alloc.awaitDiskSync();
+ pending.decrementAndGet();
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/22e18f5a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
index 706df37..7240aee 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
@@ -19,23 +19,30 @@ package org.apache.cassandra.db.commitlog;
import java.io.*;
import java.lang.management.ManagementFactory;
+import java.nio.ByteBuffer;
import java.util.*;
-import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
+
import javax.management.MBeanServer;
import javax.management.ObjectName;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
import org.apache.cassandra.config.Config;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.*;
import org.apache.cassandra.io.FSWriteError;
+import org.apache.cassandra.io.util.ByteBufferOutputStream;
+import org.apache.cassandra.io.util.ChecksummedOutputStream;
import org.apache.cassandra.metrics.CommitLogMetrics;
import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.PureJavaCrc32;
+
+import com.google.common.util.concurrent.Futures;
+
+import static org.apache.cassandra.db.commitlog.CommitLogSegment.*;
/*
* Commit Log tracks every write operation into the system. The aim of the commit log is to be able to
@@ -47,29 +54,24 @@ public class CommitLog implements CommitLogMBean
public static final CommitLog instance = new CommitLog();
- private final ICommitLogExecutorService executor;
-
- public final CommitLogAllocator allocator;
+ // we only permit records HALF the size of a commit log, to ensure we don't spin allocating many mostly
+ // empty segments when writing large records
+ private static final long MAX_MUTATION_SIZE = DatabaseDescriptor.getCommitLogSegmentSize() >> 1;
+ public final CommitLogSegmentManager allocator;
public final CommitLogArchiver archiver = new CommitLogArchiver();
-
- public static final int END_OF_SEGMENT_MARKER = 0; // this is written out at the end of a segment
- public static final int END_OF_SEGMENT_MARKER_SIZE = 4; // number of bytes of ^^^
-
- public CommitLogSegment activeSegment;
-
private final CommitLogMetrics metrics;
+ final AbstractCommitLogService executor;
private CommitLog()
{
DatabaseDescriptor.createAllDirectories();
- allocator = new CommitLogAllocator();
- activateNextSegment();
+ allocator = new CommitLogSegmentManager();
executor = DatabaseDescriptor.getCommitLogSync() == Config.CommitLogSync.batch
- ? new BatchCommitLogExecutorService()
- : new PeriodicCommitLogExecutorService(this);
+ ? new BatchCommitLogService(this)
+ : new PeriodicCommitLogService(this);
MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
try
@@ -86,15 +88,6 @@ public class CommitLog implements CommitLogMBean
}
/**
- * FOR TESTING PURPOSES. See CommitLogAllocator.
- */
- public void resetUnsafe()
- {
- allocator.resetUnsafe();
- activateNextSegment();
- }
-
- /**
* Perform recovery on commit logs located in the directory specified by the config file.
*
* @return the number of mutations replayed
@@ -121,7 +114,7 @@ public class CommitLog implements CommitLogMBean
}
else
{
- Arrays.sort(files, new CommitLogSegment.CommitLogSegmentFileComparator());
+ Arrays.sort(files, new CommitLogSegmentFileComparator());
logger.info("Replaying {}", StringUtils.join(files, ", "));
replayed = recover(files);
logger.info("Log replay complete, {} replayed mutations", replayed);
@@ -157,38 +150,84 @@ public class CommitLog implements CommitLogMBean
/**
* @return a Future representing a ReplayPosition such that when it is ready,
- * all commitlog tasks enqueued prior to the getContext call will be complete (i.e., appended to the log)
+ * all Allocations created prior to the getContext call will be written to the log
*/
public Future<ReplayPosition> getContext()
{
- Callable<ReplayPosition> task = new Callable<ReplayPosition>()
+ return Futures.immediateFuture(allocator.allocatingFrom().getContext());
+ }
+
+ /**
+ * Flushes all dirty CFs, waiting for them to free and recycle any segments they were retaining
+ */
+ public void forceRecycleAllSegments()
+ {
+ allocator.forceRecycleAll();
+ }
+
+ /**
+ * Forces a disk flush on the commit log files that need it. Blocking.
+ */
+ public void sync(boolean syncAllSegments)
+ {
+ CommitLogSegment current = allocator.allocatingFrom();
+ for (CommitLogSegment segment : allocator.getActiveSegments())
{
- public ReplayPosition call()
- {
- return activeSegment.getContext();
- }
- };
- return executor.submit(task);
+ if (!syncAllSegments && segment.id > current.id)
+ return;
+ segment.sync();
+ }
}
/**
- * Used by tests.
- *
- * @return the number of active segments (segments with unflushed data in them)
+ * Preempts the CLExecutor, telling to to sync immediately
*/
- public int activeSegments()
+ public void requestExtraSync()
{
- return allocator.getActiveSegments().size();
+ executor.requestExtraSync();
}
/**
* Add a RowMutation to the commit log.
*
- * @param rm the RowMutation to add to the log
+ * @param rowMutation the RowMutation to add to the log
*/
- public void add(RowMutation rm)
+ public void add(RowMutation rowMutation)
{
- executor.add(new LogRecordAdder(rm));
+ long size = RowMutation.serializer.serializedSize(rowMutation, MessagingService.current_version);
+
+ long totalSize = size + ENTRY_OVERHEAD_SIZE;
+ if (totalSize > MAX_MUTATION_SIZE)
+ {
+ logger.warn("Skipping commitlog append of extremely large mutation ({} bytes)", totalSize);
+ return;
+ }
+
+ Allocation alloc = allocator.allocate(rowMutation, (int) totalSize, new Allocation());
+ try
+ {
+ PureJavaCrc32 checksum = new PureJavaCrc32();
+ final ByteBuffer buffer = alloc.getBuffer();
+ DataOutputStream dos = new DataOutputStream(new ChecksummedOutputStream(new ByteBufferOutputStream(buffer), checksum));
+
+ // checksummed length
+ dos.writeInt((int) size);
+ buffer.putLong(checksum.getValue());
+
+ // checksummed mutation
+ RowMutation.serializer.serialize(rowMutation, dos, MessagingService.current_version);
+ buffer.putLong(checksum.getValue());
+ }
+ catch (IOException e)
+ {
+ throw new FSWriteError(e, alloc.getSegment().getPath());
+ }
+ finally
+ {
+ alloc.markWritten();
+ }
+
+ executor.finishWriteFor(alloc);
}
/**
@@ -200,104 +239,58 @@ public class CommitLog implements CommitLogMBean
*/
public void discardCompletedSegments(final UUID cfId, final ReplayPosition context)
{
- Callable task = new Callable()
+ logger.debug("discard completed log segments for {}, column family {}", context, cfId);
+
+ // Go thru the active segment files, which are ordered oldest to newest, marking the
+ // flushed CF as clean, until we reach the segment file containing the ReplayPosition passed
+ // in the arguments. Any segments that become unused after they are marked clean will be
+ // recycled or discarded.
+ for (Iterator<CommitLogSegment> iter = allocator.getActiveSegments().iterator(); iter.hasNext();)
{
- public Object call()
+ CommitLogSegment segment = iter.next();
+ segment.markClean(cfId, context);
+
+ if (segment.isUnused())
{
- logger.debug("discard completed log segments for {}, column family {}", context, cfId);
-
- // Go thru the active segment files, which are ordered oldest to newest, marking the
- // flushed CF as clean, until we reach the segment file containing the ReplayPosition passed
- // in the arguments. Any segments that become unused after they are marked clean will be
- // recycled or discarded.
- for (Iterator<CommitLogSegment> iter = allocator.getActiveSegments().iterator(); iter.hasNext();)
- {
- CommitLogSegment segment = iter.next();
- segment.markClean(cfId, context);
-
- // If the segment is no longer needed, and we have another spare segment in the hopper
- // (to keep the last segment from getting discarded), pursue either recycling or deleting
- // this segment file.
- if (iter.hasNext())
- {
- if (segment.isUnused())
- {
- logger.debug("Commit log segment {} is unused", segment);
- allocator.recycleSegment(segment);
- }
- else
- {
- logger.debug("Not safe to delete commit log segment {}; dirty is {}",
- segment, segment.dirtyString());
- }
- }
- else
- {
- logger.debug("Not deleting active commitlog segment {}", segment);
- }
-
- // Don't mark or try to delete any newer segments once we've reached the one containing the
- // position of the flush.
- if (segment.contains(context))
- break;
- }
-
- return null;
+ logger.debug("Commit log segment {} is unused", segment);
+ allocator.recycleSegment(segment);
+ }
+ else
+ {
+ logger.debug("Not safe to delete{} commit log segment {}; dirty is {}",
+ (iter.hasNext() ? "" : " active"), segment, segment.dirtyString());
}
- };
-
- FBUtilities.waitOnFuture(executor.submit(task));
- }
- /**
- * Forces a disk flush on the commit log files that need it.
- */
- public void sync()
- {
- for (CommitLogSegment segment : allocator.getActiveSegments())
- {
- segment.sync();
+ // Don't mark or try to delete any newer segments once we've reached the one containing the
+ // position of the flush.
+ if (segment.contains(context))
+ break;
}
}
- /**
- * @return the number of tasks completed by the commit log executor
- */
+ @Override
public long getCompletedTasks()
{
return metrics.completedTasks.value();
}
- /**
- * @return the depth of pending commit log executor queue
- */
+ @Override
public long getPendingTasks()
{
return metrics.pendingTasks.value();
}
/**
- * @return the total size occupied by commitlo segments expressed in bytes. (used by MBean)
+ * @return the total size occupied by commitlog segments expressed in bytes. (used by MBean)
*/
public long getTotalCommitlogSize()
{
return metrics.totalCommitLogSize.value();
}
- /**
- * Fetches a new segment file from the allocator and activates it.
- *
- * @return the newly activated segment
- */
- private void activateNextSegment()
- {
- activeSegment = allocator.fetchSegment();
- logger.debug("Active segment is now {}", activeSegment);
- }
-
public List<String> getActiveSegmentNames()
{
- List<String> segmentNames = new ArrayList<String>();
+ List<String> segmentNames = new ArrayList<>();
for (CommitLogSegment segment : allocator.getActiveSegments())
segmentNames.add(segment.getName());
return segmentNames;
@@ -305,7 +298,7 @@ public class CommitLog implements CommitLogMBean
public List<String> getArchivingSegmentNames()
{
- return new ArrayList<String>(archiver.archivePending.keySet());
+ return new ArrayList<>(archiver.archivePending.keySet());
}
/**
@@ -319,48 +312,21 @@ public class CommitLog implements CommitLogMBean
allocator.awaitTermination();
}
- // TODO this should be a Runnable since it doesn't actually return anything, but it's difficult to do that
- // without breaking the fragile CheaterFutureTask in BatchCLES.
- class LogRecordAdder implements Callable, Runnable
+ /**
+ * FOR TESTING PURPOSES. See CommitLogAllocator.
+ */
+ public void resetUnsafe()
{
- final RowMutation rowMutation;
-
- LogRecordAdder(RowMutation rm)
- {
- this.rowMutation = rm;
- }
-
- public void run()
- {
- long totalSize = RowMutation.serializer.serializedSize(rowMutation, MessagingService.current_version) + CommitLogSegment.ENTRY_OVERHEAD_SIZE;
- if (totalSize > DatabaseDescriptor.getCommitLogSegmentSize())
- {
- logger.warn("Skipping commitlog append of extremely large mutation ({} bytes)", totalSize);
- return;
- }
-
- if (!activeSegment.hasCapacityFor(totalSize))
- {
- CommitLogSegment oldSegment = activeSegment;
- activateNextSegment();
- // Now we can run the user defined command just before switching to the new commit log.
- // (Do this here instead of in the recycle call so we can get a head start on the archive.)
- archiver.maybeArchive(oldSegment.getPath(), oldSegment.getName());
- }
- try
- {
- activeSegment.write(rowMutation);
- }
- catch (IOException e)
- {
- throw new FSWriteError(e, activeSegment.getPath());
- }
- }
+ allocator.resetUnsafe();
+ }
- public Object call()
- {
- run();
- return null;
- }
+ /**
+ * Used by tests.
+ *
+ * @return the number of active segments (segments with unflushed data in them)
+ */
+ public int activeSegments()
+ {
+ return allocator.getActiveSegments().size();
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/22e18f5a/src/java/org/apache/cassandra/db/commitlog/CommitLogAllocator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogAllocator.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogAllocator.java
deleted file mode 100644
index 706cf9e..0000000
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogAllocator.java
+++ /dev/null
@@ -1,369 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db.commitlog;
-
-import java.io.File;
-
-import java.util.Collection;
-import java.util.Collections;
-import java.util.UUID;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.atomic.AtomicLong;
-
-import com.google.common.collect.Iterables;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.config.Schema;
-import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.db.Keyspace;
-import org.apache.cassandra.io.util.FileUtils;
-import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.utils.Pair;
-import org.apache.cassandra.utils.WrappedRunnable;
-
-/**
- * Performs the pre-allocation of commit log segments in a background thread. All the
- * public methods are thread safe.
- */
-public class CommitLogAllocator
-{
- static final Logger logger = LoggerFactory.getLogger(CommitLogAllocator.class);
-
- /** The (theoretical) max milliseconds between loop runs to perform janitorial tasks */
- public final static int TICK_CYCLE_TIME = 100;
-
- /** Segments that are ready to be used */
- private final BlockingQueue<CommitLogSegment> availableSegments = new LinkedBlockingQueue<CommitLogSegment>();
-
- /** Allocations to be run by the thread */
- private final BlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>();
-
- /** Active segments, containing unflushed data */
- private final ConcurrentLinkedQueue<CommitLogSegment> activeSegments = new ConcurrentLinkedQueue<CommitLogSegment>();
-
- /**
- * Tracks commitlog size, in multiples of the segment size. We need to do this so we can "promise" size
- * adjustments ahead of actually adding/freeing segments on disk, so that the "evict oldest segment" logic
- * can see the effect of recycling segments immediately (even though they're really happening asynchronously
- * on the allocator thread, which will take a ms or two).
- */
- private final AtomicLong size = new AtomicLong();
-
- /**
- * New segment creation is initially disabled because we'll typically get some "free" segments
- * recycled after log replay.
- */
- private volatile boolean createReserveSegments = false;
-
- private final Thread allocationThread;
- private volatile boolean run = true;
-
- public CommitLogAllocator()
- {
- // The run loop for the allocation thread
- Runnable runnable = new WrappedRunnable()
- {
- public void runMayThrow() throws Exception
- {
- while (run)
- {
- Runnable r = queue.poll(TICK_CYCLE_TIME, TimeUnit.MILLISECONDS);
-
- if (r != null)
- {
- r.run();
- }
- else
- {
- // no job, so we're clear to check to see if we're out of segments
- // and ready a new one if needed. has the effect of ensuring there's
- // almost always a segment available when it's needed.
- if (availableSegments.isEmpty() && (activeSegments.isEmpty() || createReserveSegments))
- {
- logger.debug("No segments in reserve; creating a fresh one");
- createFreshSegment();
- }
- }
- }
- }
- };
-
- allocationThread = new Thread(runnable, "COMMIT-LOG-ALLOCATOR");
- allocationThread.start();
- }
-
- /**
- * Fetches an empty segment file.
- *
- * @return the next writable segment
- */
- public CommitLogSegment fetchSegment()
- {
- CommitLogSegment next;
- try
- {
- next = availableSegments.take();
- }
- catch (InterruptedException e)
- {
- throw new AssertionError(e);
- }
-
- assert !activeSegments.contains(next);
- activeSegments.add(next);
- if (isCapExceeded())
- flushOldestKeyspaces();
-
- return next;
- }
-
- /**
- * Indicates that a segment is no longer in use and that it should be recycled.
- *
- * @param segment segment that is no longer in use
- */
- public void recycleSegment(final CommitLogSegment segment)
- {
- activeSegments.remove(segment);
- if (!CommitLog.instance.archiver.maybeWaitForArchiving(segment.getName()))
- {
- // if archiving (command) was not successful then leave the file alone. don't delete or recycle.
- discardSegment(segment, false);
- return;
- }
- if (isCapExceeded())
- {
- discardSegment(segment, true);
- return;
- }
-
- logger.debug("Recycling {}", segment);
- queue.add(new Runnable()
- {
- public void run()
- {
- CommitLogSegment recycled = segment.recycle();
- internalAddReadySegment(recycled);
- }
- });
- }
-
- /**
- * Differs from the above because it can work on any file instead of just existing
- * commit log segments managed by this allocator.
- *
- * @param file segment file that is no longer in use.
- */
- public void recycleSegment(final File file)
- {
- // check against SEGMENT_SIZE avoids recycling odd-sized or empty segments from old C* versions and unit tests
- if (isCapExceeded() || file.length() != DatabaseDescriptor.getCommitLogSegmentSize()
- || CommitLogDescriptor.fromFileName(file.getName()).getMessagingVersion() != MessagingService.current_version)
- {
- // (don't decrease managed size, since this was never a "live" segment)
- logger.debug("(Unopened) segment {} is no longer needed and will be deleted now", file);
- FileUtils.deleteWithConfirm(file);
- return;
- }
-
- logger.debug("Recycling {}", file);
- // this wasn't previously a live segment, so add it to the managed size when we make it live
- size.addAndGet(DatabaseDescriptor.getCommitLogSegmentSize());
- queue.add(new Runnable()
- {
- public void run()
- {
- CommitLogSegment segment = new CommitLogSegment(file.getPath());
- internalAddReadySegment(segment);
- }
- });
- }
-
- /**
- * Indicates that a segment file should be deleted.
- *
- * @param segment segment to be discarded
- */
- private void discardSegment(final CommitLogSegment segment, final boolean deleteFile)
- {
- logger.debug("Segment {} is no longer active and will be deleted {}", segment, deleteFile ? "now" : "by the archive script");
- size.addAndGet(-DatabaseDescriptor.getCommitLogSegmentSize());
-
- queue.add(new Runnable()
- {
- public void run()
- {
- segment.discard(deleteFile);
- }
- });
- }
-
- /**
- * @return the space (in bytes) used by all segment files.
- */
- public long bytesUsed()
- {
- return size.get();
- }
-
- /**
- * @param name the filename to check
- * @return true if file is managed by this allocator.
- */
- public boolean manages(String name)
- {
- for (CommitLogSegment segment : Iterables.concat(activeSegments, availableSegments))
- if (segment.getName().equals(name))
- return true;
-
- return false;
- }
-
- /**
- * Creates and readies a brand new segment.
- *
- * @return the newly minted segment
- */
- private CommitLogSegment createFreshSegment()
- {
- size.addAndGet(DatabaseDescriptor.getCommitLogSegmentSize());
- return internalAddReadySegment(CommitLogSegment.freshSegment());
- }
-
- /**
- * Adds a segment to our internal tracking list and makes it ready for consumption.
- *
- * @param segment the segment to add
- * @return the newly added segment
- */
- private CommitLogSegment internalAddReadySegment(CommitLogSegment segment)
- {
- assert !activeSegments.contains(segment);
- assert !availableSegments.contains(segment);
- availableSegments.add(segment);
- return segment;
- }
-
- /**
- * Check to see if the speculative current size exceeds the cap.
- *
- * @return true if cap is exceeded
- */
- private boolean isCapExceeded()
- {
- long currentSize = size.get();
- logger.debug("Total active commitlog segment space used is {}", currentSize);
- return currentSize > DatabaseDescriptor.getTotalCommitlogSpaceInMB() * 1024 * 1024;
- }
-
- /**
- * Throws a flag that enables the behavior of keeping at least one spare segment
- * available at all times.
- */
- public void enableReserveSegmentCreation()
- {
- createReserveSegments = true;
- }
-
- /**
- * Force a flush on all dirty CFs represented in the oldest commitlog segment
- */
- private void flushOldestKeyspaces()
- {
- CommitLogSegment oldestSegment = activeSegments.peek();
-
- if (oldestSegment != null)
- {
- for (UUID dirtyCFId : oldestSegment.getDirtyCFIDs())
- {
- Pair<String,String> pair = Schema.instance.getCF(dirtyCFId);
- if (pair == null)
- {
- // even though we remove the schema entry before a final flush when dropping a CF,
- // it's still possible for a writer to race and finish his append after the flush.
- logger.debug("Marking clean CF {} that doesn't exist anymore", dirtyCFId);
- oldestSegment.markClean(dirtyCFId, oldestSegment.getContext());
- }
- else
- {
- String keypace = pair.left;
- final ColumnFamilyStore cfs = Keyspace.open(keypace).getColumnFamilyStore(dirtyCFId);
- // flush shouldn't run on the commitlog executor, since it acquires Table.switchLock,
- // which may already be held by a thread waiting for the CL executor (via getContext),
- // causing deadlock
- Runnable runnable = new Runnable()
- {
- public void run()
- {
- cfs.forceFlush();
- }
- };
- StorageService.optionalTasks.execute(runnable);
- }
- }
- }
- }
-
- /**
- * Resets all the segments, for testing purposes. DO NOT USE THIS OUTSIDE OF TESTS.
- */
- public void resetUnsafe()
- {
- logger.debug("Closing and clearing existing commit log segments...");
-
- while (!queue.isEmpty())
- Thread.yield();
-
- for (CommitLogSegment segment : Iterables.concat(activeSegments, availableSegments))
- segment.close();
-
- activeSegments.clear();
- availableSegments.clear();
- }
-
- /**
- * Initiates the shutdown process for the allocator thread.
- */
- public void shutdown()
- {
- run = false;
- }
-
- /**
- * Returns when the allocator thread terminates.
- */
- public void awaitTermination() throws InterruptedException
- {
- allocationThread.join();
- }
-
- /**
- * @return a read-only collection of the active commit log segments
- */
- public Collection<CommitLogSegment> getActiveSegments()
- {
- return Collections.unmodifiableCollection(activeSegments);
- }
-}
-
http://git-wip-us.apache.org/repos/asf/cassandra/blob/22e18f5a/src/java/org/apache/cassandra/db/commitlog/CommitLogArchiver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogArchiver.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogArchiver.java
index f020182..1385ea4 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogArchiver.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogArchiver.java
@@ -137,7 +137,7 @@ public class CommitLogArchiver
{
if (e.getCause() instanceof IOException)
{
- logger.info("Looks like the archiving of file {} failed earlier, cassandra is going to ignore this segment for now.", name);
+ logger.error("Looks like the archiving of file {} failed earlier, cassandra is going to ignore this segment for now.", name);
return false;
}
throw new RuntimeException(e);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/22e18f5a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
index ae091bf..c42ba9b 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
@@ -48,6 +48,7 @@ public class CommitLogReplayer
{
private static final Logger logger = LoggerFactory.getLogger(CommitLogReplayer.class);
private static final int MAX_OUTSTANDING_REPLAY_COUNT = 1024;
+ private static final int LEGACY_END_OF_SEGMENT_MARKER = 0;
private final Set<Keyspace> keyspacesRecovered;
private final List<Future<?>> futures;
@@ -114,6 +115,53 @@ public class CommitLogReplayer
return replayedCount.get();
}
+ private int readHeader(long segmentId, int offset, RandomAccessReader reader) throws IOException
+ {
+ if (offset > reader.length() - CommitLogSegment.SYNC_MARKER_SIZE)
+ {
+ if (offset != reader.length() && offset != Integer.MAX_VALUE)
+ logger.warn("Encountered bad header at position {} of Commit log {}; not enough room for a header");
+ // cannot possibly be a header here. if we're == length(), assume it's a correctly written final segment
+ return -1;
+ }
+ reader.seek(offset);
+ PureJavaCrc32 crc = new PureJavaCrc32();
+ crc.update((int) (segmentId & 0xFFFFFFFFL));
+ crc.update((int) (segmentId >>> 32));
+ crc.update((int) reader.getPosition());
+ int end = reader.readInt();
+ long filecrc = reader.readLong();
+ if (crc.getValue() != filecrc)
+ {
+ if (end != 0 || filecrc != 0)
+ {
+ logger.warn("Encountered bad header at position {} of commit log {}, with invalid CRC. The end of segment marker should be zero.", offset, reader.getPath());
+ }
+ return -1;
+ }
+ else if (end < offset || end > reader.length())
+ {
+ logger.warn("Encountered bad header at position {} of commit log {}, with bad position but valid CRC", offset, reader.getPath());
+ return -1;
+ }
+ return end;
+ }
+
+ private int getStartOffset(long segmentId, int version, File file)
+ {
+ if (globalPosition.segment < segmentId)
+ {
+ if (version >= CommitLogDescriptor.VERSION_21)
+ return CommitLogSegment.SYNC_MARKER_SIZE;
+ else
+ return 0;
+ }
+ else if (globalPosition.segment == segmentId)
+ return globalPosition.position;
+ else
+ return -1;
+ }
+
private abstract static class ReplayFilter
{
public abstract Iterable<ColumnFamily> filter(RowMutation rm);
@@ -181,182 +229,199 @@ public class CommitLogReplayer
final ReplayFilter replayFilter = ReplayFilter.create();
logger.info("Replaying {}", file.getPath());
CommitLogDescriptor desc = CommitLogDescriptor.fromFileName(file.getName());
- final long segment = desc.id;
+ final long segmentId = desc.id;
int version = desc.getMessagingVersion();
RandomAccessReader reader = RandomAccessReader.open(new File(file.getAbsolutePath()));
+
try
{
assert reader.length() <= Integer.MAX_VALUE;
- int replayPosition;
- if (globalPosition.segment < segment)
- {
- replayPosition = 0;
- }
- else if (globalPosition.segment == segment)
- {
- replayPosition = globalPosition.position;
- }
- else
+ int offset = getStartOffset(segmentId, version, file);
+ if (offset < 0)
{
logger.debug("skipping replay of fully-flushed {}", file);
return;
}
- if (logger.isDebugEnabled())
- logger.debug("Replaying {} starting at {}", file, replayPosition);
- reader.seek(replayPosition);
-
- /* read the logs populate RowMutation and apply */
- while (!reader.isEOF())
+ int prevEnd = 0;
+ main: while (true)
{
- if (logger.isDebugEnabled())
- logger.debug("Reading mutation at {}", reader.getFilePointer());
- long claimedCRC32;
- int serializedSize;
- try
+ int end = prevEnd;
+ if (version < CommitLogDescriptor.VERSION_21)
+ end = Integer.MAX_VALUE;
+ else
{
- // any of the reads may hit EOF
- serializedSize = reader.readInt();
- if (serializedSize == CommitLog.END_OF_SEGMENT_MARKER)
- {
- logger.debug("Encountered end of segment marker at {}", reader.getFilePointer());
- break;
- }
-
- // RowMutation must be at LEAST 10 bytes:
- // 3 each for a non-empty Keyspace and Key (including the
- // 2-byte length from writeUTF/writeWithShortLength) and 4 bytes for column count.
- // This prevents CRC by being fooled by special-case garbage in the file; see CASSANDRA-2128
- if (serializedSize < 10)
- break;
-
- long claimedSizeChecksum = reader.readLong();
- checksum.reset();
- if (version < CommitLogDescriptor.VERSION_20)
- checksum.update(serializedSize);
- else
- FBUtilities.updateChecksumInt(checksum, serializedSize);
-
- if (checksum.getValue() != claimedSizeChecksum)
- break; // entry wasn't synced correctly/fully. that's
- // ok.
-
- if (serializedSize > buffer.length)
- buffer = new byte[(int) (1.2 * serializedSize)];
- reader.readFully(buffer, 0, serializedSize);
- claimedCRC32 = reader.readLong();
- }
- catch (EOFException eof)
- {
- break; // last CL entry didn't get completely written. that's ok.
+ do { end = readHeader(segmentId, end, reader); }
+ while (end < offset && end > prevEnd);
}
- checksum.update(buffer, 0, serializedSize);
- if (claimedCRC32 != checksum.getValue())
- {
- // this entry must not have been fsynced. probably the rest is bad too,
- // but just in case there is no harm in trying them (since we still read on an entry boundary)
- continue;
- }
+ if (end < prevEnd)
+ break;
- /* deserialize the commit log entry */
- FastByteArrayInputStream bufIn = new FastByteArrayInputStream(buffer, 0, serializedSize);
- final RowMutation rm;
- try
- {
- // assuming version here. We've gone to lengths to make sure what gets written to the CL is in
- // the current version. so do make sure the CL is drained prior to upgrading a node.
- rm = RowMutation.serializer.deserialize(new DataInputStream(bufIn), version, ColumnSerializer.Flag.LOCAL);
- // doublecheck that what we read is [still] valid for the current schema
- for (ColumnFamily cf : rm.getColumnFamilies())
- for (Column cell : cf)
- cf.getComparator().validate(cell.name());
- }
- catch (UnknownColumnFamilyException ex)
- {
- if (ex.cfId == null)
- continue;
- AtomicInteger i = invalidMutations.get(ex.cfId);
- if (i == null)
- {
- i = new AtomicInteger(1);
- invalidMutations.put(ex.cfId, i);
- }
- else
- i.incrementAndGet();
- continue;
- }
- catch (Throwable t)
+ if (logger.isDebugEnabled())
+ logger.debug("Replaying {} between {} and {}", file, offset, prevEnd);
+
+ reader.seek(offset);
+
+ /* read the logs populate RowMutation and apply */
+ while (reader.getPosition() < end && !reader.isEOF())
{
- File f = File.createTempFile("mutation", "dat");
- DataOutputStream out = new DataOutputStream(new FileOutputStream(f));
+ if (logger.isDebugEnabled())
+ logger.debug("Reading mutation at {}", reader.getFilePointer());
+
+ long claimedCRC32;
+ int serializedSize;
try
{
- out.write(buffer, 0, serializedSize);
+ // any of the reads may hit EOF
+ serializedSize = reader.readInt();
+ if (serializedSize == LEGACY_END_OF_SEGMENT_MARKER)
+ {
+ logger.debug("Encountered end of segment marker at {}", reader.getFilePointer());
+ break main;
+ }
+
+ // RowMutation must be at LEAST 10 bytes:
+ // 3 each for a non-empty Keyspace and Key (including the
+ // 2-byte length from writeUTF/writeWithShortLength) and 4 bytes for column count.
+ // This prevents CRC by being fooled by special-case garbage in the file; see CASSANDRA-2128
+ if (serializedSize < 10)
+ break main;
+
+ long claimedSizeChecksum = reader.readLong();
+ checksum.reset();
+ if (version < CommitLogDescriptor.VERSION_20)
+ checksum.update(serializedSize);
+ else
+ FBUtilities.updateChecksumInt(checksum, serializedSize);
+
+ if (checksum.getValue() != claimedSizeChecksum)
+ break main; // entry wasn't synced correctly/fully. that's
+ // ok.
+
+ if (serializedSize > buffer.length)
+ buffer = new byte[(int) (1.2 * serializedSize)];
+ reader.readFully(buffer, 0, serializedSize);
+ claimedCRC32 = reader.readLong();
}
- finally
+ catch (EOFException eof)
{
- out.close();
+ break main; // last CL entry didn't get completely written. that's ok.
}
- String st = String.format("Unexpected error deserializing mutation; saved to %s and ignored. This may be caused by replaying a mutation against a table with the same name but incompatible schema. Exception follows: ",
- f.getAbsolutePath());
- logger.error(st, t);
- continue;
- }
- if (logger.isDebugEnabled())
- logger.debug("replaying mutation for {}.{}: {}", rm.getKeyspaceName(), ByteBufferUtil.bytesToHex(rm.key()), "{" + StringUtils.join(rm.getColumnFamilies().iterator(), ", ") + "}");
+ checksum.update(buffer, 0, serializedSize);
+ if (claimedCRC32 != checksum.getValue())
+ {
+ // this entry must not have been fsynced. probably the rest is bad too,
+ // but just in case there is no harm in trying them (since we still read on an entry boundary)
+ continue;
+ }
- final long entryLocation = reader.getFilePointer();
- Runnable runnable = new WrappedRunnable()
- {
- public void runMayThrow() throws IOException
+ /* deserialize the commit log entry */
+ FastByteArrayInputStream bufIn = new FastByteArrayInputStream(buffer, 0, serializedSize);
+ final RowMutation rm;
+ try
{
- if (Schema.instance.getKSMetaData(rm.getKeyspaceName()) == null)
- return;
- if (pointInTimeExceeded(rm))
- return;
-
- final Keyspace keyspace = Keyspace.open(rm.getKeyspaceName());
-
- // Rebuild the row mutation, omitting column families that
- // a) the user has requested that we ignore,
- // b) have already been flushed,
- // or c) are part of a cf that was dropped.
- // Keep in mind that the cf.name() is suspect. do every thing based on the cfid instead.
- RowMutation newRm = null;
- for (ColumnFamily columnFamily : replayFilter.filter(rm))
+ // assuming version here. We've gone to lengths to make sure what gets written to the CL is in
+ // the current version. so do make sure the CL is drained prior to upgrading a node.
+ rm = RowMutation.serializer.deserialize(new DataInputStream(bufIn), version, ColumnSerializer.Flag.LOCAL);
+ // doublecheck that what we read is [still] valid for the current schema
+ for (ColumnFamily cf : rm.getColumnFamilies())
+ for (Column cell : cf)
+ cf.getComparator().validate(cell.name());
+ }
+ catch (UnknownColumnFamilyException ex)
+ {
+ if (ex.cfId == null)
+ continue;
+ AtomicInteger i = invalidMutations.get(ex.cfId);
+ if (i == null)
{
- if (Schema.instance.getCF(columnFamily.id()) == null)
- continue; // dropped
+ i = new AtomicInteger(1);
+ invalidMutations.put(ex.cfId, i);
+ }
+ else
+ i.incrementAndGet();
+ continue;
+ }
+ catch (Throwable t)
+ {
+ File f = File.createTempFile("mutation", "dat");
+ DataOutputStream out = new DataOutputStream(new FileOutputStream(f));
+ try
+ {
+ out.write(buffer, 0, serializedSize);
+ }
+ finally
+ {
+ out.close();
+ }
+ String st = String.format("Unexpected error deserializing mutation; saved to %s and ignored. This may be caused by replaying a mutation against a table with the same name but incompatible schema. Exception follows: ",
+ f.getAbsolutePath());
+ logger.error(st, t);
+ continue;
+ }
- ReplayPosition rp = cfPositions.get(columnFamily.id());
+ if (logger.isDebugEnabled())
+ logger.debug("replaying mutation for {}.{}: {}", rm.getKeyspaceName(), ByteBufferUtil.bytesToHex(rm.key()), "{" + StringUtils.join(rm.getColumnFamilies().iterator(), ", ") + "}");
- // replay if current segment is newer than last flushed one or,
- // if it is the last known segment, if we are after the replay position
- if (segment > rp.segment || (segment == rp.segment && entryLocation > rp.position))
+ final long entryLocation = reader.getFilePointer();
+ Runnable runnable = new WrappedRunnable()
+ {
+ public void runMayThrow() throws IOException
+ {
+ if (Schema.instance.getKSMetaData(rm.getKeyspaceName()) == null)
+ return;
+ if (pointInTimeExceeded(rm))
+ return;
+
+ final Keyspace keyspace = Keyspace.open(rm.getKeyspaceName());
+
+ // Rebuild the row mutation, omitting column families that
+ // a) the user has requested that we ignore,
+ // b) have already been flushed,
+ // or c) are part of a cf that was dropped.
+ // Keep in mind that the cf.name() is suspect. do every thing based on the cfid instead.
+ RowMutation newRm = null;
+ for (ColumnFamily columnFamily : replayFilter.filter(rm))
{
- if (newRm == null)
- newRm = new RowMutation(rm.getKeyspaceName(), rm.key());
- newRm.add(columnFamily);
- replayedCount.incrementAndGet();
+ if (Schema.instance.getCF(columnFamily.id()) == null)
+ continue; // dropped
+
+ ReplayPosition rp = cfPositions.get(columnFamily.id());
+
+ // replay if current segment is newer than last flushed one or,
+ // if it is the last known segment, if we are after the replay position
+ if (segmentId > rp.segment || (segmentId == rp.segment && entryLocation > rp.position))
+ {
+ if (newRm == null)
+ newRm = new RowMutation(rm.getKeyspaceName(), rm.key());
+ newRm.add(columnFamily);
+ replayedCount.incrementAndGet();
+ }
+ }
+ if (newRm != null)
+ {
+ assert !newRm.isEmpty();
+ Keyspace.open(newRm.getKeyspaceName()).apply(newRm, false);
+ keyspacesRecovered.add(keyspace);
}
}
- if (newRm != null)
- {
- assert !newRm.isEmpty();
- Keyspace.open(newRm.getKeyspaceName()).apply(newRm, false);
- keyspacesRecovered.add(keyspace);
- }
+ };
+ futures.add(StageManager.getStage(Stage.MUTATION).submit(runnable));
+ if (futures.size() > MAX_OUTSTANDING_REPLAY_COUNT)
+ {
+ FBUtilities.waitOnFutures(futures);
+ futures.clear();
}
- };
- futures.add(StageManager.getStage(Stage.MUTATION).submit(runnable));
- if (futures.size() > MAX_OUTSTANDING_REPLAY_COUNT)
- {
- FBUtilities.waitOnFutures(futures);
- futures.clear();
}
+
+ if (version < CommitLogDescriptor.VERSION_21)
+ break;
+
+ offset = end + CommitLogSegment.SYNC_MARKER_SIZE;
+ prevEnd = end;
}
}
finally