You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jm...@apache.org on 2016/06/16 13:55:40 UTC
[4/5] cassandra git commit: Add Change Data Capture
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5dcab286/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
index 4a660ca..b1f48b2 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
@@ -22,34 +22,35 @@ import java.lang.management.ManagementFactory;
import java.nio.ByteBuffer;
import java.util.*;
import java.util.zip.CRC32;
-
import javax.management.MBeanServer;
import javax.management.ObjectName;
import com.google.common.annotations.VisibleForTesting;
-
+import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.commons.lang3.StringUtils;
-
import org.apache.cassandra.config.Config;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.ParameterizedClass;
import org.apache.cassandra.db.*;
+import org.apache.cassandra.exceptions.WriteTimeoutException;
import org.apache.cassandra.io.FSWriteError;
-import org.apache.cassandra.schema.CompressionParams;
import org.apache.cassandra.io.compress.ICompressor;
import org.apache.cassandra.io.util.BufferedDataOutputStreamPlus;
import org.apache.cassandra.io.util.DataOutputBufferFixed;
+import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.metrics.CommitLogMetrics;
import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.schema.CompressionParams;
import org.apache.cassandra.security.EncryptionContext;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.JVMStabilityInspector;
-import static org.apache.cassandra.db.commitlog.CommitLogSegment.*;
+import static org.apache.cassandra.db.commitlog.CommitLogSegment.Allocation;
+import static org.apache.cassandra.db.commitlog.CommitLogSegment.CommitLogSegmentFileComparator;
+import static org.apache.cassandra.db.commitlog.CommitLogSegment.ENTRY_OVERHEAD_SIZE;
import static org.apache.cassandra.utils.FBUtilities.updateChecksum;
import static org.apache.cassandra.utils.FBUtilities.updateChecksumInt;
@@ -65,19 +66,19 @@ public class CommitLog implements CommitLogMBean
// we only permit records HALF the size of a commit log, to ensure we don't spin allocating many mostly
// empty segments when writing large records
- private final long MAX_MUTATION_SIZE = DatabaseDescriptor.getMaxMutationSize();
+ final long MAX_MUTATION_SIZE = DatabaseDescriptor.getMaxMutationSize();
+
+ final public AbstractCommitLogSegmentManager segmentManager;
- public final CommitLogSegmentManager allocator;
public final CommitLogArchiver archiver;
final CommitLogMetrics metrics;
final AbstractCommitLogService executor;
volatile Configuration configuration;
- final public String location;
private static CommitLog construct()
{
- CommitLog log = new CommitLog(DatabaseDescriptor.getCommitLogLocation(), CommitLogArchiver.construct());
+ CommitLog log = new CommitLog(CommitLogArchiver.construct());
MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
try
@@ -92,9 +93,8 @@ public class CommitLog implements CommitLogMBean
}
@VisibleForTesting
- CommitLog(String location, CommitLogArchiver archiver)
+ CommitLog(CommitLogArchiver archiver)
{
- this.location = location;
this.configuration = new Configuration(DatabaseDescriptor.getCommitLogCompression(),
DatabaseDescriptor.getEncryptionContext());
DatabaseDescriptor.createAllDirectories();
@@ -106,16 +106,17 @@ public class CommitLog implements CommitLogMBean
? new BatchCommitLogService(this)
: new PeriodicCommitLogService(this);
- allocator = new CommitLogSegmentManager(this);
-
+ segmentManager = DatabaseDescriptor.isCDCEnabled()
+ ? new CommitLogSegmentManagerCDC(this, DatabaseDescriptor.getCommitLogLocation())
+ : new CommitLogSegmentManagerStandard(this, DatabaseDescriptor.getCommitLogLocation());
// register metrics
- metrics.attach(executor, allocator);
+ metrics.attach(executor, segmentManager);
}
CommitLog start()
{
executor.start();
- allocator.start();
+ segmentManager.start();
return this;
}
@@ -123,11 +124,12 @@ public class CommitLog implements CommitLogMBean
* Perform recovery on commit logs located in the directory specified by the config file.
*
* @return the number of mutations replayed
+ * @throws IOException
*/
- public int recover() throws IOException
+ public int recoverSegmentsOnDisk() throws IOException
{
// If createReserveSegments is already flipped, the CLSM is running and recovery has already taken place.
- if (allocator.createReserveSegments)
+ if (segmentManager.createReserveSegments)
return 0;
FilenameFilter unmanagedFilesFilter = new FilenameFilter()
@@ -136,14 +138,13 @@ public class CommitLog implements CommitLogMBean
{
// we used to try to avoid instantiating commitlog (thus creating an empty segment ready for writes)
// until after recover was finished. this turns out to be fragile; it is less error-prone to go
- // ahead and allow writes before recover(), and just skip active segments when we do.
+ // ahead and allow writes before recover, and just skip active segments when we do.
return CommitLogDescriptor.isValid(name) && CommitLogSegment.shouldReplay(name);
}
};
- // submit all existing files in the commit log dir for archiving prior to recovery - CASSANDRA-6904
- File[] listFiles = new File(DatabaseDescriptor.getCommitLogLocation()).listFiles(unmanagedFilesFilter);
- for (File file : listFiles)
+ // submit all files for this segment manager for archiving prior to recovery - CASSANDRA-6904
+ for (File file : new File(segmentManager.storageDirectory).listFiles(unmanagedFilesFilter))
{
archiver.maybeArchive(file.getPath(), file.getName());
archiver.maybeWaitForArchiving(file.getName());
@@ -152,7 +153,7 @@ public class CommitLog implements CommitLogMBean
assert archiver.archivePending.isEmpty() : "Not all commit log archive tasks were completed before restore";
archiver.maybeRestoreArchive();
- File[] files = new File(DatabaseDescriptor.getCommitLogLocation()).listFiles(unmanagedFilesFilter);
+ File[] files = new File(segmentManager.storageDirectory).listFiles(unmanagedFilesFilter);
int replayed = 0;
if (files.length == 0)
{
@@ -162,14 +163,14 @@ public class CommitLog implements CommitLogMBean
{
Arrays.sort(files, new CommitLogSegmentFileComparator());
logger.info("Replaying {}", StringUtils.join(files, ", "));
- replayed = recover(files);
+ replayed = recoverFiles(files);
logger.info("Log replay complete, {} replayed mutations", replayed);
for (File f : files)
- allocator.recycleSegment(f);
+ segmentManager.handleReplayedSegment(f);
}
- allocator.enableReserveSegmentCreation();
+ segmentManager.enableReserveSegmentCreation();
return replayed;
}
@@ -179,30 +180,35 @@ public class CommitLog implements CommitLogMBean
* @param clogs the list of commit log files to replay
* @return the number of mutations replayed
*/
- public int recover(File... clogs) throws IOException
+ public int recoverFiles(File... clogs) throws IOException
+ {
+ CommitLogReplayer replayer = CommitLogReplayer.construct(this);
+ replayer.replayFiles(clogs);
+ return replayer.blockForWrites();
+ }
+
+ public void recoverPath(String path) throws IOException
{
- CommitLogReplayer recovery = CommitLogReplayer.construct(this);
- recovery.recover(clogs);
- return recovery.blockForWrites();
+ CommitLogReplayer replayer = CommitLogReplayer.construct(this);
+ replayer.replayPath(new File(path), false);
+ replayer.blockForWrites();
}
/**
- * Perform recovery on a single commit log.
+ * Perform recovery on a single commit log. Kept w/sub-optimal name due to coupling w/MBean / JMX
*/
public void recover(String path) throws IOException
{
- CommitLogReplayer recovery = CommitLogReplayer.construct(this);
- recovery.recover(new File(path), false);
- recovery.blockForWrites();
+ recoverPath(path);
}
/**
- * @return a ReplayPosition which, if {@code >= one} returned from add(), implies add() was started
+ * @return a CommitLogPosition which, if {@code >= one} returned from add(), implies add() was started
* (but not necessarily finished) prior to this call
*/
- public ReplayPosition getContext()
+ public CommitLogPosition getCurrentPosition()
{
- return allocator.allocatingFrom().getContext();
+ return segmentManager.getCurrentPosition();
}
/**
@@ -210,7 +216,7 @@ public class CommitLog implements CommitLogMBean
*/
public void forceRecycleAllSegments(Iterable<UUID> droppedCfs)
{
- allocator.forceRecycleAll(droppedCfs);
+ segmentManager.forceRecycleAll(droppedCfs);
}
/**
@@ -218,21 +224,15 @@ public class CommitLog implements CommitLogMBean
*/
public void forceRecycleAllSegments()
{
- allocator.forceRecycleAll(Collections.<UUID>emptyList());
+ segmentManager.forceRecycleAll(Collections.<UUID>emptyList());
}
/**
* Forces a disk flush on the commit log files that need it. Blocking.
*/
- public void sync(boolean syncAllSegments)
+ public void sync(boolean syncAllSegments) throws IOException
{
- CommitLogSegment current = allocator.allocatingFrom();
- for (CommitLogSegment segment : allocator.getActiveSegments())
- {
- if (!syncAllSegments && segment.id > current.id)
- return;
- segment.sync();
- }
+ segmentManager.sync(syncAllSegments);
}
/**
@@ -244,11 +244,12 @@ public class CommitLog implements CommitLogMBean
}
/**
- * Add a Mutation to the commit log.
+ * Add a Mutation to the commit log. If CDC is enabled, this can fail.
*
* @param mutation the Mutation to add to the log
+ * @throws WriteTimeoutException
*/
- public ReplayPosition add(Mutation mutation)
+ public CommitLogPosition add(Mutation mutation) throws WriteTimeoutException
{
assert mutation != null;
@@ -262,7 +263,8 @@ public class CommitLog implements CommitLogMBean
FBUtilities.prettyPrintMemory(MAX_MUTATION_SIZE)));
}
- Allocation alloc = allocator.allocate(mutation, (int) totalSize);
+ Allocation alloc = segmentManager.allocate(mutation, totalSize);
+
CRC32 checksum = new CRC32();
final ByteBuffer buffer = alloc.getBuffer();
try (BufferedDataOutputStreamPlus dos = new DataOutputBufferFixed(buffer))
@@ -287,7 +289,7 @@ public class CommitLog implements CommitLogMBean
}
executor.finishWriteFor(alloc);
- return alloc.getReplayPosition();
+ return alloc.getCommitLogPosition();
}
/**
@@ -295,17 +297,17 @@ public class CommitLog implements CommitLogMBean
* given. Discards any commit log segments that are no longer used.
*
* @param cfId the column family ID that was flushed
- * @param context the replay position of the flush
+ * @param context the commit log segment position of the flush
*/
- public void discardCompletedSegments(final UUID cfId, final ReplayPosition context)
+ public void discardCompletedSegments(final UUID cfId, final CommitLogPosition context)
{
logger.trace("discard completed log segments for {}, table {}", context, cfId);
// Go thru the active segment files, which are ordered oldest to newest, marking the
- // flushed CF as clean, until we reach the segment file containing the ReplayPosition passed
+ // flushed CF as clean, until we reach the segment file containing the CommitLogPosition passed
// in the arguments. Any segments that become unused after they are marked clean will be
// recycled or discarded.
- for (Iterator<CommitLogSegment> iter = allocator.getActiveSegments().iterator(); iter.hasNext();)
+ for (Iterator<CommitLogSegment> iter = segmentManager.getActiveSegments().iterator(); iter.hasNext();)
{
CommitLogSegment segment = iter.next();
segment.markClean(cfId, context);
@@ -313,7 +315,7 @@ public class CommitLog implements CommitLogMBean
if (segment.isUnused())
{
logger.trace("Commit log segment {} is unused", segment);
- allocator.recycleSegment(segment);
+ segmentManager.recycleSegment(segment);
}
else
{
@@ -361,8 +363,8 @@ public class CommitLog implements CommitLogMBean
public List<String> getActiveSegmentNames()
{
List<String> segmentNames = new ArrayList<>();
- for (CommitLogSegment segment : allocator.getActiveSegments())
- segmentNames.add(segment.getName());
+ for (CommitLogSegment seg : segmentManager.getActiveSegments())
+ segmentNames.add(seg.getName());
return segmentNames;
}
@@ -375,23 +377,23 @@ public class CommitLog implements CommitLogMBean
public long getActiveContentSize()
{
long size = 0;
- for (CommitLogSegment segment : allocator.getActiveSegments())
- size += segment.contentSize();
+ for (CommitLogSegment seg : segmentManager.getActiveSegments())
+ size += seg.contentSize();
return size;
}
@Override
public long getActiveOnDiskSize()
{
- return allocator.onDiskSize();
+ return segmentManager.onDiskSize();
}
@Override
public Map<String, Double> getActiveSegmentCompressionRatios()
{
Map<String, Double> segmentRatios = new TreeMap<>();
- for (CommitLogSegment segment : allocator.getActiveSegments())
- segmentRatios.put(segment.getName(), 1.0 * segment.onDiskSize() / segment.contentSize());
+ for (CommitLogSegment seg : segmentManager.getActiveSegments())
+ segmentRatios.put(seg.getName(), 1.0 * seg.onDiskSize() / seg.contentSize());
return segmentRatios;
}
@@ -402,12 +404,12 @@ public class CommitLog implements CommitLogMBean
{
executor.shutdown();
executor.awaitTermination();
- allocator.shutdown();
- allocator.awaitTermination();
+ segmentManager.shutdown();
+ segmentManager.awaitTermination();
}
/**
- * FOR TESTING PURPOSES. See CommitLogAllocator.
+ * FOR TESTING PURPOSES
* @return the number of files recovered
*/
public int resetUnsafe(boolean deleteSegments) throws IOException
@@ -427,7 +429,6 @@ public class CommitLog implements CommitLogMBean
}
/**
- * FOR TESTING PURPOSES. See CommitLogAllocator.
*/
public void stopUnsafe(boolean deleteSegments)
{
@@ -440,20 +441,24 @@ public class CommitLog implements CommitLogMBean
{
throw new RuntimeException(e);
}
- allocator.stopUnsafe(deleteSegments);
+ segmentManager.stopUnsafe(deleteSegments);
CommitLogSegment.resetReplayLimit();
+ if (DatabaseDescriptor.isCDCEnabled() && deleteSegments)
+ for (File f : new File(DatabaseDescriptor.getCDCLogLocation()).listFiles())
+ FileUtils.deleteWithConfirm(f);
+
}
/**
- * FOR TESTING PURPOSES. See CommitLogAllocator
+ * FOR TESTING PURPOSES
*/
public int restartUnsafe() throws IOException
{
- allocator.start();
+ segmentManager.start();
executor.restartUnsafe();
try
{
- return recover();
+ return recoverSegmentsOnDisk();
}
catch (FSWriteError e)
{
@@ -468,16 +473,6 @@ public class CommitLog implements CommitLogMBean
}
}
- /**
- * Used by tests.
- *
- * @return the number of active segments (segments with unflushed data in them)
- */
- public int activeSegments()
- {
- return allocator.getActiveSegments().size();
- }
-
@VisibleForTesting
public static boolean handleCommitError(String message, Throwable t)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5dcab286/src/java/org/apache/cassandra/db/commitlog/CommitLogPosition.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogPosition.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogPosition.java
new file mode 100644
index 0000000..84054a4
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogPosition.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.commitlog;
+
+import java.io.IOException;
+import java.util.Comparator;
+
+import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.io.ISerializer;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
+
+/**
+ * Contains a segment id and a position for CommitLogSegment identification.
+ * Used for both replay and general CommitLog file reading.
+ */
+public class CommitLogPosition implements Comparable<CommitLogPosition>
+{
+ public static final CommitLogPositionSerializer serializer = new CommitLogPositionSerializer();
+
+ // NONE is used for SSTables that are streamed from other nodes and thus have no relationship
+ // with our local commitlog. The values satisfy the criteria that
+ // - no real commitlog segment will have the given id
+ // - it will sort before any real CommitLogPosition, so it will be effectively ignored by getCommitLogPosition
+ public static final CommitLogPosition NONE = new CommitLogPosition(-1, 0);
+
+ public final long segmentId;
+ public final int position;
+
+ public static final Comparator<CommitLogPosition> comparator = new Comparator<CommitLogPosition>()
+ {
+ public int compare(CommitLogPosition o1, CommitLogPosition o2)
+ {
+ if (o1.segmentId != o2.segmentId)
+ return Long.compare(o1.segmentId, o2.segmentId);
+
+ return Integer.compare(o1.position, o2.position);
+ }
+ };
+
+ public CommitLogPosition(long segmentId, int position)
+ {
+ this.segmentId = segmentId;
+ assert position >= 0;
+ this.position = position;
+ }
+
+ public int compareTo(CommitLogPosition other)
+ {
+ return comparator.compare(this, other);
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ CommitLogPosition that = (CommitLogPosition) o;
+
+ if (position != that.position) return false;
+ return segmentId == that.segmentId;
+ }
+
+ @Override
+ public int hashCode()
+ {
+ int result = (int) (segmentId ^ (segmentId >>> 32));
+ result = 31 * result + position;
+ return result;
+ }
+
+ @Override
+ public String toString()
+ {
+ return "CommitLogPosition(" +
+ "segmentId=" + segmentId +
+ ", position=" + position +
+ ')';
+ }
+
+ public CommitLogPosition clone()
+ {
+ return new CommitLogPosition(segmentId, position);
+ }
+
+
+ public static class CommitLogPositionSerializer implements ISerializer<CommitLogPosition>
+ {
+ public void serialize(CommitLogPosition clsp, DataOutputPlus out) throws IOException
+ {
+ out.writeLong(clsp.segmentId);
+ out.writeInt(clsp.position);
+ }
+
+ public CommitLogPosition deserialize(DataInputPlus in) throws IOException
+ {
+ return new CommitLogPosition(in.readLong(), in.readInt());
+ }
+
+ public long serializedSize(CommitLogPosition clsp)
+ {
+ return TypeSizes.sizeof(clsp.segmentId) + TypeSizes.sizeof(clsp.position);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5dcab286/src/java/org/apache/cassandra/db/commitlog/CommitLogReadHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogReadHandler.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogReadHandler.java
new file mode 100644
index 0000000..0602147
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogReadHandler.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.commitlog;
+
+import java.io.IOException;
+
+import org.apache.cassandra.db.Mutation;
+
+public interface CommitLogReadHandler
+{
+ enum CommitLogReadErrorReason
+ {
+ RECOVERABLE_DESCRIPTOR_ERROR,
+ UNRECOVERABLE_DESCRIPTOR_ERROR,
+ MUTATION_ERROR,
+ UNRECOVERABLE_UNKNOWN_ERROR,
+ EOF
+ }
+
+ class CommitLogReadException extends IOException
+ {
+ public final CommitLogReadErrorReason reason;
+ public final boolean permissible;
+
+ CommitLogReadException(String message, CommitLogReadErrorReason reason, boolean permissible)
+ {
+ super(message);
+ this.reason = reason;
+ this.permissible = permissible;
+ }
+ }
+
+ /**
+ * Handle an error during segment read, signaling whether or not you want the reader to skip the remainder of the
+ * current segment on error.
+ *
+ * @param exception CommitLogReadException w/details on exception state
+ * @return boolean indicating whether to stop reading
+ * @throws IOException In the event the handler wants forceful termination of all processing, throw IOException.
+ */
+ boolean shouldSkipSegmentOnError(CommitLogReadException exception) throws IOException;
+
+ /**
+ * In instances where we cannot recover from a specific error and don't care what the reader thinks
+ *
+ * @param exception CommitLogReadException w/details on exception state
+ * @throws IOException
+ */
+ void handleUnrecoverableError(CommitLogReadException exception) throws IOException;
+
+ /**
+ * Process a deserialized mutation
+ *
+ * @param m deserialized mutation
+ * @param size serialized size of the mutation
+ * @param entryLocation filePointer offset inside the CommitLogSegment for the record
+ * @param desc CommitLogDescriptor for mutation being processed
+ */
+ void handleMutation(Mutation m, int size, int entryLocation, CommitLogDescriptor desc);
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5dcab286/src/java/org/apache/cassandra/db/commitlog/CommitLogReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogReader.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogReader.java
new file mode 100644
index 0000000..73d70f3
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogReader.java
@@ -0,0 +1,501 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.commitlog;
+
+import java.io.*;
+import java.util.*;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.zip.CRC32;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.Mutation;
+import org.apache.cassandra.db.UnknownColumnFamilyException;
+import org.apache.cassandra.db.commitlog.CommitLogReadHandler.CommitLogReadErrorReason;
+import org.apache.cassandra.db.commitlog.CommitLogReadHandler.CommitLogReadException;
+import org.apache.cassandra.db.partitions.PartitionUpdate;
+import org.apache.cassandra.db.rows.SerializationHelper;
+import org.apache.cassandra.io.util.ChannelProxy;
+import org.apache.cassandra.io.util.DataInputBuffer;
+import org.apache.cassandra.io.util.FileDataInput;
+import org.apache.cassandra.io.util.RandomAccessReader;
+import org.apache.cassandra.io.util.RebufferingInputStream;
+import org.apache.cassandra.utils.JVMStabilityInspector;
+
+import static org.apache.cassandra.utils.FBUtilities.updateChecksumInt;
+
+public class CommitLogReader
+{
+ private static final Logger logger = LoggerFactory.getLogger(CommitLogReader.class);
+
+ private static final int LEGACY_END_OF_SEGMENT_MARKER = 0;
+
+ @VisibleForTesting
+ public static final int ALL_MUTATIONS = -1;
+ private final CRC32 checksum;
+ private final Map<UUID, AtomicInteger> invalidMutations;
+
+ private byte[] buffer;
+
+ public CommitLogReader()
+ {
+ checksum = new CRC32();
+ invalidMutations = new HashMap<>();
+ buffer = new byte[4096];
+ }
+
+ public Set<Map.Entry<UUID, AtomicInteger>> getInvalidMutations()
+ {
+ return invalidMutations.entrySet();
+ }
+
+ /**
+ * Reads all passed in files with no minimum, no start, and no mutation limit.
+ */
+ public void readAllFiles(CommitLogReadHandler handler, File[] files) throws IOException
+ {
+ readAllFiles(handler, files, CommitLogPosition.NONE);
+ }
+
+ /**
+ * Reads all passed in files with minPosition, no start, and no mutation limit.
+ */
+ public void readAllFiles(CommitLogReadHandler handler, File[] files, CommitLogPosition minPosition) throws IOException
+ {
+ for (int i = 0; i < files.length; i++)
+ readCommitLogSegment(handler, files[i], minPosition, ALL_MUTATIONS, i + 1 == files.length);
+ }
+
+ /**
+ * Reads passed in file fully
+ */
+ public void readCommitLogSegment(CommitLogReadHandler handler, File file, boolean tolerateTruncation) throws IOException
+ {
+ readCommitLogSegment(handler, file, CommitLogPosition.NONE, ALL_MUTATIONS, tolerateTruncation);
+ }
+
+ /**
+ * Reads passed in file fully, up to mutationLimit count
+ */
+ @VisibleForTesting
+ public void readCommitLogSegment(CommitLogReadHandler handler, File file, int mutationLimit, boolean tolerateTruncation) throws IOException
+ {
+ readCommitLogSegment(handler, file, CommitLogPosition.NONE, mutationLimit, tolerateTruncation);
+ }
+
+ /**
+ * Reads mutations from file, handing them off to handler
+ * @param handler Handler that will take action based on deserialized Mutations
+ * @param file CommitLogSegment file to read
+ * @param minPosition Optional minimum CommitLogPosition - all segments with id > or matching w/greater position will be read
+ * @param mutationLimit Optional limit on # of mutations to replay. Local ALL_MUTATIONS serves as marker to play all.
+ * @param tolerateTruncation Whether or not we should allow truncation of this file or throw if EOF found
+ *
+ * @throws IOException
+ */
+ public void readCommitLogSegment(CommitLogReadHandler handler,
+ File file,
+ CommitLogPosition minPosition,
+ int mutationLimit,
+ boolean tolerateTruncation) throws IOException
+ {
+ // just transform from the file name (no reading of headers) to determine version
+ CommitLogDescriptor desc = CommitLogDescriptor.fromFileName(file.getName());
+
+ try(ChannelProxy channel = new ChannelProxy(file);
+ RandomAccessReader reader = RandomAccessReader.open(channel))
+ {
+ if (desc.version < CommitLogDescriptor.VERSION_21)
+ {
+ if (!shouldSkipSegmentId(file, desc, minPosition))
+ {
+ if (minPosition.segmentId == desc.id)
+ reader.seek(minPosition.position);
+ ReadStatusTracker statusTracker = new ReadStatusTracker(mutationLimit, tolerateTruncation);
+ statusTracker.errorContext = desc.fileName();
+ readSection(handler, reader, minPosition, (int) reader.length(), statusTracker, desc);
+ }
+ return;
+ }
+
+ final long segmentIdFromFilename = desc.id;
+ try
+ {
+ // The following call can either throw or legitimately return null. For either case, we need to check
+ // desc outside this block and set it to null in the exception case.
+ desc = CommitLogDescriptor.readHeader(reader, DatabaseDescriptor.getEncryptionContext());
+ }
+ catch (Exception e)
+ {
+ desc = null;
+ }
+ if (desc == null)
+ {
+ // don't care about whether or not the handler thinks we can continue. We can't w/out descriptor.
+ handler.handleUnrecoverableError(new CommitLogReadException(
+ String.format("Could not read commit log descriptor in file %s", file),
+ CommitLogReadErrorReason.UNRECOVERABLE_DESCRIPTOR_ERROR,
+ false));
+ return;
+ }
+
+ if (segmentIdFromFilename != desc.id)
+ {
+ if (handler.shouldSkipSegmentOnError(new CommitLogReadException(String.format(
+ "Segment id mismatch (filename %d, descriptor %d) in file %s", segmentIdFromFilename, desc.id, file),
+ CommitLogReadErrorReason.RECOVERABLE_DESCRIPTOR_ERROR,
+ false)))
+ {
+ return;
+ }
+ }
+
+ if (shouldSkipSegmentId(file, desc, minPosition))
+ return;
+
+ CommitLogSegmentReader segmentReader;
+ try
+ {
+ segmentReader = new CommitLogSegmentReader(handler, desc, reader, tolerateTruncation);
+ }
+ catch(Exception e)
+ {
+ handler.handleUnrecoverableError(new CommitLogReadException(
+ String.format("Unable to create segment reader for commit log file: %s", e),
+ CommitLogReadErrorReason.UNRECOVERABLE_UNKNOWN_ERROR,
+ tolerateTruncation));
+ return;
+ }
+
+ try
+ {
+ ReadStatusTracker statusTracker = new ReadStatusTracker(mutationLimit, tolerateTruncation);
+ for (CommitLogSegmentReader.SyncSegment syncSegment : segmentReader)
+ {
+ statusTracker.tolerateErrorsInSection &= syncSegment.toleratesErrorsInSection;
+
+ // Skip segments that are completely behind the desired minPosition
+ if (desc.id == minPosition.segmentId && syncSegment.endPosition < minPosition.position)
+ continue;
+
+ statusTracker.errorContext = String.format("Next section at %d in %s", syncSegment.fileStartPosition, desc.fileName());
+
+ // TODO: Since EncryptedFileSegmentInputStream doesn't implement seek(), we cannot pre-emptively seek
+ // to the desired offset in the syncSegment before reading the section and deserializing the mutations.
+ readSection(handler, syncSegment.input, minPosition, syncSegment.endPosition, statusTracker, desc);
+ if (!statusTracker.shouldContinue())
+ break;
+ }
+ }
+ // Unfortunately AbstractIterator cannot throw a checked exception, so we check to see if a RuntimeException
+ // is wrapping an IOException.
+ catch (RuntimeException re)
+ {
+ if (re.getCause() instanceof IOException)
+ throw (IOException) re.getCause();
+ throw re;
+ }
+ logger.debug("Finished reading {}", file);
+ }
+ }
+
+ /**
+ * Any segment with id >= minPosition.segmentId is a candidate for read.
+ */
+ private boolean shouldSkipSegmentId(File file, CommitLogDescriptor desc, CommitLogPosition minPosition)
+ {
+ logger.debug("Reading {} (CL version {}, messaging version {}, compression {})",
+ file.getPath(),
+ desc.version,
+ desc.getMessagingVersion(),
+ desc.compression);
+
+ if (minPosition.segmentId > desc.id)
+ {
+ logger.trace("Skipping read of fully-flushed {}", file);
+ return true;
+ }
+ return false;
+ }
+
+ /**
+ * Reads a section of a file containing mutations
+ *
+ * @param handler Handler that will take action based on deserialized Mutations
+ * @param reader FileDataInput / logical buffer containing commitlog mutations
+ * @param minPosition CommitLogPosition indicating when we should start actively replaying mutations
+ * @param end logical numeric end of the segment being read
+ * @param statusTracker ReadStatusTracker with current state of mutation count, error state, etc
+ * @param desc Descriptor for CommitLog serialization
+ */
+ private void readSection(CommitLogReadHandler handler,
+ FileDataInput reader,
+ CommitLogPosition minPosition,
+ int end,
+ ReadStatusTracker statusTracker,
+ CommitLogDescriptor desc) throws IOException
+ {
+ while (statusTracker.shouldContinue() && reader.getFilePointer() < end && !reader.isEOF())
+ {
+ long mutationStart = reader.getFilePointer();
+ if (logger.isTraceEnabled())
+ logger.trace("Reading mutation at {}", mutationStart);
+
+ long claimedCRC32;
+ int serializedSize;
+ try
+ {
+ // any of the reads may hit EOF
+ serializedSize = reader.readInt();
+ if (serializedSize == LEGACY_END_OF_SEGMENT_MARKER)
+ {
+ logger.trace("Encountered end of segment marker at {}", reader.getFilePointer());
+ statusTracker.requestTermination();
+ return;
+ }
+
+ // Mutation must be at LEAST 10 bytes:
+ // 3 for a non-empty Keyspace
+ // 3 for a Key (including the 2-byte length from writeUTF/writeWithShortLength)
+ // 4 bytes for column count.
+ // This prevents CRC by being fooled by special-case garbage in the file; see CASSANDRA-2128
+ if (serializedSize < 10)
+ {
+ if (handler.shouldSkipSegmentOnError(new CommitLogReadException(
+ String.format("Invalid mutation size %d at %d in %s", serializedSize, mutationStart, statusTracker.errorContext),
+ CommitLogReadErrorReason.MUTATION_ERROR,
+ statusTracker.tolerateErrorsInSection)))
+ {
+ statusTracker.requestTermination();
+ }
+ return;
+ }
+
+ long claimedSizeChecksum = CommitLogFormat.calculateClaimedChecksum(reader, desc.version);
+ checksum.reset();
+ CommitLogFormat.updateChecksum(checksum, serializedSize, desc.version);
+
+ if (checksum.getValue() != claimedSizeChecksum)
+ {
+ if (handler.shouldSkipSegmentOnError(new CommitLogReadException(
+ String.format("Mutation size checksum failure at %d in %s", mutationStart, statusTracker.errorContext),
+ CommitLogReadErrorReason.MUTATION_ERROR,
+ statusTracker.tolerateErrorsInSection)))
+ {
+ statusTracker.requestTermination();
+ }
+ return;
+ }
+
+ if (serializedSize > buffer.length)
+ buffer = new byte[(int) (1.2 * serializedSize)];
+ reader.readFully(buffer, 0, serializedSize);
+
+ claimedCRC32 = CommitLogFormat.calculateClaimedCRC32(reader, desc.version);
+ }
+ catch (EOFException eof)
+ {
+ if (handler.shouldSkipSegmentOnError(new CommitLogReadException(
+ String.format("Unexpected end of segment", mutationStart, statusTracker.errorContext),
+ CommitLogReadErrorReason.EOF,
+ statusTracker.tolerateErrorsInSection)))
+ {
+ statusTracker.requestTermination();
+ }
+ return;
+ }
+
+ checksum.update(buffer, 0, serializedSize);
+ if (claimedCRC32 != checksum.getValue())
+ {
+ if (handler.shouldSkipSegmentOnError(new CommitLogReadException(
+ String.format("Mutation checksum failure at %d in %s", mutationStart, statusTracker.errorContext),
+ CommitLogReadErrorReason.MUTATION_ERROR,
+ statusTracker.tolerateErrorsInSection)))
+ {
+ statusTracker.requestTermination();
+ }
+ continue;
+ }
+
+ long mutationPosition = reader.getFilePointer();
+ readMutation(handler, buffer, serializedSize, minPosition, (int)mutationPosition, desc);
+
+ // Only count this as a processed mutation if it is after our min as we suppress reading of mutations that
+ // are before this mark.
+ if (mutationPosition >= minPosition.position)
+ statusTracker.addProcessedMutation();
+ }
+ }
+
+ /**
+ * Deserializes and passes a Mutation to the ICommitLogReadHandler requested
+ *
+ * @param handler Handler that will take action based on deserialized Mutations
+ * @param inputBuffer raw byte array w/Mutation data
+ * @param size deserialized size of mutation
+ * @param minPosition We need to suppress replay of mutations that are before the required minPosition
+ * @param entryLocation filePointer offset of mutation within CommitLogSegment
+ * @param desc CommitLogDescriptor being worked on
+ */
+ @VisibleForTesting
+ protected void readMutation(CommitLogReadHandler handler,
+ byte[] inputBuffer,
+ int size,
+ CommitLogPosition minPosition,
+ final int entryLocation,
+ final CommitLogDescriptor desc) throws IOException
+ {
+ // For now, we need to go through the motions of deserializing the mutation to determine its size and move
+ // the file pointer forward accordingly, even if we're behind the requested minPosition within this SyncSegment.
+ boolean shouldReplay = entryLocation > minPosition.position;
+
+ final Mutation mutation;
+ try (RebufferingInputStream bufIn = new DataInputBuffer(inputBuffer, 0, size))
+ {
+ mutation = Mutation.serializer.deserialize(bufIn,
+ desc.getMessagingVersion(),
+ SerializationHelper.Flag.LOCAL);
+ // doublecheck that what we read is still] valid for the current schema
+ for (PartitionUpdate upd : mutation.getPartitionUpdates())
+ upd.validate();
+ }
+ catch (UnknownColumnFamilyException ex)
+ {
+ if (ex.cfId == null)
+ return;
+ AtomicInteger i = invalidMutations.get(ex.cfId);
+ if (i == null)
+ {
+ i = new AtomicInteger(1);
+ invalidMutations.put(ex.cfId, i);
+ }
+ else
+ i.incrementAndGet();
+ return;
+ }
+ catch (Throwable t)
+ {
+ JVMStabilityInspector.inspectThrowable(t);
+ File f = File.createTempFile("mutation", "dat");
+
+ try (DataOutputStream out = new DataOutputStream(new FileOutputStream(f)))
+ {
+ out.write(inputBuffer, 0, size);
+ }
+
+ // Checksum passed so this error can't be permissible.
+ handler.handleUnrecoverableError(new CommitLogReadException(
+ String.format(
+ "Unexpected error deserializing mutation; saved to %s. " +
+ "This may be caused by replaying a mutation against a table with the same name but incompatible schema. " +
+ "Exception follows: %s", f.getAbsolutePath(), t),
+ CommitLogReadErrorReason.MUTATION_ERROR,
+ false));
+ return;
+ }
+
+ if (logger.isTraceEnabled())
+ logger.trace("Read mutation for {}.{}: {}", mutation.getKeyspaceName(), mutation.key(),
+ "{" + StringUtils.join(mutation.getPartitionUpdates().iterator(), ", ") + "}");
+
+ if (shouldReplay)
+ handler.handleMutation(mutation, size, entryLocation, desc);
+ }
+
+ /**
+ * Helper methods to deal with changing formats of internals of the CommitLog without polluting deserialization code.
+ */
+ private static class CommitLogFormat
+ {
+ public static long calculateClaimedChecksum(FileDataInput input, int commitLogVersion) throws IOException
+ {
+ switch (commitLogVersion)
+ {
+ case CommitLogDescriptor.VERSION_12:
+ case CommitLogDescriptor.VERSION_20:
+ return input.readLong();
+ // Changed format in 2.1
+ default:
+ return input.readInt() & 0xffffffffL;
+ }
+ }
+
+ public static void updateChecksum(CRC32 checksum, int serializedSize, int commitLogVersion)
+ {
+ switch (commitLogVersion)
+ {
+ case CommitLogDescriptor.VERSION_12:
+ checksum.update(serializedSize);
+ break;
+ // Changed format in 2.0
+ default:
+ updateChecksumInt(checksum, serializedSize);
+ break;
+ }
+ }
+
+ public static long calculateClaimedCRC32(FileDataInput input, int commitLogVersion) throws IOException
+ {
+ switch (commitLogVersion)
+ {
+ case CommitLogDescriptor.VERSION_12:
+ case CommitLogDescriptor.VERSION_20:
+ return input.readLong();
+ // Changed format in 2.1
+ default:
+ return input.readInt() & 0xffffffffL;
+ }
+ }
+ }
+
+ private class ReadStatusTracker
+ {
+ private int mutationsLeft;
+ public String errorContext = "";
+ public boolean tolerateErrorsInSection;
+ private boolean error;
+
+ public ReadStatusTracker(int mutationLimit, boolean tolerateErrorsInSection)
+ {
+ this.mutationsLeft = mutationLimit;
+ this.tolerateErrorsInSection = tolerateErrorsInSection;
+ }
+
+ public void addProcessedMutation()
+ {
+ if (mutationsLeft == ALL_MUTATIONS)
+ return;
+ --mutationsLeft;
+ }
+
+ public boolean shouldContinue()
+ {
+ return !error && (mutationsLeft != 0 || mutationsLeft == ALL_MUTATIONS);
+ }
+
+ public void requestTermination()
+ {
+ error = true;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5dcab286/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
index 93c6c50..c8e597f 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
@@ -18,32 +18,17 @@
*/
package org.apache.cassandra.db.commitlog;
-import java.io.DataOutputStream;
-import java.io.EOFException;
import java.io.File;
-import java.io.FileOutputStream;
import java.io.IOException;
-import java.util.ArrayDeque;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Queue;
-import java.util.Set;
-import java.util.UUID;
+import java.util.*;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
-import java.util.zip.CRC32;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Predicate;
import com.google.common.base.Throwables;
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Multimap;
+import com.google.common.collect.*;
import com.google.common.util.concurrent.Uninterruptibles;
import org.apache.commons.lang3.StringUtils;
import org.cliffc.high_scale_lib.NonBlockingHashSet;
@@ -53,28 +38,14 @@ import org.slf4j.LoggerFactory;
import org.apache.cassandra.concurrent.Stage;
import org.apache.cassandra.concurrent.StageManager;
import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.Schema;
-import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.db.Keyspace;
-import org.apache.cassandra.db.Mutation;
-import org.apache.cassandra.db.SystemKeyspace;
-import org.apache.cassandra.db.UnknownColumnFamilyException;
-import org.apache.cassandra.db.commitlog.SegmentReader.SyncSegment;
+import org.apache.cassandra.db.*;
import org.apache.cassandra.db.partitions.PartitionUpdate;
-import org.apache.cassandra.db.rows.SerializationHelper;
-import org.apache.cassandra.io.util.ChannelProxy;
-import org.apache.cassandra.io.util.DataInputBuffer;
-import org.apache.cassandra.io.util.FileDataInput;
-import org.apache.cassandra.io.util.RandomAccessReader;
-import org.apache.cassandra.io.util.RebufferingInputStream;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.JVMStabilityInspector;
import org.apache.cassandra.utils.WrappedRunnable;
-import static org.apache.cassandra.utils.FBUtilities.updateChecksumInt;
-
-public class CommitLogReplayer
+public class CommitLogReplayer implements CommitLogReadHandler
{
@VisibleForTesting
public static long MAX_OUTSTANDING_REPLAY_BYTES = Long.getLong("cassandra.commitlog_max_outstanding_replay_bytes", 1024 * 1024 * 64);
@@ -83,117 +54,54 @@ public class CommitLogReplayer
static final String IGNORE_REPLAY_ERRORS_PROPERTY = "cassandra.commitlog.ignorereplayerrors";
private static final Logger logger = LoggerFactory.getLogger(CommitLogReplayer.class);
private static final int MAX_OUTSTANDING_REPLAY_COUNT = Integer.getInteger("cassandra.commitlog_max_outstanding_replay_count", 1024);
- private static final int LEGACY_END_OF_SEGMENT_MARKER = 0;
- private final Set<Keyspace> keyspacesRecovered;
+ private final Set<Keyspace> keyspacesReplayed;
private final Queue<Future<Integer>> futures;
- private final Map<UUID, AtomicInteger> invalidMutations;
+
private final AtomicInteger replayedCount;
- private final Map<UUID, ReplayPosition.ReplayFilter> cfPersisted;
- private final ReplayPosition globalPosition;
- private final CRC32 checksum;
- private byte[] buffer;
+ private final Map<UUID, ReplayPositionFilter> cfPersisted;
+ private final CommitLogPosition globalPosition;
+
+ // Used to throttle speed of replay of mutations if we pass the max outstanding count
private long pendingMutationBytes = 0;
private final ReplayFilter replayFilter;
private final CommitLogArchiver archiver;
- /*
- * Wrapper around initiating mutations read from the log to make it possible
- * to spy on initiated mutations for test
- */
@VisibleForTesting
- public static class MutationInitiator
- {
- protected Future<Integer> initiateMutation(final Mutation mutation,
- final long segmentId,
- final int serializedSize,
- final int entryLocation,
- final CommitLogReplayer clr)
- {
- Runnable runnable = new WrappedRunnable()
- {
- public void runMayThrow()
- {
- if (Schema.instance.getKSMetaData(mutation.getKeyspaceName()) == null)
- return;
- if (clr.pointInTimeExceeded(mutation))
- return;
-
- final Keyspace keyspace = Keyspace.open(mutation.getKeyspaceName());
+ protected CommitLogReader commitLogReader;
- // Rebuild the mutation, omitting column families that
- // a) the user has requested that we ignore,
- // b) have already been flushed,
- // or c) are part of a cf that was dropped.
- // Keep in mind that the cf.name() is suspect. do every thing based on the cfid instead.
- Mutation newMutation = null;
- for (PartitionUpdate update : clr.replayFilter.filter(mutation))
- {
- if (Schema.instance.getCF(update.metadata().cfId) == null)
- continue; // dropped
-
- // replay if current segment is newer than last flushed one or,
- // if it is the last known segment, if we are after the replay position
- if (clr.shouldReplay(update.metadata().cfId, new ReplayPosition(segmentId, entryLocation)))
- {
- if (newMutation == null)
- newMutation = new Mutation(mutation.getKeyspaceName(), mutation.key());
- newMutation.add(update);
- clr.replayedCount.incrementAndGet();
- }
- }
- if (newMutation != null)
- {
- assert !newMutation.isEmpty();
-
- try
- {
- Uninterruptibles.getUninterruptibly(Keyspace.open(newMutation.getKeyspaceName()).applyFromCommitLog(newMutation));
- }
- catch (ExecutionException e)
- {
- throw Throwables.propagate(e.getCause());
- }
-
- clr.keyspacesRecovered.add(keyspace);
- }
- }
- };
- return StageManager.getStage(Stage.MUTATION).submit(runnable, serializedSize);
- }
- }
-
- CommitLogReplayer(CommitLog commitLog, ReplayPosition globalPosition, Map<UUID, ReplayPosition.ReplayFilter> cfPersisted, ReplayFilter replayFilter)
+ CommitLogReplayer(CommitLog commitLog,
+ CommitLogPosition globalPosition,
+ Map<UUID, ReplayPositionFilter> cfPersisted,
+ ReplayFilter replayFilter)
{
- this.keyspacesRecovered = new NonBlockingHashSet<Keyspace>();
+ this.keyspacesReplayed = new NonBlockingHashSet<Keyspace>();
this.futures = new ArrayDeque<Future<Integer>>();
- this.buffer = new byte[4096];
- this.invalidMutations = new HashMap<UUID, AtomicInteger>();
// count the number of replayed mutation. We don't really care about atomicity, but we need it to be a reference.
this.replayedCount = new AtomicInteger();
- this.checksum = new CRC32();
this.cfPersisted = cfPersisted;
this.globalPosition = globalPosition;
this.replayFilter = replayFilter;
this.archiver = commitLog.archiver;
+ this.commitLogReader = new CommitLogReader();
}
public static CommitLogReplayer construct(CommitLog commitLog)
{
- // compute per-CF and global replay positions
- Map<UUID, ReplayPosition.ReplayFilter> cfPersisted = new HashMap<>();
+ // compute per-CF and global commit log segment positions
+ Map<UUID, ReplayPositionFilter> cfPersisted = new HashMap<>();
ReplayFilter replayFilter = ReplayFilter.create();
- ReplayPosition globalPosition = null;
+ CommitLogPosition globalPosition = null;
for (ColumnFamilyStore cfs : ColumnFamilyStore.all())
{
// but, if we've truncated the cf in question, then we need to need to start replay after the truncation
- ReplayPosition truncatedAt = SystemKeyspace.getTruncatedPosition(cfs.metadata.cfId);
+ CommitLogPosition truncatedAt = SystemKeyspace.getTruncatedPosition(cfs.metadata.cfId);
if (truncatedAt != null)
{
- // Point in time restore is taken to mean that the tables need to be recovered even if they were
+ // Point in time restore is taken to mean that the tables need to be replayed even if they were
// deleted at a later point in time. Any truncation record after that point must thus be cleared prior
- // to recovery (CASSANDRA-9195).
+ // to replay (CASSANDRA-9195).
long restoreTime = commitLog.archiver.restorePointInTime;
long truncatedTime = SystemKeyspace.getTruncatedAt(cfs.metadata.cfId);
if (truncatedTime > restoreTime)
@@ -209,28 +117,35 @@ public class CommitLogReplayer
}
}
- ReplayPosition.ReplayFilter filter = new ReplayPosition.ReplayFilter(cfs.getSSTables(), truncatedAt);
+ ReplayPositionFilter filter = new ReplayPositionFilter(cfs.getSSTables(), truncatedAt);
if (!filter.isEmpty())
cfPersisted.put(cfs.metadata.cfId, filter);
else
- globalPosition = ReplayPosition.NONE; // if we have no ranges for this CF, we must replay everything and filter
+ globalPosition = CommitLogPosition.NONE; // if we have no ranges for this CF, we must replay everything and filter
}
if (globalPosition == null)
- globalPosition = ReplayPosition.firstNotCovered(cfPersisted.values());
- logger.debug("Global replay position is {} from columnfamilies {}", globalPosition, FBUtilities.toString(cfPersisted));
+ globalPosition = firstNotCovered(cfPersisted.values());
+ logger.trace("Global commit log segment position is {} from columnfamilies {}", globalPosition, FBUtilities.toString(cfPersisted));
return new CommitLogReplayer(commitLog, globalPosition, cfPersisted, replayFilter);
}
- public void recover(File[] clogs) throws IOException
+ public void replayPath(File file, boolean tolerateTruncation) throws IOException
+ {
+ commitLogReader.readCommitLogSegment(this, file, globalPosition, CommitLogReader.ALL_MUTATIONS, tolerateTruncation);
+ }
+
+ public void replayFiles(File[] clogs) throws IOException
{
- int i;
- for (i = 0; i < clogs.length; ++i)
- recover(clogs[i], i + 1 == clogs.length);
+ commitLogReader.readAllFiles(this, clogs, globalPosition);
}
+ /**
+ * Flushes all keyspaces associated with this replayer in parallel, blocking until their flushes are complete.
+ * @return the number of mutations replayed
+ */
public int blockForWrites()
{
- for (Map.Entry<UUID, AtomicInteger> entry : invalidMutations.entrySet())
+ for (Map.Entry<UUID, AtomicInteger> entry : commitLogReader.getInvalidMutations())
logger.warn(String.format("Skipped %d mutations from unknown (probably removed) CF with id %s", entry.getValue().intValue(), entry.getKey()));
// wait for all the writes to finish on the mutation stage
@@ -242,7 +157,7 @@ public class CommitLogReplayer
boolean flushingSystem = false;
List<Future<?>> futures = new ArrayList<Future<?>>();
- for (Keyspace keyspace : keyspacesRecovered)
+ for (Keyspace keyspace : keyspacesReplayed)
{
if (keyspace.getName().equals(SystemKeyspace.NAME))
flushingSystem = true;
@@ -259,6 +174,137 @@ public class CommitLogReplayer
return replayedCount.get();
}
+ /*
+ * Wrapper around initiating mutations read from the log to make it possible
+ * to spy on initiated mutations for test
+ */
+ @VisibleForTesting
+ public static class MutationInitiator
+ {
+ protected Future<Integer> initiateMutation(final Mutation mutation,
+ final long segmentId,
+ final int serializedSize,
+ final int entryLocation,
+ final CommitLogReplayer commitLogReplayer)
+ {
+ Runnable runnable = new WrappedRunnable()
+ {
+ public void runMayThrow()
+ {
+ if (Schema.instance.getKSMetaData(mutation.getKeyspaceName()) == null)
+ return;
+ if (commitLogReplayer.pointInTimeExceeded(mutation))
+ return;
+
+ final Keyspace keyspace = Keyspace.open(mutation.getKeyspaceName());
+
+ // Rebuild the mutation, omitting column families that
+ // a) the user has requested that we ignore,
+ // b) have already been flushed,
+ // or c) are part of a cf that was dropped.
+ // Keep in mind that the cf.name() is suspect. do every thing based on the cfid instead.
+ Mutation newMutation = null;
+ for (PartitionUpdate update : commitLogReplayer.replayFilter.filter(mutation))
+ {
+ if (Schema.instance.getCF(update.metadata().cfId) == null)
+ continue; // dropped
+
+ // replay if current segment is newer than last flushed one or,
+ // if it is the last known segment, if we are after the commit log segment position
+ if (commitLogReplayer.shouldReplay(update.metadata().cfId, new CommitLogPosition(segmentId, entryLocation)))
+ {
+ if (newMutation == null)
+ newMutation = new Mutation(mutation.getKeyspaceName(), mutation.key());
+ newMutation.add(update);
+ commitLogReplayer.replayedCount.incrementAndGet();
+ }
+ }
+ if (newMutation != null)
+ {
+ assert !newMutation.isEmpty();
+
+ try
+ {
+ Uninterruptibles.getUninterruptibly(Keyspace.open(newMutation.getKeyspaceName()).applyFromCommitLog(newMutation));
+ }
+ catch (ExecutionException e)
+ {
+ throw Throwables.propagate(e.getCause());
+ }
+
+ commitLogReplayer.keyspacesReplayed.add(keyspace);
+ }
+ }
+ };
+ return StageManager.getStage(Stage.MUTATION).submit(runnable, serializedSize);
+ }
+ }
+
+ /**
+ * A filter of known safe-to-discard commit log replay positions, based on
+ * the range covered by on disk sstables and those prior to the most recent truncation record
+ */
+ public static class ReplayPositionFilter
+ {
+ final NavigableMap<CommitLogPosition, CommitLogPosition> persisted = new TreeMap<>();
+ public ReplayPositionFilter(Iterable<SSTableReader> onDisk, CommitLogPosition truncatedAt)
+ {
+ for (SSTableReader reader : onDisk)
+ {
+ CommitLogPosition start = reader.getSSTableMetadata().commitLogLowerBound;
+ CommitLogPosition end = reader.getSSTableMetadata().commitLogUpperBound;
+ add(persisted, start, end);
+ }
+ if (truncatedAt != null)
+ add(persisted, CommitLogPosition.NONE, truncatedAt);
+ }
+
+ private static void add(NavigableMap<CommitLogPosition, CommitLogPosition> ranges, CommitLogPosition start, CommitLogPosition end)
+ {
+ // extend ourselves to cover any ranges we overlap
+ // record directly preceding our end may extend past us, so take the max of our end and its
+ Map.Entry<CommitLogPosition, CommitLogPosition> extend = ranges.floorEntry(end);
+ if (extend != null && extend.getValue().compareTo(end) > 0)
+ end = extend.getValue();
+
+ // record directly preceding our start may extend into us; if it does, we take it as our start
+ extend = ranges.lowerEntry(start);
+ if (extend != null && extend.getValue().compareTo(start) >= 0)
+ start = extend.getKey();
+
+ ranges.subMap(start, end).clear();
+ ranges.put(start, end);
+ }
+
+ public boolean shouldReplay(CommitLogPosition position)
+ {
+ // replay ranges are start exclusive, end inclusive
+ Map.Entry<CommitLogPosition, CommitLogPosition> range = persisted.lowerEntry(position);
+ return range == null || position.compareTo(range.getValue()) > 0;
+ }
+
+ public boolean isEmpty()
+ {
+ return persisted.isEmpty();
+ }
+ }
+
+ public static CommitLogPosition firstNotCovered(Iterable<ReplayPositionFilter> ranges)
+ {
+ CommitLogPosition min = null;
+ for (ReplayPositionFilter map : ranges)
+ {
+ CommitLogPosition first = map.persisted.firstEntry().getValue();
+ if (min == null)
+ min = first;
+ else
+ min = Ordering.natural().min(min, first);
+ }
+ if (min == null)
+ return CommitLogPosition.NONE;
+ return min;
+ }
+
abstract static class ReplayFilter
{
public abstract Iterable<PartitionUpdate> filter(Mutation mutation);
@@ -340,291 +386,65 @@ public class CommitLogReplayer
*
* @return true iff replay is necessary
*/
- private boolean shouldReplay(UUID cfId, ReplayPosition position)
+ private boolean shouldReplay(UUID cfId, CommitLogPosition position)
{
- ReplayPosition.ReplayFilter filter = cfPersisted.get(cfId);
+ ReplayPositionFilter filter = cfPersisted.get(cfId);
return filter == null || filter.shouldReplay(position);
}
- @SuppressWarnings("resource")
- public void recover(File file, boolean tolerateTruncation) throws IOException
- {
- // just transform from the file name (no reading of headers) to determine version
- CommitLogDescriptor desc = CommitLogDescriptor.fromFileName(file.getName());
-
- try(ChannelProxy channel = new ChannelProxy(file);
- RandomAccessReader reader = RandomAccessReader.open(channel))
- {
- if (desc.version < CommitLogDescriptor.VERSION_21)
- {
- if (logAndCheckIfShouldSkip(file, desc))
- return;
- if (globalPosition.segment == desc.id)
- reader.seek(globalPosition.position);
- replaySyncSection(reader, (int) reader.length(), desc, desc.fileName(), tolerateTruncation);
- return;
- }
- final long segmentId = desc.id;
- try
- {
- desc = CommitLogDescriptor.readHeader(reader, DatabaseDescriptor.getEncryptionContext());
- }
- catch (Exception e)
- {
- desc = null;
- }
-
- if (desc == null) {
- handleReplayError(false, "Could not read commit log descriptor in file %s", file);
- return;
- }
- if (segmentId != desc.id)
- {
- handleReplayError(false, "Segment id mismatch (filename %d, descriptor %d) in file %s", segmentId, desc.id, file);
- // continue processing if ignored.
- }
-
- if (logAndCheckIfShouldSkip(file, desc))
- return;
-
- SegmentReader segmentReader;
- try
- {
- segmentReader = new SegmentReader(desc, reader, tolerateTruncation);
- }
- catch(Exception e)
- {
- handleReplayError(false, "unable to create segment reader for commit log file: %s", e);
- return;
- }
-
- try
- {
- boolean tolerateErrorsInSection = tolerateTruncation;
- for (SyncSegment syncSegment : segmentReader)
- {
- tolerateErrorsInSection &= syncSegment.toleratesErrorsInSection;
-
- // Skip over flushed section.
- if (desc.id == globalPosition.segment && syncSegment.endPosition < globalPosition.position)
- continue;
- String errorContext = String.format("next section at %d in %s", syncSegment.fileStartPosition, desc.fileName());
- if (!replaySyncSection(syncSegment.input, syncSegment.endPosition, desc, errorContext, tolerateErrorsInSection))
- break;
- }
- }
- // unfortunately, AbstractIterator cannot throw a checked excpetion,
- // so check to see if a RuntimeException is wrapping an IOException
- catch (RuntimeException re)
- {
- if (re.getCause() instanceof IOException)
- throw (IOException) re.getCause();
- throw re;
- }
- logger.debug("Finished reading {}", file);
- }
- }
-
- public boolean logAndCheckIfShouldSkip(File file, CommitLogDescriptor desc)
+ protected boolean pointInTimeExceeded(Mutation fm)
{
- logger.debug("Replaying {} (CL version {}, messaging version {}, compression {})",
- file.getPath(),
- desc.version,
- desc.getMessagingVersion(),
- desc.compression);
+ long restoreTarget = archiver.restorePointInTime;
- if (globalPosition.segment > desc.id)
+ for (PartitionUpdate upd : fm.getPartitionUpdates())
{
- logger.trace("skipping replay of fully-flushed {}", file);
- return true;
+ if (archiver.precision.toMillis(upd.maxTimestamp()) > restoreTarget)
+ return true;
}
return false;
}
- /**
- * Replays a sync section containing a list of mutations.
- *
- * @return Whether replay should continue with the next section.
- */
- private boolean replaySyncSection(FileDataInput reader, int end, CommitLogDescriptor desc, String errorContext, boolean tolerateErrors) throws IOException
+ public void handleMutation(Mutation m, int size, int entryLocation, CommitLogDescriptor desc)
{
- /* read the logs populate Mutation and apply */
- while (reader.getFilePointer() < end && !reader.isEOF())
- {
- long mutationStart = reader.getFilePointer();
- if (logger.isTraceEnabled())
- logger.trace("Reading mutation at {}", mutationStart);
-
- long claimedCRC32;
- int serializedSize;
- try
- {
- // any of the reads may hit EOF
- serializedSize = reader.readInt();
- if (serializedSize == LEGACY_END_OF_SEGMENT_MARKER)
- {
- logger.trace("Encountered end of segment marker at {}", reader.getFilePointer());
- return false;
- }
-
- // Mutation must be at LEAST 10 bytes:
- // 3 each for a non-empty Keyspace and Key (including the
- // 2-byte length from writeUTF/writeWithShortLength) and 4 bytes for column count.
- // This prevents CRC by being fooled by special-case garbage in the file; see CASSANDRA-2128
- if (serializedSize < 10)
- {
- handleReplayError(tolerateErrors,
- "Invalid mutation size %d at %d in %s",
- serializedSize, mutationStart, errorContext);
- return false;
- }
-
- long claimedSizeChecksum;
- if (desc.version < CommitLogDescriptor.VERSION_21)
- claimedSizeChecksum = reader.readLong();
- else
- claimedSizeChecksum = reader.readInt() & 0xffffffffL;
- checksum.reset();
- if (desc.version < CommitLogDescriptor.VERSION_20)
- checksum.update(serializedSize);
- else
- updateChecksumInt(checksum, serializedSize);
-
- if (checksum.getValue() != claimedSizeChecksum)
- {
- handleReplayError(tolerateErrors,
- "Mutation size checksum failure at %d in %s",
- mutationStart, errorContext);
- return false;
- }
- // ok.
-
- if (serializedSize > buffer.length)
- buffer = new byte[(int) (1.2 * serializedSize)];
- reader.readFully(buffer, 0, serializedSize);
- if (desc.version < CommitLogDescriptor.VERSION_21)
- claimedCRC32 = reader.readLong();
- else
- claimedCRC32 = reader.readInt() & 0xffffffffL;
- }
- catch (EOFException eof)
- {
- handleReplayError(tolerateErrors,
- "Unexpected end of segment",
- mutationStart, errorContext);
- return false; // last CL entry didn't get completely written. that's ok.
- }
-
- checksum.update(buffer, 0, serializedSize);
- if (claimedCRC32 != checksum.getValue())
- {
- handleReplayError(tolerateErrors,
- "Mutation checksum failure at %d in %s",
- mutationStart, errorContext);
- continue;
- }
- replayMutation(buffer, serializedSize, (int) reader.getFilePointer(), desc);
- }
- return true;
- }
-
- /**
- * Deserializes and replays a commit log entry.
- */
- void replayMutation(byte[] inputBuffer, int size,
- final int entryLocation, final CommitLogDescriptor desc) throws IOException
- {
-
- final Mutation mutation;
- try (RebufferingInputStream bufIn = new DataInputBuffer(inputBuffer, 0, size))
- {
- mutation = Mutation.serializer.deserialize(bufIn,
- desc.getMessagingVersion(),
- SerializationHelper.Flag.LOCAL);
- // doublecheck that what we read is [still] valid for the current schema
- for (PartitionUpdate upd : mutation.getPartitionUpdates())
- upd.validate();
- }
- catch (UnknownColumnFamilyException ex)
- {
- if (ex.cfId == null)
- return;
- AtomicInteger i = invalidMutations.get(ex.cfId);
- if (i == null)
- {
- i = new AtomicInteger(1);
- invalidMutations.put(ex.cfId, i);
- }
- else
- i.incrementAndGet();
- return;
- }
- catch (Throwable t)
- {
- JVMStabilityInspector.inspectThrowable(t);
- File f = File.createTempFile("mutation", "dat");
-
- try (DataOutputStream out = new DataOutputStream(new FileOutputStream(f)))
- {
- out.write(inputBuffer, 0, size);
- }
-
- // Checksum passed so this error can't be permissible.
- handleReplayError(false,
- "Unexpected error deserializing mutation; saved to %s. " +
- "This may be caused by replaying a mutation against a table with the same name but incompatible schema. " +
- "Exception follows: %s",
- f.getAbsolutePath(),
- t);
- return;
- }
-
- if (logger.isTraceEnabled())
- logger.trace("replaying mutation for {}.{}: {}", mutation.getKeyspaceName(), mutation.key(), "{" + StringUtils.join(mutation.getPartitionUpdates().iterator(), ", ") + "}");
-
pendingMutationBytes += size;
- futures.offer(mutationInitiator.initiateMutation(mutation,
+ futures.offer(mutationInitiator.initiateMutation(m,
desc.id,
size,
entryLocation,
this));
- //If there are finished mutations, or too many outstanding bytes/mutations
- //drain the futures in the queue
+ // If there are finished mutations, or too many outstanding bytes/mutations
+ // drain the futures in the queue
while (futures.size() > MAX_OUTSTANDING_REPLAY_COUNT
- || pendingMutationBytes > MAX_OUTSTANDING_REPLAY_BYTES
- || (!futures.isEmpty() && futures.peek().isDone()))
+ || pendingMutationBytes > MAX_OUTSTANDING_REPLAY_BYTES
+ || (!futures.isEmpty() && futures.peek().isDone()))
{
pendingMutationBytes -= FBUtilities.waitOnFuture(futures.poll());
}
}
- protected boolean pointInTimeExceeded(Mutation fm)
- {
- long restoreTarget = archiver.restorePointInTime;
-
- for (PartitionUpdate upd : fm.getPartitionUpdates())
- {
- if (archiver.precision.toMillis(upd.maxTimestamp()) > restoreTarget)
- return true;
- }
- return false;
- }
-
- static void handleReplayError(boolean permissible, String message, Object... messageArgs) throws IOException
+ public boolean shouldSkipSegmentOnError(CommitLogReadException exception) throws IOException
{
- String msg = String.format(message, messageArgs);
- IOException e = new CommitLogReplayException(msg);
- if (permissible)
- logger.error("Ignoring commit log replay error likely due to incomplete flush to disk", e);
+ if (exception.permissible)
+ logger.error("Ignoring commit log replay error likely due to incomplete flush to disk", exception);
else if (Boolean.getBoolean(IGNORE_REPLAY_ERRORS_PROPERTY))
- logger.error("Ignoring commit log replay error", e);
- else if (!CommitLog.handleCommitError("Failed commit log replay", e))
+ logger.error("Ignoring commit log replay error", exception);
+ else if (!CommitLog.handleCommitError("Failed commit log replay", exception))
{
logger.error("Replay stopped. If you wish to override this error and continue starting the node ignoring " +
"commit log replay problems, specify -D" + IGNORE_REPLAY_ERRORS_PROPERTY + "=true " +
"on the command line");
- throw e;
+ throw new CommitLogReplayException(exception.getMessage(), exception);
}
+ return false;
+ }
+
+ /**
+ * The logic for whether or not we throw on an error is identical for the replayer between recoverable or non.
+ */
+ public void handleUnrecoverableError(CommitLogReadException exception) throws IOException
+ {
+ // Don't care about return value, use this simply to throw exception as appropriate.
+ shouldSkipSegmentOnError(exception);
}
@SuppressWarnings("serial")