You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jj...@apache.org on 2017/03/13 05:00:19 UTC
[01/10] cassandra git commit: Commitlog replay may fail if last
mutation is within 4 bytes of end of segment
Repository: cassandra
Updated Branches:
refs/heads/cassandra-2.2 5ef8a8b40 -> beb9658dd
refs/heads/cassandra-3.0 aeca1d2bd -> 44f79bf2f
refs/heads/cassandra-3.11 2c111d15b -> a2399d4d3
refs/heads/trunk 0c5faef66 -> dd5251c46
Commitlog replay may fail if last mutation is within 4 bytes of end of segment
Patch by Jeff Jirsa; Reviewed by Branimir Lambov for CASSANDRA-13282
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/beb9658d
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/beb9658d
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/beb9658d
Branch: refs/heads/cassandra-2.2
Commit: beb9658dd5e18e3a6a4e8431b6549ae4c33365a9
Parents: 5ef8a8b
Author: Jeff Jirsa <je...@jeffjirsa.net>
Authored: Sun Mar 12 21:54:04 2017 -0700
Committer: Jeff Jirsa <je...@jeffjirsa.net>
Committed: Sun Mar 12 21:54:04 2017 -0700
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../db/commitlog/CommitLogReplayer.java | 11 +++++++++++
.../cassandra/db/commitlog/CommitLogTest.java | 20 ++++++++++++++++----
3 files changed, 28 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/beb9658d/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 09e4039..2839291 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -9,6 +9,7 @@
* Fix failing COPY TO STDOUT (CASSANDRA-12497)
* Fix ColumnCounter::countAll behaviour for reverse queries (CASSANDRA-13222)
* Exceptions encountered calling getSeeds() breaks OTC thread (CASSANDRA-13018)
+ * Commitlog replay may fail if last mutation is within 4 bytes of end of segment (CASSANDRA-13282)
Merged from 2.1:
* Remove unused repositories (CASSANDRA-13278)
* Log stacktrace of uncaught exceptions (CASSANDRA-13108)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/beb9658d/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
index a58aeb4..3cf4d0f 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
@@ -439,6 +439,17 @@ public class CommitLogReplayer
int serializedSize;
try
{
+ // We rely on reading serialized size == 0 (LEGACY_END_OF_SEGMENT_MARKER) to identify the end
+ // of a segment, which happens naturally due to the 0 padding of the empty segment on creation.
+ // However, with 2.1 era commitlogs it's possible that the last mutation ended less than 4 bytes
+ // from the end of the file, which means that we'll be unable to read an a full int and instead
+ // read an EOF here
+ if(end - reader.getFilePointer() < 4)
+ {
+ logger.trace("Not enough bytes left for another mutation in this CommitLog segment, continuing");
+ return false;
+ }
+
// any of the reads may hit EOF
serializedSize = reader.readInt();
if (serializedSize == LEGACY_END_OF_SEGMENT_MARKER)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/beb9658d/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java b/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
index 9999b42..9b63885 100644
--- a/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
+++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
@@ -137,12 +137,24 @@ public class CommitLogTest
}
@Test
+ public void testRecoveryWithShortPadding() throws Exception
+ {
+ // If we have 0-3 bytes remaining, commitlog replayer
+ // should pass, because there's insufficient room
+ // left in the segment for the legacy size marker.
+ testRecovery(new byte[1], null);
+ testRecovery(new byte[2], null);
+ testRecovery(new byte[3], null);
+ }
+
+ @Test
public void testRecoveryWithShortSize() throws Exception
{
- runExpecting(new WrappedRunnable() {
- public void runMayThrow() throws Exception
- {
- testRecovery(new byte[2], CommitLogDescriptor.VERSION_20);
+ runExpecting(new WrappedRunnable() {
+ public void runMayThrow() throws Exception {
+ byte[] data = new byte[5];
+ data[3] = 1; // Not a legacy marker, give it a fake (short) size
+ testRecovery(data, CommitLogDescriptor.VERSION_20);
}
}, CommitLogReplayException.class);
}
[09/10] cassandra git commit: Merge branch 'cassandra-3.0' into
cassandra-3.11
Posted by jj...@apache.org.
Merge branch 'cassandra-3.0' into cassandra-3.11
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/a2399d4d
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/a2399d4d
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/a2399d4d
Branch: refs/heads/trunk
Commit: a2399d4d309ac6b60a150ea20af8dc6f006d51ff
Parents: 2c111d1 44f79bf
Author: Jeff Jirsa <je...@jeffjirsa.net>
Authored: Sun Mar 12 21:56:11 2017 -0700
Committer: Jeff Jirsa <je...@jeffjirsa.net>
Committed: Sun Mar 12 21:57:25 2017 -0700
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../cassandra/db/commitlog/CommitLogReader.java | 12 ++++++++++++
.../apache/cassandra/db/commitlog/CommitLogTest.java | 15 ++++++++++++++-
3 files changed, 27 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a2399d4d/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 302a028,140c860..ab28dd4
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -33,140 -43,6 +33,141 @@@ Merged from 3.0
live rows in sstabledump (CASSANDRA-13177)
* Provide user workaround when system_schema.columns does not contain entries
for a table that's in system_schema.tables (CASSANDRA-13180)
+Merged from 2.2:
++ * Commitlog replay may fail if last mutation is within 4 bytes of end of segment (CASSANDRA-13282)
+ * Fix queries updating multiple time the same list (CASSANDRA-13130)
+ * Fix GRANT/REVOKE when keyspace isn't specified (CASSANDRA-13053)
+ * Fix flaky LongLeveledCompactionStrategyTest (CASSANDRA-12202)
+ * Fix failing COPY TO STDOUT (CASSANDRA-12497)
+ * Fix ColumnCounter::countAll behaviour for reverse queries (CASSANDRA-13222)
+ * Exceptions encountered calling getSeeds() breaks OTC thread (CASSANDRA-13018)
+ * Fix negative mean latency metric (CASSANDRA-12876)
+ * Use only one file pointer when creating commitlog segments (CASSANDRA-12539)
+Merged from 2.1:
+ * Remove unused repositories (CASSANDRA-13278)
+ * Log stacktrace of uncaught exceptions (CASSANDRA-13108)
+ * Use portable stderr for java error in startup (CASSANDRA-13211)
+ * Fix Thread Leak in OutboundTcpConnection (CASSANDRA-13204)
+ * Coalescing strategy can enter infinite loop (CASSANDRA-13159)
+
+
+3.10
+ * Fix secondary index queries regression (CASSANDRA-13013)
+ * Add duration type to the protocol V5 (CASSANDRA-12850)
+ * Fix duration type validation (CASSANDRA-13143)
+ * Fix flaky GcCompactionTest (CASSANDRA-12664)
+ * Fix TestHintedHandoff.hintedhandoff_decom_test (CASSANDRA-13058)
+ * Fixed query monitoring for range queries (CASSANDRA-13050)
+ * Remove outboundBindAny configuration property (CASSANDRA-12673)
+ * Use correct bounds for all-data range when filtering (CASSANDRA-12666)
+ * Remove timing window in test case (CASSANDRA-12875)
+ * Resolve unit testing without JCE security libraries installed (CASSANDRA-12945)
+ * Fix inconsistencies in cassandra-stress load balancing policy (CASSANDRA-12919)
+ * Fix validation of non-frozen UDT cells (CASSANDRA-12916)
+ * Don't shut down socket input/output on StreamSession (CASSANDRA-12903)
+ * Fix Murmur3PartitionerTest (CASSANDRA-12858)
+ * Move cqlsh syntax rules into separate module and allow easier customization (CASSANDRA-12897)
+ * Fix CommitLogSegmentManagerTest (CASSANDRA-12283)
+ * Fix cassandra-stress truncate option (CASSANDRA-12695)
+ * Fix crossNode value when receiving messages (CASSANDRA-12791)
+ * Don't load MX4J beans twice (CASSANDRA-12869)
+ * Extend native protocol request flags, add versions to SUPPORTED, and introduce ProtocolVersion enum (CASSANDRA-12838)
+ * Set JOINING mode when running pre-join tasks (CASSANDRA-12836)
+ * remove net.mintern.primitive library due to license issue (CASSANDRA-12845)
+ * Properly format IPv6 addresses when logging JMX service URL (CASSANDRA-12454)
+ * Optimize the vnode allocation for single replica per DC (CASSANDRA-12777)
+ * Use non-token restrictions for bounds when token restrictions are overridden (CASSANDRA-12419)
+ * Fix CQLSH auto completion for PER PARTITION LIMIT (CASSANDRA-12803)
+ * Use different build directories for Eclipse and Ant (CASSANDRA-12466)
+ * Avoid potential AttributeError in cqlsh due to no table metadata (CASSANDRA-12815)
+ * Fix RandomReplicationAwareTokenAllocatorTest.testExistingCluster (CASSANDRA-12812)
+ * Upgrade commons-codec to 1.9 (CASSANDRA-12790)
+ * Make the fanout size for LeveledCompactionStrategy to be configurable (CASSANDRA-11550)
+ * Add duration data type (CASSANDRA-11873)
+ * Fix timeout in ReplicationAwareTokenAllocatorTest (CASSANDRA-12784)
+ * Improve sum aggregate functions (CASSANDRA-12417)
+ * Make cassandra.yaml docs for batch_size_*_threshold_in_kb reflect changes in CASSANDRA-10876 (CASSANDRA-12761)
+ * cqlsh fails to format collections when using aliases (CASSANDRA-11534)
+ * Check for hash conflicts in prepared statements (CASSANDRA-12733)
+ * Exit query parsing upon first error (CASSANDRA-12598)
+ * Fix cassandra-stress to use single seed in UUID generation (CASSANDRA-12729)
+ * CQLSSTableWriter does not allow Update statement (CASSANDRA-12450)
+ * Config class uses boxed types but DD exposes primitive types (CASSANDRA-12199)
+ * Add pre- and post-shutdown hooks to Storage Service (CASSANDRA-12461)
+ * Add hint delivery metrics (CASSANDRA-12693)
+ * Remove IndexInfo cache from FileIndexInfoRetriever (CASSANDRA-12731)
+ * ColumnIndex does not reuse buffer (CASSANDRA-12502)
+ * cdc column addition still breaks schema migration tasks (CASSANDRA-12697)
+ * Upgrade metrics-reporter dependencies (CASSANDRA-12089)
+ * Tune compaction thread count via nodetool (CASSANDRA-12248)
+ * Add +=/-= shortcut syntax for update queries (CASSANDRA-12232)
+ * Include repair session IDs in repair start message (CASSANDRA-12532)
+ * Add a blocking task to Index, run before joining the ring (CASSANDRA-12039)
+ * Fix NPE when using CQLSSTableWriter (CASSANDRA-12667)
+ * Support optional backpressure strategies at the coordinator (CASSANDRA-9318)
+ * Make randompartitioner work with new vnode allocation (CASSANDRA-12647)
+ * Fix cassandra-stress graphing (CASSANDRA-12237)
+ * Allow filtering on partition key columns for queries without secondary indexes (CASSANDRA-11031)
+ * Fix Cassandra Stress reporting thread model and precision (CASSANDRA-12585)
+ * Add JMH benchmarks.jar (CASSANDRA-12586)
+ * Cleanup uses of AlterTableStatementColumn (CASSANDRA-12567)
+ * Add keep-alive to streaming (CASSANDRA-11841)
+ * Tracing payload is passed through newSession(..) (CASSANDRA-11706)
+ * avoid deleting non existing sstable files and improve related log messages (CASSANDRA-12261)
+ * json/yaml output format for nodetool compactionhistory (CASSANDRA-12486)
+ * Retry all internode messages once after a connection is
+ closed and reopened (CASSANDRA-12192)
+ * Add support to rebuild from targeted replica (CASSANDRA-9875)
+ * Add sequence distribution type to cassandra stress (CASSANDRA-12490)
+ * "SELECT * FROM foo LIMIT ;" does not error out (CASSANDRA-12154)
+ * Define executeLocally() at the ReadQuery Level (CASSANDRA-12474)
+ * Extend read/write failure messages with a map of replica addresses
+ to error codes in the v5 native protocol (CASSANDRA-12311)
+ * Fix rebuild of SASI indexes with existing index files (CASSANDRA-12374)
+ * Let DatabaseDescriptor not implicitly startup services (CASSANDRA-9054, 12550)
+ * Fix clustering indexes in presence of static columns in SASI (CASSANDRA-12378)
+ * Fix queries on columns with reversed type on SASI indexes (CASSANDRA-12223)
+ * Added slow query log (CASSANDRA-12403)
+ * Count full coordinated request against timeout (CASSANDRA-12256)
+ * Allow TTL with null value on insert and update (CASSANDRA-12216)
+ * Make decommission operation resumable (CASSANDRA-12008)
+ * Add support to one-way targeted repair (CASSANDRA-9876)
+ * Remove clientutil jar (CASSANDRA-11635)
+ * Fix compaction throughput throttle (CASSANDRA-12366, CASSANDRA-12717)
+ * Delay releasing Memtable memory on flush until PostFlush has finished running (CASSANDRA-12358)
+ * Cassandra stress should dump all setting on startup (CASSANDRA-11914)
+ * Make it possible to compact a given token range (CASSANDRA-10643)
+ * Allow updating DynamicEndpointSnitch properties via JMX (CASSANDRA-12179)
+ * Collect metrics on queries by consistency level (CASSANDRA-7384)
+ * Add support for GROUP BY to SELECT statement (CASSANDRA-10707)
+ * Deprecate memtable_cleanup_threshold and update default for memtable_flush_writers (CASSANDRA-12228)
+ * Upgrade to OHC 0.4.4 (CASSANDRA-12133)
+ * Add version command to cassandra-stress (CASSANDRA-12258)
+ * Create compaction-stress tool (CASSANDRA-11844)
+ * Garbage-collecting compaction operation and schema option (CASSANDRA-7019)
+ * Add beta protocol flag for v5 native protocol (CASSANDRA-12142)
+ * Support filtering on non-PRIMARY KEY columns in the CREATE
+ MATERIALIZED VIEW statement's WHERE clause (CASSANDRA-10368)
+ * Unify STDOUT and SYSTEMLOG logback format (CASSANDRA-12004)
+ * COPY FROM should raise error for non-existing input files (CASSANDRA-12174)
+ * Faster write path (CASSANDRA-12269)
+ * Option to leave omitted columns in INSERT JSON unset (CASSANDRA-11424)
+ * Support json/yaml output in nodetool tpstats (CASSANDRA-12035)
+ * Expose metrics for successful/failed authentication attempts (CASSANDRA-10635)
+ * Prepend snapshot name with "truncated" or "dropped" when a snapshot
+ is taken before truncating or dropping a table (CASSANDRA-12178)
+ * Optimize RestrictionSet (CASSANDRA-12153)
+ * cqlsh does not automatically downgrade CQL version (CASSANDRA-12150)
+ * Omit (de)serialization of state variable in UDAs (CASSANDRA-9613)
+ * Create a system table to expose prepared statements (CASSANDRA-8831)
+ * Reuse DataOutputBuffer from ColumnIndex (CASSANDRA-11970)
+ * Remove DatabaseDescriptor dependency from SegmentedFile (CASSANDRA-11580)
+ * Add supplied username to authentication error messages (CASSANDRA-12076)
+ * Remove pre-startup check for open JMX port (CASSANDRA-12074)
+ * Remove compaction Severity from DynamicEndpointSnitch (CASSANDRA-11738)
+ * Restore resumable hints delivery (CASSANDRA-11960)
+ * Properly report LWT contention (CASSANDRA-12626)
+Merged from 3.0:
* Dump threads when unit tests time out (CASSANDRA-13117)
* Better error when modifying function permissions without explicit keyspace (CASSANDRA-12925)
* Indexer is not correctly invoked when building indexes over sstables (CASSANDRA-13075)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a2399d4d/src/java/org/apache/cassandra/db/commitlog/CommitLogReader.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/commitlog/CommitLogReader.java
index e6e2e1a,0000000..d1cb8d6
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogReader.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogReader.java
@@@ -1,502 -1,0 +1,514 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.commitlog;
+
+import java.io.*;
+import java.util.*;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.zip.CRC32;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.Mutation;
+import org.apache.cassandra.db.UnknownColumnFamilyException;
+import org.apache.cassandra.db.commitlog.CommitLogReadHandler.CommitLogReadErrorReason;
+import org.apache.cassandra.db.commitlog.CommitLogReadHandler.CommitLogReadException;
+import org.apache.cassandra.db.partitions.PartitionUpdate;
+import org.apache.cassandra.db.rows.SerializationHelper;
+import org.apache.cassandra.io.util.DataInputBuffer;
+import org.apache.cassandra.io.util.FileDataInput;
+import org.apache.cassandra.io.util.RandomAccessReader;
+import org.apache.cassandra.io.util.RebufferingInputStream;
+import org.apache.cassandra.utils.JVMStabilityInspector;
+
+import static org.apache.cassandra.utils.FBUtilities.updateChecksumInt;
+
+public class CommitLogReader
+{
+ private static final Logger logger = LoggerFactory.getLogger(CommitLogReader.class);
+
+ private static final int LEGACY_END_OF_SEGMENT_MARKER = 0;
+
+ @VisibleForTesting
+ public static final int ALL_MUTATIONS = -1;
+ private final CRC32 checksum;
+ private final Map<UUID, AtomicInteger> invalidMutations;
+
+ private byte[] buffer;
+
+ public CommitLogReader()
+ {
+ checksum = new CRC32();
+ invalidMutations = new HashMap<>();
+ buffer = new byte[4096];
+ }
+
+ public Set<Map.Entry<UUID, AtomicInteger>> getInvalidMutations()
+ {
+ return invalidMutations.entrySet();
+ }
+
+ /**
+ * Reads all passed in files with no minimum, no start, and no mutation limit.
+ */
+ public void readAllFiles(CommitLogReadHandler handler, File[] files) throws IOException
+ {
+ readAllFiles(handler, files, CommitLogPosition.NONE);
+ }
+
+ /**
+ * Reads all passed in files with minPosition, no start, and no mutation limit.
+ */
+ public void readAllFiles(CommitLogReadHandler handler, File[] files, CommitLogPosition minPosition) throws IOException
+ {
+ for (int i = 0; i < files.length; i++)
+ readCommitLogSegment(handler, files[i], minPosition, ALL_MUTATIONS, i + 1 == files.length);
+ }
+
+ /**
+ * Reads passed in file fully
+ */
+ public void readCommitLogSegment(CommitLogReadHandler handler, File file, boolean tolerateTruncation) throws IOException
+ {
+ readCommitLogSegment(handler, file, CommitLogPosition.NONE, ALL_MUTATIONS, tolerateTruncation);
+ }
+
+ /**
+ * Reads passed in file fully, up to mutationLimit count
+ */
+ @VisibleForTesting
+ public void readCommitLogSegment(CommitLogReadHandler handler, File file, int mutationLimit, boolean tolerateTruncation) throws IOException
+ {
+ readCommitLogSegment(handler, file, CommitLogPosition.NONE, mutationLimit, tolerateTruncation);
+ }
+
+ /**
+ * Reads mutations from file, handing them off to handler
+ * @param handler Handler that will take action based on deserialized Mutations
+ * @param file CommitLogSegment file to read
+ * @param minPosition Optional minimum CommitLogPosition - all segments with id > or matching w/greater position will be read
+ * @param mutationLimit Optional limit on # of mutations to replay. Local ALL_MUTATIONS serves as marker to play all.
+ * @param tolerateTruncation Whether or not we should allow truncation of this file or throw if EOF found
+ *
+ * @throws IOException
+ */
+ public void readCommitLogSegment(CommitLogReadHandler handler,
+ File file,
+ CommitLogPosition minPosition,
+ int mutationLimit,
+ boolean tolerateTruncation) throws IOException
+ {
+ // just transform from the file name (no reading of headers) to determine version
+ CommitLogDescriptor desc = CommitLogDescriptor.fromFileName(file.getName());
+
+ try(RandomAccessReader reader = RandomAccessReader.open(file))
+ {
+ if (desc.version < CommitLogDescriptor.VERSION_21)
+ {
+ if (!shouldSkipSegmentId(file, desc, minPosition))
+ {
+ if (minPosition.segmentId == desc.id)
+ reader.seek(minPosition.position);
+ ReadStatusTracker statusTracker = new ReadStatusTracker(mutationLimit, tolerateTruncation);
+ statusTracker.errorContext = desc.fileName();
+ readSection(handler, reader, minPosition, (int) reader.length(), statusTracker, desc);
+ }
+ return;
+ }
+
+ final long segmentIdFromFilename = desc.id;
+ try
+ {
+ // The following call can either throw or legitimately return null. For either case, we need to check
+ // desc outside this block and set it to null in the exception case.
+ desc = CommitLogDescriptor.readHeader(reader, DatabaseDescriptor.getEncryptionContext());
+ }
+ catch (Exception e)
+ {
+ desc = null;
+ }
+ if (desc == null)
+ {
+ // don't care about whether or not the handler thinks we can continue. We can't w/out descriptor.
+ handler.handleUnrecoverableError(new CommitLogReadException(
+ String.format("Could not read commit log descriptor in file %s", file),
+ CommitLogReadErrorReason.UNRECOVERABLE_DESCRIPTOR_ERROR,
+ false));
+ return;
+ }
+
+ if (segmentIdFromFilename != desc.id)
+ {
+ if (handler.shouldSkipSegmentOnError(new CommitLogReadException(String.format(
+ "Segment id mismatch (filename %d, descriptor %d) in file %s", segmentIdFromFilename, desc.id, file),
+ CommitLogReadErrorReason.RECOVERABLE_DESCRIPTOR_ERROR,
+ false)))
+ {
+ return;
+ }
+ }
+
+ if (shouldSkipSegmentId(file, desc, minPosition))
+ return;
+
+ CommitLogSegmentReader segmentReader;
+ try
+ {
+ segmentReader = new CommitLogSegmentReader(handler, desc, reader, tolerateTruncation);
+ }
+ catch(Exception e)
+ {
+ handler.handleUnrecoverableError(new CommitLogReadException(
+ String.format("Unable to create segment reader for commit log file: %s", e),
+ CommitLogReadErrorReason.UNRECOVERABLE_UNKNOWN_ERROR,
+ tolerateTruncation));
+ return;
+ }
+
+ try
+ {
+ ReadStatusTracker statusTracker = new ReadStatusTracker(mutationLimit, tolerateTruncation);
+ for (CommitLogSegmentReader.SyncSegment syncSegment : segmentReader)
+ {
+ // Only tolerate truncation if we allow in both global and segment
+ statusTracker.tolerateErrorsInSection = tolerateTruncation & syncSegment.toleratesErrorsInSection;
+
+ // Skip segments that are completely behind the desired minPosition
+ if (desc.id == minPosition.segmentId && syncSegment.endPosition < minPosition.position)
+ continue;
+
+ statusTracker.errorContext = String.format("Next section at %d in %s", syncSegment.fileStartPosition, desc.fileName());
+
+ readSection(handler, syncSegment.input, minPosition, syncSegment.endPosition, statusTracker, desc);
+ if (!statusTracker.shouldContinue())
+ break;
+ }
+ }
+ // Unfortunately AbstractIterator cannot throw a checked exception, so we check to see if a RuntimeException
+ // is wrapping an IOException.
+ catch (RuntimeException re)
+ {
+ if (re.getCause() instanceof IOException)
+ throw (IOException) re.getCause();
+ throw re;
+ }
+ logger.debug("Finished reading {}", file);
+ }
+ }
+
+ /**
+ * Any segment with id >= minPosition.segmentId is a candidate for read.
+ */
+ private boolean shouldSkipSegmentId(File file, CommitLogDescriptor desc, CommitLogPosition minPosition)
+ {
+ logger.debug("Reading {} (CL version {}, messaging version {}, compression {})",
+ file.getPath(),
+ desc.version,
+ desc.getMessagingVersion(),
+ desc.compression);
+
+ if (minPosition.segmentId > desc.id)
+ {
+ logger.trace("Skipping read of fully-flushed {}", file);
+ return true;
+ }
+ return false;
+ }
+
+ /**
+ * Reads a section of a file containing mutations
+ *
+ * @param handler Handler that will take action based on deserialized Mutations
+ * @param reader FileDataInput / logical buffer containing commitlog mutations
+ * @param minPosition CommitLogPosition indicating when we should start actively replaying mutations
+ * @param end logical numeric end of the segment being read
+ * @param statusTracker ReadStatusTracker with current state of mutation count, error state, etc
+ * @param desc Descriptor for CommitLog serialization
+ */
+ private void readSection(CommitLogReadHandler handler,
+ FileDataInput reader,
+ CommitLogPosition minPosition,
+ int end,
+ ReadStatusTracker statusTracker,
+ CommitLogDescriptor desc) throws IOException
+ {
+ // seek rather than deserializing mutation-by-mutation to reach the desired minPosition in this SyncSegment
+ if (desc.id == minPosition.segmentId && reader.getFilePointer() < minPosition.position)
+ reader.seek(minPosition.position);
+
+ while (statusTracker.shouldContinue() && reader.getFilePointer() < end && !reader.isEOF())
+ {
+ long mutationStart = reader.getFilePointer();
+ if (logger.isTraceEnabled())
+ logger.trace("Reading mutation at {}", mutationStart);
+
+ long claimedCRC32;
+ int serializedSize;
+ try
+ {
++ // We rely on reading serialized size == 0 (LEGACY_END_OF_SEGMENT_MARKER) to identify the end
++ // of a segment, which happens naturally due to the 0 padding of the empty segment on creation.
++ // However, it's possible with 2.1 era commitlogs that the last mutation ended less than 4 bytes
++ // from the end of the file, which means that we'll be unable to read an a full int and instead
++ // read an EOF here
++ if(end - reader.getFilePointer() < 4)
++ {
++ logger.trace("Not enough bytes left for another mutation in this CommitLog segment, continuing");
++ statusTracker.requestTermination();
++ return;
++ }
++
+ // any of the reads may hit EOF
+ serializedSize = reader.readInt();
+ if (serializedSize == LEGACY_END_OF_SEGMENT_MARKER)
+ {
+ logger.trace("Encountered end of segment marker at {}", reader.getFilePointer());
+ statusTracker.requestTermination();
+ return;
+ }
+
+ // Mutation must be at LEAST 10 bytes:
+ // 3 for a non-empty Keyspace
+ // 3 for a Key (including the 2-byte length from writeUTF/writeWithShortLength)
+ // 4 bytes for column count.
+ // This prevents CRC by being fooled by special-case garbage in the file; see CASSANDRA-2128
+ if (serializedSize < 10)
+ {
+ if (handler.shouldSkipSegmentOnError(new CommitLogReadException(
+ String.format("Invalid mutation size %d at %d in %s", serializedSize, mutationStart, statusTracker.errorContext),
+ CommitLogReadErrorReason.MUTATION_ERROR,
+ statusTracker.tolerateErrorsInSection)))
+ {
+ statusTracker.requestTermination();
+ }
+ return;
+ }
+
+ long claimedSizeChecksum = CommitLogFormat.calculateClaimedChecksum(reader, desc.version);
+ checksum.reset();
+ CommitLogFormat.updateChecksum(checksum, serializedSize, desc.version);
+
+ if (checksum.getValue() != claimedSizeChecksum)
+ {
+ if (handler.shouldSkipSegmentOnError(new CommitLogReadException(
+ String.format("Mutation size checksum failure at %d in %s", mutationStart, statusTracker.errorContext),
+ CommitLogReadErrorReason.MUTATION_ERROR,
+ statusTracker.tolerateErrorsInSection)))
+ {
+ statusTracker.requestTermination();
+ }
+ return;
+ }
+
+ if (serializedSize > buffer.length)
+ buffer = new byte[(int) (1.2 * serializedSize)];
+ reader.readFully(buffer, 0, serializedSize);
+
+ claimedCRC32 = CommitLogFormat.calculateClaimedCRC32(reader, desc.version);
+ }
+ catch (EOFException eof)
+ {
+ if (handler.shouldSkipSegmentOnError(new CommitLogReadException(
+ String.format("Unexpected end of segment at %d in %s", mutationStart, statusTracker.errorContext),
+ CommitLogReadErrorReason.EOF,
+ statusTracker.tolerateErrorsInSection)))
+ {
+ statusTracker.requestTermination();
+ }
+ return;
+ }
+
+ checksum.update(buffer, 0, serializedSize);
+ if (claimedCRC32 != checksum.getValue())
+ {
+ if (handler.shouldSkipSegmentOnError(new CommitLogReadException(
+ String.format("Mutation checksum failure at %d in %s", mutationStart, statusTracker.errorContext),
+ CommitLogReadErrorReason.MUTATION_ERROR,
+ statusTracker.tolerateErrorsInSection)))
+ {
+ statusTracker.requestTermination();
+ }
+ continue;
+ }
+
+ long mutationPosition = reader.getFilePointer();
+ readMutation(handler, buffer, serializedSize, minPosition, (int)mutationPosition, desc);
+
+ // Only count this as a processed mutation if it is after our min as we suppress reading of mutations that
+ // are before this mark.
+ if (mutationPosition >= minPosition.position)
+ statusTracker.addProcessedMutation();
+ }
+ }
+
+ /**
+ * Deserializes and passes a Mutation to the ICommitLogReadHandler requested
+ *
+ * @param handler Handler that will take action based on deserialized Mutations
+ * @param inputBuffer raw byte array w/Mutation data
+ * @param size deserialized size of mutation
+ * @param minPosition We need to suppress replay of mutations that are before the required minPosition
+ * @param entryLocation filePointer offset of mutation within CommitLogSegment
+ * @param desc CommitLogDescriptor being worked on
+ */
+ @VisibleForTesting
+ protected void readMutation(CommitLogReadHandler handler,
+ byte[] inputBuffer,
+ int size,
+ CommitLogPosition minPosition,
+ final int entryLocation,
+ final CommitLogDescriptor desc) throws IOException
+ {
+ // For now, we need to go through the motions of deserializing the mutation to determine its size and move
+ // the file pointer forward accordingly, even if we're behind the requested minPosition within this SyncSegment.
+ boolean shouldReplay = entryLocation > minPosition.position;
+
+ final Mutation mutation;
+ try (RebufferingInputStream bufIn = new DataInputBuffer(inputBuffer, 0, size))
+ {
+ mutation = Mutation.serializer.deserialize(bufIn,
+ desc.getMessagingVersion(),
+ SerializationHelper.Flag.LOCAL);
+ // doublecheck that what we read is still] valid for the current schema
+ for (PartitionUpdate upd : mutation.getPartitionUpdates())
+ upd.validate();
+ }
+ catch (UnknownColumnFamilyException ex)
+ {
+ if (ex.cfId == null)
+ return;
+ AtomicInteger i = invalidMutations.get(ex.cfId);
+ if (i == null)
+ {
+ i = new AtomicInteger(1);
+ invalidMutations.put(ex.cfId, i);
+ }
+ else
+ i.incrementAndGet();
+ return;
+ }
+ catch (Throwable t)
+ {
+ JVMStabilityInspector.inspectThrowable(t);
+ File f = File.createTempFile("mutation", "dat");
+
+ try (DataOutputStream out = new DataOutputStream(new FileOutputStream(f)))
+ {
+ out.write(inputBuffer, 0, size);
+ }
+
+ // Checksum passed so this error can't be permissible.
+ handler.handleUnrecoverableError(new CommitLogReadException(
+ String.format(
+ "Unexpected error deserializing mutation; saved to %s. " +
+ "This may be caused by replaying a mutation against a table with the same name but incompatible schema. " +
+ "Exception follows: %s", f.getAbsolutePath(), t),
+ CommitLogReadErrorReason.MUTATION_ERROR,
+ false));
+ return;
+ }
+
+ if (logger.isTraceEnabled())
+ logger.trace("Read mutation for {}.{}: {}", mutation.getKeyspaceName(), mutation.key(),
+ "{" + StringUtils.join(mutation.getPartitionUpdates().iterator(), ", ") + "}");
+
+ if (shouldReplay)
+ handler.handleMutation(mutation, size, entryLocation, desc);
+ }
+
+ /**
+ * Helper methods to deal with changing formats of internals of the CommitLog without polluting deserialization code.
+ */
+ private static class CommitLogFormat
+ {
+ public static long calculateClaimedChecksum(FileDataInput input, int commitLogVersion) throws IOException
+ {
+ switch (commitLogVersion)
+ {
+ case CommitLogDescriptor.VERSION_12:
+ case CommitLogDescriptor.VERSION_20:
+ return input.readLong();
+ // Changed format in 2.1
+ default:
+ return input.readInt() & 0xffffffffL;
+ }
+ }
+
+ public static void updateChecksum(CRC32 checksum, int serializedSize, int commitLogVersion)
+ {
+ switch (commitLogVersion)
+ {
+ case CommitLogDescriptor.VERSION_12:
+ checksum.update(serializedSize);
+ break;
+ // Changed format in 2.0
+ default:
+ updateChecksumInt(checksum, serializedSize);
+ break;
+ }
+ }
+
+ public static long calculateClaimedCRC32(FileDataInput input, int commitLogVersion) throws IOException
+ {
+ switch (commitLogVersion)
+ {
+ case CommitLogDescriptor.VERSION_12:
+ case CommitLogDescriptor.VERSION_20:
+ return input.readLong();
+ // Changed format in 2.1
+ default:
+ return input.readInt() & 0xffffffffL;
+ }
+ }
+ }
+
+ private static class ReadStatusTracker
+ {
+ private int mutationsLeft;
+ public String errorContext = "";
+ public boolean tolerateErrorsInSection;
+ private boolean error;
+
+ public ReadStatusTracker(int mutationLimit, boolean tolerateErrorsInSection)
+ {
+ this.mutationsLeft = mutationLimit;
+ this.tolerateErrorsInSection = tolerateErrorsInSection;
+ }
+
+ public void addProcessedMutation()
+ {
+ if (mutationsLeft == ALL_MUTATIONS)
+ return;
+ --mutationsLeft;
+ }
+
+ public boolean shouldContinue()
+ {
+ return !error && (mutationsLeft != 0 || mutationsLeft == ALL_MUTATIONS);
+ }
+
+ public void requestTermination()
+ {
+ error = true;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a2399d4d/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
index 5476d03,90dc258..4000fbf
--- a/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
+++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
@@@ -171,10 -143,23 +171,23 @@@ public class CommitLogTes
}
@Test
+ public void testRecoveryWithShortPadding() throws Exception
+ {
- // If we have 0-3 bytes remaining, commitlog replayer
- // should pass, because there's insufficient room
- // left in the segment for the legacy size marker.
- testRecovery(new byte[1], null);
- testRecovery(new byte[2], null);
- testRecovery(new byte[3], null);
++ // If we have 0-3 bytes remaining, commitlog replayer
++ // should pass, because there's insufficient room
++ // left in the segment for the legacy size marker.
++ testRecovery(new byte[1], null);
++ testRecovery(new byte[2], null);
++ testRecovery(new byte[3], null);
+ }
+
+ @Test
public void testRecoveryWithShortSize() throws Exception
{
+ byte[] data = new byte[5];
+ data[3] = 1; // Not a legacy marker, give it a fake (short) size
runExpecting(() -> {
- testRecovery(new byte[2], CommitLogDescriptor.VERSION_20);
+ testRecovery(data, CommitLogDescriptor.VERSION_20);
return null;
}, CommitLogReplayException.class);
}
[06/10] cassandra git commit: Merge branch 'cassandra-2.2' into
cassandra-3.0
Posted by jj...@apache.org.
Merge branch 'cassandra-2.2' into cassandra-3.0
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/44f79bf2
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/44f79bf2
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/44f79bf2
Branch: refs/heads/cassandra-3.0
Commit: 44f79bf2f7a3a05f802014492ecbec67c49c02d0
Parents: aeca1d2 beb9658
Author: Jeff Jirsa <je...@jeffjirsa.net>
Authored: Sun Mar 12 21:54:42 2017 -0700
Committer: Jeff Jirsa <je...@jeffjirsa.net>
Committed: Sun Mar 12 21:56:00 2017 -0700
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../cassandra/db/commitlog/CommitLogReplayer.java | 11 +++++++++++
.../apache/cassandra/db/commitlog/CommitLogTest.java | 15 ++++++++++++++-
3 files changed, 26 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/44f79bf2/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 52a794b,2839291..140c860
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,21 -1,6 +1,22 @@@
-2.2.10
+3.0.13
+ * Slice.isEmpty() returns false for some empty slices (CASSANDRA-13305)
+ * Add formatted row output to assertEmpty in CQL Tester (CASSANDRA-13238)
+Merged from 2.2:
++ * Commitlog replay may fail if last mutation is within 4 bytes of end of segment (CASSANDRA-13282)
* Fix queries updating multiple time the same list (CASSANDRA-13130)
* Fix GRANT/REVOKE when keyspace isn't specified (CASSANDRA-13053)
+
+
+3.0.12
+ * Prevent data loss on upgrade 2.1 - 3.0 by adding component separator to LogRecord absolute path (CASSANDRA-13294)
+ * Improve testing on macOS by eliminating sigar logging (CASSANDRA-13233)
+ * Cqlsh copy-from should error out when csv contains invalid data for collections (CASSANDRA-13071)
+ * Update c.yaml doc for offheap memtables (CASSANDRA-13179)
+ * Faster StreamingHistogram (CASSANDRA-13038)
+ * Legacy deserializer can create unexpected boundary range tombstones (CASSANDRA-13237)
+ * Remove unnecessary assertion from AntiCompactionTest (CASSANDRA-13070)
+ * Fix cqlsh COPY for dates before 1900 (CASSANDRA-13185)
+Merged from 2.2:
* Avoid race on receiver by starting streaming sender thread after sending init message (CASSANDRA-12886)
* Fix "multiple versions of ant detected..." when running ant test (CASSANDRA-13232)
* Coalescing strategy sleeps too much (CASSANDRA-13090)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/44f79bf2/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
index d53f0f8,3cf4d0f..205c36a
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
@@@ -483,6 -439,17 +483,17 @@@ public class CommitLogReplaye
int serializedSize;
try
{
+ // We rely on reading serialized size == 0 (LEGACY_END_OF_SEGMENT_MARKER) to identify the end
+ // of a segment, which happens naturally due to the 0 padding of the empty segment on creation.
- // However, with 2.1 era commitlogs it's possible that the last mutation ended less than 4 bytes
++ // However, it's possible with 2.1 era commitlogs that the last mutation ended less than 4 bytes
+ // from the end of the file, which means that we'll be unable to read an a full int and instead
+ // read an EOF here
+ if(end - reader.getFilePointer() < 4)
+ {
+ logger.trace("Not enough bytes left for another mutation in this CommitLog segment, continuing");
+ return false;
+ }
+
// any of the reads may hit EOF
serializedSize = reader.readInt();
if (serializedSize == LEGACY_END_OF_SEGMENT_MARKER)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/44f79bf2/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
index c4ab6ab,9b63885..90dc258
--- a/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
+++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
@@@ -143,11 -137,25 +143,24 @@@ public class CommitLogTes
}
@Test
+ public void testRecoveryWithShortPadding() throws Exception
+ {
+ // If we have 0-3 bytes remaining, commitlog replayer
+ // should pass, because there's insufficient room
+ // left in the segment for the legacy size marker.
+ testRecovery(new byte[1], null);
+ testRecovery(new byte[2], null);
+ testRecovery(new byte[3], null);
+ }
+
+ @Test
public void testRecoveryWithShortSize() throws Exception
{
- runExpecting(new WrappedRunnable() {
- public void runMayThrow() throws Exception {
- byte[] data = new byte[5];
- data[3] = 1; // Not a legacy marker, give it a fake (short) size
- testRecovery(data, CommitLogDescriptor.VERSION_20);
- }
++ byte[] data = new byte[5];
++ data[3] = 1; // Not a legacy marker, give it a fake (short) size
+ runExpecting(() -> {
- testRecovery(new byte[2], CommitLogDescriptor.VERSION_20);
++ testRecovery(data, CommitLogDescriptor.VERSION_20);
+ return null;
}, CommitLogReplayException.class);
}
[04/10] cassandra git commit: Commitlog replay may fail if last
mutation is within 4 bytes of end of segment
Posted by jj...@apache.org.
Commitlog replay may fail if last mutation is within 4 bytes of end of segment
Patch by Jeff Jirsa; Reviewed by Branimir Lambov for CASSANDRA-13282
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/beb9658d
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/beb9658d
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/beb9658d
Branch: refs/heads/trunk
Commit: beb9658dd5e18e3a6a4e8431b6549ae4c33365a9
Parents: 5ef8a8b
Author: Jeff Jirsa <je...@jeffjirsa.net>
Authored: Sun Mar 12 21:54:04 2017 -0700
Committer: Jeff Jirsa <je...@jeffjirsa.net>
Committed: Sun Mar 12 21:54:04 2017 -0700
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../db/commitlog/CommitLogReplayer.java | 11 +++++++++++
.../cassandra/db/commitlog/CommitLogTest.java | 20 ++++++++++++++++----
3 files changed, 28 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/beb9658d/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 09e4039..2839291 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -9,6 +9,7 @@
* Fix failing COPY TO STDOUT (CASSANDRA-12497)
* Fix ColumnCounter::countAll behaviour for reverse queries (CASSANDRA-13222)
* Exceptions encountered calling getSeeds() breaks OTC thread (CASSANDRA-13018)
+ * Commitlog replay may fail if last mutation is within 4 bytes of end of segment (CASSANDRA-13282)
Merged from 2.1:
* Remove unused repositories (CASSANDRA-13278)
* Log stacktrace of uncaught exceptions (CASSANDRA-13108)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/beb9658d/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
index a58aeb4..3cf4d0f 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
@@ -439,6 +439,17 @@ public class CommitLogReplayer
int serializedSize;
try
{
+ // We rely on reading serialized size == 0 (LEGACY_END_OF_SEGMENT_MARKER) to identify the end
+ // of a segment, which happens naturally due to the 0 padding of the empty segment on creation.
+ // However, with 2.1 era commitlogs it's possible that the last mutation ended less than 4 bytes
+ // from the end of the file, which means that we'll be unable to read an a full int and instead
+ // read an EOF here
+ if(end - reader.getFilePointer() < 4)
+ {
+ logger.trace("Not enough bytes left for another mutation in this CommitLog segment, continuing");
+ return false;
+ }
+
// any of the reads may hit EOF
serializedSize = reader.readInt();
if (serializedSize == LEGACY_END_OF_SEGMENT_MARKER)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/beb9658d/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java b/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
index 9999b42..9b63885 100644
--- a/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
+++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
@@ -137,12 +137,24 @@ public class CommitLogTest
}
@Test
+ public void testRecoveryWithShortPadding() throws Exception
+ {
+ // If we have 0-3 bytes remaining, commitlog replayer
+ // should pass, because there's insufficient room
+ // left in the segment for the legacy size marker.
+ testRecovery(new byte[1], null);
+ testRecovery(new byte[2], null);
+ testRecovery(new byte[3], null);
+ }
+
+ @Test
public void testRecoveryWithShortSize() throws Exception
{
- runExpecting(new WrappedRunnable() {
- public void runMayThrow() throws Exception
- {
- testRecovery(new byte[2], CommitLogDescriptor.VERSION_20);
+ runExpecting(new WrappedRunnable() {
+ public void runMayThrow() throws Exception {
+ byte[] data = new byte[5];
+ data[3] = 1; // Not a legacy marker, give it a fake (short) size
+ testRecovery(data, CommitLogDescriptor.VERSION_20);
}
}, CommitLogReplayException.class);
}
[05/10] cassandra git commit: Merge branch 'cassandra-2.2' into
cassandra-3.0
Posted by jj...@apache.org.
Merge branch 'cassandra-2.2' into cassandra-3.0
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/44f79bf2
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/44f79bf2
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/44f79bf2
Branch: refs/heads/cassandra-3.11
Commit: 44f79bf2f7a3a05f802014492ecbec67c49c02d0
Parents: aeca1d2 beb9658
Author: Jeff Jirsa <je...@jeffjirsa.net>
Authored: Sun Mar 12 21:54:42 2017 -0700
Committer: Jeff Jirsa <je...@jeffjirsa.net>
Committed: Sun Mar 12 21:56:00 2017 -0700
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../cassandra/db/commitlog/CommitLogReplayer.java | 11 +++++++++++
.../apache/cassandra/db/commitlog/CommitLogTest.java | 15 ++++++++++++++-
3 files changed, 26 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/44f79bf2/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 52a794b,2839291..140c860
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,21 -1,6 +1,22 @@@
-2.2.10
+3.0.13
+ * Slice.isEmpty() returns false for some empty slices (CASSANDRA-13305)
+ * Add formatted row output to assertEmpty in CQL Tester (CASSANDRA-13238)
+Merged from 2.2:
++ * Commitlog replay may fail if last mutation is within 4 bytes of end of segment (CASSANDRA-13282)
* Fix queries updating multiple time the same list (CASSANDRA-13130)
* Fix GRANT/REVOKE when keyspace isn't specified (CASSANDRA-13053)
+
+
+3.0.12
+ * Prevent data loss on upgrade 2.1 - 3.0 by adding component separator to LogRecord absolute path (CASSANDRA-13294)
+ * Improve testing on macOS by eliminating sigar logging (CASSANDRA-13233)
+ * Cqlsh copy-from should error out when csv contains invalid data for collections (CASSANDRA-13071)
+ * Update c.yaml doc for offheap memtables (CASSANDRA-13179)
+ * Faster StreamingHistogram (CASSANDRA-13038)
+ * Legacy deserializer can create unexpected boundary range tombstones (CASSANDRA-13237)
+ * Remove unnecessary assertion from AntiCompactionTest (CASSANDRA-13070)
+ * Fix cqlsh COPY for dates before 1900 (CASSANDRA-13185)
+Merged from 2.2:
* Avoid race on receiver by starting streaming sender thread after sending init message (CASSANDRA-12886)
* Fix "multiple versions of ant detected..." when running ant test (CASSANDRA-13232)
* Coalescing strategy sleeps too much (CASSANDRA-13090)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/44f79bf2/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
index d53f0f8,3cf4d0f..205c36a
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
@@@ -483,6 -439,17 +483,17 @@@ public class CommitLogReplaye
int serializedSize;
try
{
+ // We rely on reading serialized size == 0 (LEGACY_END_OF_SEGMENT_MARKER) to identify the end
+ // of a segment, which happens naturally due to the 0 padding of the empty segment on creation.
- // However, with 2.1 era commitlogs it's possible that the last mutation ended less than 4 bytes
++ // However, it's possible with 2.1 era commitlogs that the last mutation ended less than 4 bytes
+ // from the end of the file, which means that we'll be unable to read an a full int and instead
+ // read an EOF here
+ if(end - reader.getFilePointer() < 4)
+ {
+ logger.trace("Not enough bytes left for another mutation in this CommitLog segment, continuing");
+ return false;
+ }
+
// any of the reads may hit EOF
serializedSize = reader.readInt();
if (serializedSize == LEGACY_END_OF_SEGMENT_MARKER)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/44f79bf2/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
index c4ab6ab,9b63885..90dc258
--- a/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
+++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
@@@ -143,11 -137,25 +143,24 @@@ public class CommitLogTes
}
@Test
+ public void testRecoveryWithShortPadding() throws Exception
+ {
+ // If we have 0-3 bytes remaining, commitlog replayer
+ // should pass, because there's insufficient room
+ // left in the segment for the legacy size marker.
+ testRecovery(new byte[1], null);
+ testRecovery(new byte[2], null);
+ testRecovery(new byte[3], null);
+ }
+
+ @Test
public void testRecoveryWithShortSize() throws Exception
{
- runExpecting(new WrappedRunnable() {
- public void runMayThrow() throws Exception {
- byte[] data = new byte[5];
- data[3] = 1; // Not a legacy marker, give it a fake (short) size
- testRecovery(data, CommitLogDescriptor.VERSION_20);
- }
++ byte[] data = new byte[5];
++ data[3] = 1; // Not a legacy marker, give it a fake (short) size
+ runExpecting(() -> {
- testRecovery(new byte[2], CommitLogDescriptor.VERSION_20);
++ testRecovery(data, CommitLogDescriptor.VERSION_20);
+ return null;
}, CommitLogReplayException.class);
}
[10/10] cassandra git commit: Merge branch 'cassandra-3.11' into trunk
Posted by jj...@apache.org.
Merge branch 'cassandra-3.11' into trunk
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/dd5251c4
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/dd5251c4
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/dd5251c4
Branch: refs/heads/trunk
Commit: dd5251c46073d75c09e47f645b1d0ebc3a135411
Parents: 0c5faef a2399d4
Author: Jeff Jirsa <je...@jeffjirsa.net>
Authored: Sun Mar 12 21:57:37 2017 -0700
Committer: Jeff Jirsa <je...@jeffjirsa.net>
Committed: Sun Mar 12 21:58:36 2017 -0700
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../apache/cassandra/db/commitlog/CommitLogReader.java | 12 ++++++++++++
2 files changed, 13 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/dd5251c4/CHANGES.txt
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/dd5251c4/src/java/org/apache/cassandra/db/commitlog/CommitLogReader.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/commitlog/CommitLogReader.java
index 1da0cee,d1cb8d6..9eec477
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogReader.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogReader.java
@@@ -253,6 -265,18 +253,18 @@@ public class CommitLogReade
int serializedSize;
try
{
+ // We rely on reading serialized size == 0 (LEGACY_END_OF_SEGMENT_MARKER) to identify the end
+ // of a segment, which happens naturally due to the 0 padding of the empty segment on creation.
+ // However, it's possible with 2.1 era commitlogs that the last mutation ended less than 4 bytes
+ // from the end of the file, which means that we'll be unable to read an a full int and instead
+ // read an EOF here
+ if(end - reader.getFilePointer() < 4)
+ {
- logger.trace("Not enough bytes left for another mutation in this CommitLog segment, continuing");
++ logger.trace("Not enough bytes left for another mutation in this CommitLog section, continuing");
+ statusTracker.requestTermination();
+ return;
+ }
+
// any of the reads may hit EOF
serializedSize = reader.readInt();
if (serializedSize == LEGACY_END_OF_SEGMENT_MARKER)
[08/10] cassandra git commit: Merge branch 'cassandra-3.0' into
cassandra-3.11
Posted by jj...@apache.org.
Merge branch 'cassandra-3.0' into cassandra-3.11
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/a2399d4d
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/a2399d4d
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/a2399d4d
Branch: refs/heads/cassandra-3.11
Commit: a2399d4d309ac6b60a150ea20af8dc6f006d51ff
Parents: 2c111d1 44f79bf
Author: Jeff Jirsa <je...@jeffjirsa.net>
Authored: Sun Mar 12 21:56:11 2017 -0700
Committer: Jeff Jirsa <je...@jeffjirsa.net>
Committed: Sun Mar 12 21:57:25 2017 -0700
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../cassandra/db/commitlog/CommitLogReader.java | 12 ++++++++++++
.../apache/cassandra/db/commitlog/CommitLogTest.java | 15 ++++++++++++++-
3 files changed, 27 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a2399d4d/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 302a028,140c860..ab28dd4
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -33,140 -43,6 +33,141 @@@ Merged from 3.0
live rows in sstabledump (CASSANDRA-13177)
* Provide user workaround when system_schema.columns does not contain entries
for a table that's in system_schema.tables (CASSANDRA-13180)
+Merged from 2.2:
++ * Commitlog replay may fail if last mutation is within 4 bytes of end of segment (CASSANDRA-13282)
+ * Fix queries updating multiple time the same list (CASSANDRA-13130)
+ * Fix GRANT/REVOKE when keyspace isn't specified (CASSANDRA-13053)
+ * Fix flaky LongLeveledCompactionStrategyTest (CASSANDRA-12202)
+ * Fix failing COPY TO STDOUT (CASSANDRA-12497)
+ * Fix ColumnCounter::countAll behaviour for reverse queries (CASSANDRA-13222)
+ * Exceptions encountered calling getSeeds() breaks OTC thread (CASSANDRA-13018)
+ * Fix negative mean latency metric (CASSANDRA-12876)
+ * Use only one file pointer when creating commitlog segments (CASSANDRA-12539)
+Merged from 2.1:
+ * Remove unused repositories (CASSANDRA-13278)
+ * Log stacktrace of uncaught exceptions (CASSANDRA-13108)
+ * Use portable stderr for java error in startup (CASSANDRA-13211)
+ * Fix Thread Leak in OutboundTcpConnection (CASSANDRA-13204)
+ * Coalescing strategy can enter infinite loop (CASSANDRA-13159)
+
+
+3.10
+ * Fix secondary index queries regression (CASSANDRA-13013)
+ * Add duration type to the protocol V5 (CASSANDRA-12850)
+ * Fix duration type validation (CASSANDRA-13143)
+ * Fix flaky GcCompactionTest (CASSANDRA-12664)
+ * Fix TestHintedHandoff.hintedhandoff_decom_test (CASSANDRA-13058)
+ * Fixed query monitoring for range queries (CASSANDRA-13050)
+ * Remove outboundBindAny configuration property (CASSANDRA-12673)
+ * Use correct bounds for all-data range when filtering (CASSANDRA-12666)
+ * Remove timing window in test case (CASSANDRA-12875)
+ * Resolve unit testing without JCE security libraries installed (CASSANDRA-12945)
+ * Fix inconsistencies in cassandra-stress load balancing policy (CASSANDRA-12919)
+ * Fix validation of non-frozen UDT cells (CASSANDRA-12916)
+ * Don't shut down socket input/output on StreamSession (CASSANDRA-12903)
+ * Fix Murmur3PartitionerTest (CASSANDRA-12858)
+ * Move cqlsh syntax rules into separate module and allow easier customization (CASSANDRA-12897)
+ * Fix CommitLogSegmentManagerTest (CASSANDRA-12283)
+ * Fix cassandra-stress truncate option (CASSANDRA-12695)
+ * Fix crossNode value when receiving messages (CASSANDRA-12791)
+ * Don't load MX4J beans twice (CASSANDRA-12869)
+ * Extend native protocol request flags, add versions to SUPPORTED, and introduce ProtocolVersion enum (CASSANDRA-12838)
+ * Set JOINING mode when running pre-join tasks (CASSANDRA-12836)
+ * remove net.mintern.primitive library due to license issue (CASSANDRA-12845)
+ * Properly format IPv6 addresses when logging JMX service URL (CASSANDRA-12454)
+ * Optimize the vnode allocation for single replica per DC (CASSANDRA-12777)
+ * Use non-token restrictions for bounds when token restrictions are overridden (CASSANDRA-12419)
+ * Fix CQLSH auto completion for PER PARTITION LIMIT (CASSANDRA-12803)
+ * Use different build directories for Eclipse and Ant (CASSANDRA-12466)
+ * Avoid potential AttributeError in cqlsh due to no table metadata (CASSANDRA-12815)
+ * Fix RandomReplicationAwareTokenAllocatorTest.testExistingCluster (CASSANDRA-12812)
+ * Upgrade commons-codec to 1.9 (CASSANDRA-12790)
+ * Make the fanout size for LeveledCompactionStrategy to be configurable (CASSANDRA-11550)
+ * Add duration data type (CASSANDRA-11873)
+ * Fix timeout in ReplicationAwareTokenAllocatorTest (CASSANDRA-12784)
+ * Improve sum aggregate functions (CASSANDRA-12417)
+ * Make cassandra.yaml docs for batch_size_*_threshold_in_kb reflect changes in CASSANDRA-10876 (CASSANDRA-12761)
+ * cqlsh fails to format collections when using aliases (CASSANDRA-11534)
+ * Check for hash conflicts in prepared statements (CASSANDRA-12733)
+ * Exit query parsing upon first error (CASSANDRA-12598)
+ * Fix cassandra-stress to use single seed in UUID generation (CASSANDRA-12729)
+ * CQLSSTableWriter does not allow Update statement (CASSANDRA-12450)
+ * Config class uses boxed types but DD exposes primitive types (CASSANDRA-12199)
+ * Add pre- and post-shutdown hooks to Storage Service (CASSANDRA-12461)
+ * Add hint delivery metrics (CASSANDRA-12693)
+ * Remove IndexInfo cache from FileIndexInfoRetriever (CASSANDRA-12731)
+ * ColumnIndex does not reuse buffer (CASSANDRA-12502)
+ * cdc column addition still breaks schema migration tasks (CASSANDRA-12697)
+ * Upgrade metrics-reporter dependencies (CASSANDRA-12089)
+ * Tune compaction thread count via nodetool (CASSANDRA-12248)
+ * Add +=/-= shortcut syntax for update queries (CASSANDRA-12232)
+ * Include repair session IDs in repair start message (CASSANDRA-12532)
+ * Add a blocking task to Index, run before joining the ring (CASSANDRA-12039)
+ * Fix NPE when using CQLSSTableWriter (CASSANDRA-12667)
+ * Support optional backpressure strategies at the coordinator (CASSANDRA-9318)
+ * Make randompartitioner work with new vnode allocation (CASSANDRA-12647)
+ * Fix cassandra-stress graphing (CASSANDRA-12237)
+ * Allow filtering on partition key columns for queries without secondary indexes (CASSANDRA-11031)
+ * Fix Cassandra Stress reporting thread model and precision (CASSANDRA-12585)
+ * Add JMH benchmarks.jar (CASSANDRA-12586)
+ * Cleanup uses of AlterTableStatementColumn (CASSANDRA-12567)
+ * Add keep-alive to streaming (CASSANDRA-11841)
+ * Tracing payload is passed through newSession(..) (CASSANDRA-11706)
+ * avoid deleting non existing sstable files and improve related log messages (CASSANDRA-12261)
+ * json/yaml output format for nodetool compactionhistory (CASSANDRA-12486)
+ * Retry all internode messages once after a connection is
+ closed and reopened (CASSANDRA-12192)
+ * Add support to rebuild from targeted replica (CASSANDRA-9875)
+ * Add sequence distribution type to cassandra stress (CASSANDRA-12490)
+ * "SELECT * FROM foo LIMIT ;" does not error out (CASSANDRA-12154)
+ * Define executeLocally() at the ReadQuery Level (CASSANDRA-12474)
+ * Extend read/write failure messages with a map of replica addresses
+ to error codes in the v5 native protocol (CASSANDRA-12311)
+ * Fix rebuild of SASI indexes with existing index files (CASSANDRA-12374)
+ * Let DatabaseDescriptor not implicitly startup services (CASSANDRA-9054, 12550)
+ * Fix clustering indexes in presence of static columns in SASI (CASSANDRA-12378)
+ * Fix queries on columns with reversed type on SASI indexes (CASSANDRA-12223)
+ * Added slow query log (CASSANDRA-12403)
+ * Count full coordinated request against timeout (CASSANDRA-12256)
+ * Allow TTL with null value on insert and update (CASSANDRA-12216)
+ * Make decommission operation resumable (CASSANDRA-12008)
+ * Add support to one-way targeted repair (CASSANDRA-9876)
+ * Remove clientutil jar (CASSANDRA-11635)
+ * Fix compaction throughput throttle (CASSANDRA-12366, CASSANDRA-12717)
+ * Delay releasing Memtable memory on flush until PostFlush has finished running (CASSANDRA-12358)
+ * Cassandra stress should dump all setting on startup (CASSANDRA-11914)
+ * Make it possible to compact a given token range (CASSANDRA-10643)
+ * Allow updating DynamicEndpointSnitch properties via JMX (CASSANDRA-12179)
+ * Collect metrics on queries by consistency level (CASSANDRA-7384)
+ * Add support for GROUP BY to SELECT statement (CASSANDRA-10707)
+ * Deprecate memtable_cleanup_threshold and update default for memtable_flush_writers (CASSANDRA-12228)
+ * Upgrade to OHC 0.4.4 (CASSANDRA-12133)
+ * Add version command to cassandra-stress (CASSANDRA-12258)
+ * Create compaction-stress tool (CASSANDRA-11844)
+ * Garbage-collecting compaction operation and schema option (CASSANDRA-7019)
+ * Add beta protocol flag for v5 native protocol (CASSANDRA-12142)
+ * Support filtering on non-PRIMARY KEY columns in the CREATE
+ MATERIALIZED VIEW statement's WHERE clause (CASSANDRA-10368)
+ * Unify STDOUT and SYSTEMLOG logback format (CASSANDRA-12004)
+ * COPY FROM should raise error for non-existing input files (CASSANDRA-12174)
+ * Faster write path (CASSANDRA-12269)
+ * Option to leave omitted columns in INSERT JSON unset (CASSANDRA-11424)
+ * Support json/yaml output in nodetool tpstats (CASSANDRA-12035)
+ * Expose metrics for successful/failed authentication attempts (CASSANDRA-10635)
+ * Prepend snapshot name with "truncated" or "dropped" when a snapshot
+ is taken before truncating or dropping a table (CASSANDRA-12178)
+ * Optimize RestrictionSet (CASSANDRA-12153)
+ * cqlsh does not automatically downgrade CQL version (CASSANDRA-12150)
+ * Omit (de)serialization of state variable in UDAs (CASSANDRA-9613)
+ * Create a system table to expose prepared statements (CASSANDRA-8831)
+ * Reuse DataOutputBuffer from ColumnIndex (CASSANDRA-11970)
+ * Remove DatabaseDescriptor dependency from SegmentedFile (CASSANDRA-11580)
+ * Add supplied username to authentication error messages (CASSANDRA-12076)
+ * Remove pre-startup check for open JMX port (CASSANDRA-12074)
+ * Remove compaction Severity from DynamicEndpointSnitch (CASSANDRA-11738)
+ * Restore resumable hints delivery (CASSANDRA-11960)
+ * Properly report LWT contention (CASSANDRA-12626)
+Merged from 3.0:
* Dump threads when unit tests time out (CASSANDRA-13117)
* Better error when modifying function permissions without explicit keyspace (CASSANDRA-12925)
* Indexer is not correctly invoked when building indexes over sstables (CASSANDRA-13075)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a2399d4d/src/java/org/apache/cassandra/db/commitlog/CommitLogReader.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/commitlog/CommitLogReader.java
index e6e2e1a,0000000..d1cb8d6
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogReader.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogReader.java
@@@ -1,502 -1,0 +1,514 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.commitlog;
+
+import java.io.*;
+import java.util.*;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.zip.CRC32;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.Mutation;
+import org.apache.cassandra.db.UnknownColumnFamilyException;
+import org.apache.cassandra.db.commitlog.CommitLogReadHandler.CommitLogReadErrorReason;
+import org.apache.cassandra.db.commitlog.CommitLogReadHandler.CommitLogReadException;
+import org.apache.cassandra.db.partitions.PartitionUpdate;
+import org.apache.cassandra.db.rows.SerializationHelper;
+import org.apache.cassandra.io.util.DataInputBuffer;
+import org.apache.cassandra.io.util.FileDataInput;
+import org.apache.cassandra.io.util.RandomAccessReader;
+import org.apache.cassandra.io.util.RebufferingInputStream;
+import org.apache.cassandra.utils.JVMStabilityInspector;
+
+import static org.apache.cassandra.utils.FBUtilities.updateChecksumInt;
+
+public class CommitLogReader
+{
+ private static final Logger logger = LoggerFactory.getLogger(CommitLogReader.class);
+
+ private static final int LEGACY_END_OF_SEGMENT_MARKER = 0;
+
+ @VisibleForTesting
+ public static final int ALL_MUTATIONS = -1;
+ private final CRC32 checksum;
+ private final Map<UUID, AtomicInteger> invalidMutations;
+
+ private byte[] buffer;
+
+ public CommitLogReader()
+ {
+ checksum = new CRC32();
+ invalidMutations = new HashMap<>();
+ buffer = new byte[4096];
+ }
+
+ public Set<Map.Entry<UUID, AtomicInteger>> getInvalidMutations()
+ {
+ return invalidMutations.entrySet();
+ }
+
+ /**
+ * Reads all passed in files with no minimum, no start, and no mutation limit.
+ */
+ public void readAllFiles(CommitLogReadHandler handler, File[] files) throws IOException
+ {
+ readAllFiles(handler, files, CommitLogPosition.NONE);
+ }
+
+ /**
+ * Reads all passed in files with minPosition, no start, and no mutation limit.
+ */
+ public void readAllFiles(CommitLogReadHandler handler, File[] files, CommitLogPosition minPosition) throws IOException
+ {
+ for (int i = 0; i < files.length; i++)
+ readCommitLogSegment(handler, files[i], minPosition, ALL_MUTATIONS, i + 1 == files.length);
+ }
+
+ /**
+ * Reads passed in file fully
+ */
+ public void readCommitLogSegment(CommitLogReadHandler handler, File file, boolean tolerateTruncation) throws IOException
+ {
+ readCommitLogSegment(handler, file, CommitLogPosition.NONE, ALL_MUTATIONS, tolerateTruncation);
+ }
+
+ /**
+ * Reads passed in file fully, up to mutationLimit count
+ */
+ @VisibleForTesting
+ public void readCommitLogSegment(CommitLogReadHandler handler, File file, int mutationLimit, boolean tolerateTruncation) throws IOException
+ {
+ readCommitLogSegment(handler, file, CommitLogPosition.NONE, mutationLimit, tolerateTruncation);
+ }
+
+ /**
+ * Reads mutations from file, handing them off to handler
+ * @param handler Handler that will take action based on deserialized Mutations
+ * @param file CommitLogSegment file to read
+ * @param minPosition Optional minimum CommitLogPosition - all segments with id > or matching w/greater position will be read
+ * @param mutationLimit Optional limit on # of mutations to replay. Local ALL_MUTATIONS serves as marker to play all.
+ * @param tolerateTruncation Whether or not we should allow truncation of this file or throw if EOF found
+ *
+ * @throws IOException
+ */
+ public void readCommitLogSegment(CommitLogReadHandler handler,
+ File file,
+ CommitLogPosition minPosition,
+ int mutationLimit,
+ boolean tolerateTruncation) throws IOException
+ {
+ // just transform from the file name (no reading of headers) to determine version
+ CommitLogDescriptor desc = CommitLogDescriptor.fromFileName(file.getName());
+
+ try(RandomAccessReader reader = RandomAccessReader.open(file))
+ {
+ if (desc.version < CommitLogDescriptor.VERSION_21)
+ {
+ if (!shouldSkipSegmentId(file, desc, minPosition))
+ {
+ if (minPosition.segmentId == desc.id)
+ reader.seek(minPosition.position);
+ ReadStatusTracker statusTracker = new ReadStatusTracker(mutationLimit, tolerateTruncation);
+ statusTracker.errorContext = desc.fileName();
+ readSection(handler, reader, minPosition, (int) reader.length(), statusTracker, desc);
+ }
+ return;
+ }
+
+ final long segmentIdFromFilename = desc.id;
+ try
+ {
+ // The following call can either throw or legitimately return null. For either case, we need to check
+ // desc outside this block and set it to null in the exception case.
+ desc = CommitLogDescriptor.readHeader(reader, DatabaseDescriptor.getEncryptionContext());
+ }
+ catch (Exception e)
+ {
+ desc = null;
+ }
+ if (desc == null)
+ {
+ // don't care about whether or not the handler thinks we can continue. We can't w/out descriptor.
+ handler.handleUnrecoverableError(new CommitLogReadException(
+ String.format("Could not read commit log descriptor in file %s", file),
+ CommitLogReadErrorReason.UNRECOVERABLE_DESCRIPTOR_ERROR,
+ false));
+ return;
+ }
+
+ if (segmentIdFromFilename != desc.id)
+ {
+ if (handler.shouldSkipSegmentOnError(new CommitLogReadException(String.format(
+ "Segment id mismatch (filename %d, descriptor %d) in file %s", segmentIdFromFilename, desc.id, file),
+ CommitLogReadErrorReason.RECOVERABLE_DESCRIPTOR_ERROR,
+ false)))
+ {
+ return;
+ }
+ }
+
+ if (shouldSkipSegmentId(file, desc, minPosition))
+ return;
+
+ CommitLogSegmentReader segmentReader;
+ try
+ {
+ segmentReader = new CommitLogSegmentReader(handler, desc, reader, tolerateTruncation);
+ }
+ catch(Exception e)
+ {
+ handler.handleUnrecoverableError(new CommitLogReadException(
+ String.format("Unable to create segment reader for commit log file: %s", e),
+ CommitLogReadErrorReason.UNRECOVERABLE_UNKNOWN_ERROR,
+ tolerateTruncation));
+ return;
+ }
+
+ try
+ {
+ ReadStatusTracker statusTracker = new ReadStatusTracker(mutationLimit, tolerateTruncation);
+ for (CommitLogSegmentReader.SyncSegment syncSegment : segmentReader)
+ {
+ // Only tolerate truncation if we allow in both global and segment
+ statusTracker.tolerateErrorsInSection = tolerateTruncation & syncSegment.toleratesErrorsInSection;
+
+ // Skip segments that are completely behind the desired minPosition
+ if (desc.id == minPosition.segmentId && syncSegment.endPosition < minPosition.position)
+ continue;
+
+ statusTracker.errorContext = String.format("Next section at %d in %s", syncSegment.fileStartPosition, desc.fileName());
+
+ readSection(handler, syncSegment.input, minPosition, syncSegment.endPosition, statusTracker, desc);
+ if (!statusTracker.shouldContinue())
+ break;
+ }
+ }
+ // Unfortunately AbstractIterator cannot throw a checked exception, so we check to see if a RuntimeException
+ // is wrapping an IOException.
+ catch (RuntimeException re)
+ {
+ if (re.getCause() instanceof IOException)
+ throw (IOException) re.getCause();
+ throw re;
+ }
+ logger.debug("Finished reading {}", file);
+ }
+ }
+
+ /**
+ * Any segment with id >= minPosition.segmentId is a candidate for read.
+ */
+ private boolean shouldSkipSegmentId(File file, CommitLogDescriptor desc, CommitLogPosition minPosition)
+ {
+ logger.debug("Reading {} (CL version {}, messaging version {}, compression {})",
+ file.getPath(),
+ desc.version,
+ desc.getMessagingVersion(),
+ desc.compression);
+
+ if (minPosition.segmentId > desc.id)
+ {
+ logger.trace("Skipping read of fully-flushed {}", file);
+ return true;
+ }
+ return false;
+ }
+
+ /**
+ * Reads a section of a file containing mutations
+ *
+ * @param handler Handler that will take action based on deserialized Mutations
+ * @param reader FileDataInput / logical buffer containing commitlog mutations
+ * @param minPosition CommitLogPosition indicating when we should start actively replaying mutations
+ * @param end logical numeric end of the segment being read
+ * @param statusTracker ReadStatusTracker with current state of mutation count, error state, etc
+ * @param desc Descriptor for CommitLog serialization
+ */
+ private void readSection(CommitLogReadHandler handler,
+ FileDataInput reader,
+ CommitLogPosition minPosition,
+ int end,
+ ReadStatusTracker statusTracker,
+ CommitLogDescriptor desc) throws IOException
+ {
+ // seek rather than deserializing mutation-by-mutation to reach the desired minPosition in this SyncSegment
+ if (desc.id == minPosition.segmentId && reader.getFilePointer() < minPosition.position)
+ reader.seek(minPosition.position);
+
+ while (statusTracker.shouldContinue() && reader.getFilePointer() < end && !reader.isEOF())
+ {
+ long mutationStart = reader.getFilePointer();
+ if (logger.isTraceEnabled())
+ logger.trace("Reading mutation at {}", mutationStart);
+
+ long claimedCRC32;
+ int serializedSize;
+ try
+ {
++ // We rely on reading serialized size == 0 (LEGACY_END_OF_SEGMENT_MARKER) to identify the end
++ // of a segment, which happens naturally due to the 0 padding of the empty segment on creation.
++ // However, it's possible with 2.1 era commitlogs that the last mutation ended less than 4 bytes
++ // from the end of the file, which means that we'll be unable to read an a full int and instead
++ // read an EOF here
++ if(end - reader.getFilePointer() < 4)
++ {
++ logger.trace("Not enough bytes left for another mutation in this CommitLog segment, continuing");
++ statusTracker.requestTermination();
++ return;
++ }
++
+ // any of the reads may hit EOF
+ serializedSize = reader.readInt();
+ if (serializedSize == LEGACY_END_OF_SEGMENT_MARKER)
+ {
+ logger.trace("Encountered end of segment marker at {}", reader.getFilePointer());
+ statusTracker.requestTermination();
+ return;
+ }
+
+ // Mutation must be at LEAST 10 bytes:
+ // 3 for a non-empty Keyspace
+ // 3 for a Key (including the 2-byte length from writeUTF/writeWithShortLength)
+ // 4 bytes for column count.
+ // This prevents CRC by being fooled by special-case garbage in the file; see CASSANDRA-2128
+ if (serializedSize < 10)
+ {
+ if (handler.shouldSkipSegmentOnError(new CommitLogReadException(
+ String.format("Invalid mutation size %d at %d in %s", serializedSize, mutationStart, statusTracker.errorContext),
+ CommitLogReadErrorReason.MUTATION_ERROR,
+ statusTracker.tolerateErrorsInSection)))
+ {
+ statusTracker.requestTermination();
+ }
+ return;
+ }
+
+ long claimedSizeChecksum = CommitLogFormat.calculateClaimedChecksum(reader, desc.version);
+ checksum.reset();
+ CommitLogFormat.updateChecksum(checksum, serializedSize, desc.version);
+
+ if (checksum.getValue() != claimedSizeChecksum)
+ {
+ if (handler.shouldSkipSegmentOnError(new CommitLogReadException(
+ String.format("Mutation size checksum failure at %d in %s", mutationStart, statusTracker.errorContext),
+ CommitLogReadErrorReason.MUTATION_ERROR,
+ statusTracker.tolerateErrorsInSection)))
+ {
+ statusTracker.requestTermination();
+ }
+ return;
+ }
+
+ if (serializedSize > buffer.length)
+ buffer = new byte[(int) (1.2 * serializedSize)];
+ reader.readFully(buffer, 0, serializedSize);
+
+ claimedCRC32 = CommitLogFormat.calculateClaimedCRC32(reader, desc.version);
+ }
+ catch (EOFException eof)
+ {
+ if (handler.shouldSkipSegmentOnError(new CommitLogReadException(
+ String.format("Unexpected end of segment at %d in %s", mutationStart, statusTracker.errorContext),
+ CommitLogReadErrorReason.EOF,
+ statusTracker.tolerateErrorsInSection)))
+ {
+ statusTracker.requestTermination();
+ }
+ return;
+ }
+
+ checksum.update(buffer, 0, serializedSize);
+ if (claimedCRC32 != checksum.getValue())
+ {
+ if (handler.shouldSkipSegmentOnError(new CommitLogReadException(
+ String.format("Mutation checksum failure at %d in %s", mutationStart, statusTracker.errorContext),
+ CommitLogReadErrorReason.MUTATION_ERROR,
+ statusTracker.tolerateErrorsInSection)))
+ {
+ statusTracker.requestTermination();
+ }
+ continue;
+ }
+
+ long mutationPosition = reader.getFilePointer();
+ readMutation(handler, buffer, serializedSize, minPosition, (int)mutationPosition, desc);
+
+ // Only count this as a processed mutation if it is after our min as we suppress reading of mutations that
+ // are before this mark.
+ if (mutationPosition >= minPosition.position)
+ statusTracker.addProcessedMutation();
+ }
+ }
+
+ /**
+ * Deserializes and passes a Mutation to the ICommitLogReadHandler requested
+ *
+ * @param handler Handler that will take action based on deserialized Mutations
+ * @param inputBuffer raw byte array w/Mutation data
+ * @param size deserialized size of mutation
+ * @param minPosition We need to suppress replay of mutations that are before the required minPosition
+ * @param entryLocation filePointer offset of mutation within CommitLogSegment
+ * @param desc CommitLogDescriptor being worked on
+ */
+ @VisibleForTesting
+ protected void readMutation(CommitLogReadHandler handler,
+ byte[] inputBuffer,
+ int size,
+ CommitLogPosition minPosition,
+ final int entryLocation,
+ final CommitLogDescriptor desc) throws IOException
+ {
+ // For now, we need to go through the motions of deserializing the mutation to determine its size and move
+ // the file pointer forward accordingly, even if we're behind the requested minPosition within this SyncSegment.
+ boolean shouldReplay = entryLocation > minPosition.position;
+
+ final Mutation mutation;
+ try (RebufferingInputStream bufIn = new DataInputBuffer(inputBuffer, 0, size))
+ {
+ mutation = Mutation.serializer.deserialize(bufIn,
+ desc.getMessagingVersion(),
+ SerializationHelper.Flag.LOCAL);
+ // doublecheck that what we read is still] valid for the current schema
+ for (PartitionUpdate upd : mutation.getPartitionUpdates())
+ upd.validate();
+ }
+ catch (UnknownColumnFamilyException ex)
+ {
+ if (ex.cfId == null)
+ return;
+ AtomicInteger i = invalidMutations.get(ex.cfId);
+ if (i == null)
+ {
+ i = new AtomicInteger(1);
+ invalidMutations.put(ex.cfId, i);
+ }
+ else
+ i.incrementAndGet();
+ return;
+ }
+ catch (Throwable t)
+ {
+ JVMStabilityInspector.inspectThrowable(t);
+ File f = File.createTempFile("mutation", "dat");
+
+ try (DataOutputStream out = new DataOutputStream(new FileOutputStream(f)))
+ {
+ out.write(inputBuffer, 0, size);
+ }
+
+ // Checksum passed so this error can't be permissible.
+ handler.handleUnrecoverableError(new CommitLogReadException(
+ String.format(
+ "Unexpected error deserializing mutation; saved to %s. " +
+ "This may be caused by replaying a mutation against a table with the same name but incompatible schema. " +
+ "Exception follows: %s", f.getAbsolutePath(), t),
+ CommitLogReadErrorReason.MUTATION_ERROR,
+ false));
+ return;
+ }
+
+ if (logger.isTraceEnabled())
+ logger.trace("Read mutation for {}.{}: {}", mutation.getKeyspaceName(), mutation.key(),
+ "{" + StringUtils.join(mutation.getPartitionUpdates().iterator(), ", ") + "}");
+
+ if (shouldReplay)
+ handler.handleMutation(mutation, size, entryLocation, desc);
+ }
+
+ /**
+ * Helper methods to deal with changing formats of internals of the CommitLog without polluting deserialization code.
+ */
+ private static class CommitLogFormat
+ {
+ public static long calculateClaimedChecksum(FileDataInput input, int commitLogVersion) throws IOException
+ {
+ switch (commitLogVersion)
+ {
+ case CommitLogDescriptor.VERSION_12:
+ case CommitLogDescriptor.VERSION_20:
+ return input.readLong();
+ // Changed format in 2.1
+ default:
+ return input.readInt() & 0xffffffffL;
+ }
+ }
+
+ public static void updateChecksum(CRC32 checksum, int serializedSize, int commitLogVersion)
+ {
+ switch (commitLogVersion)
+ {
+ case CommitLogDescriptor.VERSION_12:
+ checksum.update(serializedSize);
+ break;
+ // Changed format in 2.0
+ default:
+ updateChecksumInt(checksum, serializedSize);
+ break;
+ }
+ }
+
+ public static long calculateClaimedCRC32(FileDataInput input, int commitLogVersion) throws IOException
+ {
+ switch (commitLogVersion)
+ {
+ case CommitLogDescriptor.VERSION_12:
+ case CommitLogDescriptor.VERSION_20:
+ return input.readLong();
+ // Changed format in 2.1
+ default:
+ return input.readInt() & 0xffffffffL;
+ }
+ }
+ }
+
+ private static class ReadStatusTracker
+ {
+ private int mutationsLeft;
+ public String errorContext = "";
+ public boolean tolerateErrorsInSection;
+ private boolean error;
+
+ public ReadStatusTracker(int mutationLimit, boolean tolerateErrorsInSection)
+ {
+ this.mutationsLeft = mutationLimit;
+ this.tolerateErrorsInSection = tolerateErrorsInSection;
+ }
+
+ public void addProcessedMutation()
+ {
+ if (mutationsLeft == ALL_MUTATIONS)
+ return;
+ --mutationsLeft;
+ }
+
+ public boolean shouldContinue()
+ {
+ return !error && (mutationsLeft != 0 || mutationsLeft == ALL_MUTATIONS);
+ }
+
+ public void requestTermination()
+ {
+ error = true;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a2399d4d/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
index 5476d03,90dc258..4000fbf
--- a/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
+++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
@@@ -171,10 -143,23 +171,23 @@@ public class CommitLogTes
}
@Test
+ public void testRecoveryWithShortPadding() throws Exception
+ {
- // If we have 0-3 bytes remaining, commitlog replayer
- // should pass, because there's insufficient room
- // left in the segment for the legacy size marker.
- testRecovery(new byte[1], null);
- testRecovery(new byte[2], null);
- testRecovery(new byte[3], null);
++ // If we have 0-3 bytes remaining, commitlog replayer
++ // should pass, because there's insufficient room
++ // left in the segment for the legacy size marker.
++ testRecovery(new byte[1], null);
++ testRecovery(new byte[2], null);
++ testRecovery(new byte[3], null);
+ }
+
+ @Test
public void testRecoveryWithShortSize() throws Exception
{
+ byte[] data = new byte[5];
+ data[3] = 1; // Not a legacy marker, give it a fake (short) size
runExpecting(() -> {
- testRecovery(new byte[2], CommitLogDescriptor.VERSION_20);
+ testRecovery(data, CommitLogDescriptor.VERSION_20);
return null;
}, CommitLogReplayException.class);
}
[03/10] cassandra git commit: Commitlog replay may fail if last
mutation is within 4 bytes of end of segment
Posted by jj...@apache.org.
Commitlog replay may fail if last mutation is within 4 bytes of end of segment
Patch by Jeff Jirsa; Reviewed by Branimir Lambov for CASSANDRA-13282
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/beb9658d
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/beb9658d
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/beb9658d
Branch: refs/heads/cassandra-3.11
Commit: beb9658dd5e18e3a6a4e8431b6549ae4c33365a9
Parents: 5ef8a8b
Author: Jeff Jirsa <je...@jeffjirsa.net>
Authored: Sun Mar 12 21:54:04 2017 -0700
Committer: Jeff Jirsa <je...@jeffjirsa.net>
Committed: Sun Mar 12 21:54:04 2017 -0700
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../db/commitlog/CommitLogReplayer.java | 11 +++++++++++
.../cassandra/db/commitlog/CommitLogTest.java | 20 ++++++++++++++++----
3 files changed, 28 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/beb9658d/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 09e4039..2839291 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -9,6 +9,7 @@
* Fix failing COPY TO STDOUT (CASSANDRA-12497)
* Fix ColumnCounter::countAll behaviour for reverse queries (CASSANDRA-13222)
* Exceptions encountered calling getSeeds() breaks OTC thread (CASSANDRA-13018)
+ * Commitlog replay may fail if last mutation is within 4 bytes of end of segment (CASSANDRA-13282)
Merged from 2.1:
* Remove unused repositories (CASSANDRA-13278)
* Log stacktrace of uncaught exceptions (CASSANDRA-13108)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/beb9658d/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
index a58aeb4..3cf4d0f 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
@@ -439,6 +439,17 @@ public class CommitLogReplayer
int serializedSize;
try
{
+ // We rely on reading serialized size == 0 (LEGACY_END_OF_SEGMENT_MARKER) to identify the end
+ // of a segment, which happens naturally due to the 0 padding of the empty segment on creation.
+ // However, with 2.1 era commitlogs it's possible that the last mutation ended less than 4 bytes
+ // from the end of the file, which means that we'll be unable to read an a full int and instead
+ // read an EOF here
+ if(end - reader.getFilePointer() < 4)
+ {
+ logger.trace("Not enough bytes left for another mutation in this CommitLog segment, continuing");
+ return false;
+ }
+
// any of the reads may hit EOF
serializedSize = reader.readInt();
if (serializedSize == LEGACY_END_OF_SEGMENT_MARKER)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/beb9658d/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java b/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
index 9999b42..9b63885 100644
--- a/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
+++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
@@ -137,12 +137,24 @@ public class CommitLogTest
}
@Test
+ public void testRecoveryWithShortPadding() throws Exception
+ {
+ // If we have 0-3 bytes remaining, commitlog replayer
+ // should pass, because there's insufficient room
+ // left in the segment for the legacy size marker.
+ testRecovery(new byte[1], null);
+ testRecovery(new byte[2], null);
+ testRecovery(new byte[3], null);
+ }
+
+ @Test
public void testRecoveryWithShortSize() throws Exception
{
- runExpecting(new WrappedRunnable() {
- public void runMayThrow() throws Exception
- {
- testRecovery(new byte[2], CommitLogDescriptor.VERSION_20);
+ runExpecting(new WrappedRunnable() {
+ public void runMayThrow() throws Exception {
+ byte[] data = new byte[5];
+ data[3] = 1; // Not a legacy marker, give it a fake (short) size
+ testRecovery(data, CommitLogDescriptor.VERSION_20);
}
}, CommitLogReplayException.class);
}
[02/10] cassandra git commit: Commitlog replay may fail if last
mutation is within 4 bytes of end of segment
Posted by jj...@apache.org.
Commitlog replay may fail if last mutation is within 4 bytes of end of segment
Patch by Jeff Jirsa; Reviewed by Branimir Lambov for CASSANDRA-13282
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/beb9658d
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/beb9658d
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/beb9658d
Branch: refs/heads/cassandra-3.0
Commit: beb9658dd5e18e3a6a4e8431b6549ae4c33365a9
Parents: 5ef8a8b
Author: Jeff Jirsa <je...@jeffjirsa.net>
Authored: Sun Mar 12 21:54:04 2017 -0700
Committer: Jeff Jirsa <je...@jeffjirsa.net>
Committed: Sun Mar 12 21:54:04 2017 -0700
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../db/commitlog/CommitLogReplayer.java | 11 +++++++++++
.../cassandra/db/commitlog/CommitLogTest.java | 20 ++++++++++++++++----
3 files changed, 28 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/beb9658d/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 09e4039..2839291 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -9,6 +9,7 @@
* Fix failing COPY TO STDOUT (CASSANDRA-12497)
* Fix ColumnCounter::countAll behaviour for reverse queries (CASSANDRA-13222)
* Exceptions encountered calling getSeeds() breaks OTC thread (CASSANDRA-13018)
+ * Commitlog replay may fail if last mutation is within 4 bytes of end of segment (CASSANDRA-13282)
Merged from 2.1:
* Remove unused repositories (CASSANDRA-13278)
* Log stacktrace of uncaught exceptions (CASSANDRA-13108)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/beb9658d/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
index a58aeb4..3cf4d0f 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
@@ -439,6 +439,17 @@ public class CommitLogReplayer
int serializedSize;
try
{
+ // We rely on reading serialized size == 0 (LEGACY_END_OF_SEGMENT_MARKER) to identify the end
+ // of a segment, which happens naturally due to the 0 padding of the empty segment on creation.
+ // However, with 2.1 era commitlogs it's possible that the last mutation ended less than 4 bytes
+ // from the end of the file, which means that we'll be unable to read an a full int and instead
+ // read an EOF here
+ if(end - reader.getFilePointer() < 4)
+ {
+ logger.trace("Not enough bytes left for another mutation in this CommitLog segment, continuing");
+ return false;
+ }
+
// any of the reads may hit EOF
serializedSize = reader.readInt();
if (serializedSize == LEGACY_END_OF_SEGMENT_MARKER)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/beb9658d/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java b/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
index 9999b42..9b63885 100644
--- a/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
+++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
@@ -137,12 +137,24 @@ public class CommitLogTest
}
@Test
+ public void testRecoveryWithShortPadding() throws Exception
+ {
+ // If we have 0-3 bytes remaining, commitlog replayer
+ // should pass, because there's insufficient room
+ // left in the segment for the legacy size marker.
+ testRecovery(new byte[1], null);
+ testRecovery(new byte[2], null);
+ testRecovery(new byte[3], null);
+ }
+
+ @Test
public void testRecoveryWithShortSize() throws Exception
{
- runExpecting(new WrappedRunnable() {
- public void runMayThrow() throws Exception
- {
- testRecovery(new byte[2], CommitLogDescriptor.VERSION_20);
+ runExpecting(new WrappedRunnable() {
+ public void runMayThrow() throws Exception {
+ byte[] data = new byte[5];
+ data[3] = 1; // Not a legacy marker, give it a fake (short) size
+ testRecovery(data, CommitLogDescriptor.VERSION_20);
}
}, CommitLogReplayException.class);
}
[07/10] cassandra git commit: Merge branch 'cassandra-2.2' into
cassandra-3.0
Posted by jj...@apache.org.
Merge branch 'cassandra-2.2' into cassandra-3.0
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/44f79bf2
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/44f79bf2
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/44f79bf2
Branch: refs/heads/trunk
Commit: 44f79bf2f7a3a05f802014492ecbec67c49c02d0
Parents: aeca1d2 beb9658
Author: Jeff Jirsa <je...@jeffjirsa.net>
Authored: Sun Mar 12 21:54:42 2017 -0700
Committer: Jeff Jirsa <je...@jeffjirsa.net>
Committed: Sun Mar 12 21:56:00 2017 -0700
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../cassandra/db/commitlog/CommitLogReplayer.java | 11 +++++++++++
.../apache/cassandra/db/commitlog/CommitLogTest.java | 15 ++++++++++++++-
3 files changed, 26 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/44f79bf2/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 52a794b,2839291..140c860
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,21 -1,6 +1,22 @@@
-2.2.10
+3.0.13
+ * Slice.isEmpty() returns false for some empty slices (CASSANDRA-13305)
+ * Add formatted row output to assertEmpty in CQL Tester (CASSANDRA-13238)
+Merged from 2.2:
++ * Commitlog replay may fail if last mutation is within 4 bytes of end of segment (CASSANDRA-13282)
* Fix queries updating multiple time the same list (CASSANDRA-13130)
* Fix GRANT/REVOKE when keyspace isn't specified (CASSANDRA-13053)
+
+
+3.0.12
+ * Prevent data loss on upgrade 2.1 - 3.0 by adding component separator to LogRecord absolute path (CASSANDRA-13294)
+ * Improve testing on macOS by eliminating sigar logging (CASSANDRA-13233)
+ * Cqlsh copy-from should error out when csv contains invalid data for collections (CASSANDRA-13071)
+ * Update c.yaml doc for offheap memtables (CASSANDRA-13179)
+ * Faster StreamingHistogram (CASSANDRA-13038)
+ * Legacy deserializer can create unexpected boundary range tombstones (CASSANDRA-13237)
+ * Remove unnecessary assertion from AntiCompactionTest (CASSANDRA-13070)
+ * Fix cqlsh COPY for dates before 1900 (CASSANDRA-13185)
+Merged from 2.2:
* Avoid race on receiver by starting streaming sender thread after sending init message (CASSANDRA-12886)
* Fix "multiple versions of ant detected..." when running ant test (CASSANDRA-13232)
* Coalescing strategy sleeps too much (CASSANDRA-13090)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/44f79bf2/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
index d53f0f8,3cf4d0f..205c36a
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
@@@ -483,6 -439,17 +483,17 @@@ public class CommitLogReplaye
int serializedSize;
try
{
+ // We rely on reading serialized size == 0 (LEGACY_END_OF_SEGMENT_MARKER) to identify the end
+ // of a segment, which happens naturally due to the 0 padding of the empty segment on creation.
- // However, with 2.1 era commitlogs it's possible that the last mutation ended less than 4 bytes
++ // However, it's possible with 2.1 era commitlogs that the last mutation ended less than 4 bytes
+ // from the end of the file, which means that we'll be unable to read an a full int and instead
+ // read an EOF here
+ if(end - reader.getFilePointer() < 4)
+ {
+ logger.trace("Not enough bytes left for another mutation in this CommitLog segment, continuing");
+ return false;
+ }
+
// any of the reads may hit EOF
serializedSize = reader.readInt();
if (serializedSize == LEGACY_END_OF_SEGMENT_MARKER)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/44f79bf2/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
index c4ab6ab,9b63885..90dc258
--- a/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
+++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
@@@ -143,11 -137,25 +143,24 @@@ public class CommitLogTes
}
@Test
+ public void testRecoveryWithShortPadding() throws Exception
+ {
+ // If we have 0-3 bytes remaining, commitlog replayer
+ // should pass, because there's insufficient room
+ // left in the segment for the legacy size marker.
+ testRecovery(new byte[1], null);
+ testRecovery(new byte[2], null);
+ testRecovery(new byte[3], null);
+ }
+
+ @Test
public void testRecoveryWithShortSize() throws Exception
{
- runExpecting(new WrappedRunnable() {
- public void runMayThrow() throws Exception {
- byte[] data = new byte[5];
- data[3] = 1; // Not a legacy marker, give it a fake (short) size
- testRecovery(data, CommitLogDescriptor.VERSION_20);
- }
++ byte[] data = new byte[5];
++ data[3] = 1; // Not a legacy marker, give it a fake (short) size
+ runExpecting(() -> {
- testRecovery(new byte[2], CommitLogDescriptor.VERSION_20);
++ testRecovery(data, CommitLogDescriptor.VERSION_20);
+ return null;
}, CommitLogReplayException.class);
}