You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2018/06/05 20:04:06 UTC

cassandra git commit: Add option to sanity check tombstones on reads/compaction

Repository: cassandra
Updated Branches:
  refs/heads/trunk 4413fdbd3 -> 5d8767765


Add option to sanity check tombstones on reads/compaction

Patch by marcuse; reviewed by Ariel Weisberg for CASSANDRA-14467


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/5d876776
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/5d876776
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/5d876776

Branch: refs/heads/trunk
Commit: 5d8767765090cd968c39008f76b0cd795d6e5032
Parents: 4413fdb
Author: Marcus Eriksson <ma...@apache.org>
Authored: Tue May 22 13:43:22 2018 +0200
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Tue Jun 5 12:47:20 2018 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 conf/cassandra.yaml                             |   3 +
 .../org/apache/cassandra/config/Config.java     |   9 +-
 .../cassandra/config/DatabaseDescriptor.java    |  10 +
 .../org/apache/cassandra/db/DeletionTime.java   |   9 +
 .../cassandra/db/UnfilteredValidation.java      | 113 ++++++++++
 .../columniterator/AbstractSSTableIterator.java |   2 +
 .../db/columniterator/SSTableIterator.java      |   2 +
 .../columniterator/SSTableReversedIterator.java |   1 +
 .../apache/cassandra/db/rows/AbstractCell.java  |   7 +
 .../apache/cassandra/db/rows/AbstractRow.java   |  12 +
 .../apache/cassandra/db/rows/ColumnData.java    |   7 +
 .../cassandra/db/rows/ComplexColumnData.java    |  10 +
 .../db/rows/RangeTombstoneBoundMarker.java      |   5 +
 .../db/rows/RangeTombstoneBoundaryMarker.java   |   5 +
 .../apache/cassandra/db/rows/Unfiltered.java    |   6 +
 .../io/sstable/SSTableIdentityIterator.java     |   6 +-
 .../io/sstable/format/SSTableReader.java        |   6 +
 .../cassandra/service/StorageService.java       |  12 +
 .../cassandra/service/StorageServiceMBean.java  |   2 +
 test/conf/cassandra.yaml                        |   1 +
 .../config/DatabaseDescriptorRefTest.java       |   1 +
 .../cql3/validation/operations/TTLTest.java     |  19 ++
 .../db/compaction/CompactionsCQLTest.java       | 223 +++++++++++++++++++
 .../sstable/SSTableCorruptionDetectionTest.java |   5 +
 25 files changed, 475 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/5d876776/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 351ae37..eb064be 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.0
+ * Add option to sanity check tombstones on reads/compactions (CASSANDRA-14467)
  * Add a virtual table to expose all running sstable tasks (CASSANDRA-14457)
  * Let nodetool import take a list of directories (CASSANDRA-14442)
  * Avoid unneeded memory allocations / cpu for disabled log levels (CASSANDRA-14488)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5d876776/conf/cassandra.yaml
----------------------------------------------------------------------
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index 49c6f03..7ff056d 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -1198,3 +1198,6 @@ audit_logging_options:
     # included_users:
     # excluded_users:
 
+# validate tombstones on reads and compaction
+# can be either "disabled", "warn" or "exception"
+# corrupted_tombstone_strategy: disabled

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5d876776/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java
index d945368..d9250bb 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -383,7 +383,7 @@ public class Config
 
     public volatile AuditLogOptions audit_logging_options = new AuditLogOptions();
 
-
+    public CorruptedTombstoneStrategy corrupted_tombstone_strategy = CorruptedTombstoneStrategy.disabled;
     /**
      * @deprecated migrate to {@link DatabaseDescriptor#isClientInitialized()}
      */
@@ -468,6 +468,13 @@ public class Config
         reject
     }
 
+    public enum CorruptedTombstoneStrategy
+    {
+        disabled,
+        warn,
+        exception
+    }
+
     private static final List<String> SENSITIVE_KEYS = new ArrayList<String>() {{
         add("client_encryption_options");
         add("server_encryption_options");

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5d876776/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 592b96e..91ee63a 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -2596,4 +2596,14 @@ public class DatabaseDescriptor
     {
         conf.audit_logging_options = auditLoggingOptions;
     }
+
+    public static Config.CorruptedTombstoneStrategy getCorruptedTombstoneStrategy()
+    {
+        return conf.corrupted_tombstone_strategy;
+    }
+
+    public static void setCorruptedTombstoneStrategy(Config.CorruptedTombstoneStrategy strategy)
+    {
+        conf.corrupted_tombstone_strategy = strategy;
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5d876776/src/java/org/apache/cassandra/db/DeletionTime.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DeletionTime.java b/src/java/org/apache/cassandra/db/DeletionTime.java
index 9dcbb07..14e846d 100644
--- a/src/java/org/apache/cassandra/db/DeletionTime.java
+++ b/src/java/org/apache/cassandra/db/DeletionTime.java
@@ -88,6 +88,15 @@ public class DeletionTime implements Comparable<DeletionTime>, IMeasurableMemory
         HashingUtils.updateWithLong(hasher, markedForDeleteAt());
     }
 
+    /**
+     * check if this deletion time is valid - localDeletionTime can never be negative
+     * @return true if it is valid
+     */
+    public boolean validate()
+    {
+        return localDeletionTime >= 0;
+    }
+
     @Override
     public boolean equals(Object o)
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5d876776/src/java/org/apache/cassandra/db/UnfilteredValidation.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/UnfilteredValidation.java b/src/java/org/apache/cassandra/db/UnfilteredValidation.java
new file mode 100644
index 0000000..6d8bbfd
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/UnfilteredValidation.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db;
+
+import java.util.concurrent.TimeUnit;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.Config;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.rows.Unfiltered;
+
+import org.apache.cassandra.io.sstable.CorruptSSTableException;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.serializers.MarshalException;
+import org.apache.cassandra.utils.NoSpamLogger;
+
+
+/**
+ * Handles unfiltered validation - if configured, it checks if the provided unfiltered has
+ * invalid deletions (if the local deletion time is negative or if the ttl is negative) and
+ * then either logs or throws an exception if so.
+ */
+public class UnfilteredValidation
+{
+    private static final Logger logger = LoggerFactory.getLogger(UnfilteredValidation.class);
+    private static final NoSpamLogger nospam1m = NoSpamLogger.getLogger(logger, 1, TimeUnit.MINUTES);
+
+    public static void maybeValidateUnfiltered(Unfiltered unfiltered, TableMetadata metadata, DecoratedKey key, SSTableReader sstable)
+    {
+        Config.CorruptedTombstoneStrategy strat = DatabaseDescriptor.getCorruptedTombstoneStrategy();
+        if (strat != Config.CorruptedTombstoneStrategy.disabled && unfiltered != null && !unfiltered.isEmpty())
+        {
+            boolean hasInvalidDeletions = false;
+            try
+            {
+                hasInvalidDeletions = unfiltered.hasInvalidDeletions();
+            }
+            catch (Throwable t) // make sure no unknown exceptions fail the read/compaction
+            {
+                nospam1m.error("Could not check if Unfiltered in {} had any invalid deletions", sstable, t);
+            }
+
+            if (hasInvalidDeletions)
+            {
+                String content;
+                try
+                {
+                    content = unfiltered.toString(metadata, true);
+                }
+                catch (Throwable t)
+                {
+                    content = "Could not get string representation: " + t.getMessage();
+                }
+                handleInvalid(metadata, key, sstable, content);
+            }
+        }
+    }
+
+    public static void handleInvalid(TableMetadata metadata, DecoratedKey key, SSTableReader sstable, String invalidContent)
+    {
+        Config.CorruptedTombstoneStrategy strat = DatabaseDescriptor.getCorruptedTombstoneStrategy();
+        String keyString;
+        try
+        {
+            keyString = metadata.partitionKeyType.getString(key.getKey());
+        }
+        catch (Throwable t)
+        {
+            keyString = "[corrupt token="+key.getToken()+"]";
+        }
+
+        if (strat == Config.CorruptedTombstoneStrategy.exception)
+        {
+            String msg = String.format("Key %s in %s.%s is invalid in %s: %s",
+                                       keyString,
+                                       metadata.keyspace,
+                                       metadata.name,
+                                       sstable,
+                                       invalidContent);
+            // we mark suspect to make sure this sstable is not included in future compactions - it would just keep
+            // throwing exceptions
+            sstable.markSuspect();
+            throw new CorruptSSTableException(new MarshalException(msg), sstable.getFilename());
+        }
+        else if (strat == Config.CorruptedTombstoneStrategy.warn)
+        {
+            String msgTemplate = String.format("Key {} in %s.%s is invalid in %s: {}",
+                                               metadata.keyspace,
+                                               metadata.name,
+                                               sstable);
+            nospam1m.warn(msgTemplate, keyString, invalidContent);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5d876776/src/java/org/apache/cassandra/db/columniterator/AbstractSSTableIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/columniterator/AbstractSSTableIterator.java b/src/java/org/apache/cassandra/db/columniterator/AbstractSSTableIterator.java
index 9496878..443fe49 100644
--- a/src/java/org/apache/cassandra/db/columniterator/AbstractSSTableIterator.java
+++ b/src/java/org/apache/cassandra/db/columniterator/AbstractSSTableIterator.java
@@ -111,6 +111,8 @@ public abstract class AbstractSSTableIterator implements UnfilteredRowIterator
                     this.staticRow = Rows.EMPTY_STATIC_ROW;
                     this.reader = createReader(indexEntry, file, shouldCloseFile);
                 }
+                if (!partitionLevelDeletion.validate())
+                    UnfilteredValidation.handleInvalid(metadata(), key, sstable, "partitionLevelDeletion="+partitionLevelDeletion.toString());
 
                 if (reader != null && !slices.isEmpty())
                     reader.setForSlice(nextSlice());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5d876776/src/java/org/apache/cassandra/db/columniterator/SSTableIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/columniterator/SSTableIterator.java b/src/java/org/apache/cassandra/db/columniterator/SSTableIterator.java
index 6b8a3ad..9346345 100644
--- a/src/java/org/apache/cassandra/db/columniterator/SSTableIterator.java
+++ b/src/java/org/apache/cassandra/db/columniterator/SSTableIterator.java
@@ -150,6 +150,7 @@ public class SSTableIterator extends AbstractSSTableIterator
                     return null;
 
                 Unfiltered next = deserializer.readNext();
+                UnfilteredValidation.maybeValidateUnfiltered(next, metadata(), key, sstable);
                 // We may get empty row for the same reason expressed on UnfilteredSerializer.deserializeOne.
                 if (next.isEmpty())
                     continue;
@@ -299,6 +300,7 @@ public class SSTableIterator extends AbstractSSTableIterator
 
 
                 Unfiltered next = deserializer.readNext();
+                UnfilteredValidation.maybeValidateUnfiltered(next, metadata(), key, sstable);
                 // We may get empty row for the same reason expressed on UnfilteredSerializer.deserializeOne.
                 if (next.isEmpty())
                     continue;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5d876776/src/java/org/apache/cassandra/db/columniterator/SSTableReversedIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/columniterator/SSTableReversedIterator.java b/src/java/org/apache/cassandra/db/columniterator/SSTableReversedIterator.java
index 9a30d19..9f449a0 100644
--- a/src/java/org/apache/cassandra/db/columniterator/SSTableReversedIterator.java
+++ b/src/java/org/apache/cassandra/db/columniterator/SSTableReversedIterator.java
@@ -223,6 +223,7 @@ public class SSTableReversedIterator extends AbstractSSTableIterator
                    && !stopReadingDisk())
             {
                 Unfiltered unfiltered = deserializer.readNext();
+                UnfilteredValidation.maybeValidateUnfiltered(unfiltered, metadata(), key, sstable);
                 // We may get empty row for the same reason expressed on UnfilteredSerializer.deserializeOne.
                 if (!unfiltered.isEmpty())
                     buffer.add(unfiltered);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5d876776/src/java/org/apache/cassandra/db/rows/AbstractCell.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/AbstractCell.java b/src/java/org/apache/cassandra/db/rows/AbstractCell.java
index 4946a46..bfe7396 100644
--- a/src/java/org/apache/cassandra/db/rows/AbstractCell.java
+++ b/src/java/org/apache/cassandra/db/rows/AbstractCell.java
@@ -154,6 +154,13 @@ public abstract class AbstractCell extends Cell
         column().validateCell(this);
     }
 
+    public boolean hasInvalidDeletions()
+    {
+        if (ttl() < 0 || localDeletionTime() < 0 || (isExpiring() && localDeletionTime() == NO_DELETION_TIME))
+            return true;
+        return false;
+    }
+
     public long maxTimestamp()
     {
         return timestamp();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5d876776/src/java/org/apache/cassandra/db/rows/AbstractRow.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/AbstractRow.java b/src/java/org/apache/cassandra/db/rows/AbstractRow.java
index 211b13f..24b088f 100644
--- a/src/java/org/apache/cassandra/db/rows/AbstractRow.java
+++ b/src/java/org/apache/cassandra/db/rows/AbstractRow.java
@@ -91,6 +91,18 @@ public abstract class AbstractRow extends AbstractCollection<ColumnData> impleme
             cd.validate();
     }
 
+    public boolean hasInvalidDeletions()
+    {
+        if (primaryKeyLivenessInfo().isExpiring() && (primaryKeyLivenessInfo().ttl() < 0 || primaryKeyLivenessInfo().localExpirationTime() < 0))
+            return true;
+        if (!deletion().time().validate())
+            return true;
+        for (ColumnData cd : this)
+            if (cd.hasInvalidDeletions())
+                return true;
+        return false;
+    }
+
     public String toString(TableMetadata metadata)
     {
         return toString(metadata, false);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5d876776/src/java/org/apache/cassandra/db/rows/ColumnData.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/ColumnData.java b/src/java/org/apache/cassandra/db/rows/ColumnData.java
index ccfcfa5..f2da132 100644
--- a/src/java/org/apache/cassandra/db/rows/ColumnData.java
+++ b/src/java/org/apache/cassandra/db/rows/ColumnData.java
@@ -66,6 +66,13 @@ public abstract class ColumnData
     public abstract void validate();
 
     /**
+     * Validates the deletions (ttl and local deletion time) if any.
+     *
+     * @return true if it has any invalid deletions, false otherwise
+     */
+    public abstract boolean hasInvalidDeletions();
+
+    /**
      * Adds the data to the provided digest.
      *
      * @param hasher the {@link Hasher} to add the data to.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5d876776/src/java/org/apache/cassandra/db/rows/ComplexColumnData.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/ComplexColumnData.java b/src/java/org/apache/cassandra/db/rows/ComplexColumnData.java
index 57851d8..3073e5f 100644
--- a/src/java/org/apache/cassandra/db/rows/ComplexColumnData.java
+++ b/src/java/org/apache/cassandra/db/rows/ComplexColumnData.java
@@ -138,6 +138,16 @@ public class ComplexColumnData extends ColumnData implements Iterable<Cell>
             cell.digest(hasher);
     }
 
+    public boolean hasInvalidDeletions()
+    {
+        if (!complexDeletion.validate())
+            return true;
+        for (Cell cell : this)
+            if (cell.hasInvalidDeletions())
+                return true;
+        return false;
+    }
+
     public ComplexColumnData markCounterLocalToBeCleared()
     {
         return transformAndFilter(complexDeletion, Cell::markCounterLocalToBeCleared);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5d876776/src/java/org/apache/cassandra/db/rows/RangeTombstoneBoundMarker.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/RangeTombstoneBoundMarker.java b/src/java/org/apache/cassandra/db/rows/RangeTombstoneBoundMarker.java
index c0c6afd..094cf72 100644
--- a/src/java/org/apache/cassandra/db/rows/RangeTombstoneBoundMarker.java
+++ b/src/java/org/apache/cassandra/db/rows/RangeTombstoneBoundMarker.java
@@ -68,6 +68,11 @@ public class RangeTombstoneBoundMarker extends AbstractRangeTombstoneMarker<Clus
         return false;
     }
 
+    public boolean hasInvalidDeletions()
+    {
+        return !deletionTime().validate();
+    }
+
     /**
      * The deletion time for the range tombstone this is a bound of.
      */

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5d876776/src/java/org/apache/cassandra/db/rows/RangeTombstoneBoundaryMarker.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/RangeTombstoneBoundaryMarker.java b/src/java/org/apache/cassandra/db/rows/RangeTombstoneBoundaryMarker.java
index 6e6c8cd..79d5b1a 100644
--- a/src/java/org/apache/cassandra/db/rows/RangeTombstoneBoundaryMarker.java
+++ b/src/java/org/apache/cassandra/db/rows/RangeTombstoneBoundaryMarker.java
@@ -116,6 +116,11 @@ public class RangeTombstoneBoundaryMarker extends AbstractRangeTombstoneMarker<C
         return true;
     }
 
+    public boolean hasInvalidDeletions()
+    {
+        return !startDeletion.validate() || !endDeletion.validate();
+    }
+
     public RangeTombstoneBoundaryMarker copy(AbstractAllocator allocator)
     {
         return new RangeTombstoneBoundaryMarker(clustering().copy(allocator), endDeletion, startDeletion);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5d876776/src/java/org/apache/cassandra/db/rows/Unfiltered.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/Unfiltered.java b/src/java/org/apache/cassandra/db/rows/Unfiltered.java
index 3a65f4e..81b63b7 100644
--- a/src/java/org/apache/cassandra/db/rows/Unfiltered.java
+++ b/src/java/org/apache/cassandra/db/rows/Unfiltered.java
@@ -55,6 +55,12 @@ public interface Unfiltered extends Clusterable
      */
     public void validateData(TableMetadata metadata);
 
+    /**
+     * Do a quick validation of the deletions of the unfiltered (if any)
+     *
+     * @return true if any deletion is invalid
+     */
+    public boolean hasInvalidDeletions();
     public boolean isEmpty();
 
     public String toString(TableMetadata metadata);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5d876776/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java b/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
index f9c6e82..a49e7b4 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
@@ -55,6 +55,8 @@ public class SSTableIdentityIterator implements Comparable<SSTableIdentityIterat
         try
         {
             DeletionTime partitionLevelDeletion = DeletionTime.serializer.deserialize(file);
+            if (!partitionLevelDeletion.validate())
+                UnfilteredValidation.handleInvalid(sstable.metadata(), key, sstable, "partitionLevelDeletion="+partitionLevelDeletion.toString());
             SerializationHelper helper = new SerializationHelper(sstable.metadata(), sstable.descriptor.version.correspondingMessagingVersion(), SerializationHelper.Flag.LOCAL);
             SSTableSimpleIterator iterator = SSTableSimpleIterator.create(sstable.metadata(), file, sstable.header, helper, partitionLevelDeletion);
             return new SSTableIdentityIterator(sstable, key, partitionLevelDeletion, file.getPath(), iterator);
@@ -169,7 +171,9 @@ public class SSTableIdentityIterator implements Comparable<SSTableIdentityIterat
 
     protected Unfiltered doCompute()
     {
-        return iterator.next();
+        Unfiltered unfiltered = iterator.next();
+        UnfilteredValidation.maybeValidateUnfiltered(unfiltered, metadata(), key, sstable);
+        return unfiltered;
     }
 
     public void close()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5d876776/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
index ee7e445..2fade21 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
@@ -1729,6 +1729,12 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
         isSuspect.getAndSet(true);
     }
 
+    @VisibleForTesting
+    public void unmarkSuspect()
+    {
+        isSuspect.getAndSet(false);
+    }
+
     public boolean isMarkedSuspect()
     {
         return isSuspect.get();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5d876776/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index 4214644..96fd63f 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -63,6 +63,7 @@ import org.apache.cassandra.concurrent.NamedThreadFactory;
 import org.apache.cassandra.concurrent.ScheduledExecutors;
 import org.apache.cassandra.concurrent.Stage;
 import org.apache.cassandra.concurrent.StageManager;
+import org.apache.cassandra.config.Config;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.commitlog.CommitLog;
@@ -5471,4 +5472,15 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
     {
         return AuditLogManager.getInstance().isAuditingEnabled();
     }
+
+    public String getCorruptedTombstoneStrategy()
+    {
+        return DatabaseDescriptor.getCorruptedTombstoneStrategy().toString();
+    }
+
+    public void setCorruptedTombstoneStrategy(String strategy)
+    {
+        DatabaseDescriptor.setCorruptedTombstoneStrategy(Config.CorruptedTombstoneStrategy.valueOf(strategy));
+        logger.info("Setting corrupted tombstone strategy to {}", strategy);
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5d876776/src/java/org/apache/cassandra/service/StorageServiceMBean.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageServiceMBean.java b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
index ab165b3..e54a95e 100644
--- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java
+++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
@@ -683,4 +683,6 @@ public interface StorageServiceMBean extends NotificationEmitter
     public void disableAuditLog();
     public void enableAuditLog(String loggerName, String includedKeyspaces, String excludedKeyspaces, String includedCategories, String excludedCategories, String includedUsers, String excludedUsers) throws ConfigurationException;
     public boolean isAuditLogEnabled();
+    public String getCorruptedTombstoneStrategy();
+    public void setCorruptedTombstoneStrategy(String strategy);
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5d876776/test/conf/cassandra.yaml
----------------------------------------------------------------------
diff --git a/test/conf/cassandra.yaml b/test/conf/cassandra.yaml
index 640f9b3..5893bab 100644
--- a/test/conf/cassandra.yaml
+++ b/test/conf/cassandra.yaml
@@ -45,3 +45,4 @@ row_cache_size_in_mb: 16
 enable_user_defined_functions: true
 enable_scripted_user_defined_functions: true
 prepared_statements_cache_size_mb: 1
+corrupted_tombstone_strategy: exception

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5d876776/test/unit/org/apache/cassandra/config/DatabaseDescriptorRefTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/config/DatabaseDescriptorRefTest.java b/test/unit/org/apache/cassandra/config/DatabaseDescriptorRefTest.java
index 4078e2a..68435a8 100644
--- a/test/unit/org/apache/cassandra/config/DatabaseDescriptorRefTest.java
+++ b/test/unit/org/apache/cassandra/config/DatabaseDescriptorRefTest.java
@@ -73,6 +73,7 @@ public class DatabaseDescriptorRefTest
     "org.apache.cassandra.config.Config$MemtableAllocationType",
     "org.apache.cassandra.config.Config$RepairCommandPoolFullStrategy",
     "org.apache.cassandra.config.Config$UserFunctionTimeoutPolicy",
+    "org.apache.cassandra.config.Config$CorruptedTombstoneStrategy",
     "org.apache.cassandra.config.ParameterizedClass",
     "org.apache.cassandra.config.EncryptionOptions",
     "org.apache.cassandra.config.EncryptionOptions$ClientEncryptionOptions",

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5d876776/test/unit/org/apache/cassandra/cql3/validation/operations/TTLTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/validation/operations/TTLTest.java b/test/unit/org/apache/cassandra/cql3/validation/operations/TTLTest.java
index fc70974..99ca7dc 100644
--- a/test/unit/org/apache/cassandra/cql3/validation/operations/TTLTest.java
+++ b/test/unit/org/apache/cassandra/cql3/validation/operations/TTLTest.java
@@ -9,6 +9,8 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
+import org.apache.cassandra.config.Config;
+import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.cql3.Attributes;
 import org.apache.cassandra.cql3.CQLTester;
 import org.apache.cassandra.cql3.UntypedResultSet;
@@ -19,6 +21,8 @@ import org.apache.cassandra.db.rows.AbstractCell;
 import org.apache.cassandra.exceptions.InvalidRequestException;
 import org.apache.cassandra.utils.FBUtilities;
 
+import org.junit.After;
+import org.junit.Before;
 import org.junit.Test;
 
 public class TTLTest extends CQLTester
@@ -31,6 +35,18 @@ public class TTLTest extends CQLTester
     public static final String SIMPLE_CLUSTERING = "table2";
     public static final String COMPLEX_NOCLUSTERING = "table3";
     public static final String COMPLEX_CLUSTERING = "table4";
+    private Config.CorruptedTombstoneStrategy corruptTombstoneStrategy;
+    @Before
+    public void before()
+    {
+        corruptTombstoneStrategy = DatabaseDescriptor.getCorruptedTombstoneStrategy();
+    }
+
+    @After
+    public void after()
+    {
+        DatabaseDescriptor.setCorruptedTombstoneStrategy(corruptTombstoneStrategy);
+    }
 
     @Test
     public void testTTLPerRequestLimit() throws Throwable
@@ -167,9 +183,12 @@ public class TTLTest extends CQLTester
     @Test
     public void testRecoverOverflowedExpirationWithScrub() throws Throwable
     {
+        // this tests writes corrupt tombstones on purpose, disable the strategy:
+        DatabaseDescriptor.setCorruptedTombstoneStrategy(Config.CorruptedTombstoneStrategy.disabled);
         baseTestRecoverOverflowedExpiration(false, false);
         baseTestRecoverOverflowedExpiration(true, false);
         baseTestRecoverOverflowedExpiration(true, true);
+        // we reset the corrupted ts strategy after each test in @After above
     }
 
     public void testCapExpirationDateOverflowPolicy(ExpirationDateOverflowHandling.ExpirationDateOverflowPolicy policy) throws Throwable

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5d876776/test/unit/org/apache/cassandra/db/compaction/CompactionsCQLTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionsCQLTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionsCQLTest.java
index 4d5215e..ca420da 100644
--- a/test/unit/org/apache/cassandra/db/compaction/CompactionsCQLTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/CompactionsCQLTest.java
@@ -17,23 +17,38 @@
  */
 package org.apache.cassandra.db.compaction;
 
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.commons.lang.StringUtils;
+import org.junit.After;
+import org.junit.Before;
 import org.junit.Test;
 
 import org.apache.cassandra.cql3.CQLTester;
 import org.apache.cassandra.cql3.UntypedResultSet;
+import org.apache.cassandra.config.Config;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.DeletionTime;
+import org.apache.cassandra.db.Mutation;
+import org.apache.cassandra.db.RangeTombstone;
+import org.apache.cassandra.db.RowUpdateBuilder;
+import org.apache.cassandra.db.Slice;
+import org.apache.cassandra.db.partitions.PartitionUpdate;
 import org.apache.cassandra.db.rows.Cell;
 import org.apache.cassandra.db.rows.Row;
 import org.apache.cassandra.db.rows.Unfiltered;
 import org.apache.cassandra.db.rows.UnfilteredRowIterator;
+import org.apache.cassandra.io.sstable.CorruptSSTableException;
 import org.apache.cassandra.io.sstable.ISSTableScanner;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.serializers.MarshalException;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -42,6 +57,21 @@ public class CompactionsCQLTest extends CQLTester
 
     public static final int SLEEP_TIME = 5000;
 
+    private Config.CorruptedTombstoneStrategy strategy;
+
+    @Before
+    public void before()
+    {
+        strategy = DatabaseDescriptor.getCorruptedTombstoneStrategy();
+    }
+
+    @After
+    public void after()
+    {
+        DatabaseDescriptor.setCorruptedTombstoneStrategy(DatabaseDescriptor.getCorruptedTombstoneStrategy());
+    }
+
+
     @Test
     public void testTriggerMinorCompactionSTCS() throws Throwable
     {
@@ -245,6 +275,189 @@ public class CompactionsCQLTest extends CQLTester
         testPerCFSNeverPurgeTombstonesHelper(false);
     }
 
+    @Test
+    public void testCompactionInvalidRTs() throws Throwable
+    {
+        // set the corruptedTombstoneStrategy to exception since these tests require it - if someone changed the default
+        // in test/conf/cassandra.yaml they would start failing
+        DatabaseDescriptor.setCorruptedTombstoneStrategy(Config.CorruptedTombstoneStrategy.exception);
+        prepare();
+        // write a range tombstone with negative local deletion time (LDTs are not set by user and should not be negative):
+        RangeTombstone rt = new RangeTombstone(Slice.ALL, new DeletionTime(System.currentTimeMillis(), -1));
+        RowUpdateBuilder rub = new RowUpdateBuilder(getCurrentColumnFamilyStore().metadata(), System.currentTimeMillis() * 1000, 22).clustering(33).addRangeTombstone(rt);
+        rub.build().apply();
+        getCurrentColumnFamilyStore().forceBlockingFlush();
+        compactAndValidate();
+        readAndValidate(true);
+        readAndValidate(false);
+    }
+
+    @Test
+    public void testCompactionInvalidTombstone() throws Throwable
+    {
+        DatabaseDescriptor.setCorruptedTombstoneStrategy(Config.CorruptedTombstoneStrategy.exception);
+        prepare();
+        // write a standard tombstone with negative local deletion time (LDTs are not set by user and should not be negative):
+        RowUpdateBuilder rub = new RowUpdateBuilder(getCurrentColumnFamilyStore().metadata(), -1, System.currentTimeMillis() * 1000, 22).clustering(33).delete("b");
+        rub.build().apply();
+        getCurrentColumnFamilyStore().forceBlockingFlush();
+        compactAndValidate();
+        readAndValidate(true);
+        readAndValidate(false);
+    }
+
+    @Test
+    public void testCompactionInvalidPartitionDeletion() throws Throwable
+    {
+        DatabaseDescriptor.setCorruptedTombstoneStrategy(Config.CorruptedTombstoneStrategy.exception);
+        prepare();
+        // write a partition deletion with negative local deletion time (LDTs are not set by user and should not be negative)::
+        PartitionUpdate pu = PartitionUpdate.simpleBuilder(getCurrentColumnFamilyStore().metadata(), 22).nowInSec(-1).delete().build();
+        new Mutation(pu).apply();
+        getCurrentColumnFamilyStore().forceBlockingFlush();
+        compactAndValidate();
+        readAndValidate(true);
+        readAndValidate(false);
+    }
+
+    @Test
+    public void testCompactionInvalidRowDeletion() throws Throwable
+    {
+        DatabaseDescriptor.setCorruptedTombstoneStrategy(Config.CorruptedTombstoneStrategy.exception);
+        prepare();
+        // write a row deletion with negative local deletion time (LDTs are not set by user and should not be negative):
+        RowUpdateBuilder.deleteRowAt(getCurrentColumnFamilyStore().metadata(), System.currentTimeMillis() * 1000, -1, 22, 33).apply();
+        getCurrentColumnFamilyStore().forceBlockingFlush();
+        compactAndValidate();
+        readAndValidate(true);
+        readAndValidate(false);
+    }
+
+    private void prepare() throws Throwable
+    {
+        createTable("CREATE TABLE %s (id int, id2 int, b text, primary key (id, id2))");
+        for (int i = 0; i < 2; i++)
+            execute("INSERT INTO %s (id, id2, b) VALUES (?, ?, ?)", i, i, String.valueOf(i));
+    }
+
+    @Test
+    public void testIndexedReaderRowDeletion() throws Throwable
+    {
+        // write enough data to make sure we use an IndexedReader when doing a read, and make sure it fails when reading a corrupt row deletion
+        DatabaseDescriptor.setCorruptedTombstoneStrategy(Config.CorruptedTombstoneStrategy.exception);
+        int maxSizePre = DatabaseDescriptor.getColumnIndexSize();
+        DatabaseDescriptor.setColumnIndexSize(1024);
+        prepareWide();
+        RowUpdateBuilder.deleteRowAt(getCurrentColumnFamilyStore().metadata(), System.currentTimeMillis() * 1000, -1, 22, 33).apply();
+        getCurrentColumnFamilyStore().forceBlockingFlush();
+        readAndValidate(true);
+        readAndValidate(false);
+        DatabaseDescriptor.setColumnIndexSize(maxSizePre);
+    }
+
+    @Test
+    public void testIndexedReaderTombstone() throws Throwable
+    {
+        // write enough data to make sure we use an IndexedReader when doing a read, and make sure it fails when reading a corrupt standard tombstone
+        DatabaseDescriptor.setCorruptedTombstoneStrategy(Config.CorruptedTombstoneStrategy.exception);
+        int maxSizePre = DatabaseDescriptor.getColumnIndexSize();
+        DatabaseDescriptor.setColumnIndexSize(1024);
+        prepareWide();
+        RowUpdateBuilder rub = new RowUpdateBuilder(getCurrentColumnFamilyStore().metadata(), -1, System.currentTimeMillis() * 1000, 22).clustering(33).delete("b");
+        rub.build().apply();
+        getCurrentColumnFamilyStore().forceBlockingFlush();
+        readAndValidate(true);
+        readAndValidate(false);
+        DatabaseDescriptor.setColumnIndexSize(maxSizePre);
+    }
+
+    @Test
+    public void testIndexedReaderRT() throws Throwable
+    {
+        // write enough data to make sure we use an IndexedReader when doing a read, and make sure it fails when reading a corrupt range tombstone
+        DatabaseDescriptor.setCorruptedTombstoneStrategy(Config.CorruptedTombstoneStrategy.exception);
+        int maxSizePre = DatabaseDescriptor.getColumnIndexSize();
+        DatabaseDescriptor.setColumnIndexSize(1024);
+        prepareWide();
+        RangeTombstone rt = new RangeTombstone(Slice.ALL, new DeletionTime(System.currentTimeMillis(), -1));
+        RowUpdateBuilder rub = new RowUpdateBuilder(getCurrentColumnFamilyStore().metadata(), System.currentTimeMillis() * 1000, 22).clustering(33).addRangeTombstone(rt);
+        rub.build().apply();
+        getCurrentColumnFamilyStore().forceBlockingFlush();
+        readAndValidate(true);
+        readAndValidate(false);
+        DatabaseDescriptor.setColumnIndexSize(maxSizePre);
+    }
+
+    private void prepareWide() throws Throwable
+    {
+        createTable("CREATE TABLE %s (id int, id2 int, b text, primary key (id, id2))");
+        for (int i = 0; i < 100; i++)
+            execute("INSERT INTO %s (id, id2, b) VALUES (?, ?, ?)", 22, i, StringUtils.repeat("ABCDEFG", 10));
+    }
+
+    private void compactAndValidate()
+    {
+        boolean gotException = false;
+        try
+        {
+            getCurrentColumnFamilyStore().forceMajorCompaction();
+        }
+        catch(Throwable t)
+        {
+            gotException = true;
+            Throwable cause = t;
+            while (cause != null && !(cause instanceof MarshalException))
+                cause = cause.getCause();
+            assertNotNull(cause);
+            MarshalException me = (MarshalException) cause;
+            assertTrue(me.getMessage().contains(getCurrentColumnFamilyStore().metadata.keyspace+"."+getCurrentColumnFamilyStore().metadata.name));
+            assertTrue(me.getMessage().contains("Key 22"));
+        }
+        assertTrue(gotException);
+        assertSuspectAndReset(getCurrentColumnFamilyStore().getLiveSSTables());
+    }
+
+    private void readAndValidate(boolean asc) throws Throwable
+    {
+        execute("select * from %s where id = 0 order by id2 "+(asc ? "ASC" : "DESC"));
+
+        boolean gotException = false;
+        try
+        {
+            for (UntypedResultSet.Row r : execute("select * from %s")) {}
+        }
+        catch (Throwable t)
+        {
+            assertTrue(t instanceof CorruptSSTableException);
+            gotException = true;
+            Throwable cause = t;
+            while (cause != null && !(cause instanceof MarshalException))
+                cause = cause.getCause();
+            assertNotNull(cause);
+            MarshalException me = (MarshalException) cause;
+            assertTrue(me.getMessage().contains("Key 22"));
+        }
+        assertSuspectAndReset(getCurrentColumnFamilyStore().getLiveSSTables());
+        assertTrue(gotException);
+        gotException = false;
+        try
+        {
+            execute("select * from %s where id = 22 order by id2 "+(asc ? "ASC" : "DESC"));
+        }
+        catch (Throwable t)
+        {
+            assertTrue(t instanceof CorruptSSTableException);
+            gotException = true;
+            Throwable cause = t;
+            while (cause != null && !(cause instanceof MarshalException))
+                cause = cause.getCause();
+            assertNotNull(cause);
+            MarshalException me = (MarshalException) cause;
+            assertTrue(me.getMessage().contains("Key 22"));
+        }
+        assertTrue(gotException);
+        assertSuspectAndReset(getCurrentColumnFamilyStore().getLiveSSTables());
+    }
 
     public void testPerCFSNeverPurgeTombstonesHelper(boolean deletedCell) throws Throwable
     {
@@ -281,6 +494,16 @@ public class CompactionsCQLTest extends CQLTester
         getCurrentColumnFamilyStore().truncateBlocking();
     }
 
+    private void assertSuspectAndReset(Collection<SSTableReader> sstables)
+    {
+        assertFalse(sstables.isEmpty());
+        for (SSTableReader sstable : sstables)
+        {
+            assertTrue(sstable.isMarkedSuspect());
+            sstable.unmarkSuspect();
+        }
+    }
+
     private void assertTombstones(SSTableReader sstable, boolean expectTS)
     {
         boolean foundTombstone = false;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5d876776/test/unit/org/apache/cassandra/io/sstable/SSTableCorruptionDetectionTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableCorruptionDetectionTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableCorruptionDetectionTest.java
index 581109c..2510c5e 100644
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableCorruptionDetectionTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableCorruptionDetectionTest.java
@@ -70,10 +70,14 @@ public class SSTableCorruptionDetectionTest extends SSTableWriterTestBase
     private static LifecycleTransaction txn;
     private static ColumnFamilyStore cfs;
     private static SSTableReader ssTableReader;
+    private static Config.CorruptedTombstoneStrategy original;
 
     @BeforeClass
     public static void setUp()
     {
+        // this test writes corrupted data on purpose, disable corrupted tombstone detection
+        original = DatabaseDescriptor.getCorruptedTombstoneStrategy();
+        DatabaseDescriptor.setCorruptedTombstoneStrategy(Config.CorruptedTombstoneStrategy.disabled);
         TableMetadata.Builder cfm =
             TableMetadata.builder(keyspace, table)
                          .addPartitionKeyColumn("pk", AsciiType.instance)
@@ -127,6 +131,7 @@ public class SSTableCorruptionDetectionTest extends SSTableWriterTestBase
 
         txn.abort();
         writer.close();
+        DatabaseDescriptor.setCorruptedTombstoneStrategy(original);
     }
 
     @Test


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org