You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by pa...@apache.org on 2018/02/11 13:26:05 UTC
[20/29] cassandra git commit: Merge branch 'cassandra-2.2' into
cassandra-3.0
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c231ed5b/src/java/org/apache/cassandra/db/LivenessInfo.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/LivenessInfo.java
index 89e0578,0000000..f6c9b62
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/db/LivenessInfo.java
+++ b/src/java/org/apache/cassandra/db/LivenessInfo.java
@@@ -1,369 -1,0 +1,375 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db;
+
+import java.util.Objects;
+import java.security.MessageDigest;
+
+import org.apache.cassandra.config.CFMetaData;
++import org.apache.cassandra.db.rows.Cell;
+import org.apache.cassandra.serializers.MarshalException;
+import org.apache.cassandra.utils.FBUtilities;
+
+/**
+ * Stores the information relating to the liveness of the primary key columns of a row.
+ * <p>
+ * A {@code LivenessInfo} can first be empty. If it isn't, it contains at least a timestamp,
+ * which is the timestamp for the row primary key columns. On top of that, the info can be
+ * ttl'ed, in which case the {@code LivenessInfo} also has both a ttl and a local expiration time.
+ * <p>
+ * Please note that if a liveness info is ttl'ed, that expiration is <b>only</b> an expiration
+ * of the liveness info itself (so, of the timestamp), and once the info expires it becomes
+ * {@code EMPTY}. But if a row has a liveness info which expires, the rest of the row data is
+ * unaffected (of course, the rest of said row data might be ttl'ed on its own but this is
+ * separate).
+ */
+public class LivenessInfo
+{
+ public static final long NO_TIMESTAMP = Long.MIN_VALUE;
- public static final int NO_TTL = 0;
++ public static final int NO_TTL = Cell.NO_TTL;
+ /**
+ * Used as flag for representing an expired liveness.
+ *
+ * TTL per request is at most 20 yrs, so this shouldn't conflict
+ * (See {@link org.apache.cassandra.cql3.Attributes#MAX_TTL})
+ */
+ public static final int EXPIRED_LIVENESS_TTL = Integer.MAX_VALUE;
- public static final int NO_EXPIRATION_TIME = Integer.MAX_VALUE;
++ public static final int NO_EXPIRATION_TIME = Cell.NO_DELETION_TIME;
+
+ public static final LivenessInfo EMPTY = new LivenessInfo(NO_TIMESTAMP);
+
+ protected final long timestamp;
+
+ protected LivenessInfo(long timestamp)
+ {
+ this.timestamp = timestamp;
+ }
+
+ public static LivenessInfo create(CFMetaData metadata, long timestamp, int nowInSec)
+ {
+ int defaultTTL = metadata.params.defaultTimeToLive;
+ if (defaultTTL != NO_TTL)
+ return expiring(timestamp, defaultTTL, nowInSec);
+
+ return new LivenessInfo(timestamp);
+ }
+
+ public static LivenessInfo expiring(long timestamp, int ttl, int nowInSec)
+ {
+ assert ttl != EXPIRED_LIVENESS_TTL;
- return new ExpiringLivenessInfo(timestamp, ttl, nowInSec + ttl);
++ return new ExpiringLivenessInfo(timestamp, ttl, ExpirationDateOverflowHandling.computeLocalExpirationTime(nowInSec, ttl));
+ }
+
+ public static LivenessInfo create(CFMetaData metadata, long timestamp, int ttl, int nowInSec)
+ {
+ return ttl == NO_TTL
+ ? create(metadata, timestamp, nowInSec)
+ : expiring(timestamp, ttl, nowInSec);
+ }
+
+ // Note that this ctor ignores the default table ttl and takes the expiration time, not the current time.
+ // Use when you know that's what you want.
+ public static LivenessInfo create(long timestamp, int ttl, int localExpirationTime)
+ {
+ if (ttl == EXPIRED_LIVENESS_TTL)
+ return new ExpiredLivenessInfo(timestamp, ttl, localExpirationTime);
+ return ttl == NO_TTL ? new LivenessInfo(timestamp) : new ExpiringLivenessInfo(timestamp, ttl, localExpirationTime);
+ }
+
+ /**
+ * Whether this liveness info is empty (has no timestamp).
+ *
+ * @return whether this liveness info is empty or not.
+ */
+ public boolean isEmpty()
+ {
+ return timestamp == NO_TIMESTAMP;
+ }
+
+ /**
+ * The timestamp for this liveness info.
+ *
+ * @return the liveness info timestamp (or {@link #NO_TIMESTAMP} if the info is empty).
+ */
+ public long timestamp()
+ {
+ return timestamp;
+ }
+
+ /**
+ * Whether the info has a ttl.
+ */
+ public boolean isExpiring()
+ {
+ return false;
+ }
+
+ /**
+ * The ttl (if any) on the row primary key columns or {@link #NO_TTL} if it is not
+ * expiring.
+ *
+ * Please note that this value is the TTL that was set originally and is thus not
+ * changing.
+ */
+ public int ttl()
+ {
+ return NO_TTL;
+ }
+
+ /**
+ * The expiration time (in seconds) if the info is expiring ({@link #NO_EXPIRATION_TIME} otherwise).
+ *
+ */
+ public int localExpirationTime()
+ {
+ return NO_EXPIRATION_TIME;
+ }
+
+ /**
+ * Whether that info is still live.
+ *
+ * A {@code LivenessInfo} is live if it is either not expiring, or if its expiration time if after
+ * {@code nowInSec}.
+ *
+ * @param nowInSec the current time in seconds.
+ * @return whether this liveness info is live or not.
+ */
+ public boolean isLive(int nowInSec)
+ {
+ return !isEmpty();
+ }
+
+ /**
+ * Adds this liveness information to the provided digest.
+ *
+ * @param digest the digest to add this liveness information to.
+ */
+ public void digest(MessageDigest digest)
+ {
+ FBUtilities.updateWithLong(digest, timestamp());
+ }
+
+ /**
+ * Validate the data contained by this liveness information.
+ *
+ * @throws MarshalException if some of the data is corrupted.
+ */
+ public void validate()
+ {
+ }
+
+ /**
+ * The size of the (useful) data this liveness information contains.
+ *
+ * @return the size of the data this liveness information contains.
+ */
+ public int dataSize()
+ {
+ return TypeSizes.sizeof(timestamp());
+ }
+
+ /**
+ * Whether this liveness information supersedes another one (that is
+ * whether is has a greater timestamp than the other or not).
+ *
+ * </br>
+ *
+ * If timestamps are the same and none of them are expired livenessInfo,
+ * livenessInfo with greater TTL supersedes another. It also means, if timestamps are the same,
+ * ttl superseders no-ttl. This is the same rule as {@link Conflicts#resolveRegular}
+ *
+ * If timestamps are the same and one of them is expired livenessInfo. Expired livenessInfo
+ * supersedes, ie. tombstone supersedes.
+ *
+ * If timestamps are the same and both of them are expired livenessInfo(Ideally it shouldn't happen),
+ * greater localDeletionTime wins.
+ *
+ * @param other
+ * the {@code LivenessInfo} to compare this info to.
+ *
+ * @return whether this {@code LivenessInfo} supersedes {@code other}.
+ */
+ public boolean supersedes(LivenessInfo other)
+ {
+ if (timestamp != other.timestamp)
+ return timestamp > other.timestamp;
+ if (isExpired() ^ other.isExpired())
+ return isExpired();
+ if (isExpiring() == other.isExpiring())
+ return localExpirationTime() > other.localExpirationTime();
+ return isExpiring();
+ }
+
+ protected boolean isExpired()
+ {
+ return false;
+ }
+
+ /**
+ * Returns a copy of this liveness info updated with the provided timestamp.
+ *
+ * @param newTimestamp the timestamp for the returned info.
+ * @return if this liveness info has a timestamp, a copy of it with {@code newTimestamp}
+ * as timestamp. If it has no timestamp however, this liveness info is returned
+ * unchanged.
+ */
+ public LivenessInfo withUpdatedTimestamp(long newTimestamp)
+ {
+ return new LivenessInfo(newTimestamp);
+ }
+
++ public LivenessInfo withUpdatedTimestampAndLocalDeletionTime(long newTimestamp, int newLocalDeletionTime)
++ {
++ return LivenessInfo.create(newTimestamp, ttl(), newLocalDeletionTime);
++ }
++
+ @Override
+ public String toString()
+ {
+ return String.format("[ts=%d]", timestamp);
+ }
+
+ @Override
+ public boolean equals(Object other)
+ {
+ if(!(other instanceof LivenessInfo))
+ return false;
+
+ LivenessInfo that = (LivenessInfo)other;
+ return this.timestamp() == that.timestamp()
+ && this.ttl() == that.ttl()
+ && this.localExpirationTime() == that.localExpirationTime();
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hash(timestamp(), ttl(), localExpirationTime());
+ }
+
+ /**
+ * Effectively acts as a PK tombstone. This is used for Materialized Views to shadow
+ * updated entries while co-existing with row tombstones.
+ *
+ * See {@link org.apache.cassandra.db.view.ViewUpdateGenerator#deleteOldEntryInternal}.
+ */
+ private static class ExpiredLivenessInfo extends ExpiringLivenessInfo
+ {
+ private ExpiredLivenessInfo(long timestamp, int ttl, int localExpirationTime)
+ {
+ super(timestamp, ttl, localExpirationTime);
+ assert ttl == EXPIRED_LIVENESS_TTL;
+ assert timestamp != NO_TIMESTAMP;
+ }
+
+ @Override
+ public boolean isExpired()
+ {
+ return true;
+ }
+
+ @Override
+ public boolean isLive(int nowInSec)
+ {
+ // used as tombstone to shadow entire PK
+ return false;
+ }
+
+ @Override
+ public LivenessInfo withUpdatedTimestamp(long newTimestamp)
+ {
+ return new ExpiredLivenessInfo(newTimestamp, ttl(), localExpirationTime());
+ }
+ }
+
+ private static class ExpiringLivenessInfo extends LivenessInfo
+ {
+ private final int ttl;
+ private final int localExpirationTime;
+
+ private ExpiringLivenessInfo(long timestamp, int ttl, int localExpirationTime)
+ {
+ super(timestamp);
+ assert ttl != NO_TTL && localExpirationTime != NO_EXPIRATION_TIME;
+ this.ttl = ttl;
+ this.localExpirationTime = localExpirationTime;
+ }
+
+ @Override
+ public int ttl()
+ {
+ return ttl;
+ }
+
+ @Override
+ public int localExpirationTime()
+ {
+ return localExpirationTime;
+ }
+
+ @Override
+ public boolean isExpiring()
+ {
+ return true;
+ }
+
+ @Override
+ public boolean isLive(int nowInSec)
+ {
+ return nowInSec < localExpirationTime;
+ }
+
+ @Override
+ public void digest(MessageDigest digest)
+ {
+ super.digest(digest);
+ FBUtilities.updateWithInt(digest, localExpirationTime);
+ FBUtilities.updateWithInt(digest, ttl);
+ }
+
+ @Override
+ public void validate()
+ {
+ if (ttl < 0)
+ throw new MarshalException("A TTL should not be negative");
+ if (localExpirationTime < 0)
+ throw new MarshalException("A local expiration time should not be negative");
+ }
+
+ @Override
+ public int dataSize()
+ {
+ return super.dataSize()
+ + TypeSizes.sizeof(ttl)
+ + TypeSizes.sizeof(localExpirationTime);
+
+ }
+
+ @Override
+ public LivenessInfo withUpdatedTimestamp(long newTimestamp)
+ {
+ return new ExpiringLivenessInfo(newTimestamp, ttl, localExpirationTime);
+ }
+
+ @Override
+ public String toString()
+ {
+ return String.format("[ts=%d ttl=%d, let=%d]", timestamp, ttl, localExpirationTime);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c231ed5b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index eaf6dab,d90abe9..1d54667
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@@ -363,7 -348,13 +363,15 @@@ public class CompactionManager implemen
}
}
-- public AllSSTableOpStatus performScrub(final ColumnFamilyStore cfs, final boolean skipCorrupted, final boolean checkData, int jobs)
++ public AllSSTableOpStatus performScrub(final ColumnFamilyStore cfs, final boolean skipCorrupted, final boolean checkData,
++ int jobs)
+ throws InterruptedException, ExecutionException
+ {
+ return performScrub(cfs, skipCorrupted, checkData, false, jobs);
+ }
+
- public AllSSTableOpStatus performScrub(final ColumnFamilyStore cfs, final boolean skipCorrupted, final boolean checkData, final boolean reinsertOverflowedTTLRows, int jobs)
++ public AllSSTableOpStatus performScrub(final ColumnFamilyStore cfs, final boolean skipCorrupted, final boolean checkData,
++ final boolean reinsertOverflowedTTL, int jobs)
throws InterruptedException, ExecutionException
{
return parallelAllSSTableOperation(cfs, new OneSSTableOperation()
@@@ -377,7 -368,7 +385,7 @@@
@Override
public void execute(LifecycleTransaction input) throws IOException
{
- scrubOne(cfs, input, skipCorrupted, checkData);
- scrubOne(cfs, input, skipCorrupted, checkData, reinsertOverflowedTTLRows);
++ scrubOne(cfs, input, skipCorrupted, checkData, reinsertOverflowedTTL);
}
}, jobs, OperationType.SCRUB);
}
@@@ -745,11 -736,11 +753,11 @@@
}
}
- private void scrubOne(ColumnFamilyStore cfs, LifecycleTransaction modifier, boolean skipCorrupted, boolean checkData) throws IOException
- private void scrubOne(ColumnFamilyStore cfs, LifecycleTransaction modifier, boolean skipCorrupted, boolean checkData, boolean reinsertOverflowedTTLRows) throws IOException
++ private void scrubOne(ColumnFamilyStore cfs, LifecycleTransaction modifier, boolean skipCorrupted, boolean checkData, boolean reinsertOverflowedTTL) throws IOException
{
CompactionInfo.Holder scrubInfo = null;
- try (Scrubber scrubber = new Scrubber(cfs, modifier, skipCorrupted, checkData))
- try (Scrubber scrubber = new Scrubber(cfs, modifier, skipCorrupted, checkData, reinsertOverflowedTTLRows))
++ try (Scrubber scrubber = new Scrubber(cfs, modifier, skipCorrupted, checkData, reinsertOverflowedTTL))
{
scrubInfo = scrubber.getScrubInfo();
metrics.beginCompaction(scrubInfo);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c231ed5b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/compaction/Scrubber.java
index c8e0c53,affee11..bc11504
--- a/src/java/org/apache/cassandra/db/compaction/Scrubber.java
+++ b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
@@@ -35,8 -37,12 +35,9 @@@ import org.apache.cassandra.io.sstable.
import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.io.util.RandomAccessReader;
import org.apache.cassandra.service.ActiveRepairService;
-import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.cassandra.utils.JVMStabilityInspector;
-import org.apache.cassandra.utils.OutputHandler;
-import org.apache.cassandra.utils.memory.HeapAllocator;
-import org.apache.cassandra.utils.UUIDGen;
+import org.apache.cassandra.utils.*;
import org.apache.cassandra.utils.concurrent.Refs;
++import org.apache.cassandra.utils.memory.HeapAllocator;
public class Scrubber implements Closeable
{
@@@ -45,7 -51,9 +46,8 @@@
private final LifecycleTransaction transaction;
private final File destination;
private final boolean skipCorrupted;
+ private final boolean reinsertOverflowedTTLRows;
- private final CompactionController controller;
private final boolean isCommutative;
private final boolean isIndex;
private final boolean checkData;
@@@ -65,38 -76,41 +67,47 @@@
long currentRowPositionFromIndex;
long nextRowPositionFromIndex;
- private final OutputHandler outputHandler;
+ private NegativeLocalDeletionInfoMetrics negativeLocalDeletionInfoMetrics = new NegativeLocalDeletionInfoMetrics();
+
- private static final Comparator<Row> rowComparator = new Comparator<Row>()
+ private final OutputHandler outputHandler;
+
+ private static final Comparator<Partition> partitionComparator = new Comparator<Partition>()
{
- public int compare(Row r1, Row r2)
+ public int compare(Partition r1, Partition r2)
{
- return r1.key.compareTo(r2.key);
+ return r1.partitionKey().compareTo(r2.partitionKey());
}
};
- private final SortedSet<Row> outOfOrderRows = new TreeSet<>(rowComparator);
+ private final SortedSet<Partition> outOfOrder = new TreeSet<>(partitionComparator);
public Scrubber(ColumnFamilyStore cfs, LifecycleTransaction transaction, boolean skipCorrupted, boolean checkData) throws IOException
{
- this(cfs, transaction, skipCorrupted, new OutputHandler.LogOutput(), checkData);
- this(cfs, transaction, skipCorrupted, new OutputHandler.LogOutput(), checkData, false);
++ this(cfs, transaction, skipCorrupted, checkData, false);
+ }
+
+ public Scrubber(ColumnFamilyStore cfs, LifecycleTransaction transaction, boolean skipCorrupted, boolean checkData,
+ boolean reinsertOverflowedTTLRows) throws IOException
+ {
+ this(cfs, transaction, skipCorrupted, new OutputHandler.LogOutput(), checkData, reinsertOverflowedTTLRows);
}
@SuppressWarnings("resource")
- public Scrubber(ColumnFamilyStore cfs, LifecycleTransaction transaction, boolean skipCorrupted, OutputHandler outputHandler, boolean checkData,
+ public Scrubber(ColumnFamilyStore cfs,
+ LifecycleTransaction transaction,
+ boolean skipCorrupted,
+ OutputHandler outputHandler,
- boolean checkData) throws IOException
++ boolean checkData,
+ boolean reinsertOverflowedTTLRows) throws IOException
{
this.cfs = cfs;
this.transaction = transaction;
this.sstable = transaction.onlyOne();
this.outputHandler = outputHandler;
this.skipCorrupted = skipCorrupted;
- this.rowIndexEntrySerializer = sstable.descriptor.version.getSSTableFormat().getIndexSerializer(sstable.metadata);
+ this.reinsertOverflowedTTLRows = reinsertOverflowedTTLRows;
-
+ this.rowIndexEntrySerializer = sstable.descriptor.version.getSSTableFormat().getIndexSerializer(sstable.metadata,
+ sstable.descriptor.version,
+ sstable.header);
-
List<SSTableReader> toScrub = Collections.singletonList(sstable);
// Calculate the expected compacted filesize
@@@ -134,19 -150,15 +145,22 @@@
this.currentRowPositionFromIndex = 0;
this.nextRowPositionFromIndex = 0;
+
+ if (reinsertOverflowedTTLRows)
+ outputHandler.output("Starting scrub with reinsert overflowed TTL option");
}
+ private UnfilteredRowIterator withValidation(UnfilteredRowIterator iter, String filename)
+ {
+ return checkData ? UnfilteredRowIterators.withValidation(iter, filename) : iter;
+ }
+
public void scrub()
{
+ List<SSTableReader> finished = new ArrayList<>();
+ boolean completed = false;
outputHandler.output(String.format("Scrubbing %s (%s bytes)", sstable, dataFile.length()));
- try (SSTableRewriter writer = new SSTableRewriter(cfs, transaction, sstable.maxDataAge, transaction.isOffline());
+ try (SSTableRewriter writer = SSTableRewriter.construct(cfs, transaction, false, sstable.maxDataAge, transaction.isOffline());
Refs<SSTableReader> refs = Refs.ref(Collections.singleton(sstable)))
{
nextIndexKey = indexAvailable() ? ByteBufferUtil.readWithShortLength(indexFile) : null;
@@@ -285,57 -311,66 +299,71 @@@
}
finally
{
- controller.close();
- if (transaction.isOffline() && newSstable != null)
- newSstable.selfRef().release();
+ if (transaction.isOffline())
+ finished.forEach(sstable -> sstable.selfRef().release());
}
- if (newSstable == null)
- {
- if (badRows > 0)
- outputHandler.warn("No valid rows found while scrubbing " + sstable + "; it is marked for deletion now. If you want to attempt manual recovery, you can find a copy in the pre-scrub snapshot");
- else
- outputHandler.output("Scrub of " + sstable + " complete; looks like all " + emptyRows + " rows were tombstoned");
- }
- else
+ if (completed)
{
+ outputHandler.output("Scrub of " + sstable + " complete: " + goodRows + " rows in new sstable and " + emptyRows + " empty (tombstoned) rows dropped");
+ if (negativeLocalDeletionInfoMetrics.fixedRows > 0)
+ outputHandler.output("Fixed " + negativeLocalDeletionInfoMetrics.fixedRows + " rows with overflowed local deletion time.");
if (badRows > 0)
- outputHandler.warn("No valid rows found while scrubbing " + sstable + "; it is marked for deletion now. If you want to attempt manual recovery, you can find a copy in the pre-scrub snapshot");
- else
- outputHandler.output("Scrub of " + sstable + " complete; looks like all " + emptyRows + " rows were tombstoned");
+ outputHandler.warn("Unable to recover " + badRows + " rows that were skipped. You can attempt manual recovery from the pre-scrub snapshot. You can also run nodetool repair to transfer the data from a healthy replica, if any");
}
+ else
+ {
- outputHandler.output("Scrub of " + sstable + " complete: " + goodRows + " rows in new sstable and " + emptyRows + " empty (tombstoned) rows dropped");
+ if (badRows > 0)
- outputHandler.warn("Unable to recover " + badRows + " rows that were skipped. You can attempt manual recovery from the pre-scrub snapshot. You can also run nodetool repair to transfer the data from a healthy replica, if any");
++ outputHandler.warn("No valid rows found while scrubbing " + sstable + "; it is marked for deletion now. If you want to attempt manual recovery, you can find a copy in the pre-scrub snapshot");
++ else
++ outputHandler.output("Scrub of " + sstable + " complete; looks like all " + emptyRows + " rows were tombstoned");
+ }
}
@SuppressWarnings("resource")
private boolean tryAppend(DecoratedKey prevKey, DecoratedKey key, SSTableRewriter writer)
{
- // OrderCheckerIterator will check, at iteration time, that the cells are in the proper order. If it detects
- // that one cell is out of order, it will stop returning them. The remaining cells will be sorted and added
- // to the outOfOrderRows that will be later written to a new SSTable.
- OrderCheckerIterator atoms = new OrderCheckerIterator(getIterator(key),
- cfs.metadata.comparator.onDiskAtomComparator());
- if (prevKey != null && prevKey.compareTo(key) > 0)
- {
- saveOutOfOrderRow(prevKey, key, atoms);
- return false;
- }
+ // OrderCheckerIterator will check, at iteration time, that the rows are in the proper order. If it detects
+ // that one row is out of order, it will stop returning them. The remaining rows will be sorted and added
+ // to the outOfOrder set that will be later written to a new SSTable.
- OrderCheckerIterator sstableIterator = new OrderCheckerIterator(new RowMergingSSTableIterator(sstable, dataFile, key),
++ OrderCheckerIterator sstableIterator = new OrderCheckerIterator(getIterator(key),
+ cfs.metadata.comparator);
- AbstractCompactedRow compactedRow = new LazilyCompactedRow(controller, Collections.singletonList(atoms));
- if (writer.tryAppend(compactedRow) == null)
- emptyRows++;
- else
- goodRows++;
+ try (UnfilteredRowIterator iterator = withValidation(sstableIterator, dataFile.getPath()))
+ {
+ if (prevKey != null && prevKey.compareTo(key) > 0)
+ {
+ saveOutOfOrderRow(prevKey, key, iterator);
+ return false;
+ }
- if (atoms.hasOutOfOrderCells())
- saveOutOfOrderRow(key, atoms);
+ if (writer.tryAppend(iterator) == null)
+ emptyRows++;
+ else
+ goodRows++;
+ }
+
+ if (sstableIterator.hasRowsOutOfOrder())
+ {
+ outputHandler.warn(String.format("Out of order rows found in partition: %s", key));
+ outOfOrder.add(sstableIterator.getRowsOutOfOrder());
+ }
return true;
}
+ /**
+ * Only wrap with {@link FixNegativeLocalDeletionTimeIterator} if {@link #reinsertOverflowedTTLRows} option
+ * is specified
+ */
- private OnDiskAtomIterator getIterator(DecoratedKey key)
++ private UnfilteredRowIterator getIterator(DecoratedKey key)
+ {
- SSTableIdentityIterator sstableIdentityIterator = new SSTableIdentityIterator(sstable, dataFile, key, checkData);
- return reinsertOverflowedTTLRows ? new FixNegativeLocalDeletionTimeIterator(sstableIdentityIterator,
++ RowMergingSSTableIterator rowMergingIterator = new RowMergingSSTableIterator(sstable, dataFile, key);
++ return reinsertOverflowedTTLRows ? new FixNegativeLocalDeletionTimeIterator(rowMergingIterator,
+ outputHandler,
- negativeLocalDeletionInfoMetrics) : sstableIdentityIterator;
++ negativeLocalDeletionInfoMetrics) : rowMergingIterator;
+ }
+
private void updateIndexKey()
{
currentIndexKey = nextIndexKey;
@@@ -477,49 -555,12 +505,55 @@@
}
}
+ public class NegativeLocalDeletionInfoMetrics
+ {
+ public volatile int fixedRows = 0;
+ }
+
/**
+ * During 2.x migration, under some circumstances rows might have gotten duplicated.
+ * Merging iterator merges rows with same clustering.
+ *
+ * For more details, refer to CASSANDRA-12144.
+ */
+ private static class RowMergingSSTableIterator extends SSTableIdentityIterator
+ {
+ RowMergingSSTableIterator(SSTableReader sstable, RandomAccessReader file, DecoratedKey key)
+ {
+ super(sstable, file, key);
+ }
+
+ @Override
+ protected Unfiltered doCompute()
+ {
+ if (!iterator.hasNext())
+ return endOfData();
+
+ Unfiltered next = iterator.next();
+ if (!next.isRow())
+ return next;
+
+ while (iterator.hasNext())
+ {
+ Unfiltered peek = iterator.peek();
+ // If there was a duplicate row, merge it.
+ if (next.clustering().equals(peek.clustering()) && peek.isRow())
+ {
+ iterator.next(); // Make sure that the peeked item was consumed.
+ next = Rows.merge((Row) next, (Row) peek, FBUtilities.nowInSeconds());
+ }
+ else
+ {
+ break;
+ }
+ }
+
+ return next;
+ }
++
+ }
+
+ /**
* In some case like CASSANDRA-12127 the cells might have been stored in the wrong order. This decorator check the
* cells order and collect the out of order cells to correct the problem.
*/
@@@ -571,53 -608,98 +605,199 @@@
}
@Override
- protected OnDiskAtom computeNext()
+ public boolean isEmpty()
{
- if (!iterator.hasNext())
- return endOfData();
+ return iterator.isEmpty();
+ }
- OnDiskAtom next = iterator.next();
+ public void close()
+ {
+ iterator.close();
+ }
- // If we detect that some cells are out of order we will store and sort the remaining once to insert them
- // in a separate SSTable.
- if (previous != null && comparator.compare(next, previous) < 0)
- {
- outOfOrderCells = collectOutOfOrderCells(next, iterator);
- return endOfData();
- }
- previous = next;
- return next;
+ public DeletionTime partitionLevelDeletion()
+ {
+ return iterator.partitionLevelDeletion();
}
- public boolean hasOutOfOrderCells()
+ public EncodingStats stats()
{
- return outOfOrderCells != null;
+ return iterator.stats();
}
- public ColumnFamily getOutOfOrderCells()
+ public boolean hasRowsOutOfOrder()
{
- return outOfOrderCells;
+ return rowsOutOfOrder != null;
}
- private static ColumnFamily collectOutOfOrderCells(OnDiskAtom atom, OnDiskAtomIterator iterator)
+ public Partition getRowsOutOfOrder()
{
- ColumnFamily cf = iterator.getColumnFamily().cloneMeShallow(ArrayBackedSortedColumns.factory, false);
- cf.addAtom(atom);
- while (iterator.hasNext())
- cf.addAtom(iterator.next());
- return cf;
+ return rowsOutOfOrder;
+ }
+
+ protected Unfiltered computeNext()
+ {
+ if (!iterator.hasNext())
+ return endOfData();
+
+ Unfiltered next = iterator.next();
+
+ // If we detect that some rows are out of order we will store and sort the remaining ones to insert them
+ // in a separate SSTable.
+ if (previous != null && comparator.compare(next, previous) < 0)
+ {
+ rowsOutOfOrder = ImmutableBTreePartition.create(UnfilteredRowIterators.concat(next, iterator), false);
+ return endOfData();
+ }
+ previous = next;
+ return next;
}
+ }
+
+ /**
- * This iterator converts negative {@link BufferExpiringCell#getLocalDeletionTime()} into {@link BufferExpiringCell#MAX_DELETION_TIME}
++ * This iterator converts negative {@link AbstractCell#localDeletionTime()} into {@link AbstractCell#MAX_DELETION_TIME}
+ *
+ * This is to recover entries with overflowed localExpirationTime due to CASSANDRA-14092
+ */
- private static final class FixNegativeLocalDeletionTimeIterator extends AbstractIterator<OnDiskAtom> implements OnDiskAtomIterator
++ private static final class FixNegativeLocalDeletionTimeIterator extends AbstractIterator<Unfiltered> implements UnfilteredRowIterator
+ {
+ /**
+ * The decorated iterator.
+ */
- private final OnDiskAtomIterator iterator;
++ private final UnfilteredRowIterator iterator;
+
+ private final OutputHandler outputHandler;
+ private final NegativeLocalDeletionInfoMetrics negativeLocalExpirationTimeMetrics;
+
- public FixNegativeLocalDeletionTimeIterator(OnDiskAtomIterator iterator, OutputHandler outputHandler,
++ public FixNegativeLocalDeletionTimeIterator(UnfilteredRowIterator iterator, OutputHandler outputHandler,
+ NegativeLocalDeletionInfoMetrics negativeLocalDeletionInfoMetrics)
+ {
+ this.iterator = iterator;
+ this.outputHandler = outputHandler;
+ this.negativeLocalExpirationTimeMetrics = negativeLocalDeletionInfoMetrics;
+ }
+
- public ColumnFamily getColumnFamily()
++ public CFMetaData metadata()
+ {
- return iterator.getColumnFamily();
++ return iterator.metadata();
+ }
+
- public DecoratedKey getKey()
++ public boolean isReverseOrder()
+ {
- return iterator.getKey();
++ return iterator.isReverseOrder();
+ }
+
- public void close() throws IOException
++ public PartitionColumns columns()
+ {
- iterator.close();
++ return iterator.columns();
++ }
++
++ public DecoratedKey partitionKey()
++ {
++ return iterator.partitionKey();
++ }
++
++ public Row staticRow()
++ {
++ return iterator.staticRow();
+ }
+
+ @Override
- protected OnDiskAtom computeNext()
++ public boolean isEmpty()
++ {
++ return iterator.isEmpty();
++ }
++
++ public void close()
++ {
++ iterator.close();
++ }
++
++ public DeletionTime partitionLevelDeletion()
++ {
++ return iterator.partitionLevelDeletion();
++ }
++
++ public EncodingStats stats()
++ {
++ return iterator.stats();
++ }
++
++ protected Unfiltered computeNext()
+ {
+ if (!iterator.hasNext())
+ return endOfData();
+
- OnDiskAtom next = iterator.next();
++ Unfiltered next = iterator.next();
++ if (!next.isRow())
++ return next;
+
- if (next instanceof ExpiringCell && next.getLocalDeletionTime() < 0)
++ if (hasNegativeLocalExpirationTime((Row) next))
+ {
- outputHandler.debug(String.format("Found cell with negative local expiration time: %s", ((ExpiringCell) next).getString(getColumnFamily().getComparator()), getColumnFamily()));
++ outputHandler.debug(String.format("Found row with negative local expiration time: %s", next.toString(metadata(), false)));
+ negativeLocalExpirationTimeMetrics.fixedRows++;
- next = ((Cell) next).localCopy(getColumnFamily().metadata(), HeapAllocator.instance).withUpdatedTimestampAndLocalDeletionTime(next.timestamp() + 1, BufferExpiringCell.MAX_DELETION_TIME);
++ return fixNegativeLocalExpirationTime((Row) next);
+ }
+
+ return next;
+ }
++
++ private boolean hasNegativeLocalExpirationTime(Row next)
++ {
++ Row row = next;
++ if (row.primaryKeyLivenessInfo().isExpiring() && row.primaryKeyLivenessInfo().localExpirationTime() < 0)
++ {
++ return true;
++ }
++
++ for (ColumnData cd : row)
++ {
++ if (cd.column().isSimple())
++ {
++ Cell cell = (Cell)cd;
++ if (cell.isExpiring() && cell.localDeletionTime() < 0)
++ return true;
++ }
++ else
++ {
++ ComplexColumnData complexData = (ComplexColumnData)cd;
++ for (Cell cell : complexData)
++ {
++ if (cell.isExpiring() && cell.localDeletionTime() < 0)
++ return true;
++ }
++ }
++ }
++
++ return false;
++ }
+
++ private Unfiltered fixNegativeLocalExpirationTime(Row row)
++ {
++ Row.Builder builder = HeapAllocator.instance.cloningBTreeRowBuilder();
++ builder.newRow(row.clustering());
++ builder.addPrimaryKeyLivenessInfo(row.primaryKeyLivenessInfo().isExpiring() && row.primaryKeyLivenessInfo().localExpirationTime() < 0 ?
++ row.primaryKeyLivenessInfo().withUpdatedTimestampAndLocalDeletionTime(row.primaryKeyLivenessInfo().timestamp() + 1, AbstractCell.MAX_DELETION_TIME)
++ :row.primaryKeyLivenessInfo());
++ builder.addRowDeletion(row.deletion());
++ for (ColumnData cd : row)
++ {
++ if (cd.column().isSimple())
++ {
++ Cell cell = (Cell)cd;
++ builder.addCell(cell.isExpiring() && cell.localDeletionTime() < 0 ? cell.withUpdatedTimestampAndLocalDeletionTime(cell.timestamp() + 1, AbstractCell.MAX_DELETION_TIME) : cell);
++ }
++ else
++ {
++ ComplexColumnData complexData = (ComplexColumnData)cd;
++ builder.addComplexDeletion(complexData.column(), complexData.complexDeletion());
++ for (Cell cell : complexData)
++ {
++ builder.addCell(cell.isExpiring() && cell.localDeletionTime() < 0 ? cell.withUpdatedTimestampAndLocalDeletionTime(cell.timestamp() + 1, AbstractCell.MAX_DELETION_TIME) : cell);
++ }
++ }
++ }
++ return builder.build();
++ }
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c231ed5b/src/java/org/apache/cassandra/db/rows/BufferCell.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/rows/BufferCell.java
index 82ae02c,0000000..df2619c
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/db/rows/BufferCell.java
+++ b/src/java/org/apache/cassandra/db/rows/BufferCell.java
@@@ -1,365 -1,0 +1,370 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.rows;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.cassandra.config.*;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.context.CounterContext;
+import org.apache.cassandra.db.marshal.ByteType;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.ObjectSizes;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.memory.AbstractAllocator;
+
+public class BufferCell extends AbstractCell
+{
+ private static final long EMPTY_SIZE = ObjectSizes.measure(new BufferCell(ColumnDefinition.regularDef("", "", "", ByteType.instance), 0L, 0, 0, ByteBufferUtil.EMPTY_BYTE_BUFFER, null));
+
+ private final long timestamp;
+ private final int ttl;
+ private final int localDeletionTime;
+
+ private final ByteBuffer value;
+ private final CellPath path;
+
+ public BufferCell(ColumnDefinition column, long timestamp, int ttl, int localDeletionTime, ByteBuffer value, CellPath path)
+ {
+ super(column);
+ assert column.isComplex() == (path != null);
+ this.timestamp = timestamp;
+ this.ttl = ttl;
+ this.localDeletionTime = localDeletionTime;
+ this.value = value;
+ this.path = path;
+ }
+
+ public static BufferCell live(CFMetaData metadata, ColumnDefinition column, long timestamp, ByteBuffer value)
+ {
+ return live(metadata, column, timestamp, value, null);
+ }
+
+ public static BufferCell live(CFMetaData metadata, ColumnDefinition column, long timestamp, ByteBuffer value, CellPath path)
+ {
+ if (metadata.params.defaultTimeToLive != NO_TTL)
+ return expiring(column, timestamp, metadata.params.defaultTimeToLive, FBUtilities.nowInSeconds(), value, path);
+
+ return new BufferCell(column, timestamp, NO_TTL, NO_DELETION_TIME, value, path);
+ }
+
+ public static BufferCell expiring(ColumnDefinition column, long timestamp, int ttl, int nowInSec, ByteBuffer value)
+ {
+ return expiring(column, timestamp, ttl, nowInSec, value, null);
+ }
+
+ public static BufferCell expiring(ColumnDefinition column, long timestamp, int ttl, int nowInSec, ByteBuffer value, CellPath path)
+ {
+ assert ttl != NO_TTL;
- return new BufferCell(column, timestamp, ttl, nowInSec + ttl, value, path);
++ return new BufferCell(column, timestamp, ttl, ExpirationDateOverflowHandling.computeLocalExpirationTime(nowInSec, ttl), value, path);
+ }
+
+ public static BufferCell tombstone(ColumnDefinition column, long timestamp, int nowInSec)
+ {
+ return tombstone(column, timestamp, nowInSec, null);
+ }
+
+ public static BufferCell tombstone(ColumnDefinition column, long timestamp, int nowInSec, CellPath path)
+ {
+ return new BufferCell(column, timestamp, NO_TTL, nowInSec, ByteBufferUtil.EMPTY_BYTE_BUFFER, path);
+ }
+
+ public boolean isCounterCell()
+ {
+ return !isTombstone() && column.isCounterColumn();
+ }
+
+ public boolean isLive(int nowInSec)
+ {
+ return localDeletionTime == NO_DELETION_TIME || (ttl != NO_TTL && nowInSec < localDeletionTime);
+ }
+
+ public boolean isTombstone()
+ {
+ return localDeletionTime != NO_DELETION_TIME && ttl == NO_TTL;
+ }
+
+ public boolean isExpiring()
+ {
+ return ttl != NO_TTL;
+ }
+
+ public long timestamp()
+ {
+ return timestamp;
+ }
+
+ public int ttl()
+ {
+ return ttl;
+ }
+
+ public int localDeletionTime()
+ {
+ return localDeletionTime;
+ }
+
+ public ByteBuffer value()
+ {
+ return value;
+ }
+
+ public CellPath path()
+ {
+ return path;
+ }
+
+ public Cell withUpdatedColumn(ColumnDefinition newColumn)
+ {
+ return new BufferCell(newColumn, timestamp, ttl, localDeletionTime, value, path);
+ }
+
+ public Cell withUpdatedValue(ByteBuffer newValue)
+ {
+ return new BufferCell(column, timestamp, ttl, localDeletionTime, newValue, path);
+ }
+
++ public Cell withUpdatedTimestampAndLocalDeletionTime(long newTimestamp, int newLocalDeletionTime)
++ {
++ return new BufferCell(column, newTimestamp, ttl, newLocalDeletionTime, value, path);
++ }
++
+ public Cell copy(AbstractAllocator allocator)
+ {
+ if (!value.hasRemaining())
+ return this;
+
+ return new BufferCell(column, timestamp, ttl, localDeletionTime, allocator.clone(value), path == null ? null : path.copy(allocator));
+ }
+
+ public Cell markCounterLocalToBeCleared()
+ {
+ if (!isCounterCell())
+ return this;
+
+ ByteBuffer marked = CounterContext.instance().markLocalToBeCleared(value());
+ return marked == value() ? this : new BufferCell(column, timestamp, ttl, localDeletionTime, marked, path);
+ }
+
+ public Cell purge(DeletionPurger purger, int nowInSec)
+ {
+ if (!isLive(nowInSec))
+ {
+ if (purger.shouldPurge(timestamp, localDeletionTime))
+ return null;
+
+ // We slightly hijack purging to convert expired but not purgeable columns to tombstones. The reason we do that is
+ // that once a column has expired it is equivalent to a tombstone but actually using a tombstone is more compact since
+ // we don't keep the column value. The reason we do it here is that 1) it's somewhat related to dealing with tombstones
+ // so hopefully not too surprising and 2) we want to this and purging at the same places, so it's simpler/more efficient
+ // to do both here.
+ if (isExpiring())
+ {
+ // Note that as long as the expiring column and the tombstone put together live longer than GC grace seconds,
+ // we'll fulfil our responsibility to repair. See discussion at
+ // http://cassandra-user-incubator-apache-org.3065146.n2.nabble.com/repair-compaction-and-tombstone-rows-td7583481.html
+ return BufferCell.tombstone(column, timestamp, localDeletionTime - ttl, path).purge(purger, nowInSec);
+ }
+ }
+ return this;
+ }
+
+ public Cell updateAllTimestamp(long newTimestamp)
+ {
+ return new BufferCell(column, isTombstone() ? newTimestamp - 1 : newTimestamp, ttl, localDeletionTime, value, path);
+ }
+
+ public int dataSize()
+ {
+ return TypeSizes.sizeof(timestamp)
+ + TypeSizes.sizeof(ttl)
+ + TypeSizes.sizeof(localDeletionTime)
+ + value.remaining()
+ + (path == null ? 0 : path.dataSize());
+ }
+
+ public long unsharedHeapSizeExcludingData()
+ {
+ return EMPTY_SIZE + ObjectSizes.sizeOnHeapExcludingData(value) + (path == null ? 0 : path.unsharedHeapSizeExcludingData());
+ }
+
+ /**
+ * The serialization format for cell is:
+ * [ flags ][ timestamp ][ deletion time ][ ttl ][ path size ][ path ][ value size ][ value ]
+ * [ 1b ][ 8b (vint) ][ 4b (vint) ][ 4b (vint) ][ 4b (vint) ][ arb ][ 4b (vint) ][ arb ]
+ *
+ * where not all field are always present (in fact, only the [ flags ] are guaranteed to be present). The fields have the following
+ * meaning:
+ * - [ flags ] is the cell flags. It is a byte for which each bit represents a flag whose meaning is explained below (*_MASK constants)
+ * - [ timestamp ] is the cell timestamp. Present unless the cell has the USE_TIMESTAMP_MASK.
+ * - [ deletion time]: the local deletion time for the cell. Present if either the cell is deleted (IS_DELETED_MASK)
+ * or it is expiring (IS_EXPIRING_MASK) but doesn't have the USE_ROW_TTL_MASK.
+ * - [ ttl ]: the ttl for the cell. Present if the row is expiring (IS_EXPIRING_MASK) but doesn't have the
+ * USE_ROW_TTL_MASK.
+ * - [ value size ] is the size of the [ value ] field. It's present unless either the cell has the HAS_EMPTY_VALUE_MASK, or the value
+ * for columns of this type have a fixed length.
+ * - [ path size ] is the size of the [ path ] field. Present iff this is the cell of a complex column.
+ * - [ value ]: the cell value, unless it has the HAS_EMPTY_VALUE_MASK.
+ * - [ path ]: the cell path if the column this is a cell of is complex.
+ */
+ static class Serializer implements Cell.Serializer
+ {
+ private final static int IS_DELETED_MASK = 0x01; // Whether the cell is a tombstone or not.
+ private final static int IS_EXPIRING_MASK = 0x02; // Whether the cell is expiring.
+ private final static int HAS_EMPTY_VALUE_MASK = 0x04; // Wether the cell has an empty value. This will be the case for tombstone in particular.
+ private final static int USE_ROW_TIMESTAMP_MASK = 0x08; // Wether the cell has the same timestamp than the row this is a cell of.
+ private final static int USE_ROW_TTL_MASK = 0x10; // Wether the cell has the same ttl than the row this is a cell of.
+
+ public void serialize(Cell cell, ColumnDefinition column, DataOutputPlus out, LivenessInfo rowLiveness, SerializationHeader header) throws IOException
+ {
+ assert cell != null;
+ boolean hasValue = cell.value().hasRemaining();
+ boolean isDeleted = cell.isTombstone();
+ boolean isExpiring = cell.isExpiring();
+ boolean useRowTimestamp = !rowLiveness.isEmpty() && cell.timestamp() == rowLiveness.timestamp();
+ boolean useRowTTL = isExpiring && rowLiveness.isExpiring() && cell.ttl() == rowLiveness.ttl() && cell.localDeletionTime() == rowLiveness.localExpirationTime();
+ int flags = 0;
+ if (!hasValue)
+ flags |= HAS_EMPTY_VALUE_MASK;
+
+ if (isDeleted)
+ flags |= IS_DELETED_MASK;
+ else if (isExpiring)
+ flags |= IS_EXPIRING_MASK;
+
+ if (useRowTimestamp)
+ flags |= USE_ROW_TIMESTAMP_MASK;
+ if (useRowTTL)
+ flags |= USE_ROW_TTL_MASK;
+
+ out.writeByte((byte)flags);
+
+ if (!useRowTimestamp)
+ header.writeTimestamp(cell.timestamp(), out);
+
+ if ((isDeleted || isExpiring) && !useRowTTL)
+ header.writeLocalDeletionTime(cell.localDeletionTime(), out);
+ if (isExpiring && !useRowTTL)
+ header.writeTTL(cell.ttl(), out);
+
+ if (column.isComplex())
+ column.cellPathSerializer().serialize(cell.path(), out);
+
+ if (hasValue)
+ header.getType(column).writeValue(cell.value(), out);
+ }
+
+ public Cell deserialize(DataInputPlus in, LivenessInfo rowLiveness, ColumnDefinition column, SerializationHeader header, SerializationHelper helper) throws IOException
+ {
+ int flags = in.readUnsignedByte();
+ boolean hasValue = (flags & HAS_EMPTY_VALUE_MASK) == 0;
+ boolean isDeleted = (flags & IS_DELETED_MASK) != 0;
+ boolean isExpiring = (flags & IS_EXPIRING_MASK) != 0;
+ boolean useRowTimestamp = (flags & USE_ROW_TIMESTAMP_MASK) != 0;
+ boolean useRowTTL = (flags & USE_ROW_TTL_MASK) != 0;
+
+ long timestamp = useRowTimestamp ? rowLiveness.timestamp() : header.readTimestamp(in);
+
+ int localDeletionTime = useRowTTL
+ ? rowLiveness.localExpirationTime()
+ : (isDeleted || isExpiring ? header.readLocalDeletionTime(in) : NO_DELETION_TIME);
+
+ int ttl = useRowTTL ? rowLiveness.ttl() : (isExpiring ? header.readTTL(in) : NO_TTL);
+
+ CellPath path = column.isComplex()
+ ? column.cellPathSerializer().deserialize(in)
+ : null;
+
+ boolean isCounter = localDeletionTime == NO_DELETION_TIME && column.type.isCounter();
+
+ ByteBuffer value = ByteBufferUtil.EMPTY_BYTE_BUFFER;
+ if (hasValue)
+ {
+ if (helper.canSkipValue(column) || (path != null && helper.canSkipValue(path)))
+ {
+ header.getType(column).skipValue(in);
+ }
+ else
+ {
+ value = header.getType(column).readValue(in, DatabaseDescriptor.getMaxValueSize());
+ if (isCounter)
+ value = helper.maybeClearCounterValue(value);
+ }
+ }
+
+ return new BufferCell(column, timestamp, ttl, localDeletionTime, value, path);
+ }
+
+ public long serializedSize(Cell cell, ColumnDefinition column, LivenessInfo rowLiveness, SerializationHeader header)
+ {
+ long size = 1; // flags
+ boolean hasValue = cell.value().hasRemaining();
+ boolean isDeleted = cell.isTombstone();
+ boolean isExpiring = cell.isExpiring();
+ boolean useRowTimestamp = !rowLiveness.isEmpty() && cell.timestamp() == rowLiveness.timestamp();
+ boolean useRowTTL = isExpiring && rowLiveness.isExpiring() && cell.ttl() == rowLiveness.ttl() && cell.localDeletionTime() == rowLiveness.localExpirationTime();
+
+ if (!useRowTimestamp)
+ size += header.timestampSerializedSize(cell.timestamp());
+
+ if ((isDeleted || isExpiring) && !useRowTTL)
+ size += header.localDeletionTimeSerializedSize(cell.localDeletionTime());
+ if (isExpiring && !useRowTTL)
+ size += header.ttlSerializedSize(cell.ttl());
+
+ if (column.isComplex())
+ size += column.cellPathSerializer().serializedSize(cell.path());
+
+ if (hasValue)
+ size += header.getType(column).writtenLength(cell.value());
+
+ return size;
+ }
+
+ // Returns if the skipped cell was an actual cell (i.e. it had its presence flag).
+ public boolean skip(DataInputPlus in, ColumnDefinition column, SerializationHeader header) throws IOException
+ {
+ int flags = in.readUnsignedByte();
+ boolean hasValue = (flags & HAS_EMPTY_VALUE_MASK) == 0;
+ boolean isDeleted = (flags & IS_DELETED_MASK) != 0;
+ boolean isExpiring = (flags & IS_EXPIRING_MASK) != 0;
+ boolean useRowTimestamp = (flags & USE_ROW_TIMESTAMP_MASK) != 0;
+ boolean useRowTTL = (flags & USE_ROW_TTL_MASK) != 0;
+
+ if (!useRowTimestamp)
+ header.skipTimestamp(in);
+
+ if (!useRowTTL && (isDeleted || isExpiring))
+ header.skipLocalDeletionTime(in);
+
+ if (!useRowTTL && isExpiring)
+ header.skipTTL(in);
+
+ if (column.isComplex())
+ column.cellPathSerializer().skip(in);
+
+ if (hasValue)
+ header.getType(column).skipValue(in);
+
+ return true;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c231ed5b/src/java/org/apache/cassandra/db/rows/Cell.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/rows/Cell.java
index d10cc74,0000000..c69e11f
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/db/rows/Cell.java
+++ b/src/java/org/apache/cassandra/db/rows/Cell.java
@@@ -1,157 -1,0 +1,166 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.rows;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Comparator;
+
++import com.google.common.annotations.VisibleForTesting;
++import org.slf4j.Logger;
++import org.slf4j.LoggerFactory;
++
++import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.ColumnDefinition;
++import org.apache.cassandra.cql3.Attributes;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.utils.memory.AbstractAllocator;
+
+/**
+ * A cell is our atomic unit for a single value of a single column.
+ * <p>
+ * A cell always holds at least a timestamp that gives us how the cell reconcile. We then
+ * have 3 main types of cells:
+ * 1) live regular cells: those will also have a value and, if for a complex column, a path.
+ * 2) expiring cells: on top of regular cells, those have a ttl and a local deletion time (when they are expired).
+ * 3) tombstone cells: those won't have value, but they have a local deletion time (when the tombstone was created).
+ */
+public abstract class Cell extends ColumnData
+{
+ public static final int NO_TTL = 0;
+ public static final int NO_DELETION_TIME = Integer.MAX_VALUE;
++ public static final int MAX_DELETION_TIME = Integer.MAX_VALUE - 1;
+
+ public final static Comparator<Cell> comparator = (c1, c2) ->
+ {
+ int cmp = c1.column().compareTo(c2.column());
+ if (cmp != 0)
+ return cmp;
+
+ Comparator<CellPath> pathComparator = c1.column().cellPathComparator();
+ return pathComparator == null ? 0 : pathComparator.compare(c1.path(), c2.path());
+ };
+
+ public static final Serializer serializer = new BufferCell.Serializer();
+
+ protected Cell(ColumnDefinition column)
+ {
+ super(column);
+ }
+
+ /**
+ * Whether the cell is a counter cell or not.
+ *
+ * @return whether the cell is a counter cell or not.
+ */
+ public abstract boolean isCounterCell();
+
+ /**
+ * The cell value.
+ *
+ * @return the cell value.
+ */
+ public abstract ByteBuffer value();
+
+ /**
+ * The cell timestamp.
+ * <p>
+ * @return the cell timestamp.
+ */
+ public abstract long timestamp();
+
+ /**
+ * The cell ttl.
+ *
+ * @return the cell ttl, or {@code NO_TTL} if the cell isn't an expiring one.
+ */
+ public abstract int ttl();
+
+ /**
+ * The cell local deletion time.
+ *
+ * @return the cell local deletion time, or {@code NO_DELETION_TIME} if the cell is neither
+ * a tombstone nor an expiring one.
+ */
+ public abstract int localDeletionTime();
+
+ /**
+ * Whether the cell is a tombstone or not.
+ *
+ * @return whether the cell is a tombstone or not.
+ */
+ public abstract boolean isTombstone();
+
+ /**
+ * Whether the cell is an expiring one or not.
+ * <p>
+ * Note that this only correspond to whether the cell liveness info
+ * have a TTL or not, but doesn't tells whether the cell is already expired
+ * or not. You should use {@link #isLive} for that latter information.
+ *
+ * @return whether the cell is an expiring one or not.
+ */
+ public abstract boolean isExpiring();
+
+ /**
+ * Whether the cell is live or not given the current time.
+ *
+ * @param nowInSec the current time in seconds. This is used to
+ * decide if an expiring cell is expired or live.
+ * @return whether the cell is live or not at {@code nowInSec}.
+ */
+ public abstract boolean isLive(int nowInSec);
+
+ /**
+ * For cells belonging to complex types (non-frozen collection and UDT), the
+ * path to the cell.
+ *
+ * @return the cell path for cells of complex column, and {@code null} for other cells.
+ */
+ public abstract CellPath path();
+
+ public abstract Cell withUpdatedColumn(ColumnDefinition newColumn);
+
+ public abstract Cell withUpdatedValue(ByteBuffer newValue);
+
++ public abstract Cell withUpdatedTimestampAndLocalDeletionTime(long newTimestamp, int newLocalDeletionTime);
++
+ public abstract Cell copy(AbstractAllocator allocator);
+
+ @Override
+ // Overrides super type to provide a more precise return type.
+ public abstract Cell markCounterLocalToBeCleared();
+
+ @Override
+ // Overrides super type to provide a more precise return type.
+ public abstract Cell purge(DeletionPurger purger, int nowInSec);
+
+ public interface Serializer
+ {
+ public void serialize(Cell cell, ColumnDefinition column, DataOutputPlus out, LivenessInfo rowLiveness, SerializationHeader header) throws IOException;
+
+ public Cell deserialize(DataInputPlus in, LivenessInfo rowLiveness, ColumnDefinition column, SerializationHeader header, SerializationHelper helper) throws IOException;
+
+ public long serializedSize(Cell cell, ColumnDefinition column, LivenessInfo rowLiveness, SerializationHeader header);
+
+ // Returns if the skipped cell was an actual cell (i.e. it had its presence flag).
+ public boolean skip(DataInputPlus in, ColumnDefinition column, SerializationHeader header) throws IOException;
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c231ed5b/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/StorageService.java
index e5a50dd,2c9ac4d..cf8e257
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@@ -2705,22 -2599,28 +2705,27 @@@ public class StorageService extends Not
return status.statusCode;
}
- public int scrub(boolean disableSnapshot, boolean skipCorrupted, String keyspaceName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException
+ public int scrub(boolean disableSnapshot, boolean skipCorrupted, String keyspaceName, String... tables) throws IOException, ExecutionException, InterruptedException
{
- return scrub(disableSnapshot, skipCorrupted, true, 0, keyspaceName, columnFamilies);
+ return scrub(disableSnapshot, skipCorrupted, true, 0, keyspaceName, tables);
}
- public int scrub(boolean disableSnapshot, boolean skipCorrupted, boolean checkData, String keyspaceName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException
+ public int scrub(boolean disableSnapshot, boolean skipCorrupted, boolean checkData, String keyspaceName, String... tables) throws IOException, ExecutionException, InterruptedException
{
- return scrub(disableSnapshot, skipCorrupted, checkData, 0, keyspaceName, columnFamilies);
+ return scrub(disableSnapshot, skipCorrupted, checkData, 0, keyspaceName, tables);
}
- public int scrub(boolean disableSnapshot, boolean skipCorrupted, boolean checkData, int jobs, String keyspaceName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException
+ public int scrub(boolean disableSnapshot, boolean skipCorrupted, boolean checkData, int jobs, String keyspaceName, String... tables) throws IOException, ExecutionException, InterruptedException
{
- return scrub(disableSnapshot, skipCorrupted, checkData, false, jobs, keyspaceName, columnFamilies);
++ return scrub(disableSnapshot, skipCorrupted, checkData, false, jobs, keyspaceName, tables);
+ }
+
- public int scrub(boolean disableSnapshot, boolean skipCorrupted, boolean checkData, boolean reinsertOverflowedTTLRows,
- int jobs, String keyspaceName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException
++ public int scrub(boolean disableSnapshot, boolean skipCorrupted, boolean checkData, boolean reinsertOverflowedTTL, int jobs, String keyspaceName, String... tables) throws IOException, ExecutionException, InterruptedException
+ {
CompactionManager.AllSSTableOpStatus status = CompactionManager.AllSSTableOpStatus.SUCCESSFUL;
- for (ColumnFamilyStore cfStore : getValidColumnFamilies(true, false, keyspaceName, columnFamilies))
+ for (ColumnFamilyStore cfStore : getValidColumnFamilies(true, false, keyspaceName, tables))
{
- CompactionManager.AllSSTableOpStatus oneStatus = cfStore.scrub(disableSnapshot, skipCorrupted, checkData, jobs);
- CompactionManager.AllSSTableOpStatus oneStatus = cfStore.scrub(disableSnapshot, skipCorrupted, checkData, reinsertOverflowedTTLRows, jobs);
++ CompactionManager.AllSSTableOpStatus oneStatus = cfStore.scrub(disableSnapshot, skipCorrupted, reinsertOverflowedTTL, checkData, jobs);
if (oneStatus != CompactionManager.AllSSTableOpStatus.SUCCESSFUL)
status = oneStatus;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c231ed5b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/StorageServiceMBean.java
index 7344ca8,f336bcc..10d47f7
--- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java
+++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
@@@ -262,14 -262,17 +262,17 @@@ public interface StorageServiceMBean ex
* Scrubbed CFs will be snapshotted first, if disableSnapshot is false
*/
@Deprecated
- public int scrub(boolean disableSnapshot, boolean skipCorrupted, String keyspaceName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException;
+ public int scrub(boolean disableSnapshot, boolean skipCorrupted, String keyspaceName, String... tableNames) throws IOException, ExecutionException, InterruptedException;
@Deprecated
- public int scrub(boolean disableSnapshot, boolean skipCorrupted, boolean checkData, String keyspaceName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException;
+ public int scrub(boolean disableSnapshot, boolean skipCorrupted, boolean checkData, String keyspaceName, String... tableNames) throws IOException, ExecutionException, InterruptedException;
+ @Deprecated
public int scrub(boolean disableSnapshot, boolean skipCorrupted, boolean checkData, int jobs, String keyspaceName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException;
-public int scrub(boolean disableSnapshot, boolean skipCorrupted, boolean checkData, boolean reinsertOverflowedTTLRows, int jobs, String keyspaceName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException;
++ public int scrub(boolean disableSnapshot, boolean skipCorrupted, boolean checkData, boolean reinsertOverflowedTTL, int jobs, String keyspaceName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException;
+
/**
* Verify (checksums of) the given keyspace.
- * If columnFamilies array is empty, all CFs are verified.
+ * If tableNames array is empty, all CFs are verified.
*
* The entire sstable will be read to ensure each cell validates if extendedVerify is true
*/
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c231ed5b/src/java/org/apache/cassandra/thrift/ThriftValidation.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/thrift/ThriftValidation.java
index 6ad791d,8bdf9dc..2ab0330
--- a/src/java/org/apache/cassandra/thrift/ThriftValidation.java
+++ b/src/java/org/apache/cassandra/thrift/ThriftValidation.java
@@@ -332,9 -315,9 +332,9 @@@ public class ThriftValidatio
if (isCommutative)
throw new org.apache.cassandra.exceptions.InvalidRequestException("invalid operation for commutative table " + metadata.cfName);
- validateTtl(cosc.column);
+ validateTtl(metadata, cosc.column);
validateColumnPath(metadata, new ColumnPath(metadata.cfName).setSuper_column((ByteBuffer)null).setColumn(cosc.column.name));
- validateColumnData(metadata, key, null, cosc.column);
+ validateColumnData(metadata, null, cosc.column);
}
if (cosc.super_column != null)
@@@ -374,11 -357,13 +374,13 @@@
if (column.ttl <= 0)
throw new org.apache.cassandra.exceptions.InvalidRequestException("ttl must be positive");
- if (column.ttl > ExpiringCell.MAX_TTL)
- throw new org.apache.cassandra.exceptions.InvalidRequestException(String.format("ttl is too large. requested (%d) maximum (%d)", column.ttl, ExpiringCell.MAX_TTL));
- Attributes.maybeApplyExpirationDateOverflowPolicy(metadata, column.ttl, false);
+ if (column.ttl > Attributes.MAX_TTL)
+ throw new org.apache.cassandra.exceptions.InvalidRequestException(String.format("ttl is too large. requested (%d) maximum (%d)", column.ttl, Attributes.MAX_TTL));
++ ExpirationDateOverflowHandling.maybeApplyExpirationDateOverflowPolicy(metadata, column.ttl, false);
}
else
{
- Attributes.maybeApplyExpirationDateOverflowPolicy(metadata, metadata.getDefaultTimeToLive(), true);
++ ExpirationDateOverflowHandling.maybeApplyExpirationDateOverflowPolicy(metadata, metadata.params.defaultTimeToLive, true);
// if it's not set, then it should be zero -- here we are just checking to make sure Thrift doesn't change that contract with us.
assert column.ttl == 0;
}
@@@ -450,9 -435,9 +452,9 @@@
/**
* Validates the data part of the column (everything in the column object but the name, which is assumed to be valid)
*/
- public static void validateColumnData(CFMetaData metadata, ByteBuffer key, ByteBuffer scName, Column column) throws org.apache.cassandra.exceptions.InvalidRequestException
+ public static void validateColumnData(CFMetaData metadata, ByteBuffer scName, Column column) throws org.apache.cassandra.exceptions.InvalidRequestException
{
- validateTtl(column);
+ validateTtl(metadata, column);
if (!column.isSetValue())
throw new org.apache.cassandra.exceptions.InvalidRequestException("Column value is required");
if (!column.isSetTimestamp())
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c231ed5b/src/java/org/apache/cassandra/tools/NodeProbe.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/tools/NodeProbe.java
index 172b505,17bef02..0d3c078
--- a/src/java/org/apache/cassandra/tools/NodeProbe.java
+++ b/src/java/org/apache/cassandra/tools/NodeProbe.java
@@@ -235,35 -228,27 +235,35 @@@ public class NodeProbe implements AutoC
public void close() throws IOException
{
- jmxc.close();
+ try
+ {
+ jmxc.close();
+ }
+ catch (ConnectException e)
+ {
+ // result of 'stopdaemon' command - i.e. if close() call fails, the daemon is shutdown
+ System.out.println("Cassandra has shutdown.");
+ }
}
- public int forceKeyspaceCleanup(int jobs, String keyspaceName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException
+ public int forceKeyspaceCleanup(int jobs, String keyspaceName, String... tables) throws IOException, ExecutionException, InterruptedException
{
- return ssProxy.forceKeyspaceCleanup(jobs, keyspaceName, columnFamilies);
+ return ssProxy.forceKeyspaceCleanup(jobs, keyspaceName, tables);
}
- public int scrub(boolean disableSnapshot, boolean skipCorrupted, boolean checkData, int jobs, String keyspaceName, String... tables) throws IOException, ExecutionException, InterruptedException
- public int scrub(boolean disableSnapshot, boolean skipCorrupted, boolean checkData, boolean reinsertOverflowedTTLRows, int jobs, String keyspaceName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException
++ public int scrub(boolean disableSnapshot, boolean skipCorrupted, boolean checkData, boolean reinsertOverflowedTTL, int jobs, String keyspaceName, String... tables) throws IOException, ExecutionException, InterruptedException
{
- return ssProxy.scrub(disableSnapshot, skipCorrupted, checkData, jobs, keyspaceName, tables);
- return ssProxy.scrub(disableSnapshot, skipCorrupted, checkData, reinsertOverflowedTTLRows, jobs, keyspaceName, columnFamilies);
++ return ssProxy.scrub(disableSnapshot, skipCorrupted, checkData, reinsertOverflowedTTL, jobs, keyspaceName, tables);
}
- public int verify(boolean extendedVerify, String keyspaceName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException
+ public int verify(boolean extendedVerify, String keyspaceName, String... tableNames) throws IOException, ExecutionException, InterruptedException
{
- return ssProxy.verify(extendedVerify, keyspaceName, columnFamilies);
+ return ssProxy.verify(extendedVerify, keyspaceName, tableNames);
}
- public int upgradeSSTables(String keyspaceName, boolean excludeCurrentVersion, int jobs, String... columnFamilies) throws IOException, ExecutionException, InterruptedException
+ public int upgradeSSTables(String keyspaceName, boolean excludeCurrentVersion, int jobs, String... tableNames) throws IOException, ExecutionException, InterruptedException
{
- return ssProxy.upgradeSSTables(keyspaceName, excludeCurrentVersion, jobs, columnFamilies);
+ return ssProxy.upgradeSSTables(keyspaceName, excludeCurrentVersion, jobs, tableNames);
}
private void checkJobs(PrintStream out, int jobs)
@@@ -288,19 -267,13 +288,19 @@@
}
}
- public void scrub(PrintStream out, boolean disableSnapshot, boolean skipCorrupted, boolean checkData, int jobs, String keyspaceName, String... tables) throws IOException, ExecutionException, InterruptedException
- public void scrub(PrintStream out, boolean disableSnapshot, boolean skipCorrupted, boolean checkData, boolean reinsertOverflowedTTLRows, int jobs, String keyspaceName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException
++ public void scrub(PrintStream out, boolean disableSnapshot, boolean skipCorrupted, boolean checkData, boolean reinsertOverflowedTTL, int jobs, String keyspaceName, String... tables) throws IOException, ExecutionException, InterruptedException
{
checkJobs(out, jobs);
- switch (scrub(disableSnapshot, skipCorrupted, checkData, jobs, keyspaceName, tables))
- if (scrub(disableSnapshot, skipCorrupted, checkData, reinsertOverflowedTTLRows, jobs, keyspaceName, columnFamilies) != 0)
++ switch (ssProxy.scrub(disableSnapshot, skipCorrupted, checkData, reinsertOverflowedTTL, jobs, keyspaceName, tables))
{
- failed = true;
- out.println("Aborted scrubbing at least one table in keyspace "+keyspaceName+", check server logs for more information.");
+ case 1:
+ failed = true;
+ out.println("Aborted scrubbing at least one table in keyspace "+keyspaceName+", check server logs for more information.");
+ break;
+ case 2:
+ failed = true;
+ out.println("Failed marking some sstables compacting in keyspace "+keyspaceName+", check server logs for more information");
+ break;
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c231ed5b/src/java/org/apache/cassandra/tools/SSTableMetadataViewer.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/tools/SSTableMetadataViewer.java
index 6076e32,8319014..19af957
--- a/src/java/org/apache/cassandra/tools/SSTableMetadataViewer.java
+++ b/src/java/org/apache/cassandra/tools/SSTableMetadataViewer.java
@@@ -90,12 -65,13 +90,13 @@@ public class SSTableMetadataViewe
{
out.printf("Minimum timestamp: %s%n", stats.minTimestamp);
out.printf("Maximum timestamp: %s%n", stats.maxTimestamp);
++ out.printf("SSTable min local deletion time: %s%n", stats.minLocalDeletionTime);
out.printf("SSTable max local deletion time: %s%n", stats.maxLocalDeletionTime);
out.printf("Compression ratio: %s%n", stats.compressionRatio);
- out.printf("Estimated droppable tombstones: %s%n", stats.getEstimatedDroppableTombstoneRatio((int) (System.currentTimeMillis() / 1000)));
+ out.printf("Estimated droppable tombstones: %s%n", stats.getEstimatedDroppableTombstoneRatio((int) (System.currentTimeMillis() / 1000) - gcgs));
out.printf("SSTable Level: %d%n", stats.sstableLevel);
out.printf("Repaired at: %d%n", stats.repairedAt);
- out.printf("Minimum replay position: %s\n", stats.commitLogLowerBound);
- out.printf("Maximum replay position: %s\n", stats.commitLogUpperBound);
+ out.printf("Replay positions covered: %s\n", stats.commitLogIntervals);
out.println("Estimated tombstone drop times:");
for (Map.Entry<Double, Long> entry : stats.estimatedTombstoneDropTime.getAsMap().entrySet())
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c231ed5b/src/java/org/apache/cassandra/tools/StandaloneScrubber.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/tools/StandaloneScrubber.java
index 4249430,f5e84c5..4778d72
--- a/src/java/org/apache/cassandra/tools/StandaloneScrubber.java
+++ b/src/java/org/apache/cassandra/tools/StandaloneScrubber.java
@@@ -122,7 -129,7 +129,7 @@@ public class StandaloneScrubbe
try (LifecycleTransaction txn = LifecycleTransaction.offline(OperationType.SCRUB, sstable))
{
txn.obsoleteOriginals(); // make sure originals are deleted and avoid NPE if index is missing, CASSANDRA-9591
- try (Scrubber scrubber = new Scrubber(cfs, txn, options.skipCorrupted, handler, !options.noValidate))
- try (Scrubber scrubber = new Scrubber(cfs, txn, options.skipCorrupted, handler, !options.noValidate, options.reinsertOverflowedTTL))
++ try (Scrubber scrubber = new Scrubber(cfs, txn, options.skipCorrupted, handler, !options.noValidate, options.reinserOverflowedTTL))
{
scrubber.scrub();
}
@@@ -199,6 -207,7 +206,7 @@@
public boolean manifestCheckOnly;
public boolean skipCorrupted;
public boolean noValidate;
- public boolean reinsertOverflowedTTL;
++ public boolean reinserOverflowedTTL;
private Options(String keyspaceName, String cfName)
{
@@@ -239,6 -248,7 +247,7 @@@
opts.manifestCheckOnly = cmd.hasOption(MANIFEST_CHECK_OPTION);
opts.skipCorrupted = cmd.hasOption(SKIP_CORRUPTED_OPTION);
opts.noValidate = cmd.hasOption(NO_VALIDATE_OPTION);
- opts.reinsertOverflowedTTL = cmd.hasOption(REINSERT_OVERFLOWED_TTL_OPTION);
++ opts.reinserOverflowedTTL = cmd.hasOption(REINSERT_OVERFLOWED_TTL_OPTION);
return opts;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c231ed5b/src/java/org/apache/cassandra/tools/nodetool/Scrub.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/tools/nodetool/Scrub.java
index 2345a85,50224a0..ead2fd4
--- a/src/java/org/apache/cassandra/tools/nodetool/Scrub.java
+++ b/src/java/org/apache/cassandra/tools/nodetool/Scrub.java
@@@ -48,11 -49,16 +49,16 @@@ public class Scrub extends NodeToolCm
description = "Do not validate columns using column validator")
private boolean noValidation = false;
- @Option(title = "jobs",
- name = {"-j", "--jobs"},
- description = "Number of sstables to scrub simultanously, set to 0 to use all available compaction threads")
- private int jobs = 2;
-
+ @Option(title = "reinsert_overflowed_ttl",
+ name = {"r", "--reinsert-overflowed-ttl"},
+ description = StandaloneScrubber.REINSERT_OVERFLOWED_TTL_OPTION_DESCRIPTION)
+ private boolean reinsertOverflowedTTL = false;
+
+ @Option(title = "jobs",
+ name = {"-j", "--jobs"},
+ description = "Number of sstables to scrub simultanously, set to 0 to use all available compaction threads")
+ private int jobs = 2;
+
@Override
public void execute(NodeProbe probe)
{
@@@ -63,13 -69,11 +69,13 @@@
{
try
{
- probe.scrub(System.out, disableSnapshot, skipCorrupted, !noValidation, jobs, keyspace, tableNames);
- probe.scrub(System.out, disableSnapshot, skipCorrupted, !noValidation, reinsertOverflowedTTL, jobs, keyspace, cfnames);
- } catch (IllegalArgumentException e)
++ probe.scrub(System.out, disableSnapshot, skipCorrupted, !noValidation, reinsertOverflowedTTL, jobs, keyspace, tableNames);
+ }
+ catch (IllegalArgumentException e)
{
throw e;
- } catch (Exception e)
+ }
+ catch (Exception e)
{
throw new RuntimeException("Error occurred during scrubbing", e);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c231ed5b/test/data/negative-local-expiration-test/table1/mc-1-big-CompressionInfo.db
----------------------------------------------------------------------
diff --cc test/data/negative-local-expiration-test/table1/mc-1-big-CompressionInfo.db
index 0000000,0000000..d759cec
new file mode 100644
Binary files differ
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c231ed5b/test/data/negative-local-expiration-test/table1/mc-1-big-Data.db
----------------------------------------------------------------------
diff --cc test/data/negative-local-expiration-test/table1/mc-1-big-Data.db
index 0000000,0000000..e7a72da
new file mode 100644
Binary files differ
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c231ed5b/test/data/negative-local-expiration-test/table1/mc-1-big-Digest.crc32
----------------------------------------------------------------------
diff --cc test/data/negative-local-expiration-test/table1/mc-1-big-Digest.crc32
index 0000000,0000000..a3c633a
new file mode 100644
--- /dev/null
+++ b/test/data/negative-local-expiration-test/table1/mc-1-big-Digest.crc32
@@@ -1,0 -1,0 +1,1 @@@
++203700622
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c231ed5b/test/data/negative-local-expiration-test/table1/mc-1-big-Filter.db
----------------------------------------------------------------------
diff --cc test/data/negative-local-expiration-test/table1/mc-1-big-Filter.db
index 0000000,0000000..a397f35
new file mode 100644
Binary files differ
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c231ed5b/test/data/negative-local-expiration-test/table1/mc-1-big-Index.db
----------------------------------------------------------------------
diff --cc test/data/negative-local-expiration-test/table1/mc-1-big-Index.db
index 0000000,0000000..d742724
new file mode 100644
Binary files differ
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c231ed5b/test/data/negative-local-expiration-test/table1/mc-1-big-Statistics.db
----------------------------------------------------------------------
diff --cc test/data/negative-local-expiration-test/table1/mc-1-big-Statistics.db
index 0000000,0000000..faf367b
new file mode 100644
Binary files differ
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c231ed5b/test/data/negative-local-expiration-test/table1/mc-1-big-Summary.db
----------------------------------------------------------------------
diff --cc test/data/negative-local-expiration-test/table1/mc-1-big-Summary.db
index 0000000,0000000..66cf70f
new file mode 100644
Binary files differ
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c231ed5b/test/data/negative-local-expiration-test/table1/mc-1-big-TOC.txt
----------------------------------------------------------------------
diff --cc test/data/negative-local-expiration-test/table1/mc-1-big-TOC.txt
index 0000000,0000000..45113dc
new file mode 100644
--- /dev/null
+++ b/test/data/negative-local-expiration-test/table1/mc-1-big-TOC.txt
@@@ -1,0 -1,0 +1,8 @@@
++CompressionInfo.db
++Data.db
++Summary.db
++Filter.db
++Statistics.db
++TOC.txt
++Digest.crc32
++Index.db
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c231ed5b/test/data/negative-local-expiration-test/table2/mc-1-big-CompressionInfo.db
----------------------------------------------------------------------
diff --cc test/data/negative-local-expiration-test/table2/mc-1-big-CompressionInfo.db
index 0000000,0000000..1759c09
new file mode 100644
Binary files differ
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c231ed5b/test/data/negative-local-expiration-test/table2/mc-1-big-Data.db
----------------------------------------------------------------------
diff --cc test/data/negative-local-expiration-test/table2/mc-1-big-Data.db
index 0000000,0000000..c1de572
new file mode 100644
Binary files differ
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c231ed5b/test/data/negative-local-expiration-test/table2/mc-1-big-Digest.crc32
----------------------------------------------------------------------
diff --cc test/data/negative-local-expiration-test/table2/mc-1-big-Digest.crc32
index 0000000,0000000..0403b5b
new file mode 100644
--- /dev/null
+++ b/test/data/negative-local-expiration-test/table2/mc-1-big-Digest.crc32
@@@ -1,0 -1,0 +1,1 @@@
++82785930
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c231ed5b/test/data/negative-local-expiration-test/table2/mc-1-big-Filter.db
----------------------------------------------------------------------
diff --cc test/data/negative-local-expiration-test/table2/mc-1-big-Filter.db
index 0000000,0000000..a397f35
new file mode 100644
Binary files differ
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c231ed5b/test/data/negative-local-expiration-test/table2/mc-1-big-Index.db
----------------------------------------------------------------------
diff --cc test/data/negative-local-expiration-test/table2/mc-1-big-Index.db
index 0000000,0000000..a0477eb
new file mode 100644
Binary files differ
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c231ed5b/test/data/negative-local-expiration-test/table2/mc-1-big-Statistics.db
----------------------------------------------------------------------
diff --cc test/data/negative-local-expiration-test/table2/mc-1-big-Statistics.db
index 0000000,0000000..e9d6577
new file mode 100644
Binary files differ
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org