You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jj...@apache.org on 2017/03/13 05:00:19 UTC

[01/10] cassandra git commit: Commitlog replay may fail if last mutation is within 4 bytes of end of segment

Repository: cassandra
Updated Branches:
  refs/heads/cassandra-2.2 5ef8a8b40 -> beb9658dd
  refs/heads/cassandra-3.0 aeca1d2bd -> 44f79bf2f
  refs/heads/cassandra-3.11 2c111d15b -> a2399d4d3
  refs/heads/trunk 0c5faef66 -> dd5251c46


Commitlog replay may fail if last mutation is within 4 bytes of end of segment

Patch by Jeff Jirsa; Reviewed by Branimir Lambov for CASSANDRA-13282


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/beb9658d
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/beb9658d
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/beb9658d

Branch: refs/heads/cassandra-2.2
Commit: beb9658dd5e18e3a6a4e8431b6549ae4c33365a9
Parents: 5ef8a8b
Author: Jeff Jirsa <je...@jeffjirsa.net>
Authored: Sun Mar 12 21:54:04 2017 -0700
Committer: Jeff Jirsa <je...@jeffjirsa.net>
Committed: Sun Mar 12 21:54:04 2017 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../db/commitlog/CommitLogReplayer.java         | 11 +++++++++++
 .../cassandra/db/commitlog/CommitLogTest.java   | 20 ++++++++++++++++----
 3 files changed, 28 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/beb9658d/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 09e4039..2839291 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -9,6 +9,7 @@
  * Fix failing COPY TO STDOUT (CASSANDRA-12497)
  * Fix ColumnCounter::countAll behaviour for reverse queries (CASSANDRA-13222)
  * Exceptions encountered calling getSeeds() breaks OTC thread (CASSANDRA-13018)
+ * Commitlog replay may fail if last mutation is within 4 bytes of end of segment (CASSANDRA-13282)
 Merged from 2.1:
  * Remove unused repositories (CASSANDRA-13278)
  * Log stacktrace of uncaught exceptions (CASSANDRA-13108)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/beb9658d/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
index a58aeb4..3cf4d0f 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
@@ -439,6 +439,17 @@ public class CommitLogReplayer
             int serializedSize;
             try
             {
+                // We rely on reading serialized size == 0 (LEGACY_END_OF_SEGMENT_MARKER) to identify the end
+                // of a segment, which happens naturally due to the 0 padding of the empty segment on creation.
+                // However, with 2.1 era commitlogs it's possible that the last mutation ended less than 4 bytes 
+                // from the end of the file, which means that we'll be unable to read an a full int and instead 
+                // read an EOF here
+                if(end - reader.getFilePointer() < 4)
+                {
+                    logger.trace("Not enough bytes left for another mutation in this CommitLog segment, continuing");
+                    return false;
+                }
+
                 // any of the reads may hit EOF
                 serializedSize = reader.readInt();
                 if (serializedSize == LEGACY_END_OF_SEGMENT_MARKER)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/beb9658d/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java b/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
index 9999b42..9b63885 100644
--- a/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
+++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
@@ -137,12 +137,24 @@ public class CommitLogTest
     }
 
     @Test
+    public void testRecoveryWithShortPadding() throws Exception
+    {
+        // If we have 0-3 bytes remaining, commitlog replayer
+        // should pass, because there's insufficient room
+        // left in the segment for the legacy size marker.
+        testRecovery(new byte[1], null);
+        testRecovery(new byte[2], null);
+        testRecovery(new byte[3], null);
+    }
+
+    @Test
     public void testRecoveryWithShortSize() throws Exception
     {
-        runExpecting(new WrappedRunnable() {
-            public void runMayThrow() throws Exception
-            {
-                testRecovery(new byte[2], CommitLogDescriptor.VERSION_20);
+        runExpecting(new WrappedRunnable()  {
+            public void runMayThrow() throws Exception {
+                byte[] data = new byte[5];
+                data[3] = 1; // Not a legacy marker, give it a fake (short) size
+                testRecovery(data, CommitLogDescriptor.VERSION_20);
             }
         }, CommitLogReplayException.class);
     }


[09/10] cassandra git commit: Merge branch 'cassandra-3.0' into cassandra-3.11

Posted by jj...@apache.org.
Merge branch 'cassandra-3.0' into cassandra-3.11


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/a2399d4d
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/a2399d4d
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/a2399d4d

Branch: refs/heads/trunk
Commit: a2399d4d309ac6b60a150ea20af8dc6f006d51ff
Parents: 2c111d1 44f79bf
Author: Jeff Jirsa <je...@jeffjirsa.net>
Authored: Sun Mar 12 21:56:11 2017 -0700
Committer: Jeff Jirsa <je...@jeffjirsa.net>
Committed: Sun Mar 12 21:57:25 2017 -0700

----------------------------------------------------------------------
 CHANGES.txt                                          |  1 +
 .../cassandra/db/commitlog/CommitLogReader.java      | 12 ++++++++++++
 .../apache/cassandra/db/commitlog/CommitLogTest.java | 15 ++++++++++++++-
 3 files changed, 27 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/a2399d4d/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 302a028,140c860..ab28dd4
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -33,140 -43,6 +33,141 @@@ Merged from 3.0
     live rows in sstabledump (CASSANDRA-13177)
   * Provide user workaround when system_schema.columns does not contain entries
     for a table that's in system_schema.tables (CASSANDRA-13180)
 +Merged from 2.2:
++ * Commitlog replay may fail if last mutation is within 4 bytes of end of segment (CASSANDRA-13282)
 + * Fix queries updating multiple time the same list (CASSANDRA-13130)
 + * Fix GRANT/REVOKE when keyspace isn't specified (CASSANDRA-13053)
 + * Fix flaky LongLeveledCompactionStrategyTest (CASSANDRA-12202)
 + * Fix failing COPY TO STDOUT (CASSANDRA-12497)
 + * Fix ColumnCounter::countAll behaviour for reverse queries (CASSANDRA-13222)
 + * Exceptions encountered calling getSeeds() breaks OTC thread (CASSANDRA-13018)
 + * Fix negative mean latency metric (CASSANDRA-12876)
 + * Use only one file pointer when creating commitlog segments (CASSANDRA-12539)
 +Merged from 2.1:
 + * Remove unused repositories (CASSANDRA-13278)
 + * Log stacktrace of uncaught exceptions (CASSANDRA-13108)
 + * Use portable stderr for java error in startup (CASSANDRA-13211)
 + * Fix Thread Leak in OutboundTcpConnection (CASSANDRA-13204)
 + * Coalescing strategy can enter infinite loop (CASSANDRA-13159)
 +
 +
 +3.10
 + * Fix secondary index queries regression (CASSANDRA-13013)
 + * Add duration type to the protocol V5 (CASSANDRA-12850)
 + * Fix duration type validation (CASSANDRA-13143)
 + * Fix flaky GcCompactionTest (CASSANDRA-12664)
 + * Fix TestHintedHandoff.hintedhandoff_decom_test (CASSANDRA-13058)
 + * Fixed query monitoring for range queries (CASSANDRA-13050)
 + * Remove outboundBindAny configuration property (CASSANDRA-12673)
 + * Use correct bounds for all-data range when filtering (CASSANDRA-12666)
 + * Remove timing window in test case (CASSANDRA-12875)
 + * Resolve unit testing without JCE security libraries installed (CASSANDRA-12945)
 + * Fix inconsistencies in cassandra-stress load balancing policy (CASSANDRA-12919)
 + * Fix validation of non-frozen UDT cells (CASSANDRA-12916)
 + * Don't shut down socket input/output on StreamSession (CASSANDRA-12903)
 + * Fix Murmur3PartitionerTest (CASSANDRA-12858)
 + * Move cqlsh syntax rules into separate module and allow easier customization (CASSANDRA-12897)
 + * Fix CommitLogSegmentManagerTest (CASSANDRA-12283)
 + * Fix cassandra-stress truncate option (CASSANDRA-12695)
 + * Fix crossNode value when receiving messages (CASSANDRA-12791)
 + * Don't load MX4J beans twice (CASSANDRA-12869)
 + * Extend native protocol request flags, add versions to SUPPORTED, and introduce ProtocolVersion enum (CASSANDRA-12838)
 + * Set JOINING mode when running pre-join tasks (CASSANDRA-12836)
 + * remove net.mintern.primitive library due to license issue (CASSANDRA-12845)
 + * Properly format IPv6 addresses when logging JMX service URL (CASSANDRA-12454)
 + * Optimize the vnode allocation for single replica per DC (CASSANDRA-12777)
 + * Use non-token restrictions for bounds when token restrictions are overridden (CASSANDRA-12419)
 + * Fix CQLSH auto completion for PER PARTITION LIMIT (CASSANDRA-12803)
 + * Use different build directories for Eclipse and Ant (CASSANDRA-12466)
 + * Avoid potential AttributeError in cqlsh due to no table metadata (CASSANDRA-12815)
 + * Fix RandomReplicationAwareTokenAllocatorTest.testExistingCluster (CASSANDRA-12812)
 + * Upgrade commons-codec to 1.9 (CASSANDRA-12790)
 + * Make the fanout size for LeveledCompactionStrategy to be configurable (CASSANDRA-11550)
 + * Add duration data type (CASSANDRA-11873)
 + * Fix timeout in ReplicationAwareTokenAllocatorTest (CASSANDRA-12784)
 + * Improve sum aggregate functions (CASSANDRA-12417)
 + * Make cassandra.yaml docs for batch_size_*_threshold_in_kb reflect changes in CASSANDRA-10876 (CASSANDRA-12761)
 + * cqlsh fails to format collections when using aliases (CASSANDRA-11534)
 + * Check for hash conflicts in prepared statements (CASSANDRA-12733)
 + * Exit query parsing upon first error (CASSANDRA-12598)
 + * Fix cassandra-stress to use single seed in UUID generation (CASSANDRA-12729)
 + * CQLSSTableWriter does not allow Update statement (CASSANDRA-12450)
 + * Config class uses boxed types but DD exposes primitive types (CASSANDRA-12199)
 + * Add pre- and post-shutdown hooks to Storage Service (CASSANDRA-12461)
 + * Add hint delivery metrics (CASSANDRA-12693)
 + * Remove IndexInfo cache from FileIndexInfoRetriever (CASSANDRA-12731)
 + * ColumnIndex does not reuse buffer (CASSANDRA-12502)
 + * cdc column addition still breaks schema migration tasks (CASSANDRA-12697)
 + * Upgrade metrics-reporter dependencies (CASSANDRA-12089)
 + * Tune compaction thread count via nodetool (CASSANDRA-12248)
 + * Add +=/-= shortcut syntax for update queries (CASSANDRA-12232)
 + * Include repair session IDs in repair start message (CASSANDRA-12532)
 + * Add a blocking task to Index, run before joining the ring (CASSANDRA-12039)
 + * Fix NPE when using CQLSSTableWriter (CASSANDRA-12667)
 + * Support optional backpressure strategies at the coordinator (CASSANDRA-9318)
 + * Make randompartitioner work with new vnode allocation (CASSANDRA-12647)
 + * Fix cassandra-stress graphing (CASSANDRA-12237)
 + * Allow filtering on partition key columns for queries without secondary indexes (CASSANDRA-11031)
 + * Fix Cassandra Stress reporting thread model and precision (CASSANDRA-12585)
 + * Add JMH benchmarks.jar (CASSANDRA-12586)
 + * Cleanup uses of AlterTableStatementColumn (CASSANDRA-12567)
 + * Add keep-alive to streaming (CASSANDRA-11841)
 + * Tracing payload is passed through newSession(..) (CASSANDRA-11706)
 + * avoid deleting non existing sstable files and improve related log messages (CASSANDRA-12261)
 + * json/yaml output format for nodetool compactionhistory (CASSANDRA-12486)
 + * Retry all internode messages once after a connection is
 +   closed and reopened (CASSANDRA-12192)
 + * Add support to rebuild from targeted replica (CASSANDRA-9875)
 + * Add sequence distribution type to cassandra stress (CASSANDRA-12490)
 + * "SELECT * FROM foo LIMIT ;" does not error out (CASSANDRA-12154)
 + * Define executeLocally() at the ReadQuery Level (CASSANDRA-12474)
 + * Extend read/write failure messages with a map of replica addresses
 +   to error codes in the v5 native protocol (CASSANDRA-12311)
 + * Fix rebuild of SASI indexes with existing index files (CASSANDRA-12374)
 + * Let DatabaseDescriptor not implicitly startup services (CASSANDRA-9054, 12550)
 + * Fix clustering indexes in presence of static columns in SASI (CASSANDRA-12378)
 + * Fix queries on columns with reversed type on SASI indexes (CASSANDRA-12223)
 + * Added slow query log (CASSANDRA-12403)
 + * Count full coordinated request against timeout (CASSANDRA-12256)
 + * Allow TTL with null value on insert and update (CASSANDRA-12216)
 + * Make decommission operation resumable (CASSANDRA-12008)
 + * Add support to one-way targeted repair (CASSANDRA-9876)
 + * Remove clientutil jar (CASSANDRA-11635)
 + * Fix compaction throughput throttle (CASSANDRA-12366, CASSANDRA-12717)
 + * Delay releasing Memtable memory on flush until PostFlush has finished running (CASSANDRA-12358)
 + * Cassandra stress should dump all setting on startup (CASSANDRA-11914)
 + * Make it possible to compact a given token range (CASSANDRA-10643)
 + * Allow updating DynamicEndpointSnitch properties via JMX (CASSANDRA-12179)
 + * Collect metrics on queries by consistency level (CASSANDRA-7384)
 + * Add support for GROUP BY to SELECT statement (CASSANDRA-10707)
 + * Deprecate memtable_cleanup_threshold and update default for memtable_flush_writers (CASSANDRA-12228)
 + * Upgrade to OHC 0.4.4 (CASSANDRA-12133)
 + * Add version command to cassandra-stress (CASSANDRA-12258)
 + * Create compaction-stress tool (CASSANDRA-11844)
 + * Garbage-collecting compaction operation and schema option (CASSANDRA-7019)
 + * Add beta protocol flag for v5 native protocol (CASSANDRA-12142)
 + * Support filtering on non-PRIMARY KEY columns in the CREATE
 +   MATERIALIZED VIEW statement's WHERE clause (CASSANDRA-10368)
 + * Unify STDOUT and SYSTEMLOG logback format (CASSANDRA-12004)
 + * COPY FROM should raise error for non-existing input files (CASSANDRA-12174)
 + * Faster write path (CASSANDRA-12269)
 + * Option to leave omitted columns in INSERT JSON unset (CASSANDRA-11424)
 + * Support json/yaml output in nodetool tpstats (CASSANDRA-12035)
 + * Expose metrics for successful/failed authentication attempts (CASSANDRA-10635)
 + * Prepend snapshot name with "truncated" or "dropped" when a snapshot
 +   is taken before truncating or dropping a table (CASSANDRA-12178)
 + * Optimize RestrictionSet (CASSANDRA-12153)
 + * cqlsh does not automatically downgrade CQL version (CASSANDRA-12150)
 + * Omit (de)serialization of state variable in UDAs (CASSANDRA-9613)
 + * Create a system table to expose prepared statements (CASSANDRA-8831)
 + * Reuse DataOutputBuffer from ColumnIndex (CASSANDRA-11970)
 + * Remove DatabaseDescriptor dependency from SegmentedFile (CASSANDRA-11580)
 + * Add supplied username to authentication error messages (CASSANDRA-12076)
 + * Remove pre-startup check for open JMX port (CASSANDRA-12074)
 + * Remove compaction Severity from DynamicEndpointSnitch (CASSANDRA-11738)
 + * Restore resumable hints delivery (CASSANDRA-11960)
 + * Properly report LWT contention (CASSANDRA-12626)
 +Merged from 3.0:
   * Dump threads when unit tests time out (CASSANDRA-13117)
   * Better error when modifying function permissions without explicit keyspace (CASSANDRA-12925)
   * Indexer is not correctly invoked when building indexes over sstables (CASSANDRA-13075)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a2399d4d/src/java/org/apache/cassandra/db/commitlog/CommitLogReader.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/commitlog/CommitLogReader.java
index e6e2e1a,0000000..d1cb8d6
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogReader.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogReader.java
@@@ -1,502 -1,0 +1,514 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.db.commitlog;
 +
 +import java.io.*;
 +import java.util.*;
 +import java.util.concurrent.atomic.AtomicInteger;
 +import java.util.zip.CRC32;
 +
 +import com.google.common.annotations.VisibleForTesting;
 +import org.apache.commons.lang3.StringUtils;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import org.apache.cassandra.config.DatabaseDescriptor;
 +import org.apache.cassandra.db.Mutation;
 +import org.apache.cassandra.db.UnknownColumnFamilyException;
 +import org.apache.cassandra.db.commitlog.CommitLogReadHandler.CommitLogReadErrorReason;
 +import org.apache.cassandra.db.commitlog.CommitLogReadHandler.CommitLogReadException;
 +import org.apache.cassandra.db.partitions.PartitionUpdate;
 +import org.apache.cassandra.db.rows.SerializationHelper;
 +import org.apache.cassandra.io.util.DataInputBuffer;
 +import org.apache.cassandra.io.util.FileDataInput;
 +import org.apache.cassandra.io.util.RandomAccessReader;
 +import org.apache.cassandra.io.util.RebufferingInputStream;
 +import org.apache.cassandra.utils.JVMStabilityInspector;
 +
 +import static org.apache.cassandra.utils.FBUtilities.updateChecksumInt;
 +
 +public class CommitLogReader
 +{
 +    private static final Logger logger = LoggerFactory.getLogger(CommitLogReader.class);
 +
 +    private static final int LEGACY_END_OF_SEGMENT_MARKER = 0;
 +
 +    @VisibleForTesting
 +    public static final int ALL_MUTATIONS = -1;
 +    private final CRC32 checksum;
 +    private final Map<UUID, AtomicInteger> invalidMutations;
 +
 +    private byte[] buffer;
 +
 +    public CommitLogReader()
 +    {
 +        checksum = new CRC32();
 +        invalidMutations = new HashMap<>();
 +        buffer = new byte[4096];
 +    }
 +
 +    public Set<Map.Entry<UUID, AtomicInteger>> getInvalidMutations()
 +    {
 +        return invalidMutations.entrySet();
 +    }
 +
 +    /**
 +     * Reads all passed in files with no minimum, no start, and no mutation limit.
 +     */
 +    public void readAllFiles(CommitLogReadHandler handler, File[] files) throws IOException
 +    {
 +        readAllFiles(handler, files, CommitLogPosition.NONE);
 +    }
 +
 +    /**
 +     * Reads all passed in files with minPosition, no start, and no mutation limit.
 +     */
 +    public void readAllFiles(CommitLogReadHandler handler, File[] files, CommitLogPosition minPosition) throws IOException
 +    {
 +        for (int i = 0; i < files.length; i++)
 +            readCommitLogSegment(handler, files[i], minPosition, ALL_MUTATIONS, i + 1 == files.length);
 +    }
 +
 +    /**
 +     * Reads passed in file fully
 +     */
 +    public void readCommitLogSegment(CommitLogReadHandler handler, File file, boolean tolerateTruncation) throws IOException
 +    {
 +        readCommitLogSegment(handler, file, CommitLogPosition.NONE, ALL_MUTATIONS, tolerateTruncation);
 +    }
 +
 +    /**
 +     * Reads passed in file fully, up to mutationLimit count
 +     */
 +    @VisibleForTesting
 +    public void readCommitLogSegment(CommitLogReadHandler handler, File file, int mutationLimit, boolean tolerateTruncation) throws IOException
 +    {
 +        readCommitLogSegment(handler, file, CommitLogPosition.NONE, mutationLimit, tolerateTruncation);
 +    }
 +
 +    /**
 +     * Reads mutations from file, handing them off to handler
 +     * @param handler Handler that will take action based on deserialized Mutations
 +     * @param file CommitLogSegment file to read
 +     * @param minPosition Optional minimum CommitLogPosition - all segments with id > or matching w/greater position will be read
 +     * @param mutationLimit Optional limit on # of mutations to replay. Local ALL_MUTATIONS serves as marker to play all.
 +     * @param tolerateTruncation Whether or not we should allow truncation of this file or throw if EOF found
 +     *
 +     * @throws IOException
 +     */
 +    public void readCommitLogSegment(CommitLogReadHandler handler,
 +                                     File file,
 +                                     CommitLogPosition minPosition,
 +                                     int mutationLimit,
 +                                     boolean tolerateTruncation) throws IOException
 +    {
 +        // just transform from the file name (no reading of headers) to determine version
 +        CommitLogDescriptor desc = CommitLogDescriptor.fromFileName(file.getName());
 +
 +        try(RandomAccessReader reader = RandomAccessReader.open(file))
 +        {
 +            if (desc.version < CommitLogDescriptor.VERSION_21)
 +            {
 +                if (!shouldSkipSegmentId(file, desc, minPosition))
 +                {
 +                    if (minPosition.segmentId == desc.id)
 +                        reader.seek(minPosition.position);
 +                    ReadStatusTracker statusTracker = new ReadStatusTracker(mutationLimit, tolerateTruncation);
 +                    statusTracker.errorContext = desc.fileName();
 +                    readSection(handler, reader, minPosition, (int) reader.length(), statusTracker, desc);
 +                }
 +                return;
 +            }
 +
 +            final long segmentIdFromFilename = desc.id;
 +            try
 +            {
 +                // The following call can either throw or legitimately return null. For either case, we need to check
 +                // desc outside this block and set it to null in the exception case.
 +                desc = CommitLogDescriptor.readHeader(reader, DatabaseDescriptor.getEncryptionContext());
 +            }
 +            catch (Exception e)
 +            {
 +                desc = null;
 +            }
 +            if (desc == null)
 +            {
 +                // don't care about whether or not the handler thinks we can continue. We can't w/out descriptor.
 +                handler.handleUnrecoverableError(new CommitLogReadException(
 +                    String.format("Could not read commit log descriptor in file %s", file),
 +                    CommitLogReadErrorReason.UNRECOVERABLE_DESCRIPTOR_ERROR,
 +                    false));
 +                return;
 +            }
 +
 +            if (segmentIdFromFilename != desc.id)
 +            {
 +                if (handler.shouldSkipSegmentOnError(new CommitLogReadException(String.format(
 +                    "Segment id mismatch (filename %d, descriptor %d) in file %s", segmentIdFromFilename, desc.id, file),
 +                                                                                CommitLogReadErrorReason.RECOVERABLE_DESCRIPTOR_ERROR,
 +                                                                                false)))
 +                {
 +                    return;
 +                }
 +            }
 +
 +            if (shouldSkipSegmentId(file, desc, minPosition))
 +                return;
 +
 +            CommitLogSegmentReader segmentReader;
 +            try
 +            {
 +                segmentReader = new CommitLogSegmentReader(handler, desc, reader, tolerateTruncation);
 +            }
 +            catch(Exception e)
 +            {
 +                handler.handleUnrecoverableError(new CommitLogReadException(
 +                    String.format("Unable to create segment reader for commit log file: %s", e),
 +                    CommitLogReadErrorReason.UNRECOVERABLE_UNKNOWN_ERROR,
 +                    tolerateTruncation));
 +                return;
 +            }
 +
 +            try
 +            {
 +                ReadStatusTracker statusTracker = new ReadStatusTracker(mutationLimit, tolerateTruncation);
 +                for (CommitLogSegmentReader.SyncSegment syncSegment : segmentReader)
 +                {
 +                    // Only tolerate truncation if we allow in both global and segment
 +                    statusTracker.tolerateErrorsInSection = tolerateTruncation & syncSegment.toleratesErrorsInSection;
 +
 +                    // Skip segments that are completely behind the desired minPosition
 +                    if (desc.id == minPosition.segmentId && syncSegment.endPosition < minPosition.position)
 +                        continue;
 +
 +                    statusTracker.errorContext = String.format("Next section at %d in %s", syncSegment.fileStartPosition, desc.fileName());
 +
 +                    readSection(handler, syncSegment.input, minPosition, syncSegment.endPosition, statusTracker, desc);
 +                    if (!statusTracker.shouldContinue())
 +                        break;
 +                }
 +            }
 +            // Unfortunately AbstractIterator cannot throw a checked exception, so we check to see if a RuntimeException
 +            // is wrapping an IOException.
 +            catch (RuntimeException re)
 +            {
 +                if (re.getCause() instanceof IOException)
 +                    throw (IOException) re.getCause();
 +                throw re;
 +            }
 +            logger.debug("Finished reading {}", file);
 +        }
 +    }
 +
 +    /**
 +     * Any segment with id >= minPosition.segmentId is a candidate for read.
 +     */
 +    private boolean shouldSkipSegmentId(File file, CommitLogDescriptor desc, CommitLogPosition minPosition)
 +    {
 +        logger.debug("Reading {} (CL version {}, messaging version {}, compression {})",
 +            file.getPath(),
 +            desc.version,
 +            desc.getMessagingVersion(),
 +            desc.compression);
 +
 +        if (minPosition.segmentId > desc.id)
 +        {
 +            logger.trace("Skipping read of fully-flushed {}", file);
 +            return true;
 +        }
 +        return false;
 +    }
 +
 +    /**
 +     * Reads a section of a file containing mutations
 +     *
 +     * @param handler Handler that will take action based on deserialized Mutations
 +     * @param reader FileDataInput / logical buffer containing commitlog mutations
 +     * @param minPosition CommitLogPosition indicating when we should start actively replaying mutations
 +     * @param end logical numeric end of the segment being read
 +     * @param statusTracker ReadStatusTracker with current state of mutation count, error state, etc
 +     * @param desc Descriptor for CommitLog serialization
 +     */
 +    private void readSection(CommitLogReadHandler handler,
 +                             FileDataInput reader,
 +                             CommitLogPosition minPosition,
 +                             int end,
 +                             ReadStatusTracker statusTracker,
 +                             CommitLogDescriptor desc) throws IOException
 +    {
 +        // seek rather than deserializing mutation-by-mutation to reach the desired minPosition in this SyncSegment
 +        if (desc.id == minPosition.segmentId && reader.getFilePointer() < minPosition.position)
 +            reader.seek(minPosition.position);
 +
 +        while (statusTracker.shouldContinue() && reader.getFilePointer() < end && !reader.isEOF())
 +        {
 +            long mutationStart = reader.getFilePointer();
 +            if (logger.isTraceEnabled())
 +                logger.trace("Reading mutation at {}", mutationStart);
 +
 +            long claimedCRC32;
 +            int serializedSize;
 +            try
 +            {
++                // We rely on reading serialized size == 0 (LEGACY_END_OF_SEGMENT_MARKER) to identify the end
++                // of a segment, which happens naturally due to the 0 padding of the empty segment on creation.
++                // However, it's possible with 2.1 era commitlogs that the last mutation ended less than 4 bytes
++                // from the end of the file, which means that we'll be unable to read an a full int and instead
++                // read an EOF here
++                if(end - reader.getFilePointer() < 4)
++                {
++                    logger.trace("Not enough bytes left for another mutation in this CommitLog segment, continuing");
++                    statusTracker.requestTermination();
++                    return;
++                }
++
 +                // any of the reads may hit EOF
 +                serializedSize = reader.readInt();
 +                if (serializedSize == LEGACY_END_OF_SEGMENT_MARKER)
 +                {
 +                    logger.trace("Encountered end of segment marker at {}", reader.getFilePointer());
 +                    statusTracker.requestTermination();
 +                    return;
 +                }
 +
 +                // Mutation must be at LEAST 10 bytes:
 +                //    3 for a non-empty Keyspace
 +                //    3 for a Key (including the 2-byte length from writeUTF/writeWithShortLength)
 +                //    4 bytes for column count.
 +                // This prevents CRC by being fooled by special-case garbage in the file; see CASSANDRA-2128
 +                if (serializedSize < 10)
 +                {
 +                    if (handler.shouldSkipSegmentOnError(new CommitLogReadException(
 +                                                    String.format("Invalid mutation size %d at %d in %s", serializedSize, mutationStart, statusTracker.errorContext),
 +                                                    CommitLogReadErrorReason.MUTATION_ERROR,
 +                                                    statusTracker.tolerateErrorsInSection)))
 +                    {
 +                        statusTracker.requestTermination();
 +                    }
 +                    return;
 +                }
 +
 +                long claimedSizeChecksum = CommitLogFormat.calculateClaimedChecksum(reader, desc.version);
 +                checksum.reset();
 +                CommitLogFormat.updateChecksum(checksum, serializedSize, desc.version);
 +
 +                if (checksum.getValue() != claimedSizeChecksum)
 +                {
 +                    if (handler.shouldSkipSegmentOnError(new CommitLogReadException(
 +                                                    String.format("Mutation size checksum failure at %d in %s", mutationStart, statusTracker.errorContext),
 +                                                    CommitLogReadErrorReason.MUTATION_ERROR,
 +                                                    statusTracker.tolerateErrorsInSection)))
 +                    {
 +                        statusTracker.requestTermination();
 +                    }
 +                    return;
 +                }
 +
 +                if (serializedSize > buffer.length)
 +                    buffer = new byte[(int) (1.2 * serializedSize)];
 +                reader.readFully(buffer, 0, serializedSize);
 +
 +                claimedCRC32 = CommitLogFormat.calculateClaimedCRC32(reader, desc.version);
 +            }
 +            catch (EOFException eof)
 +            {
 +                if (handler.shouldSkipSegmentOnError(new CommitLogReadException(
 +                                                String.format("Unexpected end of segment at %d in %s", mutationStart, statusTracker.errorContext),
 +                                                CommitLogReadErrorReason.EOF,
 +                                                statusTracker.tolerateErrorsInSection)))
 +                {
 +                    statusTracker.requestTermination();
 +                }
 +                return;
 +            }
 +
 +            checksum.update(buffer, 0, serializedSize);
 +            if (claimedCRC32 != checksum.getValue())
 +            {
 +                if (handler.shouldSkipSegmentOnError(new CommitLogReadException(
 +                                                String.format("Mutation checksum failure at %d in %s", mutationStart, statusTracker.errorContext),
 +                                                CommitLogReadErrorReason.MUTATION_ERROR,
 +                                                statusTracker.tolerateErrorsInSection)))
 +                {
 +                    statusTracker.requestTermination();
 +                }
 +                continue;
 +            }
 +
 +            long mutationPosition = reader.getFilePointer();
 +            readMutation(handler, buffer, serializedSize, minPosition, (int)mutationPosition, desc);
 +
 +            // Only count this as a processed mutation if it is after our min as we suppress reading of mutations that
 +            // are before this mark.
 +            if (mutationPosition >= minPosition.position)
 +                statusTracker.addProcessedMutation();
 +        }
 +    }
 +
 +    /**
 +     * Deserializes and passes a Mutation to the ICommitLogReadHandler requested
 +     *
 +     * @param handler Handler that will take action based on deserialized Mutations
 +     * @param inputBuffer raw byte array w/Mutation data
 +     * @param size deserialized size of mutation
 +     * @param minPosition We need to suppress replay of mutations that are before the required minPosition
 +     * @param entryLocation filePointer offset of mutation within CommitLogSegment
 +     * @param desc CommitLogDescriptor being worked on
 +     */
 +    @VisibleForTesting
 +    protected void readMutation(CommitLogReadHandler handler,
 +                                byte[] inputBuffer,
 +                                int size,
 +                                CommitLogPosition minPosition,
 +                                final int entryLocation,
 +                                final CommitLogDescriptor desc) throws IOException
 +    {
 +        // For now, we need to go through the motions of deserializing the mutation to determine its size and move
 +        // the file pointer forward accordingly, even if we're behind the requested minPosition within this SyncSegment.
 +        boolean shouldReplay = entryLocation > minPosition.position;
 +
 +        final Mutation mutation;
 +        try (RebufferingInputStream bufIn = new DataInputBuffer(inputBuffer, 0, size))
 +        {
 +            mutation = Mutation.serializer.deserialize(bufIn,
 +                                                       desc.getMessagingVersion(),
 +                                                       SerializationHelper.Flag.LOCAL);
 +            // doublecheck that what we read is still] valid for the current schema
 +            for (PartitionUpdate upd : mutation.getPartitionUpdates())
 +                upd.validate();
 +        }
 +        catch (UnknownColumnFamilyException ex)
 +        {
 +            if (ex.cfId == null)
 +                return;
 +            AtomicInteger i = invalidMutations.get(ex.cfId);
 +            if (i == null)
 +            {
 +                i = new AtomicInteger(1);
 +                invalidMutations.put(ex.cfId, i);
 +            }
 +            else
 +                i.incrementAndGet();
 +            return;
 +        }
 +        catch (Throwable t)
 +        {
 +            JVMStabilityInspector.inspectThrowable(t);
 +            File f = File.createTempFile("mutation", "dat");
 +
 +            try (DataOutputStream out = new DataOutputStream(new FileOutputStream(f)))
 +            {
 +                out.write(inputBuffer, 0, size);
 +            }
 +
 +            // Checksum passed so this error can't be permissible.
 +            handler.handleUnrecoverableError(new CommitLogReadException(
 +                String.format(
 +                    "Unexpected error deserializing mutation; saved to %s.  " +
 +                    "This may be caused by replaying a mutation against a table with the same name but incompatible schema.  " +
 +                    "Exception follows: %s", f.getAbsolutePath(), t),
 +                CommitLogReadErrorReason.MUTATION_ERROR,
 +                false));
 +            return;
 +        }
 +
 +        if (logger.isTraceEnabled())
 +            logger.trace("Read mutation for {}.{}: {}", mutation.getKeyspaceName(), mutation.key(),
 +                         "{" + StringUtils.join(mutation.getPartitionUpdates().iterator(), ", ") + "}");
 +
 +        if (shouldReplay)
 +            handler.handleMutation(mutation, size, entryLocation, desc);
 +    }
 +
 +    /**
 +     * Helper methods to deal with changing formats of internals of the CommitLog without polluting deserialization code.
 +     */
 +    private static class CommitLogFormat
 +    {
 +        public static long calculateClaimedChecksum(FileDataInput input, int commitLogVersion) throws IOException
 +        {
 +            switch (commitLogVersion)
 +            {
 +                case CommitLogDescriptor.VERSION_12:
 +                case CommitLogDescriptor.VERSION_20:
 +                    return input.readLong();
 +                // Changed format in 2.1
 +                default:
 +                    return input.readInt() & 0xffffffffL;
 +            }
 +        }
 +
 +        public static void updateChecksum(CRC32 checksum, int serializedSize, int commitLogVersion)
 +        {
 +            switch (commitLogVersion)
 +            {
 +                case CommitLogDescriptor.VERSION_12:
 +                    checksum.update(serializedSize);
 +                    break;
 +                // Changed format in 2.0
 +                default:
 +                    updateChecksumInt(checksum, serializedSize);
 +                    break;
 +            }
 +        }
 +
 +        public static long calculateClaimedCRC32(FileDataInput input, int commitLogVersion) throws IOException
 +        {
 +            switch (commitLogVersion)
 +            {
 +                case CommitLogDescriptor.VERSION_12:
 +                case CommitLogDescriptor.VERSION_20:
 +                    return input.readLong();
 +                // Changed format in 2.1
 +                default:
 +                    return input.readInt() & 0xffffffffL;
 +            }
 +        }
 +    }
 +
 +    private static class ReadStatusTracker
 +    {
 +        private int mutationsLeft;
 +        public String errorContext = "";
 +        public boolean tolerateErrorsInSection;
 +        private boolean error;
 +
 +        public ReadStatusTracker(int mutationLimit, boolean tolerateErrorsInSection)
 +        {
 +            this.mutationsLeft = mutationLimit;
 +            this.tolerateErrorsInSection = tolerateErrorsInSection;
 +        }
 +
 +        public void addProcessedMutation()
 +        {
 +            if (mutationsLeft == ALL_MUTATIONS)
 +                return;
 +            --mutationsLeft;
 +        }
 +
 +        public boolean shouldContinue()
 +        {
 +            return !error && (mutationsLeft != 0 || mutationsLeft == ALL_MUTATIONS);
 +        }
 +
 +        public void requestTermination()
 +        {
 +            error = true;
 +        }
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a2399d4d/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
index 5476d03,90dc258..4000fbf
--- a/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
+++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
@@@ -171,10 -143,23 +171,23 @@@ public class CommitLogTes
      }
  
      @Test
+     public void testRecoveryWithShortPadding() throws Exception
+     {
 -        // If we have 0-3 bytes remaining, commitlog replayer
 -        // should pass, because there's insufficient room
 -        // left in the segment for the legacy size marker.
 -        testRecovery(new byte[1], null);
 -        testRecovery(new byte[2], null);
 -        testRecovery(new byte[3], null);
++            // If we have 0-3 bytes remaining, commitlog replayer
++            // should pass, because there's insufficient room
++            // left in the segment for the legacy size marker.
++            testRecovery(new byte[1], null);
++            testRecovery(new byte[2], null);
++            testRecovery(new byte[3], null);
+     }
+ 
+     @Test
      public void testRecoveryWithShortSize() throws Exception
      {
+         byte[] data = new byte[5];
+         data[3] = 1; // Not a legacy marker, give it a fake (short) size
          runExpecting(() -> {
-             testRecovery(new byte[2], CommitLogDescriptor.VERSION_20);
+             testRecovery(data, CommitLogDescriptor.VERSION_20);
              return null;
          }, CommitLogReplayException.class);
      }


[06/10] cassandra git commit: Merge branch 'cassandra-2.2' into cassandra-3.0

Posted by jj...@apache.org.
Merge branch 'cassandra-2.2' into cassandra-3.0


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/44f79bf2
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/44f79bf2
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/44f79bf2

Branch: refs/heads/cassandra-3.0
Commit: 44f79bf2f7a3a05f802014492ecbec67c49c02d0
Parents: aeca1d2 beb9658
Author: Jeff Jirsa <je...@jeffjirsa.net>
Authored: Sun Mar 12 21:54:42 2017 -0700
Committer: Jeff Jirsa <je...@jeffjirsa.net>
Committed: Sun Mar 12 21:56:00 2017 -0700

----------------------------------------------------------------------
 CHANGES.txt                                          |  1 +
 .../cassandra/db/commitlog/CommitLogReplayer.java    | 11 +++++++++++
 .../apache/cassandra/db/commitlog/CommitLogTest.java | 15 ++++++++++++++-
 3 files changed, 26 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/44f79bf2/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 52a794b,2839291..140c860
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,21 -1,6 +1,22 @@@
 -2.2.10
 +3.0.13
 + * Slice.isEmpty() returns false for some empty slices (CASSANDRA-13305)
 + * Add formatted row output to assertEmpty in CQL Tester (CASSANDRA-13238)
 +Merged from 2.2:
++ * Commitlog replay may fail if last mutation is within 4 bytes of end of segment (CASSANDRA-13282)
   * Fix queries updating multiple time the same list (CASSANDRA-13130)
   * Fix GRANT/REVOKE when keyspace isn't specified (CASSANDRA-13053)
 +
 +
 +3.0.12
 + * Prevent data loss on upgrade 2.1 - 3.0 by adding component separator to LogRecord absolute path (CASSANDRA-13294)
 + * Improve testing on macOS by eliminating sigar logging (CASSANDRA-13233)
 + * Cqlsh copy-from should error out when csv contains invalid data for collections (CASSANDRA-13071)
 + * Update c.yaml doc for offheap memtables (CASSANDRA-13179)
 + * Faster StreamingHistogram (CASSANDRA-13038)
 + * Legacy deserializer can create unexpected boundary range tombstones (CASSANDRA-13237)
 + * Remove unnecessary assertion from AntiCompactionTest (CASSANDRA-13070)
 + * Fix cqlsh COPY for dates before 1900 (CASSANDRA-13185)
 +Merged from 2.2:
   * Avoid race on receiver by starting streaming sender thread after sending init message (CASSANDRA-12886)
   * Fix "multiple versions of ant detected..." when running ant test (CASSANDRA-13232)
   * Coalescing strategy sleeps too much (CASSANDRA-13090)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/44f79bf2/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
index d53f0f8,3cf4d0f..205c36a
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
@@@ -483,6 -439,17 +483,17 @@@ public class CommitLogReplaye
              int serializedSize;
              try
              {
+                 // We rely on reading serialized size == 0 (LEGACY_END_OF_SEGMENT_MARKER) to identify the end
+                 // of a segment, which happens naturally due to the 0 padding of the empty segment on creation.
 -                // However, with 2.1 era commitlogs it's possible that the last mutation ended less than 4 bytes 
++                // However, it's possible with 2.1 era commitlogs that the last mutation ended less than 4 bytes 
+                 // from the end of the file, which means that we'll be unable to read an a full int and instead 
+                 // read an EOF here
+                 if(end - reader.getFilePointer() < 4)
+                 {
+                     logger.trace("Not enough bytes left for another mutation in this CommitLog segment, continuing");
+                     return false;
+                 }
+ 
                  // any of the reads may hit EOF
                  serializedSize = reader.readInt();
                  if (serializedSize == LEGACY_END_OF_SEGMENT_MARKER)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/44f79bf2/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
index c4ab6ab,9b63885..90dc258
--- a/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
+++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
@@@ -143,11 -137,25 +143,24 @@@ public class CommitLogTes
      }
  
      @Test
+     public void testRecoveryWithShortPadding() throws Exception
+     {
+         // If we have 0-3 bytes remaining, commitlog replayer
+         // should pass, because there's insufficient room
+         // left in the segment for the legacy size marker.
+         testRecovery(new byte[1], null);
+         testRecovery(new byte[2], null);
+         testRecovery(new byte[3], null);
+     }
+ 
+     @Test
      public void testRecoveryWithShortSize() throws Exception
      {
 -        runExpecting(new WrappedRunnable()  {
 -            public void runMayThrow() throws Exception {
 -                byte[] data = new byte[5];
 -                data[3] = 1; // Not a legacy marker, give it a fake (short) size
 -                testRecovery(data, CommitLogDescriptor.VERSION_20);
 -            }
++        byte[] data = new byte[5];
++        data[3] = 1; // Not a legacy marker, give it a fake (short) size
 +        runExpecting(() -> {
-             testRecovery(new byte[2], CommitLogDescriptor.VERSION_20);
++            testRecovery(data, CommitLogDescriptor.VERSION_20);
 +            return null;
          }, CommitLogReplayException.class);
      }
  


[04/10] cassandra git commit: Commitlog replay may fail if last mutation is within 4 bytes of end of segment

Posted by jj...@apache.org.
Commitlog replay may fail if last mutation is within 4 bytes of end of segment

Patch by Jeff Jirsa; Reviewed by Branimir Lambov for CASSANDRA-13282


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/beb9658d
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/beb9658d
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/beb9658d

Branch: refs/heads/trunk
Commit: beb9658dd5e18e3a6a4e8431b6549ae4c33365a9
Parents: 5ef8a8b
Author: Jeff Jirsa <je...@jeffjirsa.net>
Authored: Sun Mar 12 21:54:04 2017 -0700
Committer: Jeff Jirsa <je...@jeffjirsa.net>
Committed: Sun Mar 12 21:54:04 2017 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../db/commitlog/CommitLogReplayer.java         | 11 +++++++++++
 .../cassandra/db/commitlog/CommitLogTest.java   | 20 ++++++++++++++++----
 3 files changed, 28 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/beb9658d/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 09e4039..2839291 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -9,6 +9,7 @@
  * Fix failing COPY TO STDOUT (CASSANDRA-12497)
  * Fix ColumnCounter::countAll behaviour for reverse queries (CASSANDRA-13222)
  * Exceptions encountered calling getSeeds() breaks OTC thread (CASSANDRA-13018)
+ * Commitlog replay may fail if last mutation is within 4 bytes of end of segment (CASSANDRA-13282)
 Merged from 2.1:
  * Remove unused repositories (CASSANDRA-13278)
  * Log stacktrace of uncaught exceptions (CASSANDRA-13108)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/beb9658d/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
index a58aeb4..3cf4d0f 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
@@ -439,6 +439,17 @@ public class CommitLogReplayer
             int serializedSize;
             try
             {
+                // We rely on reading serialized size == 0 (LEGACY_END_OF_SEGMENT_MARKER) to identify the end
+                // of a segment, which happens naturally due to the 0 padding of the empty segment on creation.
+                // However, with 2.1 era commitlogs it's possible that the last mutation ended less than 4 bytes 
+                // from the end of the file, which means that we'll be unable to read an a full int and instead 
+                // read an EOF here
+                if(end - reader.getFilePointer() < 4)
+                {
+                    logger.trace("Not enough bytes left for another mutation in this CommitLog segment, continuing");
+                    return false;
+                }
+
                 // any of the reads may hit EOF
                 serializedSize = reader.readInt();
                 if (serializedSize == LEGACY_END_OF_SEGMENT_MARKER)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/beb9658d/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java b/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
index 9999b42..9b63885 100644
--- a/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
+++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
@@ -137,12 +137,24 @@ public class CommitLogTest
     }
 
     @Test
+    public void testRecoveryWithShortPadding() throws Exception
+    {
+        // If we have 0-3 bytes remaining, commitlog replayer
+        // should pass, because there's insufficient room
+        // left in the segment for the legacy size marker.
+        testRecovery(new byte[1], null);
+        testRecovery(new byte[2], null);
+        testRecovery(new byte[3], null);
+    }
+
+    @Test
     public void testRecoveryWithShortSize() throws Exception
     {
-        runExpecting(new WrappedRunnable() {
-            public void runMayThrow() throws Exception
-            {
-                testRecovery(new byte[2], CommitLogDescriptor.VERSION_20);
+        runExpecting(new WrappedRunnable()  {
+            public void runMayThrow() throws Exception {
+                byte[] data = new byte[5];
+                data[3] = 1; // Not a legacy marker, give it a fake (short) size
+                testRecovery(data, CommitLogDescriptor.VERSION_20);
             }
         }, CommitLogReplayException.class);
     }


[05/10] cassandra git commit: Merge branch 'cassandra-2.2' into cassandra-3.0

Posted by jj...@apache.org.
Merge branch 'cassandra-2.2' into cassandra-3.0


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/44f79bf2
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/44f79bf2
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/44f79bf2

Branch: refs/heads/cassandra-3.11
Commit: 44f79bf2f7a3a05f802014492ecbec67c49c02d0
Parents: aeca1d2 beb9658
Author: Jeff Jirsa <je...@jeffjirsa.net>
Authored: Sun Mar 12 21:54:42 2017 -0700
Committer: Jeff Jirsa <je...@jeffjirsa.net>
Committed: Sun Mar 12 21:56:00 2017 -0700

----------------------------------------------------------------------
 CHANGES.txt                                          |  1 +
 .../cassandra/db/commitlog/CommitLogReplayer.java    | 11 +++++++++++
 .../apache/cassandra/db/commitlog/CommitLogTest.java | 15 ++++++++++++++-
 3 files changed, 26 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/44f79bf2/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 52a794b,2839291..140c860
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,21 -1,6 +1,22 @@@
 -2.2.10
 +3.0.13
 + * Slice.isEmpty() returns false for some empty slices (CASSANDRA-13305)
 + * Add formatted row output to assertEmpty in CQL Tester (CASSANDRA-13238)
 +Merged from 2.2:
++ * Commitlog replay may fail if last mutation is within 4 bytes of end of segment (CASSANDRA-13282)
   * Fix queries updating multiple time the same list (CASSANDRA-13130)
   * Fix GRANT/REVOKE when keyspace isn't specified (CASSANDRA-13053)
 +
 +
 +3.0.12
 + * Prevent data loss on upgrade 2.1 - 3.0 by adding component separator to LogRecord absolute path (CASSANDRA-13294)
 + * Improve testing on macOS by eliminating sigar logging (CASSANDRA-13233)
 + * Cqlsh copy-from should error out when csv contains invalid data for collections (CASSANDRA-13071)
 + * Update c.yaml doc for offheap memtables (CASSANDRA-13179)
 + * Faster StreamingHistogram (CASSANDRA-13038)
 + * Legacy deserializer can create unexpected boundary range tombstones (CASSANDRA-13237)
 + * Remove unnecessary assertion from AntiCompactionTest (CASSANDRA-13070)
 + * Fix cqlsh COPY for dates before 1900 (CASSANDRA-13185)
 +Merged from 2.2:
   * Avoid race on receiver by starting streaming sender thread after sending init message (CASSANDRA-12886)
   * Fix "multiple versions of ant detected..." when running ant test (CASSANDRA-13232)
   * Coalescing strategy sleeps too much (CASSANDRA-13090)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/44f79bf2/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
index d53f0f8,3cf4d0f..205c36a
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
@@@ -483,6 -439,17 +483,17 @@@ public class CommitLogReplaye
              int serializedSize;
              try
              {
+                 // We rely on reading serialized size == 0 (LEGACY_END_OF_SEGMENT_MARKER) to identify the end
+                 // of a segment, which happens naturally due to the 0 padding of the empty segment on creation.
 -                // However, with 2.1 era commitlogs it's possible that the last mutation ended less than 4 bytes 
++                // However, it's possible with 2.1 era commitlogs that the last mutation ended less than 4 bytes 
+                 // from the end of the file, which means that we'll be unable to read an a full int and instead 
+                 // read an EOF here
+                 if(end - reader.getFilePointer() < 4)
+                 {
+                     logger.trace("Not enough bytes left for another mutation in this CommitLog segment, continuing");
+                     return false;
+                 }
+ 
                  // any of the reads may hit EOF
                  serializedSize = reader.readInt();
                  if (serializedSize == LEGACY_END_OF_SEGMENT_MARKER)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/44f79bf2/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
index c4ab6ab,9b63885..90dc258
--- a/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
+++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
@@@ -143,11 -137,25 +143,24 @@@ public class CommitLogTes
      }
  
      @Test
+     public void testRecoveryWithShortPadding() throws Exception
+     {
+         // If we have 0-3 bytes remaining, commitlog replayer
+         // should pass, because there's insufficient room
+         // left in the segment for the legacy size marker.
+         testRecovery(new byte[1], null);
+         testRecovery(new byte[2], null);
+         testRecovery(new byte[3], null);
+     }
+ 
+     @Test
      public void testRecoveryWithShortSize() throws Exception
      {
 -        runExpecting(new WrappedRunnable()  {
 -            public void runMayThrow() throws Exception {
 -                byte[] data = new byte[5];
 -                data[3] = 1; // Not a legacy marker, give it a fake (short) size
 -                testRecovery(data, CommitLogDescriptor.VERSION_20);
 -            }
++        byte[] data = new byte[5];
++        data[3] = 1; // Not a legacy marker, give it a fake (short) size
 +        runExpecting(() -> {
-             testRecovery(new byte[2], CommitLogDescriptor.VERSION_20);
++            testRecovery(data, CommitLogDescriptor.VERSION_20);
 +            return null;
          }, CommitLogReplayException.class);
      }
  


[10/10] cassandra git commit: Merge branch 'cassandra-3.11' into trunk

Posted by jj...@apache.org.
Merge branch 'cassandra-3.11' into trunk


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/dd5251c4
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/dd5251c4
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/dd5251c4

Branch: refs/heads/trunk
Commit: dd5251c46073d75c09e47f645b1d0ebc3a135411
Parents: 0c5faef a2399d4
Author: Jeff Jirsa <je...@jeffjirsa.net>
Authored: Sun Mar 12 21:57:37 2017 -0700
Committer: Jeff Jirsa <je...@jeffjirsa.net>
Committed: Sun Mar 12 21:58:36 2017 -0700

----------------------------------------------------------------------
 CHANGES.txt                                             |  1 +
 .../apache/cassandra/db/commitlog/CommitLogReader.java  | 12 ++++++++++++
 2 files changed, 13 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/dd5251c4/CHANGES.txt
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dd5251c4/src/java/org/apache/cassandra/db/commitlog/CommitLogReader.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/commitlog/CommitLogReader.java
index 1da0cee,d1cb8d6..9eec477
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogReader.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogReader.java
@@@ -253,6 -265,18 +253,18 @@@ public class CommitLogReade
              int serializedSize;
              try
              {
+                 // We rely on reading serialized size == 0 (LEGACY_END_OF_SEGMENT_MARKER) to identify the end
+                 // of a segment, which happens naturally due to the 0 padding of the empty segment on creation.
+                 // However, it's possible with 2.1 era commitlogs that the last mutation ended less than 4 bytes
+                 // from the end of the file, which means that we'll be unable to read an a full int and instead
+                 // read an EOF here
+                 if(end - reader.getFilePointer() < 4)
+                 {
 -                    logger.trace("Not enough bytes left for another mutation in this CommitLog segment, continuing");
++                    logger.trace("Not enough bytes left for another mutation in this CommitLog section, continuing");
+                     statusTracker.requestTermination();
+                     return;
+                 }
+ 
                  // any of the reads may hit EOF
                  serializedSize = reader.readInt();
                  if (serializedSize == LEGACY_END_OF_SEGMENT_MARKER)


[08/10] cassandra git commit: Merge branch 'cassandra-3.0' into cassandra-3.11

Posted by jj...@apache.org.
Merge branch 'cassandra-3.0' into cassandra-3.11


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/a2399d4d
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/a2399d4d
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/a2399d4d

Branch: refs/heads/cassandra-3.11
Commit: a2399d4d309ac6b60a150ea20af8dc6f006d51ff
Parents: 2c111d1 44f79bf
Author: Jeff Jirsa <je...@jeffjirsa.net>
Authored: Sun Mar 12 21:56:11 2017 -0700
Committer: Jeff Jirsa <je...@jeffjirsa.net>
Committed: Sun Mar 12 21:57:25 2017 -0700

----------------------------------------------------------------------
 CHANGES.txt                                          |  1 +
 .../cassandra/db/commitlog/CommitLogReader.java      | 12 ++++++++++++
 .../apache/cassandra/db/commitlog/CommitLogTest.java | 15 ++++++++++++++-
 3 files changed, 27 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/a2399d4d/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 302a028,140c860..ab28dd4
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -33,140 -43,6 +33,141 @@@ Merged from 3.0
     live rows in sstabledump (CASSANDRA-13177)
   * Provide user workaround when system_schema.columns does not contain entries
     for a table that's in system_schema.tables (CASSANDRA-13180)
 +Merged from 2.2:
++ * Commitlog replay may fail if last mutation is within 4 bytes of end of segment (CASSANDRA-13282)
 + * Fix queries updating multiple time the same list (CASSANDRA-13130)
 + * Fix GRANT/REVOKE when keyspace isn't specified (CASSANDRA-13053)
 + * Fix flaky LongLeveledCompactionStrategyTest (CASSANDRA-12202)
 + * Fix failing COPY TO STDOUT (CASSANDRA-12497)
 + * Fix ColumnCounter::countAll behaviour for reverse queries (CASSANDRA-13222)
 + * Exceptions encountered calling getSeeds() breaks OTC thread (CASSANDRA-13018)
 + * Fix negative mean latency metric (CASSANDRA-12876)
 + * Use only one file pointer when creating commitlog segments (CASSANDRA-12539)
 +Merged from 2.1:
 + * Remove unused repositories (CASSANDRA-13278)
 + * Log stacktrace of uncaught exceptions (CASSANDRA-13108)
 + * Use portable stderr for java error in startup (CASSANDRA-13211)
 + * Fix Thread Leak in OutboundTcpConnection (CASSANDRA-13204)
 + * Coalescing strategy can enter infinite loop (CASSANDRA-13159)
 +
 +
 +3.10
 + * Fix secondary index queries regression (CASSANDRA-13013)
 + * Add duration type to the protocol V5 (CASSANDRA-12850)
 + * Fix duration type validation (CASSANDRA-13143)
 + * Fix flaky GcCompactionTest (CASSANDRA-12664)
 + * Fix TestHintedHandoff.hintedhandoff_decom_test (CASSANDRA-13058)
 + * Fixed query monitoring for range queries (CASSANDRA-13050)
 + * Remove outboundBindAny configuration property (CASSANDRA-12673)
 + * Use correct bounds for all-data range when filtering (CASSANDRA-12666)
 + * Remove timing window in test case (CASSANDRA-12875)
 + * Resolve unit testing without JCE security libraries installed (CASSANDRA-12945)
 + * Fix inconsistencies in cassandra-stress load balancing policy (CASSANDRA-12919)
 + * Fix validation of non-frozen UDT cells (CASSANDRA-12916)
 + * Don't shut down socket input/output on StreamSession (CASSANDRA-12903)
 + * Fix Murmur3PartitionerTest (CASSANDRA-12858)
 + * Move cqlsh syntax rules into separate module and allow easier customization (CASSANDRA-12897)
 + * Fix CommitLogSegmentManagerTest (CASSANDRA-12283)
 + * Fix cassandra-stress truncate option (CASSANDRA-12695)
 + * Fix crossNode value when receiving messages (CASSANDRA-12791)
 + * Don't load MX4J beans twice (CASSANDRA-12869)
 + * Extend native protocol request flags, add versions to SUPPORTED, and introduce ProtocolVersion enum (CASSANDRA-12838)
 + * Set JOINING mode when running pre-join tasks (CASSANDRA-12836)
 + * remove net.mintern.primitive library due to license issue (CASSANDRA-12845)
 + * Properly format IPv6 addresses when logging JMX service URL (CASSANDRA-12454)
 + * Optimize the vnode allocation for single replica per DC (CASSANDRA-12777)
 + * Use non-token restrictions for bounds when token restrictions are overridden (CASSANDRA-12419)
 + * Fix CQLSH auto completion for PER PARTITION LIMIT (CASSANDRA-12803)
 + * Use different build directories for Eclipse and Ant (CASSANDRA-12466)
 + * Avoid potential AttributeError in cqlsh due to no table metadata (CASSANDRA-12815)
 + * Fix RandomReplicationAwareTokenAllocatorTest.testExistingCluster (CASSANDRA-12812)
 + * Upgrade commons-codec to 1.9 (CASSANDRA-12790)
 + * Make the fanout size for LeveledCompactionStrategy to be configurable (CASSANDRA-11550)
 + * Add duration data type (CASSANDRA-11873)
 + * Fix timeout in ReplicationAwareTokenAllocatorTest (CASSANDRA-12784)
 + * Improve sum aggregate functions (CASSANDRA-12417)
 + * Make cassandra.yaml docs for batch_size_*_threshold_in_kb reflect changes in CASSANDRA-10876 (CASSANDRA-12761)
 + * cqlsh fails to format collections when using aliases (CASSANDRA-11534)
 + * Check for hash conflicts in prepared statements (CASSANDRA-12733)
 + * Exit query parsing upon first error (CASSANDRA-12598)
 + * Fix cassandra-stress to use single seed in UUID generation (CASSANDRA-12729)
 + * CQLSSTableWriter does not allow Update statement (CASSANDRA-12450)
 + * Config class uses boxed types but DD exposes primitive types (CASSANDRA-12199)
 + * Add pre- and post-shutdown hooks to Storage Service (CASSANDRA-12461)
 + * Add hint delivery metrics (CASSANDRA-12693)
 + * Remove IndexInfo cache from FileIndexInfoRetriever (CASSANDRA-12731)
 + * ColumnIndex does not reuse buffer (CASSANDRA-12502)
 + * cdc column addition still breaks schema migration tasks (CASSANDRA-12697)
 + * Upgrade metrics-reporter dependencies (CASSANDRA-12089)
 + * Tune compaction thread count via nodetool (CASSANDRA-12248)
 + * Add +=/-= shortcut syntax for update queries (CASSANDRA-12232)
 + * Include repair session IDs in repair start message (CASSANDRA-12532)
 + * Add a blocking task to Index, run before joining the ring (CASSANDRA-12039)
 + * Fix NPE when using CQLSSTableWriter (CASSANDRA-12667)
 + * Support optional backpressure strategies at the coordinator (CASSANDRA-9318)
 + * Make randompartitioner work with new vnode allocation (CASSANDRA-12647)
 + * Fix cassandra-stress graphing (CASSANDRA-12237)
 + * Allow filtering on partition key columns for queries without secondary indexes (CASSANDRA-11031)
 + * Fix Cassandra Stress reporting thread model and precision (CASSANDRA-12585)
 + * Add JMH benchmarks.jar (CASSANDRA-12586)
 + * Cleanup uses of AlterTableStatementColumn (CASSANDRA-12567)
 + * Add keep-alive to streaming (CASSANDRA-11841)
 + * Tracing payload is passed through newSession(..) (CASSANDRA-11706)
 + * avoid deleting non existing sstable files and improve related log messages (CASSANDRA-12261)
 + * json/yaml output format for nodetool compactionhistory (CASSANDRA-12486)
 + * Retry all internode messages once after a connection is
 +   closed and reopened (CASSANDRA-12192)
 + * Add support to rebuild from targeted replica (CASSANDRA-9875)
 + * Add sequence distribution type to cassandra stress (CASSANDRA-12490)
 + * "SELECT * FROM foo LIMIT ;" does not error out (CASSANDRA-12154)
 + * Define executeLocally() at the ReadQuery Level (CASSANDRA-12474)
 + * Extend read/write failure messages with a map of replica addresses
 +   to error codes in the v5 native protocol (CASSANDRA-12311)
 + * Fix rebuild of SASI indexes with existing index files (CASSANDRA-12374)
 + * Let DatabaseDescriptor not implicitly startup services (CASSANDRA-9054, 12550)
 + * Fix clustering indexes in presence of static columns in SASI (CASSANDRA-12378)
 + * Fix queries on columns with reversed type on SASI indexes (CASSANDRA-12223)
 + * Added slow query log (CASSANDRA-12403)
 + * Count full coordinated request against timeout (CASSANDRA-12256)
 + * Allow TTL with null value on insert and update (CASSANDRA-12216)
 + * Make decommission operation resumable (CASSANDRA-12008)
 + * Add support to one-way targeted repair (CASSANDRA-9876)
 + * Remove clientutil jar (CASSANDRA-11635)
 + * Fix compaction throughput throttle (CASSANDRA-12366, CASSANDRA-12717)
 + * Delay releasing Memtable memory on flush until PostFlush has finished running (CASSANDRA-12358)
 + * Cassandra stress should dump all setting on startup (CASSANDRA-11914)
 + * Make it possible to compact a given token range (CASSANDRA-10643)
 + * Allow updating DynamicEndpointSnitch properties via JMX (CASSANDRA-12179)
 + * Collect metrics on queries by consistency level (CASSANDRA-7384)
 + * Add support for GROUP BY to SELECT statement (CASSANDRA-10707)
 + * Deprecate memtable_cleanup_threshold and update default for memtable_flush_writers (CASSANDRA-12228)
 + * Upgrade to OHC 0.4.4 (CASSANDRA-12133)
 + * Add version command to cassandra-stress (CASSANDRA-12258)
 + * Create compaction-stress tool (CASSANDRA-11844)
 + * Garbage-collecting compaction operation and schema option (CASSANDRA-7019)
 + * Add beta protocol flag for v5 native protocol (CASSANDRA-12142)
 + * Support filtering on non-PRIMARY KEY columns in the CREATE
 +   MATERIALIZED VIEW statement's WHERE clause (CASSANDRA-10368)
 + * Unify STDOUT and SYSTEMLOG logback format (CASSANDRA-12004)
 + * COPY FROM should raise error for non-existing input files (CASSANDRA-12174)
 + * Faster write path (CASSANDRA-12269)
 + * Option to leave omitted columns in INSERT JSON unset (CASSANDRA-11424)
 + * Support json/yaml output in nodetool tpstats (CASSANDRA-12035)
 + * Expose metrics for successful/failed authentication attempts (CASSANDRA-10635)
 + * Prepend snapshot name with "truncated" or "dropped" when a snapshot
 +   is taken before truncating or dropping a table (CASSANDRA-12178)
 + * Optimize RestrictionSet (CASSANDRA-12153)
 + * cqlsh does not automatically downgrade CQL version (CASSANDRA-12150)
 + * Omit (de)serialization of state variable in UDAs (CASSANDRA-9613)
 + * Create a system table to expose prepared statements (CASSANDRA-8831)
 + * Reuse DataOutputBuffer from ColumnIndex (CASSANDRA-11970)
 + * Remove DatabaseDescriptor dependency from SegmentedFile (CASSANDRA-11580)
 + * Add supplied username to authentication error messages (CASSANDRA-12076)
 + * Remove pre-startup check for open JMX port (CASSANDRA-12074)
 + * Remove compaction Severity from DynamicEndpointSnitch (CASSANDRA-11738)
 + * Restore resumable hints delivery (CASSANDRA-11960)
 + * Properly report LWT contention (CASSANDRA-12626)
 +Merged from 3.0:
   * Dump threads when unit tests time out (CASSANDRA-13117)
   * Better error when modifying function permissions without explicit keyspace (CASSANDRA-12925)
   * Indexer is not correctly invoked when building indexes over sstables (CASSANDRA-13075)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a2399d4d/src/java/org/apache/cassandra/db/commitlog/CommitLogReader.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/commitlog/CommitLogReader.java
index e6e2e1a,0000000..d1cb8d6
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogReader.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogReader.java
@@@ -1,502 -1,0 +1,514 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.db.commitlog;
 +
 +import java.io.*;
 +import java.util.*;
 +import java.util.concurrent.atomic.AtomicInteger;
 +import java.util.zip.CRC32;
 +
 +import com.google.common.annotations.VisibleForTesting;
 +import org.apache.commons.lang3.StringUtils;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import org.apache.cassandra.config.DatabaseDescriptor;
 +import org.apache.cassandra.db.Mutation;
 +import org.apache.cassandra.db.UnknownColumnFamilyException;
 +import org.apache.cassandra.db.commitlog.CommitLogReadHandler.CommitLogReadErrorReason;
 +import org.apache.cassandra.db.commitlog.CommitLogReadHandler.CommitLogReadException;
 +import org.apache.cassandra.db.partitions.PartitionUpdate;
 +import org.apache.cassandra.db.rows.SerializationHelper;
 +import org.apache.cassandra.io.util.DataInputBuffer;
 +import org.apache.cassandra.io.util.FileDataInput;
 +import org.apache.cassandra.io.util.RandomAccessReader;
 +import org.apache.cassandra.io.util.RebufferingInputStream;
 +import org.apache.cassandra.utils.JVMStabilityInspector;
 +
 +import static org.apache.cassandra.utils.FBUtilities.updateChecksumInt;
 +
 +public class CommitLogReader
 +{
 +    private static final Logger logger = LoggerFactory.getLogger(CommitLogReader.class);
 +
 +    private static final int LEGACY_END_OF_SEGMENT_MARKER = 0;
 +
 +    @VisibleForTesting
 +    public static final int ALL_MUTATIONS = -1;
 +    private final CRC32 checksum;
 +    private final Map<UUID, AtomicInteger> invalidMutations;
 +
 +    private byte[] buffer;
 +
 +    public CommitLogReader()
 +    {
 +        checksum = new CRC32();
 +        invalidMutations = new HashMap<>();
 +        buffer = new byte[4096];
 +    }
 +
 +    public Set<Map.Entry<UUID, AtomicInteger>> getInvalidMutations()
 +    {
 +        return invalidMutations.entrySet();
 +    }
 +
 +    /**
 +     * Reads all passed in files with no minimum, no start, and no mutation limit.
 +     */
 +    public void readAllFiles(CommitLogReadHandler handler, File[] files) throws IOException
 +    {
 +        readAllFiles(handler, files, CommitLogPosition.NONE);
 +    }
 +
 +    /**
 +     * Reads all passed in files with minPosition, no start, and no mutation limit.
 +     */
 +    public void readAllFiles(CommitLogReadHandler handler, File[] files, CommitLogPosition minPosition) throws IOException
 +    {
 +        for (int i = 0; i < files.length; i++)
 +            readCommitLogSegment(handler, files[i], minPosition, ALL_MUTATIONS, i + 1 == files.length);
 +    }
 +
 +    /**
 +     * Reads passed in file fully
 +     */
 +    public void readCommitLogSegment(CommitLogReadHandler handler, File file, boolean tolerateTruncation) throws IOException
 +    {
 +        readCommitLogSegment(handler, file, CommitLogPosition.NONE, ALL_MUTATIONS, tolerateTruncation);
 +    }
 +
 +    /**
 +     * Reads passed in file fully, up to mutationLimit count
 +     */
 +    @VisibleForTesting
 +    public void readCommitLogSegment(CommitLogReadHandler handler, File file, int mutationLimit, boolean tolerateTruncation) throws IOException
 +    {
 +        readCommitLogSegment(handler, file, CommitLogPosition.NONE, mutationLimit, tolerateTruncation);
 +    }
 +
 +    /**
 +     * Reads mutations from file, handing them off to handler
 +     * @param handler Handler that will take action based on deserialized Mutations
 +     * @param file CommitLogSegment file to read
 +     * @param minPosition Optional minimum CommitLogPosition - all segments with id > or matching w/greater position will be read
 +     * @param mutationLimit Optional limit on # of mutations to replay. Local ALL_MUTATIONS serves as marker to play all.
 +     * @param tolerateTruncation Whether or not we should allow truncation of this file or throw if EOF found
 +     *
 +     * @throws IOException
 +     */
 +    public void readCommitLogSegment(CommitLogReadHandler handler,
 +                                     File file,
 +                                     CommitLogPosition minPosition,
 +                                     int mutationLimit,
 +                                     boolean tolerateTruncation) throws IOException
 +    {
 +        // just transform from the file name (no reading of headers) to determine version
 +        CommitLogDescriptor desc = CommitLogDescriptor.fromFileName(file.getName());
 +
 +        try(RandomAccessReader reader = RandomAccessReader.open(file))
 +        {
 +            if (desc.version < CommitLogDescriptor.VERSION_21)
 +            {
 +                if (!shouldSkipSegmentId(file, desc, minPosition))
 +                {
 +                    if (minPosition.segmentId == desc.id)
 +                        reader.seek(minPosition.position);
 +                    ReadStatusTracker statusTracker = new ReadStatusTracker(mutationLimit, tolerateTruncation);
 +                    statusTracker.errorContext = desc.fileName();
 +                    readSection(handler, reader, minPosition, (int) reader.length(), statusTracker, desc);
 +                }
 +                return;
 +            }
 +
 +            final long segmentIdFromFilename = desc.id;
 +            try
 +            {
 +                // The following call can either throw or legitimately return null. For either case, we need to check
 +                // desc outside this block and set it to null in the exception case.
 +                desc = CommitLogDescriptor.readHeader(reader, DatabaseDescriptor.getEncryptionContext());
 +            }
 +            catch (Exception e)
 +            {
 +                desc = null;
 +            }
 +            if (desc == null)
 +            {
 +                // don't care about whether or not the handler thinks we can continue. We can't w/out descriptor.
 +                handler.handleUnrecoverableError(new CommitLogReadException(
 +                    String.format("Could not read commit log descriptor in file %s", file),
 +                    CommitLogReadErrorReason.UNRECOVERABLE_DESCRIPTOR_ERROR,
 +                    false));
 +                return;
 +            }
 +
 +            if (segmentIdFromFilename != desc.id)
 +            {
 +                if (handler.shouldSkipSegmentOnError(new CommitLogReadException(String.format(
 +                    "Segment id mismatch (filename %d, descriptor %d) in file %s", segmentIdFromFilename, desc.id, file),
 +                                                                                CommitLogReadErrorReason.RECOVERABLE_DESCRIPTOR_ERROR,
 +                                                                                false)))
 +                {
 +                    return;
 +                }
 +            }
 +
 +            if (shouldSkipSegmentId(file, desc, minPosition))
 +                return;
 +
 +            CommitLogSegmentReader segmentReader;
 +            try
 +            {
 +                segmentReader = new CommitLogSegmentReader(handler, desc, reader, tolerateTruncation);
 +            }
 +            catch(Exception e)
 +            {
 +                handler.handleUnrecoverableError(new CommitLogReadException(
 +                    String.format("Unable to create segment reader for commit log file: %s", e),
 +                    CommitLogReadErrorReason.UNRECOVERABLE_UNKNOWN_ERROR,
 +                    tolerateTruncation));
 +                return;
 +            }
 +
 +            try
 +            {
 +                ReadStatusTracker statusTracker = new ReadStatusTracker(mutationLimit, tolerateTruncation);
 +                for (CommitLogSegmentReader.SyncSegment syncSegment : segmentReader)
 +                {
 +                    // Only tolerate truncation if we allow in both global and segment
 +                    statusTracker.tolerateErrorsInSection = tolerateTruncation & syncSegment.toleratesErrorsInSection;
 +
 +                    // Skip segments that are completely behind the desired minPosition
 +                    if (desc.id == minPosition.segmentId && syncSegment.endPosition < minPosition.position)
 +                        continue;
 +
 +                    statusTracker.errorContext = String.format("Next section at %d in %s", syncSegment.fileStartPosition, desc.fileName());
 +
 +                    readSection(handler, syncSegment.input, minPosition, syncSegment.endPosition, statusTracker, desc);
 +                    if (!statusTracker.shouldContinue())
 +                        break;
 +                }
 +            }
 +            // Unfortunately AbstractIterator cannot throw a checked exception, so we check to see if a RuntimeException
 +            // is wrapping an IOException.
 +            catch (RuntimeException re)
 +            {
 +                if (re.getCause() instanceof IOException)
 +                    throw (IOException) re.getCause();
 +                throw re;
 +            }
 +            logger.debug("Finished reading {}", file);
 +        }
 +    }
 +
 +    /**
 +     * Any segment with id >= minPosition.segmentId is a candidate for read.
 +     */
 +    private boolean shouldSkipSegmentId(File file, CommitLogDescriptor desc, CommitLogPosition minPosition)
 +    {
 +        logger.debug("Reading {} (CL version {}, messaging version {}, compression {})",
 +            file.getPath(),
 +            desc.version,
 +            desc.getMessagingVersion(),
 +            desc.compression);
 +
 +        if (minPosition.segmentId > desc.id)
 +        {
 +            logger.trace("Skipping read of fully-flushed {}", file);
 +            return true;
 +        }
 +        return false;
 +    }
 +
 +    /**
 +     * Reads a section of a file containing mutations
 +     *
 +     * @param handler Handler that will take action based on deserialized Mutations
 +     * @param reader FileDataInput / logical buffer containing commitlog mutations
 +     * @param minPosition CommitLogPosition indicating when we should start actively replaying mutations
 +     * @param end logical numeric end of the segment being read
 +     * @param statusTracker ReadStatusTracker with current state of mutation count, error state, etc
 +     * @param desc Descriptor for CommitLog serialization
 +     */
 +    private void readSection(CommitLogReadHandler handler,
 +                             FileDataInput reader,
 +                             CommitLogPosition minPosition,
 +                             int end,
 +                             ReadStatusTracker statusTracker,
 +                             CommitLogDescriptor desc) throws IOException
 +    {
 +        // seek rather than deserializing mutation-by-mutation to reach the desired minPosition in this SyncSegment
 +        if (desc.id == minPosition.segmentId && reader.getFilePointer() < minPosition.position)
 +            reader.seek(minPosition.position);
 +
 +        while (statusTracker.shouldContinue() && reader.getFilePointer() < end && !reader.isEOF())
 +        {
 +            long mutationStart = reader.getFilePointer();
 +            if (logger.isTraceEnabled())
 +                logger.trace("Reading mutation at {}", mutationStart);
 +
 +            long claimedCRC32;
 +            int serializedSize;
 +            try
 +            {
++                // We rely on reading serialized size == 0 (LEGACY_END_OF_SEGMENT_MARKER) to identify the end
++                // of a segment, which happens naturally due to the 0 padding of the empty segment on creation.
++                // However, it's possible with 2.1 era commitlogs that the last mutation ended less than 4 bytes
++                // from the end of the file, which means that we'll be unable to read an a full int and instead
++                // read an EOF here
++                if(end - reader.getFilePointer() < 4)
++                {
++                    logger.trace("Not enough bytes left for another mutation in this CommitLog segment, continuing");
++                    statusTracker.requestTermination();
++                    return;
++                }
++
 +                // any of the reads may hit EOF
 +                serializedSize = reader.readInt();
 +                if (serializedSize == LEGACY_END_OF_SEGMENT_MARKER)
 +                {
 +                    logger.trace("Encountered end of segment marker at {}", reader.getFilePointer());
 +                    statusTracker.requestTermination();
 +                    return;
 +                }
 +
 +                // Mutation must be at LEAST 10 bytes:
 +                //    3 for a non-empty Keyspace
 +                //    3 for a Key (including the 2-byte length from writeUTF/writeWithShortLength)
 +                //    4 bytes for column count.
 +                // This prevents CRC by being fooled by special-case garbage in the file; see CASSANDRA-2128
 +                if (serializedSize < 10)
 +                {
 +                    if (handler.shouldSkipSegmentOnError(new CommitLogReadException(
 +                                                    String.format("Invalid mutation size %d at %d in %s", serializedSize, mutationStart, statusTracker.errorContext),
 +                                                    CommitLogReadErrorReason.MUTATION_ERROR,
 +                                                    statusTracker.tolerateErrorsInSection)))
 +                    {
 +                        statusTracker.requestTermination();
 +                    }
 +                    return;
 +                }
 +
 +                long claimedSizeChecksum = CommitLogFormat.calculateClaimedChecksum(reader, desc.version);
 +                checksum.reset();
 +                CommitLogFormat.updateChecksum(checksum, serializedSize, desc.version);
 +
 +                if (checksum.getValue() != claimedSizeChecksum)
 +                {
 +                    if (handler.shouldSkipSegmentOnError(new CommitLogReadException(
 +                                                    String.format("Mutation size checksum failure at %d in %s", mutationStart, statusTracker.errorContext),
 +                                                    CommitLogReadErrorReason.MUTATION_ERROR,
 +                                                    statusTracker.tolerateErrorsInSection)))
 +                    {
 +                        statusTracker.requestTermination();
 +                    }
 +                    return;
 +                }
 +
 +                if (serializedSize > buffer.length)
 +                    buffer = new byte[(int) (1.2 * serializedSize)];
 +                reader.readFully(buffer, 0, serializedSize);
 +
 +                claimedCRC32 = CommitLogFormat.calculateClaimedCRC32(reader, desc.version);
 +            }
 +            catch (EOFException eof)
 +            {
 +                if (handler.shouldSkipSegmentOnError(new CommitLogReadException(
 +                                                String.format("Unexpected end of segment at %d in %s", mutationStart, statusTracker.errorContext),
 +                                                CommitLogReadErrorReason.EOF,
 +                                                statusTracker.tolerateErrorsInSection)))
 +                {
 +                    statusTracker.requestTermination();
 +                }
 +                return;
 +            }
 +
 +            checksum.update(buffer, 0, serializedSize);
 +            if (claimedCRC32 != checksum.getValue())
 +            {
 +                if (handler.shouldSkipSegmentOnError(new CommitLogReadException(
 +                                                String.format("Mutation checksum failure at %d in %s", mutationStart, statusTracker.errorContext),
 +                                                CommitLogReadErrorReason.MUTATION_ERROR,
 +                                                statusTracker.tolerateErrorsInSection)))
 +                {
 +                    statusTracker.requestTermination();
 +                }
 +                continue;
 +            }
 +
 +            long mutationPosition = reader.getFilePointer();
 +            readMutation(handler, buffer, serializedSize, minPosition, (int)mutationPosition, desc);
 +
 +            // Only count this as a processed mutation if it is after our min as we suppress reading of mutations that
 +            // are before this mark.
 +            if (mutationPosition >= minPosition.position)
 +                statusTracker.addProcessedMutation();
 +        }
 +    }
 +
 +    /**
 +     * Deserializes and passes a Mutation to the ICommitLogReadHandler requested
 +     *
 +     * @param handler Handler that will take action based on deserialized Mutations
 +     * @param inputBuffer raw byte array w/Mutation data
 +     * @param size deserialized size of mutation
 +     * @param minPosition We need to suppress replay of mutations that are before the required minPosition
 +     * @param entryLocation filePointer offset of mutation within CommitLogSegment
 +     * @param desc CommitLogDescriptor being worked on
 +     */
 +    @VisibleForTesting
 +    protected void readMutation(CommitLogReadHandler handler,
 +                                byte[] inputBuffer,
 +                                int size,
 +                                CommitLogPosition minPosition,
 +                                final int entryLocation,
 +                                final CommitLogDescriptor desc) throws IOException
 +    {
 +        // For now, we need to go through the motions of deserializing the mutation to determine its size and move
 +        // the file pointer forward accordingly, even if we're behind the requested minPosition within this SyncSegment.
 +        boolean shouldReplay = entryLocation > minPosition.position;
 +
 +        final Mutation mutation;
 +        try (RebufferingInputStream bufIn = new DataInputBuffer(inputBuffer, 0, size))
 +        {
 +            mutation = Mutation.serializer.deserialize(bufIn,
 +                                                       desc.getMessagingVersion(),
 +                                                       SerializationHelper.Flag.LOCAL);
 +            // doublecheck that what we read is still] valid for the current schema
 +            for (PartitionUpdate upd : mutation.getPartitionUpdates())
 +                upd.validate();
 +        }
 +        catch (UnknownColumnFamilyException ex)
 +        {
 +            if (ex.cfId == null)
 +                return;
 +            AtomicInteger i = invalidMutations.get(ex.cfId);
 +            if (i == null)
 +            {
 +                i = new AtomicInteger(1);
 +                invalidMutations.put(ex.cfId, i);
 +            }
 +            else
 +                i.incrementAndGet();
 +            return;
 +        }
 +        catch (Throwable t)
 +        {
 +            JVMStabilityInspector.inspectThrowable(t);
 +            File f = File.createTempFile("mutation", "dat");
 +
 +            try (DataOutputStream out = new DataOutputStream(new FileOutputStream(f)))
 +            {
 +                out.write(inputBuffer, 0, size);
 +            }
 +
 +            // Checksum passed so this error can't be permissible.
 +            handler.handleUnrecoverableError(new CommitLogReadException(
 +                String.format(
 +                    "Unexpected error deserializing mutation; saved to %s.  " +
 +                    "This may be caused by replaying a mutation against a table with the same name but incompatible schema.  " +
 +                    "Exception follows: %s", f.getAbsolutePath(), t),
 +                CommitLogReadErrorReason.MUTATION_ERROR,
 +                false));
 +            return;
 +        }
 +
 +        if (logger.isTraceEnabled())
 +            logger.trace("Read mutation for {}.{}: {}", mutation.getKeyspaceName(), mutation.key(),
 +                         "{" + StringUtils.join(mutation.getPartitionUpdates().iterator(), ", ") + "}");
 +
 +        if (shouldReplay)
 +            handler.handleMutation(mutation, size, entryLocation, desc);
 +    }
 +
 +    /**
 +     * Helper methods to deal with changing formats of internals of the CommitLog without polluting deserialization code.
 +     */
 +    private static class CommitLogFormat
 +    {
 +        public static long calculateClaimedChecksum(FileDataInput input, int commitLogVersion) throws IOException
 +        {
 +            switch (commitLogVersion)
 +            {
 +                case CommitLogDescriptor.VERSION_12:
 +                case CommitLogDescriptor.VERSION_20:
 +                    return input.readLong();
 +                // Changed format in 2.1
 +                default:
 +                    return input.readInt() & 0xffffffffL;
 +            }
 +        }
 +
 +        public static void updateChecksum(CRC32 checksum, int serializedSize, int commitLogVersion)
 +        {
 +            switch (commitLogVersion)
 +            {
 +                case CommitLogDescriptor.VERSION_12:
 +                    checksum.update(serializedSize);
 +                    break;
 +                // Changed format in 2.0
 +                default:
 +                    updateChecksumInt(checksum, serializedSize);
 +                    break;
 +            }
 +        }
 +
 +        public static long calculateClaimedCRC32(FileDataInput input, int commitLogVersion) throws IOException
 +        {
 +            switch (commitLogVersion)
 +            {
 +                case CommitLogDescriptor.VERSION_12:
 +                case CommitLogDescriptor.VERSION_20:
 +                    return input.readLong();
 +                // Changed format in 2.1
 +                default:
 +                    return input.readInt() & 0xffffffffL;
 +            }
 +        }
 +    }
 +
 +    private static class ReadStatusTracker
 +    {
 +        private int mutationsLeft;
 +        public String errorContext = "";
 +        public boolean tolerateErrorsInSection;
 +        private boolean error;
 +
 +        public ReadStatusTracker(int mutationLimit, boolean tolerateErrorsInSection)
 +        {
 +            this.mutationsLeft = mutationLimit;
 +            this.tolerateErrorsInSection = tolerateErrorsInSection;
 +        }
 +
 +        public void addProcessedMutation()
 +        {
 +            if (mutationsLeft == ALL_MUTATIONS)
 +                return;
 +            --mutationsLeft;
 +        }
 +
 +        public boolean shouldContinue()
 +        {
 +            return !error && (mutationsLeft != 0 || mutationsLeft == ALL_MUTATIONS);
 +        }
 +
 +        public void requestTermination()
 +        {
 +            error = true;
 +        }
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a2399d4d/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
index 5476d03,90dc258..4000fbf
--- a/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
+++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
@@@ -171,10 -143,23 +171,23 @@@ public class CommitLogTes
      }
  
      @Test
+     public void testRecoveryWithShortPadding() throws Exception
+     {
 -        // If we have 0-3 bytes remaining, commitlog replayer
 -        // should pass, because there's insufficient room
 -        // left in the segment for the legacy size marker.
 -        testRecovery(new byte[1], null);
 -        testRecovery(new byte[2], null);
 -        testRecovery(new byte[3], null);
++            // If we have 0-3 bytes remaining, commitlog replayer
++            // should pass, because there's insufficient room
++            // left in the segment for the legacy size marker.
++            testRecovery(new byte[1], null);
++            testRecovery(new byte[2], null);
++            testRecovery(new byte[3], null);
+     }
+ 
+     @Test
      public void testRecoveryWithShortSize() throws Exception
      {
+         byte[] data = new byte[5];
+         data[3] = 1; // Not a legacy marker, give it a fake (short) size
          runExpecting(() -> {
-             testRecovery(new byte[2], CommitLogDescriptor.VERSION_20);
+             testRecovery(data, CommitLogDescriptor.VERSION_20);
              return null;
          }, CommitLogReplayException.class);
      }


[03/10] cassandra git commit: Commitlog replay may fail if last mutation is within 4 bytes of end of segment

Posted by jj...@apache.org.
Commitlog replay may fail if last mutation is within 4 bytes of end of segment

Patch by Jeff Jirsa; Reviewed by Branimir Lambov for CASSANDRA-13282


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/beb9658d
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/beb9658d
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/beb9658d

Branch: refs/heads/cassandra-3.11
Commit: beb9658dd5e18e3a6a4e8431b6549ae4c33365a9
Parents: 5ef8a8b
Author: Jeff Jirsa <je...@jeffjirsa.net>
Authored: Sun Mar 12 21:54:04 2017 -0700
Committer: Jeff Jirsa <je...@jeffjirsa.net>
Committed: Sun Mar 12 21:54:04 2017 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../db/commitlog/CommitLogReplayer.java         | 11 +++++++++++
 .../cassandra/db/commitlog/CommitLogTest.java   | 20 ++++++++++++++++----
 3 files changed, 28 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/beb9658d/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 09e4039..2839291 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -9,6 +9,7 @@
  * Fix failing COPY TO STDOUT (CASSANDRA-12497)
  * Fix ColumnCounter::countAll behaviour for reverse queries (CASSANDRA-13222)
  * Exceptions encountered calling getSeeds() breaks OTC thread (CASSANDRA-13018)
+ * Commitlog replay may fail if last mutation is within 4 bytes of end of segment (CASSANDRA-13282)
 Merged from 2.1:
  * Remove unused repositories (CASSANDRA-13278)
  * Log stacktrace of uncaught exceptions (CASSANDRA-13108)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/beb9658d/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
index a58aeb4..3cf4d0f 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
@@ -439,6 +439,17 @@ public class CommitLogReplayer
             int serializedSize;
             try
             {
+                // We rely on reading serialized size == 0 (LEGACY_END_OF_SEGMENT_MARKER) to identify the end
+                // of a segment, which happens naturally due to the 0 padding of the empty segment on creation.
+                // However, with 2.1 era commitlogs it's possible that the last mutation ended less than 4 bytes 
+                // from the end of the file, which means that we'll be unable to read an a full int and instead 
+                // read an EOF here
+                if(end - reader.getFilePointer() < 4)
+                {
+                    logger.trace("Not enough bytes left for another mutation in this CommitLog segment, continuing");
+                    return false;
+                }
+
                 // any of the reads may hit EOF
                 serializedSize = reader.readInt();
                 if (serializedSize == LEGACY_END_OF_SEGMENT_MARKER)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/beb9658d/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java b/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
index 9999b42..9b63885 100644
--- a/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
+++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
@@ -137,12 +137,24 @@ public class CommitLogTest
     }
 
     @Test
+    public void testRecoveryWithShortPadding() throws Exception
+    {
+        // If we have 0-3 bytes remaining, commitlog replayer
+        // should pass, because there's insufficient room
+        // left in the segment for the legacy size marker.
+        testRecovery(new byte[1], null);
+        testRecovery(new byte[2], null);
+        testRecovery(new byte[3], null);
+    }
+
+    @Test
     public void testRecoveryWithShortSize() throws Exception
     {
-        runExpecting(new WrappedRunnable() {
-            public void runMayThrow() throws Exception
-            {
-                testRecovery(new byte[2], CommitLogDescriptor.VERSION_20);
+        runExpecting(new WrappedRunnable()  {
+            public void runMayThrow() throws Exception {
+                byte[] data = new byte[5];
+                data[3] = 1; // Not a legacy marker, give it a fake (short) size
+                testRecovery(data, CommitLogDescriptor.VERSION_20);
             }
         }, CommitLogReplayException.class);
     }


[02/10] cassandra git commit: Commitlog replay may fail if last mutation is within 4 bytes of end of segment

Posted by jj...@apache.org.
Commitlog replay may fail if last mutation is within 4 bytes of end of segment

Patch by Jeff Jirsa; Reviewed by Branimir Lambov for CASSANDRA-13282


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/beb9658d
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/beb9658d
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/beb9658d

Branch: refs/heads/cassandra-3.0
Commit: beb9658dd5e18e3a6a4e8431b6549ae4c33365a9
Parents: 5ef8a8b
Author: Jeff Jirsa <je...@jeffjirsa.net>
Authored: Sun Mar 12 21:54:04 2017 -0700
Committer: Jeff Jirsa <je...@jeffjirsa.net>
Committed: Sun Mar 12 21:54:04 2017 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../db/commitlog/CommitLogReplayer.java         | 11 +++++++++++
 .../cassandra/db/commitlog/CommitLogTest.java   | 20 ++++++++++++++++----
 3 files changed, 28 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/beb9658d/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 09e4039..2839291 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -9,6 +9,7 @@
  * Fix failing COPY TO STDOUT (CASSANDRA-12497)
  * Fix ColumnCounter::countAll behaviour for reverse queries (CASSANDRA-13222)
  * Exceptions encountered calling getSeeds() breaks OTC thread (CASSANDRA-13018)
+ * Commitlog replay may fail if last mutation is within 4 bytes of end of segment (CASSANDRA-13282)
 Merged from 2.1:
  * Remove unused repositories (CASSANDRA-13278)
  * Log stacktrace of uncaught exceptions (CASSANDRA-13108)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/beb9658d/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
index a58aeb4..3cf4d0f 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
@@ -439,6 +439,17 @@ public class CommitLogReplayer
             int serializedSize;
             try
             {
+                // We rely on reading serialized size == 0 (LEGACY_END_OF_SEGMENT_MARKER) to identify the end
+                // of a segment, which happens naturally due to the 0 padding of the empty segment on creation.
+                // However, with 2.1 era commitlogs it's possible that the last mutation ended less than 4 bytes 
+                // from the end of the file, which means that we'll be unable to read an a full int and instead 
+                // read an EOF here
+                if(end - reader.getFilePointer() < 4)
+                {
+                    logger.trace("Not enough bytes left for another mutation in this CommitLog segment, continuing");
+                    return false;
+                }
+
                 // any of the reads may hit EOF
                 serializedSize = reader.readInt();
                 if (serializedSize == LEGACY_END_OF_SEGMENT_MARKER)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/beb9658d/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java b/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
index 9999b42..9b63885 100644
--- a/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
+++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
@@ -137,12 +137,24 @@ public class CommitLogTest
     }
 
     @Test
+    public void testRecoveryWithShortPadding() throws Exception
+    {
+        // If we have 0-3 bytes remaining, commitlog replayer
+        // should pass, because there's insufficient room
+        // left in the segment for the legacy size marker.
+        testRecovery(new byte[1], null);
+        testRecovery(new byte[2], null);
+        testRecovery(new byte[3], null);
+    }
+
+    @Test
     public void testRecoveryWithShortSize() throws Exception
     {
-        runExpecting(new WrappedRunnable() {
-            public void runMayThrow() throws Exception
-            {
-                testRecovery(new byte[2], CommitLogDescriptor.VERSION_20);
+        runExpecting(new WrappedRunnable()  {
+            public void runMayThrow() throws Exception {
+                byte[] data = new byte[5];
+                data[3] = 1; // Not a legacy marker, give it a fake (short) size
+                testRecovery(data, CommitLogDescriptor.VERSION_20);
             }
         }, CommitLogReplayException.class);
     }


[07/10] cassandra git commit: Merge branch 'cassandra-2.2' into cassandra-3.0

Posted by jj...@apache.org.
Merge branch 'cassandra-2.2' into cassandra-3.0


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/44f79bf2
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/44f79bf2
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/44f79bf2

Branch: refs/heads/trunk
Commit: 44f79bf2f7a3a05f802014492ecbec67c49c02d0
Parents: aeca1d2 beb9658
Author: Jeff Jirsa <je...@jeffjirsa.net>
Authored: Sun Mar 12 21:54:42 2017 -0700
Committer: Jeff Jirsa <je...@jeffjirsa.net>
Committed: Sun Mar 12 21:56:00 2017 -0700

----------------------------------------------------------------------
 CHANGES.txt                                          |  1 +
 .../cassandra/db/commitlog/CommitLogReplayer.java    | 11 +++++++++++
 .../apache/cassandra/db/commitlog/CommitLogTest.java | 15 ++++++++++++++-
 3 files changed, 26 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/44f79bf2/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 52a794b,2839291..140c860
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,21 -1,6 +1,22 @@@
 -2.2.10
 +3.0.13
 + * Slice.isEmpty() returns false for some empty slices (CASSANDRA-13305)
 + * Add formatted row output to assertEmpty in CQL Tester (CASSANDRA-13238)
 +Merged from 2.2:
++ * Commitlog replay may fail if last mutation is within 4 bytes of end of segment (CASSANDRA-13282)
   * Fix queries updating multiple time the same list (CASSANDRA-13130)
   * Fix GRANT/REVOKE when keyspace isn't specified (CASSANDRA-13053)
 +
 +
 +3.0.12
 + * Prevent data loss on upgrade 2.1 - 3.0 by adding component separator to LogRecord absolute path (CASSANDRA-13294)
 + * Improve testing on macOS by eliminating sigar logging (CASSANDRA-13233)
 + * Cqlsh copy-from should error out when csv contains invalid data for collections (CASSANDRA-13071)
 + * Update c.yaml doc for offheap memtables (CASSANDRA-13179)
 + * Faster StreamingHistogram (CASSANDRA-13038)
 + * Legacy deserializer can create unexpected boundary range tombstones (CASSANDRA-13237)
 + * Remove unnecessary assertion from AntiCompactionTest (CASSANDRA-13070)
 + * Fix cqlsh COPY for dates before 1900 (CASSANDRA-13185)
 +Merged from 2.2:
   * Avoid race on receiver by starting streaming sender thread after sending init message (CASSANDRA-12886)
   * Fix "multiple versions of ant detected..." when running ant test (CASSANDRA-13232)
   * Coalescing strategy sleeps too much (CASSANDRA-13090)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/44f79bf2/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
index d53f0f8,3cf4d0f..205c36a
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
@@@ -483,6 -439,17 +483,17 @@@ public class CommitLogReplaye
              int serializedSize;
              try
              {
+                 // We rely on reading serialized size == 0 (LEGACY_END_OF_SEGMENT_MARKER) to identify the end
+                 // of a segment, which happens naturally due to the 0 padding of the empty segment on creation.
 -                // However, with 2.1 era commitlogs it's possible that the last mutation ended less than 4 bytes 
++                // However, it's possible with 2.1 era commitlogs that the last mutation ended less than 4 bytes 
+                 // from the end of the file, which means that we'll be unable to read an a full int and instead 
+                 // read an EOF here
+                 if(end - reader.getFilePointer() < 4)
+                 {
+                     logger.trace("Not enough bytes left for another mutation in this CommitLog segment, continuing");
+                     return false;
+                 }
+ 
                  // any of the reads may hit EOF
                  serializedSize = reader.readInt();
                  if (serializedSize == LEGACY_END_OF_SEGMENT_MARKER)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/44f79bf2/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
index c4ab6ab,9b63885..90dc258
--- a/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
+++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
@@@ -143,11 -137,25 +143,24 @@@ public class CommitLogTes
      }
  
      @Test
+     public void testRecoveryWithShortPadding() throws Exception
+     {
+         // If we have 0-3 bytes remaining, commitlog replayer
+         // should pass, because there's insufficient room
+         // left in the segment for the legacy size marker.
+         testRecovery(new byte[1], null);
+         testRecovery(new byte[2], null);
+         testRecovery(new byte[3], null);
+     }
+ 
+     @Test
      public void testRecoveryWithShortSize() throws Exception
      {
 -        runExpecting(new WrappedRunnable()  {
 -            public void runMayThrow() throws Exception {
 -                byte[] data = new byte[5];
 -                data[3] = 1; // Not a legacy marker, give it a fake (short) size
 -                testRecovery(data, CommitLogDescriptor.VERSION_20);
 -            }
++        byte[] data = new byte[5];
++        data[3] = 1; // Not a legacy marker, give it a fake (short) size
 +        runExpecting(() -> {
-             testRecovery(new byte[2], CommitLogDescriptor.VERSION_20);
++            testRecovery(data, CommitLogDescriptor.VERSION_20);
 +            return null;
          }, CommitLogReplayException.class);
      }