You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by pa...@apache.org on 2018/02/11 13:25:57 UTC

[12/29] cassandra git commit: Merge branch 'cassandra-2.1' into cassandra-2.2

Merge branch 'cassandra-2.1' into cassandra-2.2


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/1602e606
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/1602e606
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/1602e606

Branch: refs/heads/cassandra-2.2
Commit: 1602e606348959aead18531cb8027afb15f276e7
Parents: aa831c9 b294943
Author: Paulo Motta <pa...@apache.org>
Authored: Sat Feb 10 14:53:20 2018 -0200
Committer: Paulo Motta <pa...@apache.org>
Committed: Sat Feb 10 14:54:56 2018 -0200

----------------------------------------------------------------------
 CASSANDRA-14092.txt                             |  81 ++++
 CHANGES.txt                                     |   1 +
 NEWS.txt                                        |  21 +
 debian/rules                                    |   2 +-
 redhat/cassandra.spec                           |   2 +-
 .../org/apache/cassandra/cql3/Attributes.java   |  79 +++-
 .../cassandra/cql3/statements/CFPropDefs.java   |   7 +
 .../cql3/statements/ModificationStatement.java  |   2 +-
 .../apache/cassandra/db/AbstractNativeCell.java |   6 +
 .../org/apache/cassandra/db/BufferCell.java     |   6 +
 .../apache/cassandra/db/BufferDeletedCell.java  |   6 +
 .../apache/cassandra/db/BufferExpiringCell.java |  34 +-
 src/java/org/apache/cassandra/db/Cell.java      |   2 +
 .../apache/cassandra/db/ColumnFamilyStore.java  |  11 +-
 .../org/apache/cassandra/db/DeletionTime.java   |   1 +
 .../db/compaction/CompactionManager.java        |  14 +-
 .../cassandra/db/compaction/Scrubber.java       |  97 ++++-
 .../io/sstable/AbstractSSTableSimpleWriter.java |  17 +-
 .../cassandra/service/StorageService.java       |   8 +-
 .../cassandra/service/StorageServiceMBean.java  |   3 +
 .../cassandra/thrift/ThriftValidation.java      |   9 +-
 .../org/apache/cassandra/tools/NodeProbe.java   |   8 +-
 .../cassandra/tools/StandaloneScrubber.java     |  12 +-
 .../apache/cassandra/tools/nodetool/Scrub.java  |   8 +-
 .../table1/lb-1-big-CompressionInfo.db          | Bin 0 -> 43 bytes
 .../table1/lb-1-big-Data.db                     | Bin 0 -> 103 bytes
 .../table1/lb-1-big-Digest.adler32              |   1 +
 .../table1/lb-1-big-Filter.db                   | Bin 0 -> 16 bytes
 .../table1/lb-1-big-Index.db                    | Bin 0 -> 36 bytes
 .../table1/lb-1-big-Statistics.db               | Bin 0 -> 4466 bytes
 .../table1/lb-1-big-Summary.db                  | Bin 0 -> 84 bytes
 .../table1/lb-1-big-TOC.txt                     |   8 +
 .../table2/lb-1-big-CompressionInfo.db          | Bin 0 -> 43 bytes
 .../table2/lb-1-big-Data.db                     | Bin 0 -> 98 bytes
 .../table2/lb-1-big-Digest.adler32              |   1 +
 .../table2/lb-1-big-Filter.db                   | Bin 0 -> 16 bytes
 .../table2/lb-1-big-Index.db                    | Bin 0 -> 36 bytes
 .../table2/lb-1-big-Statistics.db               | Bin 0 -> 4478 bytes
 .../table2/lb-1-big-Summary.db                  | Bin 0 -> 84 bytes
 .../table2/lb-1-big-TOC.txt                     |   8 +
 .../table3/lb-1-big-CompressionInfo.db          | Bin 0 -> 43 bytes
 .../table3/lb-1-big-Data.db                     | Bin 0 -> 182 bytes
 .../table3/lb-1-big-Digest.adler32              |   1 +
 .../table3/lb-1-big-Filter.db                   | Bin 0 -> 16 bytes
 .../table3/lb-1-big-Index.db                    | Bin 0 -> 36 bytes
 .../table3/lb-1-big-Statistics.db               | Bin 0 -> 4482 bytes
 .../table3/lb-1-big-Summary.db                  | Bin 0 -> 84 bytes
 .../table3/lb-1-big-TOC.txt                     |   8 +
 .../table4/lb-1-big-CompressionInfo.db          | Bin 0 -> 43 bytes
 .../table4/lb-1-big-Data.db                     | Bin 0 -> 182 bytes
 .../table4/lb-1-big-Digest.adler32              |   1 +
 .../table4/lb-1-big-Filter.db                   | Bin 0 -> 16 bytes
 .../table4/lb-1-big-Index.db                    | Bin 0 -> 36 bytes
 .../table4/lb-1-big-Statistics.db               | Bin 0 -> 4494 bytes
 .../table4/lb-1-big-Summary.db                  | Bin 0 -> 84 bytes
 .../table4/lb-1-big-TOC.txt                     |   8 +
 .../cql3/validation/operations/TTLTest.java     | 405 +++++++++++++++++++
 .../unit/org/apache/cassandra/db/ScrubTest.java |   2 +-
 58 files changed, 847 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/1602e606/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 0f6e61c,9332354..82da6ad
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,9 -1,5 +1,10 @@@
 -2.1.20
 +2.2.12
 + * Fix the inspectJvmOptions startup check (CASSANDRA-14112)
 + * Fix race that prevents submitting compaction for a table when executor is full (CASSANDRA-13801)
 + * Rely on the JVM to handle OutOfMemoryErrors (CASSANDRA-13006)
 + * Grab refs during scrub/index redistribution/cleanup (CASSANDRA-13873)
 +Merged from 2.1:
+  * Protect against overflow of local expiration time (CASSANDRA-14092)
   * More PEP8 compliance for cqlsh (CASSANDRA-14021)
   * RPM package spec: fix permissions for installed jars and config files (CASSANDRA-14181)
  

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1602e606/NEWS.txt
----------------------------------------------------------------------
diff --cc NEWS.txt
index 5747941,fb6b4ee..4fe3508
--- a/NEWS.txt
+++ b/NEWS.txt
@@@ -18,15 -38,9 +38,16 @@@ using the provided 'sstableupgrade' too
  
  Upgrading
  ---------
 -   - See MAXIMUM TTL EXPIRATION DATE NOTICE above.
 -
 -2.1.19
++    - See MAXIMUM TTL EXPIRATION DATE NOTICE above.
 +    - Cassandra is now relying on the JVM options to properly shutdown on OutOfMemoryError. By default it will
 +      rely on the OnOutOfMemoryError option as the ExitOnOutOfMemoryError and CrashOnOutOfMemoryError options
 +      are not supported by the older 1.7 and 1.8 JVMs. A warning will be logged at startup if none of those JVM
 +      options are used. See CASSANDRA-13006 for more details.
 +    - Cassandra is not logging anymore by default an Heap histogram on OutOfMemoryError. To enable that behavior
 +      set the 'cassandra.printHeapHistogramOnOutOfMemoryError' System property to 'true'. See CASSANDRA-13006
 +      for more details.
 +
 +2.2.11
  ======
  
  Upgrading

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1602e606/debian/rules
----------------------------------------------------------------------
diff --cc debian/rules
index 35f5a51,70db61c..ff1d64d
--- a/debian/rules
+++ b/debian/rules
@@@ -63,7 -64,7 +63,7 @@@ binary-indep: build instal
  	dh_testroot
  	dh_installchangelogs
  	dh_installinit -u'start 50 2 3 4 5 . stop 50 0 1 6 .'
- 	dh_installdocs README.asc CHANGES.txt NEWS.txt doc/cql3/CQL.css doc/cql3/CQL.html
 -	dh_installdocs README.asc CHANGES.txt NEWS.txt
++	dh_installdocs README.asc CHANGES.txt NEWS.txt doc/cql3/CQL.css doc/cql3/CQL.html CASSANDRA-14092.txt
  	dh_installexamples tools/*.yaml
  	dh_bash-completion
  	dh_compress

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1602e606/redhat/cassandra.spec
----------------------------------------------------------------------
diff --cc redhat/cassandra.spec
index 0d4b271,a3f09b0..07c3dc5
--- a/redhat/cassandra.spec
+++ b/redhat/cassandra.spec
@@@ -114,10 -113,10 +114,10 @@@ exit 
  
  %files
  %defattr(0644,root,root,0755)
--%doc CHANGES.txt LICENSE.txt README.asc NEWS.txt NOTICE.txt
 -%attr(755,root,root) %{_bindir}/cassandra-cli
++%doc CHANGES.txt LICENSE.txt README.asc NEWS.txt NOTICE.txt CASSANDRA-14092.txt
  %attr(755,root,root) %{_bindir}/cassandra-stress
  %attr(755,root,root) %{_bindir}/cqlsh
 +%attr(755,root,root) %{_bindir}/cqlsh.py
  %attr(755,root,root) %{_bindir}/debug-cql
  %attr(755,root,root) %{_bindir}/nodetool
  %attr(755,root,root) %{_bindir}/sstablekeys

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1602e606/src/java/org/apache/cassandra/cql3/Attributes.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/cql3/Attributes.java
index 7b38e9f,23571ca..84f423a
--- a/src/java/org/apache/cassandra/cql3/Attributes.java
+++ b/src/java/org/apache/cassandra/cql3/Attributes.java
@@@ -18,17 -18,19 +18,23 @@@
  package org.apache.cassandra.cql3;
  
  import java.nio.ByteBuffer;
 +import java.util.Collections;
+ import java.util.concurrent.TimeUnit;
  
 +import com.google.common.collect.Iterables;
+ import com.google.common.annotations.VisibleForTesting;
+ import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
  
+ import org.apache.cassandra.config.CFMetaData;
 +import org.apache.cassandra.cql3.functions.Function;
  import org.apache.cassandra.db.ExpiringCell;
  import org.apache.cassandra.db.marshal.Int32Type;
  import org.apache.cassandra.db.marshal.LongType;
  import org.apache.cassandra.exceptions.InvalidRequestException;
  import org.apache.cassandra.serializers.MarshalException;
 +import org.apache.cassandra.utils.ByteBufferUtil;
+ import org.apache.cassandra.utils.NoSpamLogger;
  
  /**
   * Utility class for the Parser to gather attributes for modification

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1602e606/src/java/org/apache/cassandra/cql3/statements/CFPropDefs.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1602e606/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1602e606/src/java/org/apache/cassandra/db/AbstractNativeCell.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1602e606/src/java/org/apache/cassandra/db/BufferExpiringCell.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1602e606/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 7e36e11,2989b9d..45908de
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@@ -905,26 -879,13 +905,26 @@@ public class ColumnFamilyStore implemen
                                                  previousFlushFailure);
              logFlush();
              Flush flush = new Flush(false);
 -            ListenableFutureTask<?> flushTask = ListenableFutureTask.create(flush, null);
 -            flushExecutor.submit(flushTask);
 -            ListenableFutureTask<?> task = ListenableFutureTask.create(flush.postFlush, null);
 -            postFlushExecutor.submit(task);
 +            ListenableFutureTask<Void> flushTask = ListenableFutureTask.create(flush, null);
 +            flushExecutor.execute(flushTask);
 +            ListenableFutureTask<ReplayPosition> task = ListenableFutureTask.create(flush.postFlush);
 +            postFlushExecutor.execute(task);
  
              @SuppressWarnings("unchecked")
-             ListenableFuture<ReplayPosition> future = 
 -            ListenableFuture<?> future = Futures.allAsList(flushTask, task);
++            ListenableFuture<ReplayPosition> future =
 +                    // If either of the two tasks errors out, resulting future must also error out.
 +                    // Combine the two futures and only return post-flush result after both have completed.
 +                    // Note that flushTask will always yield null, but Futures.allAsList is
 +                    // order preserving, which is why the transform function returns the result
 +                    // from item 1 in it's input list (i.e. what was yielded by task).
 +                    Futures.transform(Futures.allAsList(flushTask, task),
 +                                      new Function<List<Object>, ReplayPosition>()
 +                                      {
 +                                          public ReplayPosition apply(List<Object> input)
 +                                          {
 +                                              return (ReplayPosition) input.get(1);
 +                                          }
 +                                      });
              return future;
          }
      }
@@@ -1610,29 -1516,48 +1610,28 @@@
          return CompactionManager.instance.performCleanup(ColumnFamilyStore.this, jobs);
      }
  
-     public CompactionManager.AllSSTableOpStatus scrub(boolean disableSnapshot, boolean skipCorrupted, boolean checkData, int jobs) throws ExecutionException, InterruptedException
+     public CompactionManager.AllSSTableOpStatus scrub(boolean disableSnapshot, boolean skipCorrupted, boolean checkData, boolean reinsertOverflowedTTLRows, int jobs) throws ExecutionException, InterruptedException
      {
-         return scrub(disableSnapshot, skipCorrupted, false, checkData, jobs);
 -        // skip snapshot creation during scrub, SEE JIRA 5891
 -        if(!disableSnapshot)
 -            snapshotWithoutFlush("pre-scrub-" + System.currentTimeMillis());
 -        return CompactionManager.instance.performScrub(ColumnFamilyStore.this, skipCorrupted, checkData, reinsertOverflowedTTLRows, jobs);
 -    }
 -
 -    public CompactionManager.AllSSTableOpStatus sstablesRewrite(boolean excludeCurrentVersion, int jobs) throws ExecutionException, InterruptedException
 -    {
 -        return CompactionManager.instance.performSSTableRewrite(ColumnFamilyStore.this, excludeCurrentVersion, jobs);
 -    }
 -
 -    public void markObsolete(Collection<SSTableReader> sstables, OperationType compactionType)
 -    {
 -        assert !sstables.isEmpty();
 -        data.markObsolete(sstables, compactionType);
 -    }
 -
 -    void replaceFlushed(Memtable memtable, SSTableReader sstable)
 -    {
 -        compactionStrategyWrapper.replaceFlushed(memtable, sstable);
 -    }
 -
 -    public boolean isValid()
 -    {
 -        return valid;
++        return scrub(disableSnapshot, skipCorrupted, false, checkData, reinsertOverflowedTTLRows, jobs);
      }
  
-     @VisibleForTesting
-     public CompactionManager.AllSSTableOpStatus scrub(boolean disableSnapshot, boolean skipCorrupted, boolean alwaysFail, boolean checkData, int jobs) throws ExecutionException, InterruptedException
 -    public long getMemtableColumnsCount()
++    public CompactionManager.AllSSTableOpStatus scrub(boolean disableSnapshot, boolean skipCorrupted, boolean alwaysFail, boolean checkData, boolean reinsertOverflowedTTLRows, int jobs) throws ExecutionException, InterruptedException
      {
 -        return metric.memtableColumnsCount.value();
 -    }
 +        // skip snapshot creation during scrub, SEE JIRA 5891
 +        if(!disableSnapshot)
 +            snapshotWithoutFlush("pre-scrub-" + System.currentTimeMillis());
  
 -    public long getMemtableDataSize()
 -    {
 -        return metric.memtableOnHeapSize.value();
 -    }
 +        try
 +        {
-             return CompactionManager.instance.performScrub(ColumnFamilyStore.this, skipCorrupted, checkData, jobs);
++            return CompactionManager.instance.performScrub(ColumnFamilyStore.this, skipCorrupted, checkData, reinsertOverflowedTTLRows, jobs);
 +        }
 +        catch(Throwable t)
 +        {
 +            if (!rebuildOnFailedScrub(t))
 +                throw t;
  
 -    public int getMemtableSwitchCount()
 -    {
 -        return (int) metric.memtableSwitchCount.count();
 +            return alwaysFail ? CompactionManager.AllSSTableOpStatus.ABORTED : CompactionManager.AllSSTableOpStatus.SUCCESSFUL;
 +        }
      }
  
      /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1602e606/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 3350b20,6e3634a..d90abe9
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@@ -348,9 -358,16 +348,15 @@@ public class CompactionManager implemen
          }
      }
  
 -    @Deprecated
 -    public AllSSTableOpStatus performScrub(final ColumnFamilyStore cfs, final boolean skipCorrupted, final boolean checkData, int jobs) throws InterruptedException, ExecutionException
 +    public AllSSTableOpStatus performScrub(final ColumnFamilyStore cfs, final boolean skipCorrupted, final boolean checkData, int jobs)
 +    throws InterruptedException, ExecutionException
      {
+         return performScrub(cfs, skipCorrupted, checkData, false, jobs);
+     }
+ 
 -    public AllSSTableOpStatus performScrub(final ColumnFamilyStore cfs, final boolean skipCorrupted, final boolean checkData,
 -                                           final boolean reinsertOverflowedTTLRows, int jobs) throws InterruptedException, ExecutionException
++    public AllSSTableOpStatus performScrub(final ColumnFamilyStore cfs, final boolean skipCorrupted, final boolean checkData, final boolean reinsertOverflowedTTLRows, int jobs)
++    throws InterruptedException, ExecutionException
+     {
 -        assert !cfs.isIndex();
          return parallelAllSSTableOperation(cfs, new OneSSTableOperation()
          {
              @Override
@@@ -360,30 -377,11 +366,30 @@@
              }
  
              @Override
 -            public void execute(SSTableReader input) throws IOException
 +            public void execute(LifecycleTransaction input) throws IOException
              {
-                 scrubOne(cfs, input, skipCorrupted, checkData);
+                 scrubOne(cfs, input, skipCorrupted, checkData, reinsertOverflowedTTLRows);
              }
 -        }, jobs);
 +        }, jobs, OperationType.SCRUB);
 +    }
 +
 +    public AllSSTableOpStatus performVerify(final ColumnFamilyStore cfs, final boolean extendedVerify) throws InterruptedException, ExecutionException
 +    {
 +        assert !cfs.isIndex();
 +        return parallelAllSSTableOperation(cfs, new OneSSTableOperation()
 +        {
 +            @Override
 +            public Iterable<SSTableReader> filterSSTables(LifecycleTransaction input)
 +            {
 +                return input.originals();
 +            }
 +
 +            @Override
 +            public void execute(LifecycleTransaction input) throws IOException
 +            {
 +                verifyOne(cfs, input.onlyOne(), extendedVerify);
 +            }
 +        }, 0, OperationType.VERIFY);
      }
  
      public AllSSTableOpStatus performSSTableRewrite(final ColumnFamilyStore cfs, final boolean excludeCurrentVersion, int jobs) throws InterruptedException, ExecutionException
@@@ -730,14 -717,14 +736,14 @@@
          }
      }
  
-     private void scrubOne(ColumnFamilyStore cfs, LifecycleTransaction modifier, boolean skipCorrupted, boolean checkData) throws IOException
 -    private void scrubOne(ColumnFamilyStore cfs, SSTableReader sstable, boolean skipCorrupted, boolean checkData, boolean reinsertOverflowedTTLRows) throws IOException
++    private void scrubOne(ColumnFamilyStore cfs, LifecycleTransaction modifier, boolean skipCorrupted, boolean checkData, boolean reinsertOverflowedTTLRows) throws IOException
      {
 -        Scrubber scrubber = new Scrubber(cfs, sstable, skipCorrupted, false, checkData, reinsertOverflowedTTLRows);
 +        CompactionInfo.Holder scrubInfo = null;
  
-         try (Scrubber scrubber = new Scrubber(cfs, modifier, skipCorrupted, checkData))
 -        CompactionInfo.Holder scrubInfo = scrubber.getScrubInfo();
 -        metrics.beginCompaction(scrubInfo);
 -        try
++        try (Scrubber scrubber = new Scrubber(cfs, modifier, skipCorrupted, checkData, reinsertOverflowedTTLRows))
          {
 +            scrubInfo = scrubber.getScrubInfo();
 +            metrics.beginCompaction(scrubInfo);
              scrubber.scrub();
          }
          finally

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1602e606/src/java/org/apache/cassandra/db/compaction/Scrubber.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/compaction/Scrubber.java
index b6b20fb,6d4537c..affee11
--- a/src/java/org/apache/cassandra/db/compaction/Scrubber.java
+++ b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
@@@ -22,33 -22,30 +22,36 @@@ import java.io.*
  import java.util.*;
  
  import com.google.common.annotations.VisibleForTesting;
 +import com.google.common.base.Predicate;
 +import com.google.common.base.Predicates;
  import com.google.common.base.Throwables;
  import com.google.common.collect.AbstractIterator;
 -import com.google.common.collect.Sets;
  
  import org.apache.cassandra.db.*;
 +import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
  import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
+ import org.apache.cassandra.db.composites.CellNames;
  import org.apache.cassandra.io.sstable.*;
 +import org.apache.cassandra.io.sstable.format.SSTableReader;
 +import org.apache.cassandra.io.sstable.format.SSTableWriter;
  import org.apache.cassandra.io.util.FileUtils;
  import org.apache.cassandra.io.util.RandomAccessReader;
  import org.apache.cassandra.service.ActiveRepairService;
  import org.apache.cassandra.utils.ByteBufferUtil;
  import org.apache.cassandra.utils.JVMStabilityInspector;
  import org.apache.cassandra.utils.OutputHandler;
+ import org.apache.cassandra.utils.memory.HeapAllocator;
 +import org.apache.cassandra.utils.UUIDGen;
 +import org.apache.cassandra.utils.concurrent.Refs;
  
  public class Scrubber implements Closeable
  {
      private final ColumnFamilyStore cfs;
      private final SSTableReader sstable;
 +    private final LifecycleTransaction transaction;
      private final File destination;
      private final boolean skipCorrupted;
 -    public final boolean validateColumns;
+     private final boolean reinsertOverflowedTTLRows;
  
      private final CompactionController controller;
      private final boolean isCommutative;
@@@ -84,20 -81,27 +88,28 @@@
      };
      private final SortedSet<Row> outOfOrderRows = new TreeSet<>(rowComparator);
  
 -    public Scrubber(ColumnFamilyStore cfs, SSTableReader sstable, boolean skipCorrupted, boolean isOffline, boolean checkData) throws IOException
 +    public Scrubber(ColumnFamilyStore cfs, LifecycleTransaction transaction, boolean skipCorrupted, boolean checkData) throws IOException
      {
-         this(cfs, transaction, skipCorrupted, new OutputHandler.LogOutput(), checkData);
 -        this(cfs, sstable, skipCorrupted, isOffline, checkData, false);
++        this(cfs, transaction, skipCorrupted, new OutputHandler.LogOutput(), checkData, false);
+     }
+ 
 -    public Scrubber(ColumnFamilyStore cfs, SSTableReader sstable, boolean skipCorrupted, boolean isOffline, boolean checkData,
++    public Scrubber(ColumnFamilyStore cfs, LifecycleTransaction transaction, boolean skipCorrupted, boolean checkData,
+                     boolean reinsertOverflowedTTLRows) throws IOException
+     {
 -        this(cfs, sstable, skipCorrupted, new OutputHandler.LogOutput(), isOffline, checkData, reinsertOverflowedTTLRows);
++        this(cfs, transaction, skipCorrupted, new OutputHandler.LogOutput(), checkData, reinsertOverflowedTTLRows);
      }
  
 -    public Scrubber(ColumnFamilyStore cfs, SSTableReader sstable, boolean skipCorrupted, OutputHandler outputHandler, boolean isOffline, boolean checkData,
 +    @SuppressWarnings("resource")
-     public Scrubber(ColumnFamilyStore cfs, LifecycleTransaction transaction, boolean skipCorrupted, OutputHandler outputHandler, boolean checkData) throws IOException
++    public Scrubber(ColumnFamilyStore cfs, LifecycleTransaction transaction, boolean skipCorrupted, OutputHandler outputHandler, boolean checkData,
+                     boolean reinsertOverflowedTTLRows) throws IOException
      {
          this.cfs = cfs;
 -        this.sstable = sstable;
 +        this.transaction = transaction;
 +        this.sstable = transaction.onlyOne();
          this.outputHandler = outputHandler;
          this.skipCorrupted = skipCorrupted;
 -        this.isOffline = isOffline;
 -        this.validateColumns = checkData;
 +        this.rowIndexEntrySerializer = sstable.descriptor.version.getSSTableFormat().getIndexSerializer(sstable.metadata);
+         this.reinsertOverflowedTTLRows = reinsertOverflowedTTLRows;
  
          List<SSTableReader> toScrub = Collections.singletonList(sstable);
  
@@@ -322,7 -339,7 +339,7 @@@
          // OrderCheckerIterator will check, at iteration time, that the cells are in the proper order. If it detects
          // that one cell is out of order, it will stop returning them. The remaining cells will be sorted and added
          // to the outOfOrderRows that will be later written to a new SSTable.
-         OrderCheckerIterator atoms = new OrderCheckerIterator(new SSTableIdentityIterator(sstable, dataFile, key, checkData),
 -        OrderCheckerIterator atoms = new OrderCheckerIterator(getIterator(key, dataSize),
++        OrderCheckerIterator atoms = new OrderCheckerIterator(getIterator(key),
                                                                cfs.metadata.comparator.onDiskAtomComparator());
          if (prevKey != null && prevKey.compareTo(key) > 0)
          {
@@@ -342,6 -359,18 +359,18 @@@
          return true;
      }
  
+     /**
+      * Only wrap with {@link FixNegativeLocalDeletionTimeIterator} if {@link #reinsertOverflowedTTLRows} option
+      * is specified
+      */
 -    private OnDiskAtomIterator getIterator(DecoratedKey key, long dataSize)
++    private OnDiskAtomIterator getIterator(DecoratedKey key)
+     {
 -        SSTableIdentityIterator sstableIdentityIterator = new SSTableIdentityIterator(sstable, dataFile, key, dataSize, validateColumns);
++        SSTableIdentityIterator sstableIdentityIterator = new SSTableIdentityIterator(sstable, dataFile, key, checkData);
+         return reinsertOverflowedTTLRows ? new FixNegativeLocalDeletionTimeIterator(sstableIdentityIterator,
+                                                                                     outputHandler,
+                                                                                     negativeLocalDeletionInfoMetrics) : sstableIdentityIterator;
+     }
+ 
      private void updateIndexKey()
      {
          currentIndexKey = nextIndexKey;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1602e606/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
index 6896062,d718765..e416c7b
--- a/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
@@@ -31,8 -32,8 +32,9 @@@ import org.apache.cassandra.cql3.Attrib
  import org.apache.cassandra.db.*;
  import org.apache.cassandra.db.context.CounterContext;
  import org.apache.cassandra.dht.IPartitioner;
+ import org.apache.cassandra.exceptions.InvalidRequestException;
 -import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
 +import org.apache.cassandra.io.sstable.format.SSTableFormat;
 +import org.apache.cassandra.io.sstable.format.SSTableWriter;
  import org.apache.cassandra.service.ActiveRepairService;
  import org.apache.cassandra.utils.CounterId;
  import org.apache.cassandra.utils.Pair;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1602e606/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/StorageService.java
index 1ecedac,a7a8ca7..2c9ac4d
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@@ -2611,10 -2415,16 +2611,16 @@@ public class StorageService extends Not
  
      public int scrub(boolean disableSnapshot, boolean skipCorrupted, boolean checkData, int jobs, String keyspaceName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException
      {
+         return scrub(disableSnapshot, skipCorrupted, checkData, false, jobs, keyspaceName, columnFamilies);
+     }
+ 
+     public int scrub(boolean disableSnapshot, boolean skipCorrupted, boolean checkData, boolean reinsertOverflowedTTLRows,
+                      int jobs, String keyspaceName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException
+     {
          CompactionManager.AllSSTableOpStatus status = CompactionManager.AllSSTableOpStatus.SUCCESSFUL;
 -        for (ColumnFamilyStore cfStore : getValidColumnFamilies(false, false, keyspaceName, columnFamilies))
 +        for (ColumnFamilyStore cfStore : getValidColumnFamilies(true, false, keyspaceName, columnFamilies))
          {
-             CompactionManager.AllSSTableOpStatus oneStatus = cfStore.scrub(disableSnapshot, skipCorrupted, checkData, jobs);
+             CompactionManager.AllSSTableOpStatus oneStatus = cfStore.scrub(disableSnapshot, skipCorrupted, checkData, reinsertOverflowedTTLRows, jobs);
              if (oneStatus != CompactionManager.AllSSTableOpStatus.SUCCESSFUL)
                  status = oneStatus;
          }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1602e606/src/java/org/apache/cassandra/service/StorageServiceMBean.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/StorageServiceMBean.java
index 761eed6,90c0fb5..f336bcc
--- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java
+++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
@@@ -265,15 -274,10 +265,18 @@@ public interface StorageServiceMBean ex
      public int scrub(boolean disableSnapshot, boolean skipCorrupted, String keyspaceName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException;
      @Deprecated
      public int scrub(boolean disableSnapshot, boolean skipCorrupted, boolean checkData, String keyspaceName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException;
+     @Deprecated
      public int scrub(boolean disableSnapshot, boolean skipCorrupted, boolean checkData, int jobs, String keyspaceName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException;
  
 -    public int scrub(boolean disableSnapshot, boolean skipCorrupted, boolean checkData, boolean reinsertOverflowedTTLRows, int jobs, String keyspaceName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException;
++public int scrub(boolean disableSnapshot, boolean skipCorrupted, boolean checkData, boolean reinsertOverflowedTTLRows, int jobs, String keyspaceName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException;
++
 +    /**
 +     * Verify (checksums of) the given keyspace.
 +     * If columnFamilies array is empty, all CFs are verified.
 +     *
 +     * The entire sstable will be read to ensure each cell validates if extendedVerify is true
 +     */
 +    public int verify(boolean extendedVerify, String keyspaceName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException;
  
      /**
       * Rewrite all sstables to the latest version.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1602e606/src/java/org/apache/cassandra/thrift/ThriftValidation.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/thrift/ThriftValidation.java
index d735676,10e7185..8bdf9dc
--- a/src/java/org/apache/cassandra/thrift/ThriftValidation.java
+++ b/src/java/org/apache/cassandra/thrift/ThriftValidation.java
@@@ -312,9 -313,9 +313,9 @@@ public class ThriftValidatio
          if (cosc.column != null)
          {
              if (isCommutative)
 -                throw new org.apache.cassandra.exceptions.InvalidRequestException("invalid operation for commutative columnfamily " + metadata.cfName);
 +                throw new org.apache.cassandra.exceptions.InvalidRequestException("invalid operation for commutative table " + metadata.cfName);
  
-             validateTtl(cosc.column);
+             validateTtl(metadata, cosc.column);
              validateColumnPath(metadata, new ColumnPath(metadata.cfName).setSuper_column((ByteBuffer)null).setColumn(cosc.column.name));
              validateColumnData(metadata, key, null, cosc.column);
          }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1602e606/src/java/org/apache/cassandra/tools/NodeProbe.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/tools/NodeProbe.java
index 24c5874,fcd4110..17bef02
--- a/src/java/org/apache/cassandra/tools/NodeProbe.java
+++ b/src/java/org/apache/cassandra/tools/NodeProbe.java
@@@ -236,16 -243,11 +236,16 @@@ public class NodeProbe implements AutoC
          return ssProxy.forceKeyspaceCleanup(jobs, keyspaceName, columnFamilies);
      }
  
-     public int scrub(boolean disableSnapshot, boolean skipCorrupted, boolean checkData, int jobs, String keyspaceName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException
+     public int scrub(boolean disableSnapshot, boolean skipCorrupted, boolean checkData, boolean reinsertOverflowedTTLRows, int jobs, String keyspaceName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException
      {
-         return ssProxy.scrub(disableSnapshot, skipCorrupted, checkData, jobs, keyspaceName, columnFamilies);
+         return ssProxy.scrub(disableSnapshot, skipCorrupted, checkData, reinsertOverflowedTTLRows, jobs, keyspaceName, columnFamilies);
      }
  
 +    public int verify(boolean extendedVerify, String keyspaceName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException
 +    {
 +        return ssProxy.verify(extendedVerify, keyspaceName, columnFamilies);
 +    }
 +
      public int upgradeSSTables(String keyspaceName, boolean excludeCurrentVersion, int jobs, String... columnFamilies) throws IOException, ExecutionException, InterruptedException
      {
          return ssProxy.upgradeSSTables(keyspaceName, excludeCurrentVersion, jobs, columnFamilies);
@@@ -267,22 -268,13 +267,22 @@@
          }
      }
  
-     public void scrub(PrintStream out, boolean disableSnapshot, boolean skipCorrupted, boolean checkData, int jobs, String keyspaceName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException
+     public void scrub(PrintStream out, boolean disableSnapshot, boolean skipCorrupted, boolean checkData, boolean reinsertOverflowedTTLRows, int jobs, String keyspaceName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException
      {
          checkJobs(out, jobs);
-         if (scrub(disableSnapshot, skipCorrupted, checkData, jobs, keyspaceName, columnFamilies) != 0)
+         if (scrub(disableSnapshot, skipCorrupted, checkData, reinsertOverflowedTTLRows, jobs, keyspaceName, columnFamilies) != 0)
          {
              failed = true;
 -            out.println("Aborted scrubbing atleast one column family in keyspace "+keyspaceName+", check server logs for more information.");
 +            out.println("Aborted scrubbing at least one table in keyspace "+keyspaceName+", check server logs for more information.");
 +        }
 +    }
 +
 +    public void verify(PrintStream out, boolean extendedVerify, String keyspaceName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException
 +    {
 +        if (verify(extendedVerify, keyspaceName, columnFamilies) != 0)
 +        {
 +            failed = true;
 +            out.println("Aborted verifying at least one table in keyspace "+keyspaceName+", check server logs for more information.");
          }
      }
  

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1602e606/src/java/org/apache/cassandra/tools/StandaloneScrubber.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/tools/StandaloneScrubber.java
index a486a13,59d13d5..f5e84c5
--- a/src/java/org/apache/cassandra/tools/StandaloneScrubber.java
+++ b/src/java/org/apache/cassandra/tools/StandaloneScrubber.java
@@@ -119,10 -115,10 +126,10 @@@ public class StandaloneScrubbe
              {
                  for (SSTableReader sstable : sstables)
                  {
 -                    try
 +                    try (LifecycleTransaction txn = LifecycleTransaction.offline(OperationType.SCRUB, sstable))
                      {
 -                        Scrubber scrubber = new Scrubber(cfs, sstable, options.skipCorrupted, handler, true, !options.noValidate, options.reinsertOverflowedTTL);
 -                        try
 +                        txn.obsoleteOriginals(); // make sure originals are deleted and avoid NPE if index is missing, CASSANDRA-9591
-                         try (Scrubber scrubber = new Scrubber(cfs, txn, options.skipCorrupted, handler, !options.noValidate))
++                        try (Scrubber scrubber = new Scrubber(cfs, txn, options.skipCorrupted, handler, !options.noValidate, options.reinsertOverflowedTTL))
                          {
                              scrubber.scrub();
                          }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1602e606/src/java/org/apache/cassandra/tools/nodetool/Scrub.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/tools/nodetool/Scrub.java
index dafe8d1,0000000..50224a0
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/tools/nodetool/Scrub.java
+++ b/src/java/org/apache/cassandra/tools/nodetool/Scrub.java
@@@ -1,76 -1,0 +1,82 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.tools.nodetool;
 +
 +import io.airlift.command.Arguments;
 +import io.airlift.command.Command;
 +import io.airlift.command.Option;
 +
 +import java.util.ArrayList;
 +import java.util.List;
 +
 +import org.apache.cassandra.tools.NodeProbe;
 +import org.apache.cassandra.tools.NodeTool.NodeToolCmd;
++import org.apache.cassandra.tools.StandaloneScrubber;
 +
 +@Command(name = "scrub", description = "Scrub (rebuild sstables for) one or more tables")
 +public class Scrub extends NodeToolCmd
 +{
 +    @Arguments(usage = "[<keyspace> <tables>...]", description = "The keyspace followed by one or many tables")
 +    private List<String> args = new ArrayList<>();
 +
 +    @Option(title = "disable_snapshot",
 +            name = {"-ns", "--no-snapshot"},
 +            description = "Scrubbed CFs will be snapshotted first, if disableSnapshot is false. (default false)")
 +    private boolean disableSnapshot = false;
 +
 +    @Option(title = "skip_corrupted",
 +            name = {"-s", "--skip-corrupted"},
 +            description = "Skip corrupted partitions even when scrubbing counter tables. (default false)")
 +    private boolean skipCorrupted = false;
 +
 +    @Option(title = "no_validate",
 +                   name = {"-n", "--no-validate"},
 +                   description = "Do not validate columns using column validator")
 +    private boolean noValidation = false;
 +
 +    @Option(title = "jobs",
 +            name = {"-j", "--jobs"},
 +            description = "Number of sstables to scrub simultanously, set to 0 to use all available compaction threads")
 +    private int jobs = 2;
 +
++    @Option(title = "reinsert_overflowed_ttl",
++    name = {"r", "--reinsert-overflowed-ttl"},
++    description = StandaloneScrubber.REINSERT_OVERFLOWED_TTL_OPTION_DESCRIPTION)
++    private boolean reinsertOverflowedTTL = false;
++
 +    @Override
 +    public void execute(NodeProbe probe)
 +    {
 +        List<String> keyspaces = parseOptionalKeyspace(args, probe);
 +        String[] cfnames = parseOptionalColumnFamilies(args);
 +
 +        for (String keyspace : keyspaces)
 +        {
 +            try
 +            {
-                 probe.scrub(System.out, disableSnapshot, skipCorrupted, !noValidation, jobs, keyspace, cfnames);
++                probe.scrub(System.out, disableSnapshot, skipCorrupted, !noValidation, reinsertOverflowedTTL, jobs, keyspace, cfnames);
 +            } catch (IllegalArgumentException e)
 +            {
 +                throw e;
 +            } catch (Exception e)
 +            {
 +                throw new RuntimeException("Error occurred during scrubbing", e);
 +            }
 +        }
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1602e606/test/data/negative-local-expiration-test/table1/lb-1-big-CompressionInfo.db
----------------------------------------------------------------------
diff --cc test/data/negative-local-expiration-test/table1/lb-1-big-CompressionInfo.db
index 0000000,0000000..d7cc13b
new file mode 100644
Binary files differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1602e606/test/data/negative-local-expiration-test/table1/lb-1-big-Data.db
----------------------------------------------------------------------
diff --cc test/data/negative-local-expiration-test/table1/lb-1-big-Data.db
index 0000000,0000000..51213c2
new file mode 100644
Binary files differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1602e606/test/data/negative-local-expiration-test/table1/lb-1-big-Digest.adler32
----------------------------------------------------------------------
diff --cc test/data/negative-local-expiration-test/table1/lb-1-big-Digest.adler32
index 0000000,0000000..d5b12df
new file mode 100644
--- /dev/null
+++ b/test/data/negative-local-expiration-test/table1/lb-1-big-Digest.adler32
@@@ -1,0 -1,0 +1,1 @@@
++2292388625

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1602e606/test/data/negative-local-expiration-test/table1/lb-1-big-Filter.db
----------------------------------------------------------------------
diff --cc test/data/negative-local-expiration-test/table1/lb-1-big-Filter.db
index 0000000,0000000..f8e53be
new file mode 100644
Binary files differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1602e606/test/data/negative-local-expiration-test/table1/lb-1-big-Index.db
----------------------------------------------------------------------
diff --cc test/data/negative-local-expiration-test/table1/lb-1-big-Index.db
index 0000000,0000000..3ab96ee
new file mode 100644
Binary files differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1602e606/test/data/negative-local-expiration-test/table1/lb-1-big-Statistics.db
----------------------------------------------------------------------
diff --cc test/data/negative-local-expiration-test/table1/lb-1-big-Statistics.db
index 0000000,0000000..e8cc7e0
new file mode 100644
Binary files differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1602e606/test/data/negative-local-expiration-test/table1/lb-1-big-Summary.db
----------------------------------------------------------------------
diff --cc test/data/negative-local-expiration-test/table1/lb-1-big-Summary.db
index 0000000,0000000..1a3f81f
new file mode 100644
Binary files differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1602e606/test/data/negative-local-expiration-test/table1/lb-1-big-TOC.txt
----------------------------------------------------------------------
diff --cc test/data/negative-local-expiration-test/table1/lb-1-big-TOC.txt
index 0000000,0000000..26c7025
new file mode 100644
--- /dev/null
+++ b/test/data/negative-local-expiration-test/table1/lb-1-big-TOC.txt
@@@ -1,0 -1,0 +1,8 @@@
++Statistics.db
++Summary.db
++Digest.adler32
++Data.db
++Index.db
++CompressionInfo.db
++Filter.db
++TOC.txt

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1602e606/test/data/negative-local-expiration-test/table2/lb-1-big-CompressionInfo.db
----------------------------------------------------------------------
diff --cc test/data/negative-local-expiration-test/table2/lb-1-big-CompressionInfo.db
index 0000000,0000000..38373b4
new file mode 100644
Binary files differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1602e606/test/data/negative-local-expiration-test/table2/lb-1-big-Data.db
----------------------------------------------------------------------
diff --cc test/data/negative-local-expiration-test/table2/lb-1-big-Data.db
index 0000000,0000000..762a229
new file mode 100644
Binary files differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1602e606/test/data/negative-local-expiration-test/table2/lb-1-big-Digest.adler32
----------------------------------------------------------------------
diff --cc test/data/negative-local-expiration-test/table2/lb-1-big-Digest.adler32
index 0000000,0000000..ae89849
new file mode 100644
--- /dev/null
+++ b/test/data/negative-local-expiration-test/table2/lb-1-big-Digest.adler32
@@@ -1,0 -1,0 +1,1 @@@
++3829731931

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1602e606/test/data/negative-local-expiration-test/table2/lb-1-big-Filter.db
----------------------------------------------------------------------
diff --cc test/data/negative-local-expiration-test/table2/lb-1-big-Filter.db
index 0000000,0000000..f8e53be
new file mode 100644
Binary files differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1602e606/test/data/negative-local-expiration-test/table2/lb-1-big-Index.db
----------------------------------------------------------------------
diff --cc test/data/negative-local-expiration-test/table2/lb-1-big-Index.db
index 0000000,0000000..38a6e4c
new file mode 100644
Binary files differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1602e606/test/data/negative-local-expiration-test/table2/lb-1-big-Statistics.db
----------------------------------------------------------------------
diff --cc test/data/negative-local-expiration-test/table2/lb-1-big-Statistics.db
index 0000000,0000000..64dab43
new file mode 100644
Binary files differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1602e606/test/data/negative-local-expiration-test/table2/lb-1-big-Summary.db
----------------------------------------------------------------------
diff --cc test/data/negative-local-expiration-test/table2/lb-1-big-Summary.db
index 0000000,0000000..1a3f81f
new file mode 100644
Binary files differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1602e606/test/data/negative-local-expiration-test/table2/lb-1-big-TOC.txt
----------------------------------------------------------------------
diff --cc test/data/negative-local-expiration-test/table2/lb-1-big-TOC.txt
index 0000000,0000000..26c7025
new file mode 100644
--- /dev/null
+++ b/test/data/negative-local-expiration-test/table2/lb-1-big-TOC.txt
@@@ -1,0 -1,0 +1,8 @@@
++Statistics.db
++Summary.db
++Digest.adler32
++Data.db
++Index.db
++CompressionInfo.db
++Filter.db
++TOC.txt

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1602e606/test/data/negative-local-expiration-test/table3/lb-1-big-CompressionInfo.db
----------------------------------------------------------------------
diff --cc test/data/negative-local-expiration-test/table3/lb-1-big-CompressionInfo.db
index 0000000,0000000..04a7384
new file mode 100644
Binary files differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1602e606/test/data/negative-local-expiration-test/table3/lb-1-big-Data.db
----------------------------------------------------------------------
diff --cc test/data/negative-local-expiration-test/table3/lb-1-big-Data.db
index 0000000,0000000..33145df
new file mode 100644
Binary files differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1602e606/test/data/negative-local-expiration-test/table3/lb-1-big-Digest.adler32
----------------------------------------------------------------------
diff --cc test/data/negative-local-expiration-test/table3/lb-1-big-Digest.adler32
index 0000000,0000000..2a542cd
new file mode 100644
--- /dev/null
+++ b/test/data/negative-local-expiration-test/table3/lb-1-big-Digest.adler32
@@@ -1,0 -1,0 +1,1 @@@
++3574474340

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1602e606/test/data/negative-local-expiration-test/table3/lb-1-big-Filter.db
----------------------------------------------------------------------
diff --cc test/data/negative-local-expiration-test/table3/lb-1-big-Filter.db
index 0000000,0000000..f8e53be
new file mode 100644
Binary files differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1602e606/test/data/negative-local-expiration-test/table3/lb-1-big-Index.db
----------------------------------------------------------------------
diff --cc test/data/negative-local-expiration-test/table3/lb-1-big-Index.db
index 0000000,0000000..5fb34e8
new file mode 100644
Binary files differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1602e606/test/data/negative-local-expiration-test/table3/lb-1-big-Statistics.db
----------------------------------------------------------------------
diff --cc test/data/negative-local-expiration-test/table3/lb-1-big-Statistics.db
index 0000000,0000000..51203ae
new file mode 100644
Binary files differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1602e606/test/data/negative-local-expiration-test/table3/lb-1-big-Summary.db
----------------------------------------------------------------------
diff --cc test/data/negative-local-expiration-test/table3/lb-1-big-Summary.db
index 0000000,0000000..1a3f81f
new file mode 100644
Binary files differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1602e606/test/data/negative-local-expiration-test/table3/lb-1-big-TOC.txt
----------------------------------------------------------------------
diff --cc test/data/negative-local-expiration-test/table3/lb-1-big-TOC.txt
index 0000000,0000000..26c7025
new file mode 100644
--- /dev/null
+++ b/test/data/negative-local-expiration-test/table3/lb-1-big-TOC.txt
@@@ -1,0 -1,0 +1,8 @@@
++Statistics.db
++Summary.db
++Digest.adler32
++Data.db
++Index.db
++CompressionInfo.db
++Filter.db
++TOC.txt

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1602e606/test/data/negative-local-expiration-test/table4/lb-1-big-CompressionInfo.db
----------------------------------------------------------------------
diff --cc test/data/negative-local-expiration-test/table4/lb-1-big-CompressionInfo.db
index 0000000,0000000..c814fef
new file mode 100644
Binary files differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1602e606/test/data/negative-local-expiration-test/table4/lb-1-big-Data.db
----------------------------------------------------------------------
diff --cc test/data/negative-local-expiration-test/table4/lb-1-big-Data.db
index 0000000,0000000..f40e71f
new file mode 100644
Binary files differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1602e606/test/data/negative-local-expiration-test/table4/lb-1-big-Digest.adler32
----------------------------------------------------------------------
diff --cc test/data/negative-local-expiration-test/table4/lb-1-big-Digest.adler32
index 0000000,0000000..e6675e4
new file mode 100644
--- /dev/null
+++ b/test/data/negative-local-expiration-test/table4/lb-1-big-Digest.adler32
@@@ -1,0 -1,0 +1,1 @@@
++2405377913

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1602e606/test/data/negative-local-expiration-test/table4/lb-1-big-Filter.db
----------------------------------------------------------------------
diff --cc test/data/negative-local-expiration-test/table4/lb-1-big-Filter.db
index 0000000,0000000..f8e53be
new file mode 100644
Binary files differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1602e606/test/data/negative-local-expiration-test/table4/lb-1-big-Index.db
----------------------------------------------------------------------
diff --cc test/data/negative-local-expiration-test/table4/lb-1-big-Index.db
index 0000000,0000000..8291383
new file mode 100644
Binary files differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1602e606/test/data/negative-local-expiration-test/table4/lb-1-big-Statistics.db
----------------------------------------------------------------------
diff --cc test/data/negative-local-expiration-test/table4/lb-1-big-Statistics.db
index 0000000,0000000..2217c2d
new file mode 100644
Binary files differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1602e606/test/data/negative-local-expiration-test/table4/lb-1-big-Summary.db
----------------------------------------------------------------------
diff --cc test/data/negative-local-expiration-test/table4/lb-1-big-Summary.db
index 0000000,0000000..1a3f81f
new file mode 100644
Binary files differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1602e606/test/data/negative-local-expiration-test/table4/lb-1-big-TOC.txt
----------------------------------------------------------------------
diff --cc test/data/negative-local-expiration-test/table4/lb-1-big-TOC.txt
index 0000000,0000000..26c7025
new file mode 100644
--- /dev/null
+++ b/test/data/negative-local-expiration-test/table4/lb-1-big-TOC.txt
@@@ -1,0 -1,0 +1,8 @@@
++Statistics.db
++Summary.db
++Digest.adler32
++Data.db
++Index.db
++CompressionInfo.db
++Filter.db
++TOC.txt

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1602e606/test/unit/org/apache/cassandra/cql3/validation/operations/TTLTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/cql3/validation/operations/TTLTest.java
index 0000000,ab4ef21..b1eaac1
mode 000000,100644..100644
--- a/test/unit/org/apache/cassandra/cql3/validation/operations/TTLTest.java
+++ b/test/unit/org/apache/cassandra/cql3/validation/operations/TTLTest.java
@@@ -1,0 -1,410 +1,405 @@@
+ package org.apache.cassandra.cql3.validation.operations;
+ 
+ import java.io.File;
+ import java.io.FileInputStream;
+ import java.io.FileOutputStream;
+ import java.io.IOException;
+ 
+ import static org.junit.Assert.assertEquals;
+ import static org.junit.Assert.assertTrue;
+ import static org.junit.Assert.fail;
+ 
+ import org.apache.cassandra.cql3.Attributes;
+ import org.apache.cassandra.cql3.CQLTester;
+ import org.apache.cassandra.cql3.UntypedResultSet;
+ import org.apache.cassandra.db.BufferExpiringCell;
+ import org.apache.cassandra.db.ColumnFamilyStore;
+ import org.apache.cassandra.db.ExpiringCell;
+ import org.apache.cassandra.db.Keyspace;
+ import org.apache.cassandra.exceptions.InvalidRequestException;
+ import org.apache.cassandra.utils.FBUtilities;
+ 
+ import org.junit.Test;
+ 
+ public class TTLTest extends CQLTester
+ {
+     public static String NEGATIVE_LOCAL_EXPIRATION_TEST_DIR = "test/data/negative-local-expiration-test/%s";
+ 
+     public static int MAX_TTL = ExpiringCell.MAX_TTL;
+ 
+     public static final String SIMPLE_NOCLUSTERING = "table1";
+     public static final String SIMPLE_CLUSTERING = "table2";
+     public static final String COMPLEX_NOCLUSTERING = "table3";
+     public static final String COMPLEX_CLUSTERING = "table4";
+ 
+     @Test
+     public void testTTLPerRequestLimit() throws Throwable
+     {
+         createTable("CREATE TABLE %s (k int PRIMARY KEY, i int)");
+         // insert with low TTL should not be denied
+         execute("INSERT INTO %s (k, i) VALUES (1, 1) USING TTL ?", 10); // max ttl
+ 
+         try
+         {
+             execute("INSERT INTO %s (k, i) VALUES (1, 1) USING TTL ?", MAX_TTL + 1);
+             fail("Expect InvalidRequestException");
+         }
+         catch (InvalidRequestException e)
+         {
+             assertTrue(e.getMessage().contains("ttl is too large."));
+         }
+ 
+         try
+         {
+             execute("INSERT INTO %s (k, i) VALUES (1, 1) USING TTL ?", -1);
+             fail("Expect InvalidRequestException");
+         }
+         catch (InvalidRequestException e)
+         {
+             assertTrue(e.getMessage().contains("A TTL must be greater or equal to 0"));
+         }
+         execute("TRUNCATE %s");
+ 
+         // insert with low TTL should not be denied
+         execute("UPDATE %s USING TTL ? SET i = 1 WHERE k = 2", 5); // max ttl
+ 
+         try
+         {
+             execute("UPDATE %s USING TTL ? SET i = 1 WHERE k = 2", MAX_TTL + 1);
+             fail("Expect InvalidRequestException");
+         }
+         catch (InvalidRequestException e)
+         {
+             assertTrue(e.getMessage().contains("ttl is too large."));
+         }
+ 
+         try
+         {
+             execute("UPDATE %s USING TTL ? SET i = 1 WHERE k = 2", -1);
+             fail("Expect InvalidRequestException");
+         }
+         catch (InvalidRequestException e)
+         {
+             assertTrue(e.getMessage().contains("A TTL must be greater or equal to 0"));
+         }
+     }
+ 
+ 
+     @Test
+     public void testTTLDefaultLimit() throws Throwable
+     {
+         try
+         {
+             createTable("CREATE TABLE %s (k int PRIMARY KEY, i int) WITH default_time_to_live=-1");
+             fail("Expect Invalid schema");
+         }
+         catch (RuntimeException e)
+         {
+             assertTrue(e.getCause()
 -                        .getCause()
+                         .getMessage()
+                         .contains("default_time_to_live cannot be smaller than 0"));
+         }
+         try
+         {
+             createTable("CREATE TABLE %s (k int PRIMARY KEY, i int) WITH default_time_to_live="
+                         + (MAX_TTL + 1));
+             fail("Expect Invalid schema");
+         }
+         catch (RuntimeException e)
+         {
+             assertTrue(e.getCause()
 -                        .getCause()
+                         .getMessage()
+                         .contains("default_time_to_live must be less than or equal to " + MAX_TTL + " (got "
+                                   + (MAX_TTL + 1) + ")"));
+         }
+ 
+         // table with default low TTL should not be denied
+         createTable("CREATE TABLE %s (k int PRIMARY KEY, i int) WITH default_time_to_live=" + 5);
+         execute("INSERT INTO %s (k, i) VALUES (1, 1)");
+     }
+ 
+     @Test
+     public void testRejectExpirationDateOverflowPolicy() throws Throwable
+     {
+         Attributes.policy = Attributes.ExpirationDateOverflowPolicy.REJECT;
+         createTable("CREATE TABLE %s (k int PRIMARY KEY, i int)");
+         try
+         {
+             execute("INSERT INTO %s (k, i) VALUES (1, 1) USING TTL " + MAX_TTL);
+         }
+         catch (InvalidRequestException e)
+         {
+             assertTrue(e.getMessage().contains("exceeds maximum supported expiration date"));
+         }
+         try
+         {
+             createTable("CREATE TABLE %s (k int PRIMARY KEY, i int) WITH default_time_to_live=" + MAX_TTL);
+             execute("INSERT INTO %s (k, i) VALUES (1, 1)");
+         }
+         catch (InvalidRequestException e)
+         {
+             assertTrue(e.getMessage().contains("exceeds maximum supported expiration date"));
+         }
+     }
+ 
+     @Test
+     public void testCapExpirationDatePolicyDefaultTTL() throws Throwable
+     {
+         Attributes.policy = Attributes.ExpirationDateOverflowPolicy.CAP;
+         createTable("CREATE TABLE %s (k int PRIMARY KEY, i int) WITH default_time_to_live=" + MAX_TTL);
+         execute("INSERT INTO %s (k, i) VALUES (1, 1)");
+         checkTTLIsCapped("i");
+         Attributes.policy = Attributes.ExpirationDateOverflowPolicy.REJECT;
+     }
+ 
+     @Test
+     public void testCapExpirationDatePolicyPerRequest() throws Throwable
+     {
+         // Test cap policy
+         Attributes.policy = Attributes.ExpirationDateOverflowPolicy.CAP;
+ 
+         // simple column, clustering, flush
+         baseCapExpirationDateOverflowTest(true, true, true);
+         // simple column, clustering, noflush
+         baseCapExpirationDateOverflowTest(true, true, false);
+         // simple column, noclustering, flush
+         baseCapExpirationDateOverflowTest(true, false, true);
+         // simple column, noclustering, noflush
+         baseCapExpirationDateOverflowTest(true, false, false);
+         // complex column, clustering, flush
+         baseCapExpirationDateOverflowTest(false, true, true);
+         // complex column, clustering, noflush
+         baseCapExpirationDateOverflowTest(false, true, false);
+         // complex column, noclustering, flush
+         baseCapExpirationDateOverflowTest(false, false, true);
+         // complex column, noclustering, noflush
+         baseCapExpirationDateOverflowTest(false, false, false);
+         // complex column, noclustering, flush
+         baseCapExpirationDateOverflowTest(false, false, false);
+ 
+         // Return to previous policy
+         Attributes.policy = Attributes.ExpirationDateOverflowPolicy.REJECT;
+     }
+ 
+     @Test
+     public void testRecoverOverflowedExpirationWithScrub() throws Throwable
+     {
 -        createTable(true, true);
 -        createTable(true, false);
 -        createTable(false, true);
 -        createTable(false, false);
 -
+         baseTestRecoverOverflowedExpiration(false, false);
+         baseTestRecoverOverflowedExpiration(true, false);
+         baseTestRecoverOverflowedExpiration(true, true);
+     }
+ 
+     public void baseCapExpirationDateOverflowTest(boolean simple, boolean clustering, boolean flush) throws Throwable
+     {
+         // Create Table
+         if (simple)
+         {
+             if (clustering)
+                 createTable("create table %s (k int, a int, b int, primary key(k, a))");
+             else
+                 createTable("create table %s (k int primary key, a int, b int)");
+         }
+         else
+         {
+             if (clustering)
+                 createTable("create table %s (k int, a int, b set<text>, primary key(k, a))");
+             else
+                 createTable("create table %s (k int primary key, a int, b set<text>)");
+         }
+ 
+         // Insert data with INSERT and UPDATE
+         if (simple)
+         {
+             execute("INSERT INTO %s (k, a, b) VALUES (?, ?, ?) USING TTL " + MAX_TTL, 2, 2, 2);
+             if (clustering)
+                 execute("UPDATE %s USING TTL " + MAX_TTL + " SET b = 1 WHERE k = 1 AND a = 1;");
+             else
+                 execute("UPDATE %s USING TTL " + MAX_TTL + " SET a = 1, b = 1 WHERE k = 1;");
+         }
+         else
+         {
+             execute("INSERT INTO %s (k, a, b) VALUES (?, ?, ?) USING TTL " + MAX_TTL, 2, 2, set("v21", "v22", "v23", "v24"));
+             if (clustering)
+                 execute("UPDATE  %s USING TTL " + MAX_TTL + " SET b = ? WHERE k = 1 AND a = 1;", set("v11", "v12", "v13", "v14"));
+             else
+                 execute("UPDATE  %s USING TTL " + MAX_TTL + " SET a = 1, b = ? WHERE k = 1;", set("v11", "v12", "v13", "v14"));
+         }
+ 
+         // Maybe Flush
+         Keyspace ks = Keyspace.open(keyspace());
+         if (flush)
+             FBUtilities.waitOnFutures(ks.flush());
+ 
+         // Verify data
+         verifyData(simple);
+ 
+         // Maybe major compact
+         if (flush)
+         {
+             // Major compact and check data is still present
+             ks.getColumnFamilyStore(currentTable()).forceMajorCompaction();
+ 
+             // Verify data again
+             verifyData(simple);
+         }
+     }
+ 
+     public void baseTestRecoverOverflowedExpiration(boolean runScrub, boolean reinsertOverflowedTTL) throws Throwable
+     {
+         // simple column, clustering
+         testRecoverOverflowedExpirationWithScrub(true, true, runScrub, reinsertOverflowedTTL);
+         // simple column, noclustering
+         testRecoverOverflowedExpirationWithScrub(true, false, runScrub, reinsertOverflowedTTL);
+         // complex column, clustering
+         testRecoverOverflowedExpirationWithScrub(false, true, runScrub, reinsertOverflowedTTL);
+         // complex column, noclustering
+         testRecoverOverflowedExpirationWithScrub(false, false, runScrub, reinsertOverflowedTTL);
+     }
+ 
+     private void verifyData(boolean simple) throws Throwable
+     {
+         if (simple)
+         {
+             assertRows(execute("SELECT * from %s"), row(1, 1, 1), row(2, 2, 2));
+         }
+         else
+         {
+             assertRows(execute("SELECT * from %s"), row(1, 1, set("v11", "v12", "v13", "v14")), row(2, 2, set("v21", "v22", "v23", "v24")));
+         }
+         // Cannot retrieve TTL from collections
+         if (simple)
+             checkTTLIsCapped("b");
+     }
+ 
+     /**
+      * Verify that the computed TTL is approximately equal to the maximum allowed ttl given the
+      * {@link ExpiringCell#getLocalDeletionTime()} field limitation (CASSANDRA-14092)
+      */
+     private void checkTTLIsCapped(String field) throws Throwable
+     {
+ 
+         // TTL is computed dynamically from row expiration time, so if it is
+         // equal or higher to the minimum max TTL we compute before the query
+         // we are fine.
+         int minMaxTTL = computeMaxTTL();
+         UntypedResultSet execute = execute("SELECT ttl(" + field + ") FROM %s");
+         for (UntypedResultSet.Row row : execute)
+         {
+             int ttl = row.getInt("ttl(" + field + ")");
+             assertTrue(ttl >= minMaxTTL);
+         }
+     }
+ 
+     /**
+      * The max TTL is computed such that the TTL summed with the current time is equal to the maximum
+      * allowed expiration time {@link BufferExpiringCell#getLocalDeletionTime()} (2038-01-19T03:14:06+00:00)
+      */
+     private int computeMaxTTL()
+     {
+         int nowInSecs = (int) (System.currentTimeMillis() / 1000);
+         return BufferExpiringCell.MAX_DELETION_TIME - nowInSecs;
+     }
+ 
+     public void testRecoverOverflowedExpirationWithScrub(boolean simple, boolean clustering, boolean runScrub, boolean reinsertOverflowedTTL) throws Throwable
+     {
+         if (reinsertOverflowedTTL)
+         {
+             assert runScrub;
+         }
+ 
++        createTable(simple, clustering);
++
+         Keyspace keyspace = Keyspace.open(KEYSPACE);
 -        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(getTableName(simple, clustering));
++        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(currentTable());
+ 
 -        assertEquals(0, cfs.getLiveSSTableCount());
++        assertEquals(0, cfs.getSSTables().size());
+ 
 -        copySSTablesToTableDir(simple, clustering);
++        copySSTablesToTableDir(currentTable(), simple, clustering);
+ 
+         cfs.loadNewSSTables();
+ 
+         if (runScrub)
+         {
+             cfs.scrub(true, false, false, reinsertOverflowedTTL, 1);
+         }
+ 
+         if (reinsertOverflowedTTL)
+         {
+             if (simple)
 -            {
 -                UntypedResultSet execute = execute(String.format("SELECT * from %s.%s", KEYSPACE, getTableName(simple, clustering)));
 -                assertRows(execute, row(1, 1, 1), row(2, 2, 2));
 -
 -            }
++                assertRows(execute("SELECT * from %s"), row(1, 1, 1), row(2, 2, 2));
+             else
 -                assertRows(execute(String.format("SELECT * from %s.%s", KEYSPACE, getTableName(simple, clustering))), row(1, 1, set("v11", "v12", "v13", "v14")), row(2, 2, set("v21", "v22", "v23", "v24")));
++                assertRows(execute("SELECT * from %s"), row(1, 1, set("v11", "v12", "v13", "v14")), row(2, 2, set("v21", "v22", "v23", "v24")));
+ 
+             cfs.forceMajorCompaction();
+ 
+             if (simple)
 -                assertRows(execute(String.format("SELECT * from %s.%s", KEYSPACE, getTableName(simple, clustering))), row(1, 1, 1), row(2, 2, 2));
++                assertRows(execute("SELECT * from %s"), row(1, 1, 1), row(2, 2, 2));
+             else
 -                assertRows(execute(String.format("SELECT * from %s.%s", KEYSPACE, getTableName(simple, clustering))), row(1, 1, set("v11", "v12", "v13", "v14")), row(2, 2, set("v21", "v22", "v23", "v24")));
++                assertRows(execute("SELECT * from %s"), row(1, 1, set("v11", "v12", "v13", "v14")), row(2, 2, set("v21", "v22", "v23", "v24")));
+         }
+         else
+         {
 -            assertEmpty(execute(String.format("SELECT * from %s.%s", KEYSPACE, getTableName(simple, clustering))));
++            assertEmpty(execute("SELECT * from %s"));
+         }
 -        cfs.truncateBlocking(); //cleanup for next tests
+     }
+ 
 -    private void copySSTablesToTableDir(boolean simple, boolean clustering) throws IOException
++    private void copySSTablesToTableDir(String table, boolean simple, boolean clustering) throws IOException
+     {
 -        File destDir = Keyspace.open(KEYSPACE).getColumnFamilyStore(getTableName(simple, clustering)).directories.getCFDirectories().iterator().next();
 -        File sourceDir = getTableDir(simple, clustering);
++        File destDir = Keyspace.open(keyspace()).getColumnFamilyStore(table).directories.getCFDirectories().iterator().next();
++        File sourceDir = getTableDir(table, simple, clustering);
+         for (File file : sourceDir.listFiles())
+         {
+             copyFile(file, destDir);
+         }
+     }
+ 
 -    private void createTable(boolean simple, boolean clustering) throws Throwable
++    private static File getTableDir(String table, boolean simple, boolean clustering)
++    {
++        return new File(String.format(NEGATIVE_LOCAL_EXPIRATION_TEST_DIR, getTableName(simple, clustering)));
++    }
++
++    private void createTable(boolean simple, boolean clustering)
+     {
+         if (simple)
+         {
+             if (clustering)
 -                execute(String.format("create table %s.%s (k int, a int, b int, primary key(k, a))", KEYSPACE, getTableName(simple, clustering)));
++                createTable("create table %s (k int, a int, b int, primary key(k, a))");
+             else
 -                execute(String.format("create table %s.%s (k int primary key, a int, b int)", KEYSPACE, getTableName(simple, clustering)));
++                createTable("create table %s (k int primary key, a int, b int)");
+         }
+         else
+         {
+             if (clustering)
 -                execute(String.format("create table %s.%s (k int, a int, b set<text>, primary key(k, a))", KEYSPACE, getTableName(simple, clustering)));
++                createTable("create table %s (k int, a int, b set<text>, primary key(k, a))");
+             else
 -                execute(String.format("create table %s.%s (k int primary key, a int, b set<text>)", KEYSPACE, getTableName(simple, clustering)));
++                createTable("create table %s (k int primary key, a int, b set<text>)");
+         }
+     }
+ 
+     private static File getTableDir(boolean simple, boolean clustering)
+     {
+         return new File(String.format(NEGATIVE_LOCAL_EXPIRATION_TEST_DIR, getTableName(simple, clustering)));
+     }
+ 
+     private static void copyFile(File src, File dest) throws IOException
+     {
+         byte[] buf = new byte[65536];
+         if (src.isFile())
+         {
+             File target = new File(dest, src.getName());
+             int rd;
+             FileInputStream is = new FileInputStream(src);
+             FileOutputStream os = new FileOutputStream(target);
+             while ((rd = is.read(buf)) >= 0)
+                 os.write(buf, 0, rd);
+         }
+     }
+ 
+     public static String getTableName(boolean simple, boolean clustering)
+     {
+         if (simple)
+             return clustering ? SIMPLE_CLUSTERING : SIMPLE_NOCLUSTERING;
+         else
+             return clustering ? COMPLEX_CLUSTERING : COMPLEX_NOCLUSTERING;
+     }
+ }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1602e606/test/unit/org/apache/cassandra/db/ScrubTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/ScrubTest.java
index 4cca7ff,4efd082..9b1ede4
--- a/test/unit/org/apache/cassandra/db/ScrubTest.java
+++ b/test/unit/org/apache/cassandra/db/ScrubTest.java
@@@ -654,129 -565,4 +654,129 @@@ public class ScrubTes
          assertEquals("bar", iter.next().getString("c"));
          assertEquals("boo", iter.next().getString("c"));
      }
 +
 +    @Test /* CASSANDRA-5174 */
 +    public void testScrubKeysIndex_preserveOrder() throws IOException, ExecutionException, InterruptedException
 +    {
 +        //If the partitioner preserves the order then SecondaryIndex uses BytesType comparator,
 +        // otherwise it uses LocalByPartitionerType
 +        setKeyComparator(BytesType.instance);
 +        testScrubIndex(CF_INDEX1, COL_KEYS_INDEX, false, true);
 +    }
 +
 +    @Test /* CASSANDRA-5174 */
 +    public void testScrubCompositeIndex_preserveOrder() throws IOException, ExecutionException, InterruptedException
 +    {
 +        setKeyComparator(BytesType.instance);
 +        testScrubIndex(CF_INDEX2, COL_COMPOSITES_INDEX, true, true);
 +    }
 +
 +    @Test /* CASSANDRA-5174 */
 +    public void testScrubKeysIndex() throws IOException, ExecutionException, InterruptedException
 +    {
 +        setKeyComparator(new LocalByPartionerType(StorageService.getPartitioner()));
 +        testScrubIndex(CF_INDEX1, COL_KEYS_INDEX, false, true);
 +    }
 +
 +    @Test /* CASSANDRA-5174 */
 +    public void testScrubCompositeIndex() throws IOException, ExecutionException, InterruptedException
 +    {
 +        setKeyComparator(new LocalByPartionerType(StorageService.getPartitioner()));
 +        testScrubIndex(CF_INDEX2, COL_COMPOSITES_INDEX, true, true);
 +    }
 +
 +    @Test /* CASSANDRA-5174 */
 +    public void testFailScrubKeysIndex() throws IOException, ExecutionException, InterruptedException
 +    {
 +        testScrubIndex(CF_INDEX1, COL_KEYS_INDEX, false, false);
 +    }
 +
 +    @Test /* CASSANDRA-5174 */
 +    public void testFailScrubCompositeIndex() throws IOException, ExecutionException, InterruptedException
 +    {
 +        testScrubIndex(CF_INDEX2, COL_COMPOSITES_INDEX, true, false);
 +    }
 +
 +    @Test /* CASSANDRA-5174 */
 +    public void testScrubTwice() throws IOException, ExecutionException, InterruptedException
 +    {
 +        testScrubIndex(CF_INDEX1, COL_KEYS_INDEX, false, true, true);
 +    }
 +
 +    /** The SecondaryIndex class is used for custom indexes so to avoid
 +     * making a public final field into a private field with getters
 +     * and setters, we resort to this hack in order to test it properly
 +     * since it can have two values which influence the scrubbing behavior.
 +     * @param comparator - the key comparator we want to test
 +     */
 +    private void setKeyComparator(AbstractType<?> comparator)
 +    {
 +        try
 +        {
 +            Field keyComparator = SecondaryIndex.class.getDeclaredField("keyComparator");
 +            keyComparator.setAccessible(true);
 +            int modifiers = keyComparator.getModifiers();
 +            Field modifierField = keyComparator.getClass().getDeclaredField("modifiers");
 +            modifiers = modifiers & ~Modifier.FINAL;
 +            modifierField.setAccessible(true);
 +            modifierField.setInt(keyComparator, modifiers);
 +
 +            keyComparator.set(null, comparator);
 +        }
 +        catch (Exception ex)
 +        {
 +            fail("Failed to change key comparator in secondary index : " + ex.getMessage());
 +            ex.printStackTrace();
 +        }
 +    }
 +
 +    private void testScrubIndex(String cfName, String colName, boolean composite, boolean ... scrubs)
 +            throws IOException, ExecutionException, InterruptedException
 +    {
 +        CompactionManager.instance.disableAutoCompaction();
 +        Keyspace keyspace = Keyspace.open(KEYSPACE);
 +        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(cfName);
 +        cfs.clearUnsafe();
 +
 +        int numRows = 1000;
 +        long[] colValues = new long [numRows * 2]; // each row has two columns
 +        for (int i = 0; i < colValues.length; i+=2)
 +        {
 +            colValues[i] = (i % 4 == 0 ? 1L : 2L); // index column
 +            colValues[i+1] = 3L; //other column
 +        }
 +        fillIndexCF(cfs, composite, colValues);
 +
 +        // check index
 +        IndexExpression expr = new IndexExpression(ByteBufferUtil.bytes(colName), Operator.EQ, ByteBufferUtil.bytes(1L));
 +        List<Row> rows = cfs.search(Util.range("", ""), Arrays.asList(expr), new IdentityQueryFilter(), numRows);
 +        assertNotNull(rows);
 +        assertEquals(numRows / 2, rows.size());
 +
 +        // scrub index
 +        Set<ColumnFamilyStore> indexCfss = cfs.indexManager.getIndexesBackedByCfs();
 +        assertTrue(indexCfss.size() == 1);
 +        for(ColumnFamilyStore indexCfs : indexCfss)
 +        {
 +            for (int i = 0; i < scrubs.length; i++)
 +            {
 +                boolean failure = !scrubs[i];
 +                if (failure)
 +                { //make sure the next scrub fails
 +                    overrideWithGarbage(indexCfs.getSSTables().iterator().next(), ByteBufferUtil.bytes(1L), ByteBufferUtil.bytes(2L));
 +                }
-                 CompactionManager.AllSSTableOpStatus result = indexCfs.scrub(false, false, true, true, 0);
++                CompactionManager.AllSSTableOpStatus result = indexCfs.scrub(false, false, true, true, true, 0);
 +                assertEquals(failure ?
 +                             CompactionManager.AllSSTableOpStatus.ABORTED :
 +                             CompactionManager.AllSSTableOpStatus.SUCCESSFUL,
 +                                result);
 +            }
 +        }
 +
 +
 +        // check index is still working
 +        rows = cfs.search(Util.range("", ""), Arrays.asList(expr), new IdentityQueryFilter(), numRows);
 +        assertNotNull(rows);
 +        assertEquals(numRows / 2, rows.size());
 +    }
  }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org