You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by pa...@apache.org on 2018/02/11 13:26:05 UTC

[20/29] cassandra git commit: Merge branch 'cassandra-2.2' into cassandra-3.0

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c231ed5b/src/java/org/apache/cassandra/db/LivenessInfo.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/LivenessInfo.java
index 89e0578,0000000..f6c9b62
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/db/LivenessInfo.java
+++ b/src/java/org/apache/cassandra/db/LivenessInfo.java
@@@ -1,369 -1,0 +1,375 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.db;
 +
 +import java.util.Objects;
 +import java.security.MessageDigest;
 +
 +import org.apache.cassandra.config.CFMetaData;
++import org.apache.cassandra.db.rows.Cell;
 +import org.apache.cassandra.serializers.MarshalException;
 +import org.apache.cassandra.utils.FBUtilities;
 +
 +/**
 + * Stores the information relating to the liveness of the primary key columns of a row.
 + * <p>
 + * A {@code LivenessInfo} can first be empty. If it isn't, it contains at least a timestamp,
 + * which is the timestamp for the row primary key columns. On top of that, the info can be
 + * ttl'ed, in which case the {@code LivenessInfo} also has both a ttl and a local expiration time.
 + * <p>
 + * Please note that if a liveness info is ttl'ed, that expiration is <b>only</b> an expiration
 + * of the liveness info itself (so, of the timestamp), and once the info expires it becomes
 + * {@code EMPTY}. But if a row has a liveness info which expires, the rest of the row data is
 + * unaffected (of course, the rest of said row data might be ttl'ed on its own but this is
 + * separate).
 + */
 +public class LivenessInfo
 +{
 +    public static final long NO_TIMESTAMP = Long.MIN_VALUE;
-     public static final int NO_TTL = 0;
++    public static final int NO_TTL = Cell.NO_TTL;
 +    /**
 +     * Used as flag for representing an expired liveness.
 +     *
 +     * TTL per request is at most 20 yrs, so this shouldn't conflict
 +     * (See {@link org.apache.cassandra.cql3.Attributes#MAX_TTL})
 +     */
 +    public static final int EXPIRED_LIVENESS_TTL = Integer.MAX_VALUE;
-     public static final int NO_EXPIRATION_TIME = Integer.MAX_VALUE;
++    public static final int NO_EXPIRATION_TIME = Cell.NO_DELETION_TIME;
 +
 +    public static final LivenessInfo EMPTY = new LivenessInfo(NO_TIMESTAMP);
 +
 +    protected final long timestamp;
 +
 +    protected LivenessInfo(long timestamp)
 +    {
 +        this.timestamp = timestamp;
 +    }
 +
 +    public static LivenessInfo create(CFMetaData metadata, long timestamp, int nowInSec)
 +    {
 +        int defaultTTL = metadata.params.defaultTimeToLive;
 +        if (defaultTTL != NO_TTL)
 +            return expiring(timestamp, defaultTTL, nowInSec);
 +
 +        return new LivenessInfo(timestamp);
 +    }
 +
 +    public static LivenessInfo expiring(long timestamp, int ttl, int nowInSec)
 +    {
 +        assert ttl != EXPIRED_LIVENESS_TTL;
-         return new ExpiringLivenessInfo(timestamp, ttl, nowInSec + ttl);
++        return new ExpiringLivenessInfo(timestamp, ttl, ExpirationDateOverflowHandling.computeLocalExpirationTime(nowInSec, ttl));
 +    }
 +
 +    public static LivenessInfo create(CFMetaData metadata, long timestamp, int ttl, int nowInSec)
 +    {
 +        return ttl == NO_TTL
 +             ? create(metadata, timestamp, nowInSec)
 +             : expiring(timestamp, ttl, nowInSec);
 +    }
 +
 +    // Note that this ctor ignores the default table ttl and takes the expiration time, not the current time.
 +    // Use when you know that's what you want.
 +    public static LivenessInfo create(long timestamp, int ttl, int localExpirationTime)
 +    {
 +        if (ttl == EXPIRED_LIVENESS_TTL)
 +            return new ExpiredLivenessInfo(timestamp, ttl, localExpirationTime);
 +        return ttl == NO_TTL ? new LivenessInfo(timestamp) : new ExpiringLivenessInfo(timestamp, ttl, localExpirationTime);
 +    }
 +
 +    /**
 +     * Whether this liveness info is empty (has no timestamp).
 +     *
 +     * @return whether this liveness info is empty or not.
 +     */
 +    public boolean isEmpty()
 +    {
 +        return timestamp == NO_TIMESTAMP;
 +    }
 +
 +    /**
 +     * The timestamp for this liveness info.
 +     *
 +     * @return the liveness info timestamp (or {@link #NO_TIMESTAMP} if the info is empty).
 +     */
 +    public long timestamp()
 +    {
 +        return timestamp;
 +    }
 +
 +    /**
 +     * Whether the info has a ttl.
 +     */
 +    public boolean isExpiring()
 +    {
 +        return false;
 +    }
 +
 +    /**
 +     * The ttl (if any) on the row primary key columns or {@link #NO_TTL} if it is not
 +     * expiring.
 +     *
 +     * Please note that this value is the TTL that was set originally and is thus not
 +     * changing.
 +     */
 +    public int ttl()
 +    {
 +        return NO_TTL;
 +    }
 +
 +    /**
 +     * The expiration time (in seconds) if the info is expiring ({@link #NO_EXPIRATION_TIME} otherwise).
 +     *
 +     */
 +    public int localExpirationTime()
 +    {
 +        return NO_EXPIRATION_TIME;
 +    }
 +
 +    /**
 +     * Whether that info is still live.
 +     *
 +     * A {@code LivenessInfo} is live if it is either not expiring, or if its expiration time if after
 +     * {@code nowInSec}.
 +     *
 +     * @param nowInSec the current time in seconds.
 +     * @return whether this liveness info is live or not.
 +     */
 +    public boolean isLive(int nowInSec)
 +    {
 +        return !isEmpty();
 +    }
 +
 +    /**
 +     * Adds this liveness information to the provided digest.
 +     *
 +     * @param digest the digest to add this liveness information to.
 +     */
 +    public void digest(MessageDigest digest)
 +    {
 +        FBUtilities.updateWithLong(digest, timestamp());
 +    }
 +
 +    /**
 +     * Validate the data contained by this liveness information.
 +     *
 +     * @throws MarshalException if some of the data is corrupted.
 +     */
 +    public void validate()
 +    {
 +    }
 +
 +    /**
 +     * The size of the (useful) data this liveness information contains.
 +     *
 +     * @return the size of the data this liveness information contains.
 +     */
 +    public int dataSize()
 +    {
 +        return TypeSizes.sizeof(timestamp());
 +    }
 +
 +    /**
 +     * Whether this liveness information supersedes another one (that is
 +     * whether is has a greater timestamp than the other or not).
 +     *
 +     * </br>
 +     *
 +     * If timestamps are the same and none of them are expired livenessInfo,
 +     * livenessInfo with greater TTL supersedes another. It also means, if timestamps are the same,
 +     * ttl superseders no-ttl. This is the same rule as {@link Conflicts#resolveRegular}
 +     *
 +     * If timestamps are the same and one of them is expired livenessInfo. Expired livenessInfo
 +     * supersedes, ie. tombstone supersedes.
 +     *
 +     * If timestamps are the same and both of them are expired livenessInfo(Ideally it shouldn't happen),
 +     * greater localDeletionTime wins.
 +     *
 +     * @param other
 +     *            the {@code LivenessInfo} to compare this info to.
 +     *
 +     * @return whether this {@code LivenessInfo} supersedes {@code other}.
 +     */
 +    public boolean supersedes(LivenessInfo other)
 +    {
 +        if (timestamp != other.timestamp)
 +            return timestamp > other.timestamp;
 +        if (isExpired() ^ other.isExpired())
 +            return isExpired();
 +        if (isExpiring() == other.isExpiring())
 +            return localExpirationTime() > other.localExpirationTime();
 +        return isExpiring();
 +    }
 +
 +    protected boolean isExpired()
 +    {
 +        return false;
 +    }
 +
 +    /**
 +     * Returns a copy of this liveness info updated with the provided timestamp.
 +     *
 +     * @param newTimestamp the timestamp for the returned info.
 +     * @return if this liveness info has a timestamp, a copy of it with {@code newTimestamp}
 +     * as timestamp. If it has no timestamp however, this liveness info is returned
 +     * unchanged.
 +     */
 +    public LivenessInfo withUpdatedTimestamp(long newTimestamp)
 +    {
 +        return new LivenessInfo(newTimestamp);
 +    }
 +
++    public LivenessInfo withUpdatedTimestampAndLocalDeletionTime(long newTimestamp, int newLocalDeletionTime)
++    {
++        return LivenessInfo.create(newTimestamp, ttl(), newLocalDeletionTime);
++    }
++
 +    @Override
 +    public String toString()
 +    {
 +        return String.format("[ts=%d]", timestamp);
 +    }
 +
 +    @Override
 +    public boolean equals(Object other)
 +    {
 +        if(!(other instanceof LivenessInfo))
 +            return false;
 +
 +        LivenessInfo that = (LivenessInfo)other;
 +        return this.timestamp() == that.timestamp()
 +            && this.ttl() == that.ttl()
 +            && this.localExpirationTime() == that.localExpirationTime();
 +    }
 +
 +    @Override
 +    public int hashCode()
 +    {
 +        return Objects.hash(timestamp(), ttl(), localExpirationTime());
 +    }
 +
 +    /**
 +     * Effectively acts as a PK tombstone. This is used for Materialized Views to shadow
 +     * updated entries while co-existing with row tombstones.
 +     *
 +     * See {@link org.apache.cassandra.db.view.ViewUpdateGenerator#deleteOldEntryInternal}.
 +     */
 +    private static class ExpiredLivenessInfo extends ExpiringLivenessInfo
 +    {
 +        private ExpiredLivenessInfo(long timestamp, int ttl, int localExpirationTime)
 +        {
 +            super(timestamp, ttl, localExpirationTime);
 +            assert ttl == EXPIRED_LIVENESS_TTL;
 +            assert timestamp != NO_TIMESTAMP;
 +        }
 +
 +        @Override
 +        public boolean isExpired()
 +        {
 +            return true;
 +        }
 +
 +        @Override
 +        public boolean isLive(int nowInSec)
 +        {
 +            // used as tombstone to shadow entire PK
 +            return false;
 +        }
 +
 +        @Override
 +        public LivenessInfo withUpdatedTimestamp(long newTimestamp)
 +        {
 +            return new ExpiredLivenessInfo(newTimestamp, ttl(), localExpirationTime());
 +        }
 +    }
 +
 +    private static class ExpiringLivenessInfo extends LivenessInfo
 +    {
 +        private final int ttl;
 +        private final int localExpirationTime;
 +
 +        private ExpiringLivenessInfo(long timestamp, int ttl, int localExpirationTime)
 +        {
 +            super(timestamp);
 +            assert ttl != NO_TTL && localExpirationTime != NO_EXPIRATION_TIME;
 +            this.ttl = ttl;
 +            this.localExpirationTime = localExpirationTime;
 +        }
 +
 +        @Override
 +        public int ttl()
 +        {
 +            return ttl;
 +        }
 +
 +        @Override
 +        public int localExpirationTime()
 +        {
 +            return localExpirationTime;
 +        }
 +
 +        @Override
 +        public boolean isExpiring()
 +        {
 +            return true;
 +        }
 +
 +        @Override
 +        public boolean isLive(int nowInSec)
 +        {
 +            return nowInSec < localExpirationTime;
 +        }
 +
 +        @Override
 +        public void digest(MessageDigest digest)
 +        {
 +            super.digest(digest);
 +            FBUtilities.updateWithInt(digest, localExpirationTime);
 +            FBUtilities.updateWithInt(digest, ttl);
 +        }
 +
 +        @Override
 +        public void validate()
 +        {
 +            if (ttl < 0)
 +                throw new MarshalException("A TTL should not be negative");
 +            if (localExpirationTime < 0)
 +                throw new MarshalException("A local expiration time should not be negative");
 +        }
 +
 +        @Override
 +        public int dataSize()
 +        {
 +            return super.dataSize()
 +                 + TypeSizes.sizeof(ttl)
 +                 + TypeSizes.sizeof(localExpirationTime);
 +
 +        }
 +
 +        @Override
 +        public LivenessInfo withUpdatedTimestamp(long newTimestamp)
 +        {
 +            return new ExpiringLivenessInfo(newTimestamp, ttl, localExpirationTime);
 +        }
 +
 +        @Override
 +        public String toString()
 +        {
 +            return String.format("[ts=%d ttl=%d, let=%d]", timestamp, ttl, localExpirationTime);
 +        }
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c231ed5b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index eaf6dab,d90abe9..1d54667
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@@ -363,7 -348,13 +363,15 @@@ public class CompactionManager implemen
          }
      }
  
--    public AllSSTableOpStatus performScrub(final ColumnFamilyStore cfs, final boolean skipCorrupted, final boolean checkData, int jobs)
++    public AllSSTableOpStatus performScrub(final ColumnFamilyStore cfs, final boolean skipCorrupted, final boolean checkData,
++                                           int jobs)
+     throws InterruptedException, ExecutionException
+     {
+         return performScrub(cfs, skipCorrupted, checkData, false, jobs);
+     }
+ 
 -    public AllSSTableOpStatus performScrub(final ColumnFamilyStore cfs, final boolean skipCorrupted, final boolean checkData, final boolean reinsertOverflowedTTLRows, int jobs)
++    public AllSSTableOpStatus performScrub(final ColumnFamilyStore cfs, final boolean skipCorrupted, final boolean checkData,
++                                           final boolean reinsertOverflowedTTL, int jobs)
      throws InterruptedException, ExecutionException
      {
          return parallelAllSSTableOperation(cfs, new OneSSTableOperation()
@@@ -377,7 -368,7 +385,7 @@@
              @Override
              public void execute(LifecycleTransaction input) throws IOException
              {
-                 scrubOne(cfs, input, skipCorrupted, checkData);
 -                scrubOne(cfs, input, skipCorrupted, checkData, reinsertOverflowedTTLRows);
++                scrubOne(cfs, input, skipCorrupted, checkData, reinsertOverflowedTTL);
              }
          }, jobs, OperationType.SCRUB);
      }
@@@ -745,11 -736,11 +753,11 @@@
          }
      }
  
-     private void scrubOne(ColumnFamilyStore cfs, LifecycleTransaction modifier, boolean skipCorrupted, boolean checkData) throws IOException
 -    private void scrubOne(ColumnFamilyStore cfs, LifecycleTransaction modifier, boolean skipCorrupted, boolean checkData, boolean reinsertOverflowedTTLRows) throws IOException
++    private void scrubOne(ColumnFamilyStore cfs, LifecycleTransaction modifier, boolean skipCorrupted, boolean checkData, boolean reinsertOverflowedTTL) throws IOException
      {
          CompactionInfo.Holder scrubInfo = null;
  
-         try (Scrubber scrubber = new Scrubber(cfs, modifier, skipCorrupted, checkData))
 -        try (Scrubber scrubber = new Scrubber(cfs, modifier, skipCorrupted, checkData, reinsertOverflowedTTLRows))
++        try (Scrubber scrubber = new Scrubber(cfs, modifier, skipCorrupted, checkData, reinsertOverflowedTTL))
          {
              scrubInfo = scrubber.getScrubInfo();
              metrics.beginCompaction(scrubInfo);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c231ed5b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/compaction/Scrubber.java
index c8e0c53,affee11..bc11504
--- a/src/java/org/apache/cassandra/db/compaction/Scrubber.java
+++ b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
@@@ -35,8 -37,12 +35,9 @@@ import org.apache.cassandra.io.sstable.
  import org.apache.cassandra.io.util.FileUtils;
  import org.apache.cassandra.io.util.RandomAccessReader;
  import org.apache.cassandra.service.ActiveRepairService;
 -import org.apache.cassandra.utils.ByteBufferUtil;
 -import org.apache.cassandra.utils.JVMStabilityInspector;
 -import org.apache.cassandra.utils.OutputHandler;
 -import org.apache.cassandra.utils.memory.HeapAllocator;
 -import org.apache.cassandra.utils.UUIDGen;
 +import org.apache.cassandra.utils.*;
  import org.apache.cassandra.utils.concurrent.Refs;
++import org.apache.cassandra.utils.memory.HeapAllocator;
  
  public class Scrubber implements Closeable
  {
@@@ -45,7 -51,9 +46,8 @@@
      private final LifecycleTransaction transaction;
      private final File destination;
      private final boolean skipCorrupted;
+     private final boolean reinsertOverflowedTTLRows;
  
 -    private final CompactionController controller;
      private final boolean isCommutative;
      private final boolean isIndex;
      private final boolean checkData;
@@@ -65,38 -76,41 +67,47 @@@
      long currentRowPositionFromIndex;
      long nextRowPositionFromIndex;
  
 -    private final OutputHandler outputHandler;
+     private NegativeLocalDeletionInfoMetrics negativeLocalDeletionInfoMetrics = new NegativeLocalDeletionInfoMetrics();
+ 
 -    private static final Comparator<Row> rowComparator = new Comparator<Row>()
 +    private final OutputHandler outputHandler;
 +
 +    private static final Comparator<Partition> partitionComparator = new Comparator<Partition>()
      {
 -         public int compare(Row r1, Row r2)
 +         public int compare(Partition r1, Partition r2)
           {
 -             return r1.key.compareTo(r2.key);
 +             return r1.partitionKey().compareTo(r2.partitionKey());
           }
      };
 -    private final SortedSet<Row> outOfOrderRows = new TreeSet<>(rowComparator);
 +    private final SortedSet<Partition> outOfOrder = new TreeSet<>(partitionComparator);
  
      public Scrubber(ColumnFamilyStore cfs, LifecycleTransaction transaction, boolean skipCorrupted, boolean checkData) throws IOException
      {
-         this(cfs, transaction, skipCorrupted, new OutputHandler.LogOutput(), checkData);
 -        this(cfs, transaction, skipCorrupted, new OutputHandler.LogOutput(), checkData, false);
++        this(cfs, transaction, skipCorrupted, checkData, false);
+     }
+ 
+     public Scrubber(ColumnFamilyStore cfs, LifecycleTransaction transaction, boolean skipCorrupted, boolean checkData,
+                     boolean reinsertOverflowedTTLRows) throws IOException
+     {
+         this(cfs, transaction, skipCorrupted, new OutputHandler.LogOutput(), checkData, reinsertOverflowedTTLRows);
      }
  
      @SuppressWarnings("resource")
 -    public Scrubber(ColumnFamilyStore cfs, LifecycleTransaction transaction, boolean skipCorrupted, OutputHandler outputHandler, boolean checkData,
 +    public Scrubber(ColumnFamilyStore cfs,
 +                    LifecycleTransaction transaction,
 +                    boolean skipCorrupted,
 +                    OutputHandler outputHandler,
-                     boolean checkData) throws IOException
++                    boolean checkData,
+                     boolean reinsertOverflowedTTLRows) throws IOException
      {
          this.cfs = cfs;
          this.transaction = transaction;
          this.sstable = transaction.onlyOne();
          this.outputHandler = outputHandler;
          this.skipCorrupted = skipCorrupted;
 -        this.rowIndexEntrySerializer = sstable.descriptor.version.getSSTableFormat().getIndexSerializer(sstable.metadata);
+         this.reinsertOverflowedTTLRows = reinsertOverflowedTTLRows;
 -
 +        this.rowIndexEntrySerializer = sstable.descriptor.version.getSSTableFormat().getIndexSerializer(sstable.metadata,
 +                                                                                                        sstable.descriptor.version,
 +                                                                                                        sstable.header);
- 
          List<SSTableReader> toScrub = Collections.singletonList(sstable);
  
          // Calculate the expected compacted filesize
@@@ -134,19 -150,15 +145,22 @@@
  
          this.currentRowPositionFromIndex = 0;
          this.nextRowPositionFromIndex = 0;
+ 
+         if (reinsertOverflowedTTLRows)
+             outputHandler.output("Starting scrub with reinsert overflowed TTL option");
      }
  
 +    private UnfilteredRowIterator withValidation(UnfilteredRowIterator iter, String filename)
 +    {
 +        return checkData ? UnfilteredRowIterators.withValidation(iter, filename) : iter;
 +    }
 +
      public void scrub()
      {
 +        List<SSTableReader> finished = new ArrayList<>();
 +        boolean completed = false;
          outputHandler.output(String.format("Scrubbing %s (%s bytes)", sstable, dataFile.length()));
 -        try (SSTableRewriter writer = new SSTableRewriter(cfs, transaction, sstable.maxDataAge, transaction.isOffline());
 +        try (SSTableRewriter writer = SSTableRewriter.construct(cfs, transaction, false, sstable.maxDataAge, transaction.isOffline());
               Refs<SSTableReader> refs = Refs.ref(Collections.singleton(sstable)))
          {
              nextIndexKey = indexAvailable() ? ByteBufferUtil.readWithShortLength(indexFile) : null;
@@@ -285,57 -311,66 +299,71 @@@
          }
          finally
          {
 -            controller.close();
 -            if (transaction.isOffline() && newSstable != null)
 -                newSstable.selfRef().release();
 +            if (transaction.isOffline())
 +                finished.forEach(sstable -> sstable.selfRef().release());
          }
  
 -        if (newSstable == null)
 -        {
 -            if (badRows > 0)
 -                outputHandler.warn("No valid rows found while scrubbing " + sstable + "; it is marked for deletion now. If you want to attempt manual recovery, you can find a copy in the pre-scrub snapshot");
 -            else
 -                outputHandler.output("Scrub of " + sstable + " complete; looks like all " + emptyRows + " rows were tombstoned");
 -        }
 -        else
 +        if (completed)
          {
+             outputHandler.output("Scrub of " + sstable + " complete: " + goodRows + " rows in new sstable and " + emptyRows + " empty (tombstoned) rows dropped");
+             if (negativeLocalDeletionInfoMetrics.fixedRows > 0)
+                 outputHandler.output("Fixed " + negativeLocalDeletionInfoMetrics.fixedRows + " rows with overflowed local deletion time.");
              if (badRows > 0)
-                 outputHandler.warn("No valid rows found while scrubbing " + sstable + "; it is marked for deletion now. If you want to attempt manual recovery, you can find a copy in the pre-scrub snapshot");
-             else
-                 outputHandler.output("Scrub of " + sstable + " complete; looks like all " + emptyRows + " rows were tombstoned");
+                 outputHandler.warn("Unable to recover " + badRows + " rows that were skipped.  You can attempt manual recovery from the pre-scrub snapshot.  You can also run nodetool repair to transfer the data from a healthy replica, if any");
          }
 +        else
 +        {
-             outputHandler.output("Scrub of " + sstable + " complete: " + goodRows + " rows in new sstable and " + emptyRows + " empty (tombstoned) rows dropped");
 +            if (badRows > 0)
-                 outputHandler.warn("Unable to recover " + badRows + " rows that were skipped.  You can attempt manual recovery from the pre-scrub snapshot.  You can also run nodetool repair to transfer the data from a healthy replica, if any");
++                outputHandler.warn("No valid rows found while scrubbing " + sstable + "; it is marked for deletion now. If you want to attempt manual recovery, you can find a copy in the pre-scrub snapshot");
++            else
++                outputHandler.output("Scrub of " + sstable + " complete; looks like all " + emptyRows + " rows were tombstoned");
 +        }
      }
  
      @SuppressWarnings("resource")
      private boolean tryAppend(DecoratedKey prevKey, DecoratedKey key, SSTableRewriter writer)
      {
 -        // OrderCheckerIterator will check, at iteration time, that the cells are in the proper order. If it detects
 -        // that one cell is out of order, it will stop returning them. The remaining cells will be sorted and added
 -        // to the outOfOrderRows that will be later written to a new SSTable.
 -        OrderCheckerIterator atoms = new OrderCheckerIterator(getIterator(key),
 -                                                              cfs.metadata.comparator.onDiskAtomComparator());
 -        if (prevKey != null && prevKey.compareTo(key) > 0)
 -        {
 -            saveOutOfOrderRow(prevKey, key, atoms);
 -            return false;
 -        }
 +        // OrderCheckerIterator will check, at iteration time, that the rows are in the proper order. If it detects
 +        // that one row is out of order, it will stop returning them. The remaining rows will be sorted and added
 +        // to the outOfOrder set that will be later written to a new SSTable.
-         OrderCheckerIterator sstableIterator = new OrderCheckerIterator(new RowMergingSSTableIterator(sstable, dataFile, key),
++        OrderCheckerIterator sstableIterator = new OrderCheckerIterator(getIterator(key),
 +                                                                        cfs.metadata.comparator);
  
 -        AbstractCompactedRow compactedRow = new LazilyCompactedRow(controller, Collections.singletonList(atoms));
 -        if (writer.tryAppend(compactedRow) == null)
 -            emptyRows++;
 -        else
 -            goodRows++;
 +        try (UnfilteredRowIterator iterator = withValidation(sstableIterator, dataFile.getPath()))
 +        {
 +            if (prevKey != null && prevKey.compareTo(key) > 0)
 +            {
 +                saveOutOfOrderRow(prevKey, key, iterator);
 +                return false;
 +            }
  
 -        if (atoms.hasOutOfOrderCells())
 -            saveOutOfOrderRow(key, atoms);
 +            if (writer.tryAppend(iterator) == null)
 +                emptyRows++;
 +            else
 +                goodRows++;
 +        }
 +
 +        if (sstableIterator.hasRowsOutOfOrder())
 +        {
 +            outputHandler.warn(String.format("Out of order rows found in partition: %s", key));
 +            outOfOrder.add(sstableIterator.getRowsOutOfOrder());
 +        }
  
          return true;
      }
  
+     /**
+      * Only wrap with {@link FixNegativeLocalDeletionTimeIterator} if {@link #reinsertOverflowedTTLRows} option
+      * is specified
+      */
 -    private OnDiskAtomIterator getIterator(DecoratedKey key)
++    private UnfilteredRowIterator getIterator(DecoratedKey key)
+     {
 -        SSTableIdentityIterator sstableIdentityIterator = new SSTableIdentityIterator(sstable, dataFile, key, checkData);
 -        return reinsertOverflowedTTLRows ? new FixNegativeLocalDeletionTimeIterator(sstableIdentityIterator,
++        RowMergingSSTableIterator rowMergingIterator = new RowMergingSSTableIterator(sstable, dataFile, key);
++        return reinsertOverflowedTTLRows ? new FixNegativeLocalDeletionTimeIterator(rowMergingIterator,
+                                                                                     outputHandler,
 -                                                                                    negativeLocalDeletionInfoMetrics) : sstableIdentityIterator;
++                                                                                    negativeLocalDeletionInfoMetrics) : rowMergingIterator;
+     }
+ 
      private void updateIndexKey()
      {
          currentIndexKey = nextIndexKey;
@@@ -477,49 -555,12 +505,55 @@@
          }
      }
  
+     public class NegativeLocalDeletionInfoMetrics
+     {
+         public volatile int fixedRows = 0;
+     }
+ 
      /**
 +     * During 2.x migration, under some circumstances rows might have gotten duplicated.
 +     * Merging iterator merges rows with same clustering.
 +     *
 +     * For more details, refer to CASSANDRA-12144.
 +     */
 +    private static class RowMergingSSTableIterator extends SSTableIdentityIterator
 +    {
 +        RowMergingSSTableIterator(SSTableReader sstable, RandomAccessReader file, DecoratedKey key)
 +        {
 +            super(sstable, file, key);
 +        }
 +
 +        @Override
 +        protected Unfiltered doCompute()
 +        {
 +            if (!iterator.hasNext())
 +                return endOfData();
 +
 +            Unfiltered next = iterator.next();
 +            if (!next.isRow())
 +                return next;
 +
 +            while (iterator.hasNext())
 +            {
 +                Unfiltered peek = iterator.peek();
 +                // If there was a duplicate row, merge it.
 +                if (next.clustering().equals(peek.clustering()) && peek.isRow())
 +                {
 +                    iterator.next(); // Make sure that the peeked item was consumed.
 +                    next = Rows.merge((Row) next, (Row) peek, FBUtilities.nowInSeconds());
 +                }
 +                else
 +                {
 +                    break;
 +                }
 +            }
 +
 +            return next;
 +        }
++
 +    }
 +
 +    /**
       * In some case like CASSANDRA-12127 the cells might have been stored in the wrong order. This decorator check the
       * cells order and collect the out of order cells to correct the problem.
       */
@@@ -571,53 -608,98 +605,199 @@@
          }
  
          @Override
 -        protected OnDiskAtom computeNext()
 +        public boolean isEmpty()
          {
 -            if (!iterator.hasNext())
 -                return endOfData();
 +            return iterator.isEmpty();
 +        }
  
 -            OnDiskAtom next = iterator.next();
 +        public void close()
 +        {
 +            iterator.close();
 +        }
  
 -            // If we detect that some cells are out of order we will store and sort the remaining once to insert them
 -            // in a separate SSTable.
 -            if (previous != null && comparator.compare(next, previous) < 0)
 -            {
 -                outOfOrderCells = collectOutOfOrderCells(next, iterator);
 -                return endOfData();
 -            }
 -            previous = next;
 -            return next;
 +        public DeletionTime partitionLevelDeletion()
 +        {
 +            return iterator.partitionLevelDeletion();
          }
  
 -        public boolean hasOutOfOrderCells()
 +        public EncodingStats stats()
          {
 -            return outOfOrderCells != null;
 +            return iterator.stats();
          }
  
 -        public ColumnFamily getOutOfOrderCells()
 +        public boolean hasRowsOutOfOrder()
          {
 -            return outOfOrderCells;
 +            return rowsOutOfOrder != null;
          }
  
 -        private static ColumnFamily collectOutOfOrderCells(OnDiskAtom atom, OnDiskAtomIterator iterator)
 +        public Partition getRowsOutOfOrder()
          {
 -            ColumnFamily cf = iterator.getColumnFamily().cloneMeShallow(ArrayBackedSortedColumns.factory, false);
 -            cf.addAtom(atom);
 -            while (iterator.hasNext())
 -                cf.addAtom(iterator.next());
 -            return cf;
 +            return rowsOutOfOrder;
 +        }
 +
 +        protected Unfiltered computeNext()
 +        {
 +            if (!iterator.hasNext())
 +                return endOfData();
 +
 +            Unfiltered next = iterator.next();
 +
 +            // If we detect that some rows are out of order we will store and sort the remaining ones to insert them
 +            // in a separate SSTable.
 +            if (previous != null && comparator.compare(next, previous) < 0)
 +            {
 +                rowsOutOfOrder = ImmutableBTreePartition.create(UnfilteredRowIterators.concat(next, iterator), false);
 +                return endOfData();
 +            }
 +            previous = next;
 +            return next;
          }
+     }
+ 
+     /**
 -     * This iterator converts negative {@link BufferExpiringCell#getLocalDeletionTime()} into {@link BufferExpiringCell#MAX_DELETION_TIME}
++     * This iterator converts negative {@link AbstractCell#localDeletionTime()} into {@link AbstractCell#MAX_DELETION_TIME}
+      *
+      * This is to recover entries with overflowed localExpirationTime due to CASSANDRA-14092
+      */
 -    private static final class FixNegativeLocalDeletionTimeIterator extends AbstractIterator<OnDiskAtom> implements OnDiskAtomIterator
++    private static final class FixNegativeLocalDeletionTimeIterator extends AbstractIterator<Unfiltered> implements UnfilteredRowIterator
+     {
+         /**
+          * The decorated iterator.
+          */
 -        private final OnDiskAtomIterator iterator;
++        private final UnfilteredRowIterator iterator;
+ 
+         private final OutputHandler outputHandler;
+         private final NegativeLocalDeletionInfoMetrics negativeLocalExpirationTimeMetrics;
+ 
 -        public FixNegativeLocalDeletionTimeIterator(OnDiskAtomIterator iterator, OutputHandler outputHandler,
++        public FixNegativeLocalDeletionTimeIterator(UnfilteredRowIterator iterator, OutputHandler outputHandler,
+                                                     NegativeLocalDeletionInfoMetrics negativeLocalDeletionInfoMetrics)
+         {
+             this.iterator = iterator;
+             this.outputHandler = outputHandler;
+             this.negativeLocalExpirationTimeMetrics = negativeLocalDeletionInfoMetrics;
+         }
+ 
 -        public ColumnFamily getColumnFamily()
++        public CFMetaData metadata()
+         {
 -            return iterator.getColumnFamily();
++            return iterator.metadata();
+         }
+ 
 -        public DecoratedKey getKey()
++        public boolean isReverseOrder()
+         {
 -            return iterator.getKey();
++            return iterator.isReverseOrder();
+         }
+ 
 -        public void close() throws IOException
++        public PartitionColumns columns()
+         {
 -            iterator.close();
++            return iterator.columns();
++        }
++
++        public DecoratedKey partitionKey()
++        {
++            return iterator.partitionKey();
++        }
++
++        public Row staticRow()
++        {
++            return iterator.staticRow();
+         }
+ 
+         @Override
 -        protected OnDiskAtom computeNext()
++        public boolean isEmpty()
++        {
++            return iterator.isEmpty();
++        }
++
++        public void close()
++        {
++            iterator.close();
++        }
++
++        public DeletionTime partitionLevelDeletion()
++        {
++            return iterator.partitionLevelDeletion();
++        }
++
++        public EncodingStats stats()
++        {
++            return iterator.stats();
++        }
++
++        protected Unfiltered computeNext()
+         {
+             if (!iterator.hasNext())
+                 return endOfData();
+ 
 -            OnDiskAtom next = iterator.next();
++            Unfiltered next = iterator.next();
++            if (!next.isRow())
++                return next;
+ 
 -            if (next instanceof ExpiringCell && next.getLocalDeletionTime() < 0)
++            if (hasNegativeLocalExpirationTime((Row) next))
+             {
 -                outputHandler.debug(String.format("Found cell with negative local expiration time: %s", ((ExpiringCell) next).getString(getColumnFamily().getComparator()), getColumnFamily()));
++                outputHandler.debug(String.format("Found row with negative local expiration time: %s", next.toString(metadata(), false)));
+                 negativeLocalExpirationTimeMetrics.fixedRows++;
 -                next = ((Cell) next).localCopy(getColumnFamily().metadata(), HeapAllocator.instance).withUpdatedTimestampAndLocalDeletionTime(next.timestamp() + 1, BufferExpiringCell.MAX_DELETION_TIME);
++                return fixNegativeLocalExpirationTime((Row) next);
+             }
+ 
+             return next;
+         }
++
++        private boolean hasNegativeLocalExpirationTime(Row next)
++        {
++            Row row = next;
++            if (row.primaryKeyLivenessInfo().isExpiring() && row.primaryKeyLivenessInfo().localExpirationTime() < 0)
++            {
++                return true;
++            }
++
++            for (ColumnData cd : row)
++            {
++                if (cd.column().isSimple())
++                {
++                    Cell cell = (Cell)cd;
++                    if (cell.isExpiring() && cell.localDeletionTime() < 0)
++                        return true;
++                }
++                else
++                {
++                    ComplexColumnData complexData = (ComplexColumnData)cd;
++                    for (Cell cell : complexData)
++                    {
++                        if (cell.isExpiring() && cell.localDeletionTime() < 0)
++                            return true;
++                    }
++                }
++            }
++
++            return false;
++        }
 +
++        private Unfiltered fixNegativeLocalExpirationTime(Row row)
++        {
++            Row.Builder builder = HeapAllocator.instance.cloningBTreeRowBuilder();
++            builder.newRow(row.clustering());
++            builder.addPrimaryKeyLivenessInfo(row.primaryKeyLivenessInfo().isExpiring() && row.primaryKeyLivenessInfo().localExpirationTime() < 0 ?
++                                              row.primaryKeyLivenessInfo().withUpdatedTimestampAndLocalDeletionTime(row.primaryKeyLivenessInfo().timestamp() + 1, AbstractCell.MAX_DELETION_TIME)
++                                              :row.primaryKeyLivenessInfo());
++            builder.addRowDeletion(row.deletion());
++            for (ColumnData cd : row)
++            {
++                if (cd.column().isSimple())
++                {
++                    Cell cell = (Cell)cd;
++                    builder.addCell(cell.isExpiring() && cell.localDeletionTime() < 0 ? cell.withUpdatedTimestampAndLocalDeletionTime(cell.timestamp() + 1, AbstractCell.MAX_DELETION_TIME) : cell);
++                }
++                else
++                {
++                    ComplexColumnData complexData = (ComplexColumnData)cd;
++                    builder.addComplexDeletion(complexData.column(), complexData.complexDeletion());
++                    for (Cell cell : complexData)
++                    {
++                        builder.addCell(cell.isExpiring() && cell.localDeletionTime() < 0 ? cell.withUpdatedTimestampAndLocalDeletionTime(cell.timestamp() + 1, AbstractCell.MAX_DELETION_TIME) : cell);
++                    }
++                }
++            }
++            return builder.build();
++        }
      }
  }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c231ed5b/src/java/org/apache/cassandra/db/rows/BufferCell.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/rows/BufferCell.java
index 82ae02c,0000000..df2619c
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/db/rows/BufferCell.java
+++ b/src/java/org/apache/cassandra/db/rows/BufferCell.java
@@@ -1,365 -1,0 +1,370 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.db.rows;
 +
 +import java.io.IOException;
 +import java.nio.ByteBuffer;
 +
 +import org.apache.cassandra.config.*;
 +import org.apache.cassandra.db.*;
 +import org.apache.cassandra.db.context.CounterContext;
 +import org.apache.cassandra.db.marshal.ByteType;
 +import org.apache.cassandra.io.util.DataInputPlus;
 +import org.apache.cassandra.io.util.DataOutputPlus;
 +import org.apache.cassandra.utils.ByteBufferUtil;
 +import org.apache.cassandra.utils.ObjectSizes;
 +import org.apache.cassandra.utils.FBUtilities;
 +import org.apache.cassandra.utils.memory.AbstractAllocator;
 +
 +public class BufferCell extends AbstractCell
 +{
 +    private static final long EMPTY_SIZE = ObjectSizes.measure(new BufferCell(ColumnDefinition.regularDef("", "", "", ByteType.instance), 0L, 0, 0, ByteBufferUtil.EMPTY_BYTE_BUFFER, null));
 +
 +    private final long timestamp;
 +    private final int ttl;
 +    private final int localDeletionTime;
 +
 +    private final ByteBuffer value;
 +    private final CellPath path;
 +
 +    public BufferCell(ColumnDefinition column, long timestamp, int ttl, int localDeletionTime, ByteBuffer value, CellPath path)
 +    {
 +        super(column);
 +        assert column.isComplex() == (path != null);
 +        this.timestamp = timestamp;
 +        this.ttl = ttl;
 +        this.localDeletionTime = localDeletionTime;
 +        this.value = value;
 +        this.path = path;
 +    }
 +
 +    public static BufferCell live(CFMetaData metadata, ColumnDefinition column, long timestamp, ByteBuffer value)
 +    {
 +        return live(metadata, column, timestamp, value, null);
 +    }
 +
 +    public static BufferCell live(CFMetaData metadata, ColumnDefinition column, long timestamp, ByteBuffer value, CellPath path)
 +    {
 +        if (metadata.params.defaultTimeToLive != NO_TTL)
 +            return expiring(column, timestamp, metadata.params.defaultTimeToLive, FBUtilities.nowInSeconds(), value, path);
 +
 +        return new BufferCell(column, timestamp, NO_TTL, NO_DELETION_TIME, value, path);
 +    }
 +
 +    public static BufferCell expiring(ColumnDefinition column, long timestamp, int ttl, int nowInSec, ByteBuffer value)
 +    {
 +        return expiring(column, timestamp, ttl, nowInSec, value, null);
 +    }
 +
 +    public static BufferCell expiring(ColumnDefinition column, long timestamp, int ttl, int nowInSec, ByteBuffer value, CellPath path)
 +    {
 +        assert ttl != NO_TTL;
-         return new BufferCell(column, timestamp, ttl, nowInSec + ttl, value, path);
++        return new BufferCell(column, timestamp, ttl, ExpirationDateOverflowHandling.computeLocalExpirationTime(nowInSec, ttl), value, path);
 +    }
 +
 +    public static BufferCell tombstone(ColumnDefinition column, long timestamp, int nowInSec)
 +    {
 +        return tombstone(column, timestamp, nowInSec, null);
 +    }
 +
 +    public static BufferCell tombstone(ColumnDefinition column, long timestamp, int nowInSec, CellPath path)
 +    {
 +        return new BufferCell(column, timestamp, NO_TTL, nowInSec, ByteBufferUtil.EMPTY_BYTE_BUFFER, path);
 +    }
 +
 +    public boolean isCounterCell()
 +    {
 +        return !isTombstone() && column.isCounterColumn();
 +    }
 +
 +    public boolean isLive(int nowInSec)
 +    {
 +        return localDeletionTime == NO_DELETION_TIME || (ttl != NO_TTL && nowInSec < localDeletionTime);
 +    }
 +
 +    public boolean isTombstone()
 +    {
 +        return localDeletionTime != NO_DELETION_TIME && ttl == NO_TTL;
 +    }
 +
 +    public boolean isExpiring()
 +    {
 +        return ttl != NO_TTL;
 +    }
 +
 +    public long timestamp()
 +    {
 +        return timestamp;
 +    }
 +
 +    public int ttl()
 +    {
 +        return ttl;
 +    }
 +
 +    public int localDeletionTime()
 +    {
 +        return localDeletionTime;
 +    }
 +
 +    public ByteBuffer value()
 +    {
 +        return value;
 +    }
 +
 +    public CellPath path()
 +    {
 +        return path;
 +    }
 +
 +    public Cell withUpdatedColumn(ColumnDefinition newColumn)
 +    {
 +        return new BufferCell(newColumn, timestamp, ttl, localDeletionTime, value, path);
 +    }
 +
 +    public Cell withUpdatedValue(ByteBuffer newValue)
 +    {
 +        return new BufferCell(column, timestamp, ttl, localDeletionTime, newValue, path);
 +    }
 +
++    public Cell withUpdatedTimestampAndLocalDeletionTime(long newTimestamp, int newLocalDeletionTime)
++    {
++        return new BufferCell(column, newTimestamp, ttl, newLocalDeletionTime, value, path);
++    }
++
 +    public Cell copy(AbstractAllocator allocator)
 +    {
 +        if (!value.hasRemaining())
 +            return this;
 +
 +        return new BufferCell(column, timestamp, ttl, localDeletionTime, allocator.clone(value), path == null ? null : path.copy(allocator));
 +    }
 +
 +    public Cell markCounterLocalToBeCleared()
 +    {
 +        if (!isCounterCell())
 +            return this;
 +
 +        ByteBuffer marked = CounterContext.instance().markLocalToBeCleared(value());
 +        return marked == value() ? this : new BufferCell(column, timestamp, ttl, localDeletionTime, marked, path);
 +    }
 +
 +    public Cell purge(DeletionPurger purger, int nowInSec)
 +    {
 +        if (!isLive(nowInSec))
 +        {
 +            if (purger.shouldPurge(timestamp, localDeletionTime))
 +                return null;
 +
 +            // We slightly hijack purging to convert expired but not purgeable columns to tombstones. The reason we do that is
 +            // that once a column has expired it is equivalent to a tombstone but actually using a tombstone is more compact since
 +            // we don't keep the column value. The reason we do it here is that 1) it's somewhat related to dealing with tombstones
 +            // so hopefully not too surprising and 2) we want to this and purging at the same places, so it's simpler/more efficient
 +            // to do both here.
 +            if (isExpiring())
 +            {
 +                // Note that as long as the expiring column and the tombstone put together live longer than GC grace seconds,
 +                // we'll fulfil our responsibility to repair. See discussion at
 +                // http://cassandra-user-incubator-apache-org.3065146.n2.nabble.com/repair-compaction-and-tombstone-rows-td7583481.html
 +                return BufferCell.tombstone(column, timestamp, localDeletionTime - ttl, path).purge(purger, nowInSec);
 +            }
 +        }
 +        return this;
 +    }
 +
 +    public Cell updateAllTimestamp(long newTimestamp)
 +    {
 +        return new BufferCell(column, isTombstone() ? newTimestamp - 1 : newTimestamp, ttl, localDeletionTime, value, path);
 +    }
 +
 +    public int dataSize()
 +    {
 +        return TypeSizes.sizeof(timestamp)
 +             + TypeSizes.sizeof(ttl)
 +             + TypeSizes.sizeof(localDeletionTime)
 +             + value.remaining()
 +             + (path == null ? 0 : path.dataSize());
 +    }
 +
 +    public long unsharedHeapSizeExcludingData()
 +    {
 +        return EMPTY_SIZE + ObjectSizes.sizeOnHeapExcludingData(value) + (path == null ? 0 : path.unsharedHeapSizeExcludingData());
 +    }
 +
 +    /**
 +     * The serialization format for cell is:
 +     *     [ flags ][ timestamp ][ deletion time ][    ttl    ][ path size ][ path ][ value size ][ value ]
 +     *     [   1b  ][ 8b (vint) ][   4b (vint)   ][ 4b (vint) ][ 4b (vint) ][  arb ][  4b (vint) ][  arb  ]
 +     *
 +     * where not all field are always present (in fact, only the [ flags ] are guaranteed to be present). The fields have the following
 +     * meaning:
 +     *   - [ flags ] is the cell flags. It is a byte for which each bit represents a flag whose meaning is explained below (*_MASK constants)
 +     *   - [ timestamp ] is the cell timestamp. Present unless the cell has the USE_TIMESTAMP_MASK.
 +     *   - [ deletion time]: the local deletion time for the cell. Present if either the cell is deleted (IS_DELETED_MASK)
 +     *       or it is expiring (IS_EXPIRING_MASK) but doesn't have the USE_ROW_TTL_MASK.
 +     *   - [ ttl ]: the ttl for the cell. Present if the row is expiring (IS_EXPIRING_MASK) but doesn't have the
 +     *       USE_ROW_TTL_MASK.
 +     *   - [ value size ] is the size of the [ value ] field. It's present unless either the cell has the HAS_EMPTY_VALUE_MASK, or the value
 +     *       for columns of this type have a fixed length.
 +     *   - [ path size ] is the size of the [ path ] field. Present iff this is the cell of a complex column.
 +     *   - [ value ]: the cell value, unless it has the HAS_EMPTY_VALUE_MASK.
 +     *   - [ path ]: the cell path if the column this is a cell of is complex.
 +     */
 +    static class Serializer implements Cell.Serializer
 +    {
 +        private final static int IS_DELETED_MASK             = 0x01; // Whether the cell is a tombstone or not.
 +        private final static int IS_EXPIRING_MASK            = 0x02; // Whether the cell is expiring.
 +        private final static int HAS_EMPTY_VALUE_MASK        = 0x04; // Wether the cell has an empty value. This will be the case for tombstone in particular.
 +        private final static int USE_ROW_TIMESTAMP_MASK      = 0x08; // Wether the cell has the same timestamp than the row this is a cell of.
 +        private final static int USE_ROW_TTL_MASK            = 0x10; // Wether the cell has the same ttl than the row this is a cell of.
 +
 +        public void serialize(Cell cell, ColumnDefinition column, DataOutputPlus out, LivenessInfo rowLiveness, SerializationHeader header) throws IOException
 +        {
 +            assert cell != null;
 +            boolean hasValue = cell.value().hasRemaining();
 +            boolean isDeleted = cell.isTombstone();
 +            boolean isExpiring = cell.isExpiring();
 +            boolean useRowTimestamp = !rowLiveness.isEmpty() && cell.timestamp() == rowLiveness.timestamp();
 +            boolean useRowTTL = isExpiring && rowLiveness.isExpiring() && cell.ttl() == rowLiveness.ttl() && cell.localDeletionTime() == rowLiveness.localExpirationTime();
 +            int flags = 0;
 +            if (!hasValue)
 +                flags |= HAS_EMPTY_VALUE_MASK;
 +
 +            if (isDeleted)
 +                flags |= IS_DELETED_MASK;
 +            else if (isExpiring)
 +                flags |= IS_EXPIRING_MASK;
 +
 +            if (useRowTimestamp)
 +                flags |= USE_ROW_TIMESTAMP_MASK;
 +            if (useRowTTL)
 +                flags |= USE_ROW_TTL_MASK;
 +
 +            out.writeByte((byte)flags);
 +
 +            if (!useRowTimestamp)
 +                header.writeTimestamp(cell.timestamp(), out);
 +
 +            if ((isDeleted || isExpiring) && !useRowTTL)
 +                header.writeLocalDeletionTime(cell.localDeletionTime(), out);
 +            if (isExpiring && !useRowTTL)
 +                header.writeTTL(cell.ttl(), out);
 +
 +            if (column.isComplex())
 +                column.cellPathSerializer().serialize(cell.path(), out);
 +
 +            if (hasValue)
 +                header.getType(column).writeValue(cell.value(), out);
 +        }
 +
 +        public Cell deserialize(DataInputPlus in, LivenessInfo rowLiveness, ColumnDefinition column, SerializationHeader header, SerializationHelper helper) throws IOException
 +        {
 +            int flags = in.readUnsignedByte();
 +            boolean hasValue = (flags & HAS_EMPTY_VALUE_MASK) == 0;
 +            boolean isDeleted = (flags & IS_DELETED_MASK) != 0;
 +            boolean isExpiring = (flags & IS_EXPIRING_MASK) != 0;
 +            boolean useRowTimestamp = (flags & USE_ROW_TIMESTAMP_MASK) != 0;
 +            boolean useRowTTL = (flags & USE_ROW_TTL_MASK) != 0;
 +
 +            long timestamp = useRowTimestamp ? rowLiveness.timestamp() : header.readTimestamp(in);
 +
 +            int localDeletionTime = useRowTTL
 +                                  ? rowLiveness.localExpirationTime()
 +                                  : (isDeleted || isExpiring ? header.readLocalDeletionTime(in) : NO_DELETION_TIME);
 +
 +            int ttl = useRowTTL ? rowLiveness.ttl() : (isExpiring ? header.readTTL(in) : NO_TTL);
 +
 +            CellPath path = column.isComplex()
 +                          ? column.cellPathSerializer().deserialize(in)
 +                          : null;
 +
 +            boolean isCounter = localDeletionTime == NO_DELETION_TIME && column.type.isCounter();
 +
 +            ByteBuffer value = ByteBufferUtil.EMPTY_BYTE_BUFFER;
 +            if (hasValue)
 +            {
 +                if (helper.canSkipValue(column) || (path != null && helper.canSkipValue(path)))
 +                {
 +                    header.getType(column).skipValue(in);
 +                }
 +                else
 +                {
 +                    value = header.getType(column).readValue(in, DatabaseDescriptor.getMaxValueSize());
 +                    if (isCounter)
 +                        value = helper.maybeClearCounterValue(value);
 +                }
 +            }
 +
 +            return new BufferCell(column, timestamp, ttl, localDeletionTime, value, path);
 +        }
 +
 +        public long serializedSize(Cell cell, ColumnDefinition column, LivenessInfo rowLiveness, SerializationHeader header)
 +        {
 +            long size = 1; // flags
 +            boolean hasValue = cell.value().hasRemaining();
 +            boolean isDeleted = cell.isTombstone();
 +            boolean isExpiring = cell.isExpiring();
 +            boolean useRowTimestamp = !rowLiveness.isEmpty() && cell.timestamp() == rowLiveness.timestamp();
 +            boolean useRowTTL = isExpiring && rowLiveness.isExpiring() && cell.ttl() == rowLiveness.ttl() && cell.localDeletionTime() == rowLiveness.localExpirationTime();
 +
 +            if (!useRowTimestamp)
 +                size += header.timestampSerializedSize(cell.timestamp());
 +
 +            if ((isDeleted || isExpiring) && !useRowTTL)
 +                size += header.localDeletionTimeSerializedSize(cell.localDeletionTime());
 +            if (isExpiring && !useRowTTL)
 +                size += header.ttlSerializedSize(cell.ttl());
 +
 +            if (column.isComplex())
 +                size += column.cellPathSerializer().serializedSize(cell.path());
 +
 +            if (hasValue)
 +                size += header.getType(column).writtenLength(cell.value());
 +
 +            return size;
 +        }
 +
 +        // Returns if the skipped cell was an actual cell (i.e. it had its presence flag).
 +        public boolean skip(DataInputPlus in, ColumnDefinition column, SerializationHeader header) throws IOException
 +        {
 +            int flags = in.readUnsignedByte();
 +            boolean hasValue = (flags & HAS_EMPTY_VALUE_MASK) == 0;
 +            boolean isDeleted = (flags & IS_DELETED_MASK) != 0;
 +            boolean isExpiring = (flags & IS_EXPIRING_MASK) != 0;
 +            boolean useRowTimestamp = (flags & USE_ROW_TIMESTAMP_MASK) != 0;
 +            boolean useRowTTL = (flags & USE_ROW_TTL_MASK) != 0;
 +
 +            if (!useRowTimestamp)
 +                header.skipTimestamp(in);
 +
 +            if (!useRowTTL && (isDeleted || isExpiring))
 +                header.skipLocalDeletionTime(in);
 +
 +            if (!useRowTTL && isExpiring)
 +                header.skipTTL(in);
 +
 +            if (column.isComplex())
 +                column.cellPathSerializer().skip(in);
 +
 +            if (hasValue)
 +                header.getType(column).skipValue(in);
 +
 +            return true;
 +        }
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c231ed5b/src/java/org/apache/cassandra/db/rows/Cell.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/rows/Cell.java
index d10cc74,0000000..c69e11f
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/db/rows/Cell.java
+++ b/src/java/org/apache/cassandra/db/rows/Cell.java
@@@ -1,157 -1,0 +1,166 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.db.rows;
 +
 +import java.io.IOException;
 +import java.nio.ByteBuffer;
 +import java.util.Comparator;
 +
++import com.google.common.annotations.VisibleForTesting;
++import org.slf4j.Logger;
++import org.slf4j.LoggerFactory;
++
++import org.apache.cassandra.config.CFMetaData;
 +import org.apache.cassandra.config.ColumnDefinition;
++import org.apache.cassandra.cql3.Attributes;
 +import org.apache.cassandra.db.*;
 +import org.apache.cassandra.io.util.DataOutputPlus;
 +import org.apache.cassandra.io.util.DataInputPlus;
 +import org.apache.cassandra.utils.memory.AbstractAllocator;
 +
 +/**
 + * A cell is our atomic unit for a single value of a single column.
 + * <p>
 + * A cell always holds at least a timestamp that gives us how the cell reconcile. We then
 + * have 3 main types of cells:
 + *   1) live regular cells: those will also have a value and, if for a complex column, a path.
 + *   2) expiring cells: on top of regular cells, those have a ttl and a local deletion time (when they are expired).
 + *   3) tombstone cells: those won't have value, but they have a local deletion time (when the tombstone was created).
 + */
 +public abstract class Cell extends ColumnData
 +{
 +    public static final int NO_TTL = 0;
 +    public static final int NO_DELETION_TIME = Integer.MAX_VALUE;
++    public static final int MAX_DELETION_TIME = Integer.MAX_VALUE - 1;
 +
 +    public final static Comparator<Cell> comparator = (c1, c2) ->
 +    {
 +        int cmp = c1.column().compareTo(c2.column());
 +        if (cmp != 0)
 +            return cmp;
 +
 +        Comparator<CellPath> pathComparator = c1.column().cellPathComparator();
 +        return pathComparator == null ? 0 : pathComparator.compare(c1.path(), c2.path());
 +    };
 +
 +    public static final Serializer serializer = new BufferCell.Serializer();
 +
 +    protected Cell(ColumnDefinition column)
 +    {
 +        super(column);
 +    }
 +
 +    /**
 +     * Whether the cell is a counter cell or not.
 +     *
 +     * @return whether the cell is a counter cell or not.
 +     */
 +    public abstract boolean isCounterCell();
 +
 +    /**
 +     * The cell value.
 +     *
 +     * @return the cell value.
 +     */
 +    public abstract ByteBuffer value();
 +
 +    /**
 +     * The cell timestamp.
 +     * <p>
 +     * @return the cell timestamp.
 +     */
 +    public abstract long timestamp();
 +
 +    /**
 +     * The cell ttl.
 +     *
 +     * @return the cell ttl, or {@code NO_TTL} if the cell isn't an expiring one.
 +     */
 +    public abstract int ttl();
 +
 +    /**
 +     * The cell local deletion time.
 +     *
 +     * @return the cell local deletion time, or {@code NO_DELETION_TIME} if the cell is neither
 +     * a tombstone nor an expiring one.
 +     */
 +    public abstract int localDeletionTime();
 +
 +    /**
 +     * Whether the cell is a tombstone or not.
 +     *
 +     * @return whether the cell is a tombstone or not.
 +     */
 +    public abstract boolean isTombstone();
 +
 +    /**
 +     * Whether the cell is an expiring one or not.
 +     * <p>
 +     * Note that this only correspond to whether the cell liveness info
 +     * have a TTL or not, but doesn't tells whether the cell is already expired
 +     * or not. You should use {@link #isLive} for that latter information.
 +     *
 +     * @return whether the cell is an expiring one or not.
 +     */
 +    public abstract boolean isExpiring();
 +
 +    /**
 +     * Whether the cell is live or not given the current time.
 +     *
 +     * @param nowInSec the current time in seconds. This is used to
 +     * decide if an expiring cell is expired or live.
 +     * @return whether the cell is live or not at {@code nowInSec}.
 +     */
 +    public abstract boolean isLive(int nowInSec);
 +
 +    /**
 +     * For cells belonging to complex types (non-frozen collection and UDT), the
 +     * path to the cell.
 +     *
 +     * @return the cell path for cells of complex column, and {@code null} for other cells.
 +     */
 +    public abstract CellPath path();
 +
 +    public abstract Cell withUpdatedColumn(ColumnDefinition newColumn);
 +
 +    public abstract Cell withUpdatedValue(ByteBuffer newValue);
 +
++    public abstract Cell withUpdatedTimestampAndLocalDeletionTime(long newTimestamp, int newLocalDeletionTime);
++
 +    public abstract Cell copy(AbstractAllocator allocator);
 +
 +    @Override
 +    // Overrides super type to provide a more precise return type.
 +    public abstract Cell markCounterLocalToBeCleared();
 +
 +    @Override
 +    // Overrides super type to provide a more precise return type.
 +    public abstract Cell purge(DeletionPurger purger, int nowInSec);
 +
 +    public interface Serializer
 +    {
 +        public void serialize(Cell cell, ColumnDefinition column, DataOutputPlus out, LivenessInfo rowLiveness, SerializationHeader header) throws IOException;
 +
 +        public Cell deserialize(DataInputPlus in, LivenessInfo rowLiveness, ColumnDefinition column, SerializationHeader header, SerializationHelper helper) throws IOException;
 +
 +        public long serializedSize(Cell cell, ColumnDefinition column, LivenessInfo rowLiveness, SerializationHeader header);
 +
 +        // Returns if the skipped cell was an actual cell (i.e. it had its presence flag).
 +        public boolean skip(DataInputPlus in, ColumnDefinition column, SerializationHeader header) throws IOException;
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c231ed5b/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/StorageService.java
index e5a50dd,2c9ac4d..cf8e257
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@@ -2705,22 -2599,28 +2705,27 @@@ public class StorageService extends Not
          return status.statusCode;
      }
  
 -    public int scrub(boolean disableSnapshot, boolean skipCorrupted, String keyspaceName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException
 +    public int scrub(boolean disableSnapshot, boolean skipCorrupted, String keyspaceName, String... tables) throws IOException, ExecutionException, InterruptedException
      {
 -        return scrub(disableSnapshot, skipCorrupted, true, 0, keyspaceName, columnFamilies);
 +        return scrub(disableSnapshot, skipCorrupted, true, 0, keyspaceName, tables);
      }
  
 -    public int scrub(boolean disableSnapshot, boolean skipCorrupted, boolean checkData, String keyspaceName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException
 +    public int scrub(boolean disableSnapshot, boolean skipCorrupted, boolean checkData, String keyspaceName, String... tables) throws IOException, ExecutionException, InterruptedException
      {
 -        return scrub(disableSnapshot, skipCorrupted, checkData, 0, keyspaceName, columnFamilies);
 +        return scrub(disableSnapshot, skipCorrupted, checkData, 0, keyspaceName, tables);
      }
  
 -    public int scrub(boolean disableSnapshot, boolean skipCorrupted, boolean checkData, int jobs, String keyspaceName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException
 +    public int scrub(boolean disableSnapshot, boolean skipCorrupted, boolean checkData, int jobs, String keyspaceName, String... tables) throws IOException, ExecutionException, InterruptedException
      {
 -        return scrub(disableSnapshot, skipCorrupted, checkData, false, jobs, keyspaceName, columnFamilies);
++        return scrub(disableSnapshot, skipCorrupted, checkData, false, jobs, keyspaceName, tables);
+     }
+ 
 -    public int scrub(boolean disableSnapshot, boolean skipCorrupted, boolean checkData, boolean reinsertOverflowedTTLRows,
 -                     int jobs, String keyspaceName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException
++    public int scrub(boolean disableSnapshot, boolean skipCorrupted, boolean checkData, boolean reinsertOverflowedTTL, int jobs, String keyspaceName, String... tables) throws IOException, ExecutionException, InterruptedException
+     {
          CompactionManager.AllSSTableOpStatus status = CompactionManager.AllSSTableOpStatus.SUCCESSFUL;
 -        for (ColumnFamilyStore cfStore : getValidColumnFamilies(true, false, keyspaceName, columnFamilies))
 +        for (ColumnFamilyStore cfStore : getValidColumnFamilies(true, false, keyspaceName, tables))
          {
-             CompactionManager.AllSSTableOpStatus oneStatus = cfStore.scrub(disableSnapshot, skipCorrupted, checkData, jobs);
 -            CompactionManager.AllSSTableOpStatus oneStatus = cfStore.scrub(disableSnapshot, skipCorrupted, checkData, reinsertOverflowedTTLRows, jobs);
++            CompactionManager.AllSSTableOpStatus oneStatus = cfStore.scrub(disableSnapshot, skipCorrupted, reinsertOverflowedTTL, checkData, jobs);
              if (oneStatus != CompactionManager.AllSSTableOpStatus.SUCCESSFUL)
                  status = oneStatus;
          }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c231ed5b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/StorageServiceMBean.java
index 7344ca8,f336bcc..10d47f7
--- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java
+++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
@@@ -262,14 -262,17 +262,17 @@@ public interface StorageServiceMBean ex
       * Scrubbed CFs will be snapshotted first, if disableSnapshot is false
       */
      @Deprecated
 -    public int scrub(boolean disableSnapshot, boolean skipCorrupted, String keyspaceName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException;
 +    public int scrub(boolean disableSnapshot, boolean skipCorrupted, String keyspaceName, String... tableNames) throws IOException, ExecutionException, InterruptedException;
      @Deprecated
 -    public int scrub(boolean disableSnapshot, boolean skipCorrupted, boolean checkData, String keyspaceName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException;
 +    public int scrub(boolean disableSnapshot, boolean skipCorrupted, boolean checkData, String keyspaceName, String... tableNames) throws IOException, ExecutionException, InterruptedException;
+     @Deprecated
      public int scrub(boolean disableSnapshot, boolean skipCorrupted, boolean checkData, int jobs, String keyspaceName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException;
  
 -public int scrub(boolean disableSnapshot, boolean skipCorrupted, boolean checkData, boolean reinsertOverflowedTTLRows, int jobs, String keyspaceName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException;
++    public int scrub(boolean disableSnapshot, boolean skipCorrupted, boolean checkData, boolean reinsertOverflowedTTL, int jobs, String keyspaceName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException;
+ 
      /**
       * Verify (checksums of) the given keyspace.
 -     * If columnFamilies array is empty, all CFs are verified.
 +     * If tableNames array is empty, all CFs are verified.
       *
       * The entire sstable will be read to ensure each cell validates if extendedVerify is true
       */

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c231ed5b/src/java/org/apache/cassandra/thrift/ThriftValidation.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/thrift/ThriftValidation.java
index 6ad791d,8bdf9dc..2ab0330
--- a/src/java/org/apache/cassandra/thrift/ThriftValidation.java
+++ b/src/java/org/apache/cassandra/thrift/ThriftValidation.java
@@@ -332,9 -315,9 +332,9 @@@ public class ThriftValidatio
              if (isCommutative)
                  throw new org.apache.cassandra.exceptions.InvalidRequestException("invalid operation for commutative table " + metadata.cfName);
  
-             validateTtl(cosc.column);
+             validateTtl(metadata, cosc.column);
              validateColumnPath(metadata, new ColumnPath(metadata.cfName).setSuper_column((ByteBuffer)null).setColumn(cosc.column.name));
 -            validateColumnData(metadata, key, null, cosc.column);
 +            validateColumnData(metadata, null, cosc.column);
          }
  
          if (cosc.super_column != null)
@@@ -374,11 -357,13 +374,13 @@@
              if (column.ttl <= 0)
                  throw new org.apache.cassandra.exceptions.InvalidRequestException("ttl must be positive");
  
 -            if (column.ttl > ExpiringCell.MAX_TTL)
 -                throw new org.apache.cassandra.exceptions.InvalidRequestException(String.format("ttl is too large. requested (%d) maximum (%d)", column.ttl, ExpiringCell.MAX_TTL));
 -            Attributes.maybeApplyExpirationDateOverflowPolicy(metadata, column.ttl, false);
 +            if (column.ttl > Attributes.MAX_TTL)
 +                throw new org.apache.cassandra.exceptions.InvalidRequestException(String.format("ttl is too large. requested (%d) maximum (%d)", column.ttl, Attributes.MAX_TTL));
++            ExpirationDateOverflowHandling.maybeApplyExpirationDateOverflowPolicy(metadata, column.ttl, false);
          }
          else
          {
 -            Attributes.maybeApplyExpirationDateOverflowPolicy(metadata, metadata.getDefaultTimeToLive(), true);
++            ExpirationDateOverflowHandling.maybeApplyExpirationDateOverflowPolicy(metadata, metadata.params.defaultTimeToLive, true);
              // if it's not set, then it should be zero -- here we are just checking to make sure Thrift doesn't change that contract with us.
              assert column.ttl == 0;
          }
@@@ -450,9 -435,9 +452,9 @@@
      /**
       * Validates the data part of the column (everything in the column object but the name, which is assumed to be valid)
       */
 -    public static void validateColumnData(CFMetaData metadata, ByteBuffer key, ByteBuffer scName, Column column) throws org.apache.cassandra.exceptions.InvalidRequestException
 +    public static void validateColumnData(CFMetaData metadata, ByteBuffer scName, Column column) throws org.apache.cassandra.exceptions.InvalidRequestException
      {
-         validateTtl(column);
+         validateTtl(metadata, column);
          if (!column.isSetValue())
              throw new org.apache.cassandra.exceptions.InvalidRequestException("Column value is required");
          if (!column.isSetTimestamp())

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c231ed5b/src/java/org/apache/cassandra/tools/NodeProbe.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/tools/NodeProbe.java
index 172b505,17bef02..0d3c078
--- a/src/java/org/apache/cassandra/tools/NodeProbe.java
+++ b/src/java/org/apache/cassandra/tools/NodeProbe.java
@@@ -235,35 -228,27 +235,35 @@@ public class NodeProbe implements AutoC
  
      public void close() throws IOException
      {
 -        jmxc.close();
 +        try
 +        {
 +            jmxc.close();
 +        }
 +        catch (ConnectException e)
 +        {
 +            // result of 'stopdaemon' command - i.e. if close() call fails, the daemon is shutdown
 +            System.out.println("Cassandra has shutdown.");
 +        }
      }
  
 -    public int forceKeyspaceCleanup(int jobs, String keyspaceName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException
 +    public int forceKeyspaceCleanup(int jobs, String keyspaceName, String... tables) throws IOException, ExecutionException, InterruptedException
      {
 -        return ssProxy.forceKeyspaceCleanup(jobs, keyspaceName, columnFamilies);
 +        return ssProxy.forceKeyspaceCleanup(jobs, keyspaceName, tables);
      }
  
-     public int scrub(boolean disableSnapshot, boolean skipCorrupted, boolean checkData, int jobs, String keyspaceName, String... tables) throws IOException, ExecutionException, InterruptedException
 -    public int scrub(boolean disableSnapshot, boolean skipCorrupted, boolean checkData, boolean reinsertOverflowedTTLRows, int jobs, String keyspaceName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException
++    public int scrub(boolean disableSnapshot, boolean skipCorrupted, boolean checkData, boolean reinsertOverflowedTTL, int jobs, String keyspaceName, String... tables) throws IOException, ExecutionException, InterruptedException
      {
-         return ssProxy.scrub(disableSnapshot, skipCorrupted, checkData, jobs, keyspaceName, tables);
 -        return ssProxy.scrub(disableSnapshot, skipCorrupted, checkData, reinsertOverflowedTTLRows, jobs, keyspaceName, columnFamilies);
++        return ssProxy.scrub(disableSnapshot, skipCorrupted, checkData, reinsertOverflowedTTL, jobs, keyspaceName, tables);
      }
  
 -    public int verify(boolean extendedVerify, String keyspaceName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException
 +    public int verify(boolean extendedVerify, String keyspaceName, String... tableNames) throws IOException, ExecutionException, InterruptedException
      {
 -        return ssProxy.verify(extendedVerify, keyspaceName, columnFamilies);
 +        return ssProxy.verify(extendedVerify, keyspaceName, tableNames);
      }
  
 -    public int upgradeSSTables(String keyspaceName, boolean excludeCurrentVersion, int jobs, String... columnFamilies) throws IOException, ExecutionException, InterruptedException
 +    public int upgradeSSTables(String keyspaceName, boolean excludeCurrentVersion, int jobs, String... tableNames) throws IOException, ExecutionException, InterruptedException
      {
 -        return ssProxy.upgradeSSTables(keyspaceName, excludeCurrentVersion, jobs, columnFamilies);
 +        return ssProxy.upgradeSSTables(keyspaceName, excludeCurrentVersion, jobs, tableNames);
      }
  
      private void checkJobs(PrintStream out, int jobs)
@@@ -288,19 -267,13 +288,19 @@@
          }
      }
  
-     public void scrub(PrintStream out, boolean disableSnapshot, boolean skipCorrupted, boolean checkData, int jobs, String keyspaceName, String... tables) throws IOException, ExecutionException, InterruptedException
 -    public void scrub(PrintStream out, boolean disableSnapshot, boolean skipCorrupted, boolean checkData, boolean reinsertOverflowedTTLRows, int jobs, String keyspaceName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException
++    public void scrub(PrintStream out, boolean disableSnapshot, boolean skipCorrupted, boolean checkData, boolean reinsertOverflowedTTL, int jobs, String keyspaceName, String... tables) throws IOException, ExecutionException, InterruptedException
      {
          checkJobs(out, jobs);
-         switch (scrub(disableSnapshot, skipCorrupted, checkData, jobs, keyspaceName, tables))
 -        if (scrub(disableSnapshot, skipCorrupted, checkData, reinsertOverflowedTTLRows, jobs, keyspaceName, columnFamilies) != 0)
++        switch (ssProxy.scrub(disableSnapshot, skipCorrupted, checkData, reinsertOverflowedTTL, jobs, keyspaceName, tables))
          {
 -            failed = true;
 -            out.println("Aborted scrubbing at least one table in keyspace "+keyspaceName+", check server logs for more information.");
 +            case 1:
 +                failed = true;
 +                out.println("Aborted scrubbing at least one table in keyspace "+keyspaceName+", check server logs for more information.");
 +                break;
 +            case 2:
 +                failed = true;
 +                out.println("Failed marking some sstables compacting in keyspace "+keyspaceName+", check server logs for more information");
 +                break;
          }
      }
  

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c231ed5b/src/java/org/apache/cassandra/tools/SSTableMetadataViewer.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/tools/SSTableMetadataViewer.java
index 6076e32,8319014..19af957
--- a/src/java/org/apache/cassandra/tools/SSTableMetadataViewer.java
+++ b/src/java/org/apache/cassandra/tools/SSTableMetadataViewer.java
@@@ -90,12 -65,13 +90,13 @@@ public class SSTableMetadataViewe
                  {
                      out.printf("Minimum timestamp: %s%n", stats.minTimestamp);
                      out.printf("Maximum timestamp: %s%n", stats.maxTimestamp);
++                    out.printf("SSTable min local deletion time: %s%n", stats.minLocalDeletionTime);
                      out.printf("SSTable max local deletion time: %s%n", stats.maxLocalDeletionTime);
                      out.printf("Compression ratio: %s%n", stats.compressionRatio);
 -                    out.printf("Estimated droppable tombstones: %s%n", stats.getEstimatedDroppableTombstoneRatio((int) (System.currentTimeMillis() / 1000)));
 +                    out.printf("Estimated droppable tombstones: %s%n", stats.getEstimatedDroppableTombstoneRatio((int) (System.currentTimeMillis() / 1000) - gcgs));
                      out.printf("SSTable Level: %d%n", stats.sstableLevel);
                      out.printf("Repaired at: %d%n", stats.repairedAt);
 -                    out.printf("Minimum replay position: %s\n", stats.commitLogLowerBound);
 -                    out.printf("Maximum replay position: %s\n", stats.commitLogUpperBound);
 +                    out.printf("Replay positions covered: %s\n", stats.commitLogIntervals);
                      out.println("Estimated tombstone drop times:");
                      for (Map.Entry<Double, Long> entry : stats.estimatedTombstoneDropTime.getAsMap().entrySet())
                      {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c231ed5b/src/java/org/apache/cassandra/tools/StandaloneScrubber.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/tools/StandaloneScrubber.java
index 4249430,f5e84c5..4778d72
--- a/src/java/org/apache/cassandra/tools/StandaloneScrubber.java
+++ b/src/java/org/apache/cassandra/tools/StandaloneScrubber.java
@@@ -122,7 -129,7 +129,7 @@@ public class StandaloneScrubbe
                      try (LifecycleTransaction txn = LifecycleTransaction.offline(OperationType.SCRUB, sstable))
                      {
                          txn.obsoleteOriginals(); // make sure originals are deleted and avoid NPE if index is missing, CASSANDRA-9591
-                         try (Scrubber scrubber = new Scrubber(cfs, txn, options.skipCorrupted, handler, !options.noValidate))
 -                        try (Scrubber scrubber = new Scrubber(cfs, txn, options.skipCorrupted, handler, !options.noValidate, options.reinsertOverflowedTTL))
++                        try (Scrubber scrubber = new Scrubber(cfs, txn, options.skipCorrupted, handler, !options.noValidate, options.reinserOverflowedTTL))
                          {
                              scrubber.scrub();
                          }
@@@ -199,6 -207,7 +206,7 @@@
          public boolean manifestCheckOnly;
          public boolean skipCorrupted;
          public boolean noValidate;
 -        public boolean reinsertOverflowedTTL;
++        public boolean reinserOverflowedTTL;
  
          private Options(String keyspaceName, String cfName)
          {
@@@ -239,6 -248,7 +247,7 @@@
                  opts.manifestCheckOnly = cmd.hasOption(MANIFEST_CHECK_OPTION);
                  opts.skipCorrupted = cmd.hasOption(SKIP_CORRUPTED_OPTION);
                  opts.noValidate = cmd.hasOption(NO_VALIDATE_OPTION);
 -                opts.reinsertOverflowedTTL = cmd.hasOption(REINSERT_OVERFLOWED_TTL_OPTION);
++                opts.reinserOverflowedTTL = cmd.hasOption(REINSERT_OVERFLOWED_TTL_OPTION);
  
                  return opts;
              }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c231ed5b/src/java/org/apache/cassandra/tools/nodetool/Scrub.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/tools/nodetool/Scrub.java
index 2345a85,50224a0..ead2fd4
--- a/src/java/org/apache/cassandra/tools/nodetool/Scrub.java
+++ b/src/java/org/apache/cassandra/tools/nodetool/Scrub.java
@@@ -48,11 -49,16 +49,16 @@@ public class Scrub extends NodeToolCm
                     description = "Do not validate columns using column validator")
      private boolean noValidation = false;
  
 -    @Option(title = "jobs",
 -            name = {"-j", "--jobs"},
 -            description = "Number of sstables to scrub simultanously, set to 0 to use all available compaction threads")
 -    private int jobs = 2;
 -
+     @Option(title = "reinsert_overflowed_ttl",
+     name = {"r", "--reinsert-overflowed-ttl"},
+     description = StandaloneScrubber.REINSERT_OVERFLOWED_TTL_OPTION_DESCRIPTION)
+     private boolean reinsertOverflowedTTL = false;
+ 
 +    @Option(title = "jobs",
 +            name = {"-j", "--jobs"},
 +            description = "Number of sstables to scrub simultanously, set to 0 to use all available compaction threads")
 +    private int jobs = 2;
 +
      @Override
      public void execute(NodeProbe probe)
      {
@@@ -63,13 -69,11 +69,13 @@@
          {
              try
              {
-                 probe.scrub(System.out, disableSnapshot, skipCorrupted, !noValidation, jobs, keyspace, tableNames);
 -                probe.scrub(System.out, disableSnapshot, skipCorrupted, !noValidation, reinsertOverflowedTTL, jobs, keyspace, cfnames);
 -            } catch (IllegalArgumentException e)
++                probe.scrub(System.out, disableSnapshot, skipCorrupted, !noValidation, reinsertOverflowedTTL, jobs, keyspace, tableNames);
 +            }
 +            catch (IllegalArgumentException e)
              {
                  throw e;
 -            } catch (Exception e)
 +            }
 +            catch (Exception e)
              {
                  throw new RuntimeException("Error occurred during scrubbing", e);
              }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c231ed5b/test/data/negative-local-expiration-test/table1/mc-1-big-CompressionInfo.db
----------------------------------------------------------------------
diff --cc test/data/negative-local-expiration-test/table1/mc-1-big-CompressionInfo.db
index 0000000,0000000..d759cec
new file mode 100644
Binary files differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c231ed5b/test/data/negative-local-expiration-test/table1/mc-1-big-Data.db
----------------------------------------------------------------------
diff --cc test/data/negative-local-expiration-test/table1/mc-1-big-Data.db
index 0000000,0000000..e7a72da
new file mode 100644
Binary files differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c231ed5b/test/data/negative-local-expiration-test/table1/mc-1-big-Digest.crc32
----------------------------------------------------------------------
diff --cc test/data/negative-local-expiration-test/table1/mc-1-big-Digest.crc32
index 0000000,0000000..a3c633a
new file mode 100644
--- /dev/null
+++ b/test/data/negative-local-expiration-test/table1/mc-1-big-Digest.crc32
@@@ -1,0 -1,0 +1,1 @@@
++203700622

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c231ed5b/test/data/negative-local-expiration-test/table1/mc-1-big-Filter.db
----------------------------------------------------------------------
diff --cc test/data/negative-local-expiration-test/table1/mc-1-big-Filter.db
index 0000000,0000000..a397f35
new file mode 100644
Binary files differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c231ed5b/test/data/negative-local-expiration-test/table1/mc-1-big-Index.db
----------------------------------------------------------------------
diff --cc test/data/negative-local-expiration-test/table1/mc-1-big-Index.db
index 0000000,0000000..d742724
new file mode 100644
Binary files differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c231ed5b/test/data/negative-local-expiration-test/table1/mc-1-big-Statistics.db
----------------------------------------------------------------------
diff --cc test/data/negative-local-expiration-test/table1/mc-1-big-Statistics.db
index 0000000,0000000..faf367b
new file mode 100644
Binary files differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c231ed5b/test/data/negative-local-expiration-test/table1/mc-1-big-Summary.db
----------------------------------------------------------------------
diff --cc test/data/negative-local-expiration-test/table1/mc-1-big-Summary.db
index 0000000,0000000..66cf70f
new file mode 100644
Binary files differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c231ed5b/test/data/negative-local-expiration-test/table1/mc-1-big-TOC.txt
----------------------------------------------------------------------
diff --cc test/data/negative-local-expiration-test/table1/mc-1-big-TOC.txt
index 0000000,0000000..45113dc
new file mode 100644
--- /dev/null
+++ b/test/data/negative-local-expiration-test/table1/mc-1-big-TOC.txt
@@@ -1,0 -1,0 +1,8 @@@
++CompressionInfo.db
++Data.db
++Summary.db
++Filter.db
++Statistics.db
++TOC.txt
++Digest.crc32
++Index.db

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c231ed5b/test/data/negative-local-expiration-test/table2/mc-1-big-CompressionInfo.db
----------------------------------------------------------------------
diff --cc test/data/negative-local-expiration-test/table2/mc-1-big-CompressionInfo.db
index 0000000,0000000..1759c09
new file mode 100644
Binary files differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c231ed5b/test/data/negative-local-expiration-test/table2/mc-1-big-Data.db
----------------------------------------------------------------------
diff --cc test/data/negative-local-expiration-test/table2/mc-1-big-Data.db
index 0000000,0000000..c1de572
new file mode 100644
Binary files differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c231ed5b/test/data/negative-local-expiration-test/table2/mc-1-big-Digest.crc32
----------------------------------------------------------------------
diff --cc test/data/negative-local-expiration-test/table2/mc-1-big-Digest.crc32
index 0000000,0000000..0403b5b
new file mode 100644
--- /dev/null
+++ b/test/data/negative-local-expiration-test/table2/mc-1-big-Digest.crc32
@@@ -1,0 -1,0 +1,1 @@@
++82785930

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c231ed5b/test/data/negative-local-expiration-test/table2/mc-1-big-Filter.db
----------------------------------------------------------------------
diff --cc test/data/negative-local-expiration-test/table2/mc-1-big-Filter.db
index 0000000,0000000..a397f35
new file mode 100644
Binary files differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c231ed5b/test/data/negative-local-expiration-test/table2/mc-1-big-Index.db
----------------------------------------------------------------------
diff --cc test/data/negative-local-expiration-test/table2/mc-1-big-Index.db
index 0000000,0000000..a0477eb
new file mode 100644
Binary files differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c231ed5b/test/data/negative-local-expiration-test/table2/mc-1-big-Statistics.db
----------------------------------------------------------------------
diff --cc test/data/negative-local-expiration-test/table2/mc-1-big-Statistics.db
index 0000000,0000000..e9d6577
new file mode 100644
Binary files differ


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org