You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by pa...@apache.org on 2018/02/11 13:25:57 UTC
[12/29] cassandra git commit: Merge branch 'cassandra-2.1' into
cassandra-2.2
Merge branch 'cassandra-2.1' into cassandra-2.2
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/1602e606
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/1602e606
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/1602e606
Branch: refs/heads/cassandra-2.2
Commit: 1602e606348959aead18531cb8027afb15f276e7
Parents: aa831c9 b294943
Author: Paulo Motta <pa...@apache.org>
Authored: Sat Feb 10 14:53:20 2018 -0200
Committer: Paulo Motta <pa...@apache.org>
Committed: Sat Feb 10 14:54:56 2018 -0200
----------------------------------------------------------------------
CASSANDRA-14092.txt | 81 ++++
CHANGES.txt | 1 +
NEWS.txt | 21 +
debian/rules | 2 +-
redhat/cassandra.spec | 2 +-
.../org/apache/cassandra/cql3/Attributes.java | 79 +++-
.../cassandra/cql3/statements/CFPropDefs.java | 7 +
.../cql3/statements/ModificationStatement.java | 2 +-
.../apache/cassandra/db/AbstractNativeCell.java | 6 +
.../org/apache/cassandra/db/BufferCell.java | 6 +
.../apache/cassandra/db/BufferDeletedCell.java | 6 +
.../apache/cassandra/db/BufferExpiringCell.java | 34 +-
src/java/org/apache/cassandra/db/Cell.java | 2 +
.../apache/cassandra/db/ColumnFamilyStore.java | 11 +-
.../org/apache/cassandra/db/DeletionTime.java | 1 +
.../db/compaction/CompactionManager.java | 14 +-
.../cassandra/db/compaction/Scrubber.java | 97 ++++-
.../io/sstable/AbstractSSTableSimpleWriter.java | 17 +-
.../cassandra/service/StorageService.java | 8 +-
.../cassandra/service/StorageServiceMBean.java | 3 +
.../cassandra/thrift/ThriftValidation.java | 9 +-
.../org/apache/cassandra/tools/NodeProbe.java | 8 +-
.../cassandra/tools/StandaloneScrubber.java | 12 +-
.../apache/cassandra/tools/nodetool/Scrub.java | 8 +-
.../table1/lb-1-big-CompressionInfo.db | Bin 0 -> 43 bytes
.../table1/lb-1-big-Data.db | Bin 0 -> 103 bytes
.../table1/lb-1-big-Digest.adler32 | 1 +
.../table1/lb-1-big-Filter.db | Bin 0 -> 16 bytes
.../table1/lb-1-big-Index.db | Bin 0 -> 36 bytes
.../table1/lb-1-big-Statistics.db | Bin 0 -> 4466 bytes
.../table1/lb-1-big-Summary.db | Bin 0 -> 84 bytes
.../table1/lb-1-big-TOC.txt | 8 +
.../table2/lb-1-big-CompressionInfo.db | Bin 0 -> 43 bytes
.../table2/lb-1-big-Data.db | Bin 0 -> 98 bytes
.../table2/lb-1-big-Digest.adler32 | 1 +
.../table2/lb-1-big-Filter.db | Bin 0 -> 16 bytes
.../table2/lb-1-big-Index.db | Bin 0 -> 36 bytes
.../table2/lb-1-big-Statistics.db | Bin 0 -> 4478 bytes
.../table2/lb-1-big-Summary.db | Bin 0 -> 84 bytes
.../table2/lb-1-big-TOC.txt | 8 +
.../table3/lb-1-big-CompressionInfo.db | Bin 0 -> 43 bytes
.../table3/lb-1-big-Data.db | Bin 0 -> 182 bytes
.../table3/lb-1-big-Digest.adler32 | 1 +
.../table3/lb-1-big-Filter.db | Bin 0 -> 16 bytes
.../table3/lb-1-big-Index.db | Bin 0 -> 36 bytes
.../table3/lb-1-big-Statistics.db | Bin 0 -> 4482 bytes
.../table3/lb-1-big-Summary.db | Bin 0 -> 84 bytes
.../table3/lb-1-big-TOC.txt | 8 +
.../table4/lb-1-big-CompressionInfo.db | Bin 0 -> 43 bytes
.../table4/lb-1-big-Data.db | Bin 0 -> 182 bytes
.../table4/lb-1-big-Digest.adler32 | 1 +
.../table4/lb-1-big-Filter.db | Bin 0 -> 16 bytes
.../table4/lb-1-big-Index.db | Bin 0 -> 36 bytes
.../table4/lb-1-big-Statistics.db | Bin 0 -> 4494 bytes
.../table4/lb-1-big-Summary.db | Bin 0 -> 84 bytes
.../table4/lb-1-big-TOC.txt | 8 +
.../cql3/validation/operations/TTLTest.java | 405 +++++++++++++++++++
.../unit/org/apache/cassandra/db/ScrubTest.java | 2 +-
58 files changed, 847 insertions(+), 33 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1602e606/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 0f6e61c,9332354..82da6ad
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,9 -1,5 +1,10 @@@
-2.1.20
+2.2.12
+ * Fix the inspectJvmOptions startup check (CASSANDRA-14112)
+ * Fix race that prevents submitting compaction for a table when executor is full (CASSANDRA-13801)
+ * Rely on the JVM to handle OutOfMemoryErrors (CASSANDRA-13006)
+ * Grab refs during scrub/index redistribution/cleanup (CASSANDRA-13873)
+Merged from 2.1:
+ * Protect against overflow of local expiration time (CASSANDRA-14092)
* More PEP8 compliance for cqlsh (CASSANDRA-14021)
* RPM package spec: fix permissions for installed jars and config files (CASSANDRA-14181)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1602e606/NEWS.txt
----------------------------------------------------------------------
diff --cc NEWS.txt
index 5747941,fb6b4ee..4fe3508
--- a/NEWS.txt
+++ b/NEWS.txt
@@@ -18,15 -38,9 +38,16 @@@ using the provided 'sstableupgrade' too
Upgrading
---------
- - See MAXIMUM TTL EXPIRATION DATE NOTICE above.
-
-2.1.19
++ - See MAXIMUM TTL EXPIRATION DATE NOTICE above.
+ - Cassandra is now relying on the JVM options to properly shutdown on OutOfMemoryError. By default it will
+ rely on the OnOutOfMemoryError option as the ExitOnOutOfMemoryError and CrashOnOutOfMemoryError options
+ are not supported by the older 1.7 and 1.8 JVMs. A warning will be logged at startup if none of those JVM
+ options are used. See CASSANDRA-13006 for more details.
+ - Cassandra is not logging anymore by default an Heap histogram on OutOfMemoryError. To enable that behavior
+ set the 'cassandra.printHeapHistogramOnOutOfMemoryError' System property to 'true'. See CASSANDRA-13006
+ for more details.
+
+2.2.11
======
Upgrading
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1602e606/debian/rules
----------------------------------------------------------------------
diff --cc debian/rules
index 35f5a51,70db61c..ff1d64d
--- a/debian/rules
+++ b/debian/rules
@@@ -63,7 -64,7 +63,7 @@@ binary-indep: build instal
dh_testroot
dh_installchangelogs
dh_installinit -u'start 50 2 3 4 5 . stop 50 0 1 6 .'
- dh_installdocs README.asc CHANGES.txt NEWS.txt doc/cql3/CQL.css doc/cql3/CQL.html
- dh_installdocs README.asc CHANGES.txt NEWS.txt
++ dh_installdocs README.asc CHANGES.txt NEWS.txt doc/cql3/CQL.css doc/cql3/CQL.html CASSANDRA-14092.txt
dh_installexamples tools/*.yaml
dh_bash-completion
dh_compress
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1602e606/redhat/cassandra.spec
----------------------------------------------------------------------
diff --cc redhat/cassandra.spec
index 0d4b271,a3f09b0..07c3dc5
--- a/redhat/cassandra.spec
+++ b/redhat/cassandra.spec
@@@ -114,10 -113,10 +114,10 @@@ exit
%files
%defattr(0644,root,root,0755)
--%doc CHANGES.txt LICENSE.txt README.asc NEWS.txt NOTICE.txt
-%attr(755,root,root) %{_bindir}/cassandra-cli
++%doc CHANGES.txt LICENSE.txt README.asc NEWS.txt NOTICE.txt CASSANDRA-14092.txt
%attr(755,root,root) %{_bindir}/cassandra-stress
%attr(755,root,root) %{_bindir}/cqlsh
+%attr(755,root,root) %{_bindir}/cqlsh.py
%attr(755,root,root) %{_bindir}/debug-cql
%attr(755,root,root) %{_bindir}/nodetool
%attr(755,root,root) %{_bindir}/sstablekeys
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1602e606/src/java/org/apache/cassandra/cql3/Attributes.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/cql3/Attributes.java
index 7b38e9f,23571ca..84f423a
--- a/src/java/org/apache/cassandra/cql3/Attributes.java
+++ b/src/java/org/apache/cassandra/cql3/Attributes.java
@@@ -18,17 -18,19 +18,23 @@@
package org.apache.cassandra.cql3;
import java.nio.ByteBuffer;
+import java.util.Collections;
+ import java.util.concurrent.TimeUnit;
+import com.google.common.collect.Iterables;
+ import com.google.common.annotations.VisibleForTesting;
+ import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
+ import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.cql3.functions.Function;
import org.apache.cassandra.db.ExpiringCell;
import org.apache.cassandra.db.marshal.Int32Type;
import org.apache.cassandra.db.marshal.LongType;
import org.apache.cassandra.exceptions.InvalidRequestException;
import org.apache.cassandra.serializers.MarshalException;
+import org.apache.cassandra.utils.ByteBufferUtil;
+ import org.apache.cassandra.utils.NoSpamLogger;
/**
* Utility class for the Parser to gather attributes for modification
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1602e606/src/java/org/apache/cassandra/cql3/statements/CFPropDefs.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1602e606/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1602e606/src/java/org/apache/cassandra/db/AbstractNativeCell.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1602e606/src/java/org/apache/cassandra/db/BufferExpiringCell.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1602e606/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 7e36e11,2989b9d..45908de
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@@ -905,26 -879,13 +905,26 @@@ public class ColumnFamilyStore implemen
previousFlushFailure);
logFlush();
Flush flush = new Flush(false);
- ListenableFutureTask<?> flushTask = ListenableFutureTask.create(flush, null);
- flushExecutor.submit(flushTask);
- ListenableFutureTask<?> task = ListenableFutureTask.create(flush.postFlush, null);
- postFlushExecutor.submit(task);
+ ListenableFutureTask<Void> flushTask = ListenableFutureTask.create(flush, null);
+ flushExecutor.execute(flushTask);
+ ListenableFutureTask<ReplayPosition> task = ListenableFutureTask.create(flush.postFlush);
+ postFlushExecutor.execute(task);
@SuppressWarnings("unchecked")
- ListenableFuture<ReplayPosition> future =
- ListenableFuture<?> future = Futures.allAsList(flushTask, task);
++ ListenableFuture<ReplayPosition> future =
+ // If either of the two tasks errors out, resulting future must also error out.
+ // Combine the two futures and only return post-flush result after both have completed.
+ // Note that flushTask will always yield null, but Futures.allAsList is
+ // order preserving, which is why the transform function returns the result
+ // from item 1 in it's input list (i.e. what was yielded by task).
+ Futures.transform(Futures.allAsList(flushTask, task),
+ new Function<List<Object>, ReplayPosition>()
+ {
+ public ReplayPosition apply(List<Object> input)
+ {
+ return (ReplayPosition) input.get(1);
+ }
+ });
return future;
}
}
@@@ -1610,29 -1516,48 +1610,28 @@@
return CompactionManager.instance.performCleanup(ColumnFamilyStore.this, jobs);
}
- public CompactionManager.AllSSTableOpStatus scrub(boolean disableSnapshot, boolean skipCorrupted, boolean checkData, int jobs) throws ExecutionException, InterruptedException
+ public CompactionManager.AllSSTableOpStatus scrub(boolean disableSnapshot, boolean skipCorrupted, boolean checkData, boolean reinsertOverflowedTTLRows, int jobs) throws ExecutionException, InterruptedException
{
- return scrub(disableSnapshot, skipCorrupted, false, checkData, jobs);
- // skip snapshot creation during scrub, SEE JIRA 5891
- if(!disableSnapshot)
- snapshotWithoutFlush("pre-scrub-" + System.currentTimeMillis());
- return CompactionManager.instance.performScrub(ColumnFamilyStore.this, skipCorrupted, checkData, reinsertOverflowedTTLRows, jobs);
- }
-
- public CompactionManager.AllSSTableOpStatus sstablesRewrite(boolean excludeCurrentVersion, int jobs) throws ExecutionException, InterruptedException
- {
- return CompactionManager.instance.performSSTableRewrite(ColumnFamilyStore.this, excludeCurrentVersion, jobs);
- }
-
- public void markObsolete(Collection<SSTableReader> sstables, OperationType compactionType)
- {
- assert !sstables.isEmpty();
- data.markObsolete(sstables, compactionType);
- }
-
- void replaceFlushed(Memtable memtable, SSTableReader sstable)
- {
- compactionStrategyWrapper.replaceFlushed(memtable, sstable);
- }
-
- public boolean isValid()
- {
- return valid;
++ return scrub(disableSnapshot, skipCorrupted, false, checkData, reinsertOverflowedTTLRows, jobs);
}
- @VisibleForTesting
- public CompactionManager.AllSSTableOpStatus scrub(boolean disableSnapshot, boolean skipCorrupted, boolean alwaysFail, boolean checkData, int jobs) throws ExecutionException, InterruptedException
- public long getMemtableColumnsCount()
++ public CompactionManager.AllSSTableOpStatus scrub(boolean disableSnapshot, boolean skipCorrupted, boolean alwaysFail, boolean checkData, boolean reinsertOverflowedTTLRows, int jobs) throws ExecutionException, InterruptedException
{
- return metric.memtableColumnsCount.value();
- }
+ // skip snapshot creation during scrub, SEE JIRA 5891
+ if(!disableSnapshot)
+ snapshotWithoutFlush("pre-scrub-" + System.currentTimeMillis());
- public long getMemtableDataSize()
- {
- return metric.memtableOnHeapSize.value();
- }
+ try
+ {
- return CompactionManager.instance.performScrub(ColumnFamilyStore.this, skipCorrupted, checkData, jobs);
++ return CompactionManager.instance.performScrub(ColumnFamilyStore.this, skipCorrupted, checkData, reinsertOverflowedTTLRows, jobs);
+ }
+ catch(Throwable t)
+ {
+ if (!rebuildOnFailedScrub(t))
+ throw t;
- public int getMemtableSwitchCount()
- {
- return (int) metric.memtableSwitchCount.count();
+ return alwaysFail ? CompactionManager.AllSSTableOpStatus.ABORTED : CompactionManager.AllSSTableOpStatus.SUCCESSFUL;
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1602e606/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 3350b20,6e3634a..d90abe9
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@@ -348,9 -358,16 +348,15 @@@ public class CompactionManager implemen
}
}
- @Deprecated
- public AllSSTableOpStatus performScrub(final ColumnFamilyStore cfs, final boolean skipCorrupted, final boolean checkData, int jobs) throws InterruptedException, ExecutionException
+ public AllSSTableOpStatus performScrub(final ColumnFamilyStore cfs, final boolean skipCorrupted, final boolean checkData, int jobs)
+ throws InterruptedException, ExecutionException
{
+ return performScrub(cfs, skipCorrupted, checkData, false, jobs);
+ }
+
- public AllSSTableOpStatus performScrub(final ColumnFamilyStore cfs, final boolean skipCorrupted, final boolean checkData,
- final boolean reinsertOverflowedTTLRows, int jobs) throws InterruptedException, ExecutionException
++ public AllSSTableOpStatus performScrub(final ColumnFamilyStore cfs, final boolean skipCorrupted, final boolean checkData, final boolean reinsertOverflowedTTLRows, int jobs)
++ throws InterruptedException, ExecutionException
+ {
- assert !cfs.isIndex();
return parallelAllSSTableOperation(cfs, new OneSSTableOperation()
{
@Override
@@@ -360,30 -377,11 +366,30 @@@
}
@Override
- public void execute(SSTableReader input) throws IOException
+ public void execute(LifecycleTransaction input) throws IOException
{
- scrubOne(cfs, input, skipCorrupted, checkData);
+ scrubOne(cfs, input, skipCorrupted, checkData, reinsertOverflowedTTLRows);
}
- }, jobs);
+ }, jobs, OperationType.SCRUB);
+ }
+
+ public AllSSTableOpStatus performVerify(final ColumnFamilyStore cfs, final boolean extendedVerify) throws InterruptedException, ExecutionException
+ {
+ assert !cfs.isIndex();
+ return parallelAllSSTableOperation(cfs, new OneSSTableOperation()
+ {
+ @Override
+ public Iterable<SSTableReader> filterSSTables(LifecycleTransaction input)
+ {
+ return input.originals();
+ }
+
+ @Override
+ public void execute(LifecycleTransaction input) throws IOException
+ {
+ verifyOne(cfs, input.onlyOne(), extendedVerify);
+ }
+ }, 0, OperationType.VERIFY);
}
public AllSSTableOpStatus performSSTableRewrite(final ColumnFamilyStore cfs, final boolean excludeCurrentVersion, int jobs) throws InterruptedException, ExecutionException
@@@ -730,14 -717,14 +736,14 @@@
}
}
- private void scrubOne(ColumnFamilyStore cfs, LifecycleTransaction modifier, boolean skipCorrupted, boolean checkData) throws IOException
- private void scrubOne(ColumnFamilyStore cfs, SSTableReader sstable, boolean skipCorrupted, boolean checkData, boolean reinsertOverflowedTTLRows) throws IOException
++ private void scrubOne(ColumnFamilyStore cfs, LifecycleTransaction modifier, boolean skipCorrupted, boolean checkData, boolean reinsertOverflowedTTLRows) throws IOException
{
- Scrubber scrubber = new Scrubber(cfs, sstable, skipCorrupted, false, checkData, reinsertOverflowedTTLRows);
+ CompactionInfo.Holder scrubInfo = null;
- try (Scrubber scrubber = new Scrubber(cfs, modifier, skipCorrupted, checkData))
- CompactionInfo.Holder scrubInfo = scrubber.getScrubInfo();
- metrics.beginCompaction(scrubInfo);
- try
++ try (Scrubber scrubber = new Scrubber(cfs, modifier, skipCorrupted, checkData, reinsertOverflowedTTLRows))
{
+ scrubInfo = scrubber.getScrubInfo();
+ metrics.beginCompaction(scrubInfo);
scrubber.scrub();
}
finally
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1602e606/src/java/org/apache/cassandra/db/compaction/Scrubber.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/compaction/Scrubber.java
index b6b20fb,6d4537c..affee11
--- a/src/java/org/apache/cassandra/db/compaction/Scrubber.java
+++ b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
@@@ -22,33 -22,30 +22,36 @@@ import java.io.*
import java.util.*;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Predicate;
+import com.google.common.base.Predicates;
import com.google.common.base.Throwables;
import com.google.common.collect.AbstractIterator;
-import com.google.common.collect.Sets;
import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
+ import org.apache.cassandra.db.composites.CellNames;
import org.apache.cassandra.io.sstable.*;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.sstable.format.SSTableWriter;
import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.io.util.RandomAccessReader;
import org.apache.cassandra.service.ActiveRepairService;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.JVMStabilityInspector;
import org.apache.cassandra.utils.OutputHandler;
+ import org.apache.cassandra.utils.memory.HeapAllocator;
+import org.apache.cassandra.utils.UUIDGen;
+import org.apache.cassandra.utils.concurrent.Refs;
public class Scrubber implements Closeable
{
private final ColumnFamilyStore cfs;
private final SSTableReader sstable;
+ private final LifecycleTransaction transaction;
private final File destination;
private final boolean skipCorrupted;
- public final boolean validateColumns;
+ private final boolean reinsertOverflowedTTLRows;
private final CompactionController controller;
private final boolean isCommutative;
@@@ -84,20 -81,27 +88,28 @@@
};
private final SortedSet<Row> outOfOrderRows = new TreeSet<>(rowComparator);
- public Scrubber(ColumnFamilyStore cfs, SSTableReader sstable, boolean skipCorrupted, boolean isOffline, boolean checkData) throws IOException
+ public Scrubber(ColumnFamilyStore cfs, LifecycleTransaction transaction, boolean skipCorrupted, boolean checkData) throws IOException
{
- this(cfs, transaction, skipCorrupted, new OutputHandler.LogOutput(), checkData);
- this(cfs, sstable, skipCorrupted, isOffline, checkData, false);
++ this(cfs, transaction, skipCorrupted, new OutputHandler.LogOutput(), checkData, false);
+ }
+
- public Scrubber(ColumnFamilyStore cfs, SSTableReader sstable, boolean skipCorrupted, boolean isOffline, boolean checkData,
++ public Scrubber(ColumnFamilyStore cfs, LifecycleTransaction transaction, boolean skipCorrupted, boolean checkData,
+ boolean reinsertOverflowedTTLRows) throws IOException
+ {
- this(cfs, sstable, skipCorrupted, new OutputHandler.LogOutput(), isOffline, checkData, reinsertOverflowedTTLRows);
++ this(cfs, transaction, skipCorrupted, new OutputHandler.LogOutput(), checkData, reinsertOverflowedTTLRows);
}
- public Scrubber(ColumnFamilyStore cfs, SSTableReader sstable, boolean skipCorrupted, OutputHandler outputHandler, boolean isOffline, boolean checkData,
+ @SuppressWarnings("resource")
- public Scrubber(ColumnFamilyStore cfs, LifecycleTransaction transaction, boolean skipCorrupted, OutputHandler outputHandler, boolean checkData) throws IOException
++ public Scrubber(ColumnFamilyStore cfs, LifecycleTransaction transaction, boolean skipCorrupted, OutputHandler outputHandler, boolean checkData,
+ boolean reinsertOverflowedTTLRows) throws IOException
{
this.cfs = cfs;
- this.sstable = sstable;
+ this.transaction = transaction;
+ this.sstable = transaction.onlyOne();
this.outputHandler = outputHandler;
this.skipCorrupted = skipCorrupted;
- this.isOffline = isOffline;
- this.validateColumns = checkData;
+ this.rowIndexEntrySerializer = sstable.descriptor.version.getSSTableFormat().getIndexSerializer(sstable.metadata);
+ this.reinsertOverflowedTTLRows = reinsertOverflowedTTLRows;
List<SSTableReader> toScrub = Collections.singletonList(sstable);
@@@ -322,7 -339,7 +339,7 @@@
// OrderCheckerIterator will check, at iteration time, that the cells are in the proper order. If it detects
// that one cell is out of order, it will stop returning them. The remaining cells will be sorted and added
// to the outOfOrderRows that will be later written to a new SSTable.
- OrderCheckerIterator atoms = new OrderCheckerIterator(new SSTableIdentityIterator(sstable, dataFile, key, checkData),
- OrderCheckerIterator atoms = new OrderCheckerIterator(getIterator(key, dataSize),
++ OrderCheckerIterator atoms = new OrderCheckerIterator(getIterator(key),
cfs.metadata.comparator.onDiskAtomComparator());
if (prevKey != null && prevKey.compareTo(key) > 0)
{
@@@ -342,6 -359,18 +359,18 @@@
return true;
}
+ /**
+ * Only wrap with {@link FixNegativeLocalDeletionTimeIterator} if {@link #reinsertOverflowedTTLRows} option
+ * is specified
+ */
- private OnDiskAtomIterator getIterator(DecoratedKey key, long dataSize)
++ private OnDiskAtomIterator getIterator(DecoratedKey key)
+ {
- SSTableIdentityIterator sstableIdentityIterator = new SSTableIdentityIterator(sstable, dataFile, key, dataSize, validateColumns);
++ SSTableIdentityIterator sstableIdentityIterator = new SSTableIdentityIterator(sstable, dataFile, key, checkData);
+ return reinsertOverflowedTTLRows ? new FixNegativeLocalDeletionTimeIterator(sstableIdentityIterator,
+ outputHandler,
+ negativeLocalDeletionInfoMetrics) : sstableIdentityIterator;
+ }
+
private void updateIndexKey()
{
currentIndexKey = nextIndexKey;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1602e606/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
index 6896062,d718765..e416c7b
--- a/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
@@@ -31,8 -32,8 +32,9 @@@ import org.apache.cassandra.cql3.Attrib
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.context.CounterContext;
import org.apache.cassandra.dht.IPartitioner;
+ import org.apache.cassandra.exceptions.InvalidRequestException;
-import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
+import org.apache.cassandra.io.sstable.format.SSTableFormat;
+import org.apache.cassandra.io.sstable.format.SSTableWriter;
import org.apache.cassandra.service.ActiveRepairService;
import org.apache.cassandra.utils.CounterId;
import org.apache.cassandra.utils.Pair;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1602e606/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/StorageService.java
index 1ecedac,a7a8ca7..2c9ac4d
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@@ -2611,10 -2415,16 +2611,16 @@@ public class StorageService extends Not
public int scrub(boolean disableSnapshot, boolean skipCorrupted, boolean checkData, int jobs, String keyspaceName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException
{
+ return scrub(disableSnapshot, skipCorrupted, checkData, false, jobs, keyspaceName, columnFamilies);
+ }
+
+ public int scrub(boolean disableSnapshot, boolean skipCorrupted, boolean checkData, boolean reinsertOverflowedTTLRows,
+ int jobs, String keyspaceName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException
+ {
CompactionManager.AllSSTableOpStatus status = CompactionManager.AllSSTableOpStatus.SUCCESSFUL;
- for (ColumnFamilyStore cfStore : getValidColumnFamilies(false, false, keyspaceName, columnFamilies))
+ for (ColumnFamilyStore cfStore : getValidColumnFamilies(true, false, keyspaceName, columnFamilies))
{
- CompactionManager.AllSSTableOpStatus oneStatus = cfStore.scrub(disableSnapshot, skipCorrupted, checkData, jobs);
+ CompactionManager.AllSSTableOpStatus oneStatus = cfStore.scrub(disableSnapshot, skipCorrupted, checkData, reinsertOverflowedTTLRows, jobs);
if (oneStatus != CompactionManager.AllSSTableOpStatus.SUCCESSFUL)
status = oneStatus;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1602e606/src/java/org/apache/cassandra/service/StorageServiceMBean.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/StorageServiceMBean.java
index 761eed6,90c0fb5..f336bcc
--- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java
+++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
@@@ -265,15 -274,10 +265,18 @@@ public interface StorageServiceMBean ex
public int scrub(boolean disableSnapshot, boolean skipCorrupted, String keyspaceName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException;
@Deprecated
public int scrub(boolean disableSnapshot, boolean skipCorrupted, boolean checkData, String keyspaceName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException;
+ @Deprecated
public int scrub(boolean disableSnapshot, boolean skipCorrupted, boolean checkData, int jobs, String keyspaceName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException;
- public int scrub(boolean disableSnapshot, boolean skipCorrupted, boolean checkData, boolean reinsertOverflowedTTLRows, int jobs, String keyspaceName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException;
++public int scrub(boolean disableSnapshot, boolean skipCorrupted, boolean checkData, boolean reinsertOverflowedTTLRows, int jobs, String keyspaceName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException;
++
+ /**
+ * Verify (checksums of) the given keyspace.
+ * If columnFamilies array is empty, all CFs are verified.
+ *
+ * The entire sstable will be read to ensure each cell validates if extendedVerify is true
+ */
+ public int verify(boolean extendedVerify, String keyspaceName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException;
/**
* Rewrite all sstables to the latest version.
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1602e606/src/java/org/apache/cassandra/thrift/ThriftValidation.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/thrift/ThriftValidation.java
index d735676,10e7185..8bdf9dc
--- a/src/java/org/apache/cassandra/thrift/ThriftValidation.java
+++ b/src/java/org/apache/cassandra/thrift/ThriftValidation.java
@@@ -312,9 -313,9 +313,9 @@@ public class ThriftValidatio
if (cosc.column != null)
{
if (isCommutative)
- throw new org.apache.cassandra.exceptions.InvalidRequestException("invalid operation for commutative columnfamily " + metadata.cfName);
+ throw new org.apache.cassandra.exceptions.InvalidRequestException("invalid operation for commutative table " + metadata.cfName);
- validateTtl(cosc.column);
+ validateTtl(metadata, cosc.column);
validateColumnPath(metadata, new ColumnPath(metadata.cfName).setSuper_column((ByteBuffer)null).setColumn(cosc.column.name));
validateColumnData(metadata, key, null, cosc.column);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1602e606/src/java/org/apache/cassandra/tools/NodeProbe.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/tools/NodeProbe.java
index 24c5874,fcd4110..17bef02
--- a/src/java/org/apache/cassandra/tools/NodeProbe.java
+++ b/src/java/org/apache/cassandra/tools/NodeProbe.java
@@@ -236,16 -243,11 +236,16 @@@ public class NodeProbe implements AutoC
return ssProxy.forceKeyspaceCleanup(jobs, keyspaceName, columnFamilies);
}
- public int scrub(boolean disableSnapshot, boolean skipCorrupted, boolean checkData, int jobs, String keyspaceName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException
+ public int scrub(boolean disableSnapshot, boolean skipCorrupted, boolean checkData, boolean reinsertOverflowedTTLRows, int jobs, String keyspaceName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException
{
- return ssProxy.scrub(disableSnapshot, skipCorrupted, checkData, jobs, keyspaceName, columnFamilies);
+ return ssProxy.scrub(disableSnapshot, skipCorrupted, checkData, reinsertOverflowedTTLRows, jobs, keyspaceName, columnFamilies);
}
+ public int verify(boolean extendedVerify, String keyspaceName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException
+ {
+ return ssProxy.verify(extendedVerify, keyspaceName, columnFamilies);
+ }
+
public int upgradeSSTables(String keyspaceName, boolean excludeCurrentVersion, int jobs, String... columnFamilies) throws IOException, ExecutionException, InterruptedException
{
return ssProxy.upgradeSSTables(keyspaceName, excludeCurrentVersion, jobs, columnFamilies);
@@@ -267,22 -268,13 +267,22 @@@
}
}
- public void scrub(PrintStream out, boolean disableSnapshot, boolean skipCorrupted, boolean checkData, int jobs, String keyspaceName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException
+ public void scrub(PrintStream out, boolean disableSnapshot, boolean skipCorrupted, boolean checkData, boolean reinsertOverflowedTTLRows, int jobs, String keyspaceName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException
{
checkJobs(out, jobs);
- if (scrub(disableSnapshot, skipCorrupted, checkData, jobs, keyspaceName, columnFamilies) != 0)
+ if (scrub(disableSnapshot, skipCorrupted, checkData, reinsertOverflowedTTLRows, jobs, keyspaceName, columnFamilies) != 0)
{
failed = true;
- out.println("Aborted scrubbing atleast one column family in keyspace "+keyspaceName+", check server logs for more information.");
+ out.println("Aborted scrubbing at least one table in keyspace "+keyspaceName+", check server logs for more information.");
+ }
+ }
+
+ public void verify(PrintStream out, boolean extendedVerify, String keyspaceName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException
+ {
+ if (verify(extendedVerify, keyspaceName, columnFamilies) != 0)
+ {
+ failed = true;
+ out.println("Aborted verifying at least one table in keyspace "+keyspaceName+", check server logs for more information.");
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1602e606/src/java/org/apache/cassandra/tools/StandaloneScrubber.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/tools/StandaloneScrubber.java
index a486a13,59d13d5..f5e84c5
--- a/src/java/org/apache/cassandra/tools/StandaloneScrubber.java
+++ b/src/java/org/apache/cassandra/tools/StandaloneScrubber.java
@@@ -119,10 -115,10 +126,10 @@@ public class StandaloneScrubbe
{
for (SSTableReader sstable : sstables)
{
- try
+ try (LifecycleTransaction txn = LifecycleTransaction.offline(OperationType.SCRUB, sstable))
{
- Scrubber scrubber = new Scrubber(cfs, sstable, options.skipCorrupted, handler, true, !options.noValidate, options.reinsertOverflowedTTL);
- try
+ txn.obsoleteOriginals(); // make sure originals are deleted and avoid NPE if index is missing, CASSANDRA-9591
- try (Scrubber scrubber = new Scrubber(cfs, txn, options.skipCorrupted, handler, !options.noValidate))
++ try (Scrubber scrubber = new Scrubber(cfs, txn, options.skipCorrupted, handler, !options.noValidate, options.reinsertOverflowedTTL))
{
scrubber.scrub();
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1602e606/src/java/org/apache/cassandra/tools/nodetool/Scrub.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/tools/nodetool/Scrub.java
index dafe8d1,0000000..50224a0
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/tools/nodetool/Scrub.java
+++ b/src/java/org/apache/cassandra/tools/nodetool/Scrub.java
@@@ -1,76 -1,0 +1,82 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.tools.nodetool;
+
+import io.airlift.command.Arguments;
+import io.airlift.command.Command;
+import io.airlift.command.Option;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.cassandra.tools.NodeProbe;
+import org.apache.cassandra.tools.NodeTool.NodeToolCmd;
++import org.apache.cassandra.tools.StandaloneScrubber;
+
+@Command(name = "scrub", description = "Scrub (rebuild sstables for) one or more tables")
+public class Scrub extends NodeToolCmd
+{
+ @Arguments(usage = "[<keyspace> <tables>...]", description = "The keyspace followed by one or many tables")
+ private List<String> args = new ArrayList<>();
+
+ @Option(title = "disable_snapshot",
+ name = {"-ns", "--no-snapshot"},
+ description = "Scrubbed CFs will be snapshotted first, if disableSnapshot is false. (default false)")
+ private boolean disableSnapshot = false;
+
+ @Option(title = "skip_corrupted",
+ name = {"-s", "--skip-corrupted"},
+ description = "Skip corrupted partitions even when scrubbing counter tables. (default false)")
+ private boolean skipCorrupted = false;
+
+ @Option(title = "no_validate",
+ name = {"-n", "--no-validate"},
+ description = "Do not validate columns using column validator")
+ private boolean noValidation = false;
+
+ @Option(title = "jobs",
+ name = {"-j", "--jobs"},
+ description = "Number of sstables to scrub simultanously, set to 0 to use all available compaction threads")
+ private int jobs = 2;
+
++ @Option(title = "reinsert_overflowed_ttl",
++ name = {"r", "--reinsert-overflowed-ttl"},
++ description = StandaloneScrubber.REINSERT_OVERFLOWED_TTL_OPTION_DESCRIPTION)
++ private boolean reinsertOverflowedTTL = false;
++
+ @Override
+ public void execute(NodeProbe probe)
+ {
+ List<String> keyspaces = parseOptionalKeyspace(args, probe);
+ String[] cfnames = parseOptionalColumnFamilies(args);
+
+ for (String keyspace : keyspaces)
+ {
+ try
+ {
- probe.scrub(System.out, disableSnapshot, skipCorrupted, !noValidation, jobs, keyspace, cfnames);
++ probe.scrub(System.out, disableSnapshot, skipCorrupted, !noValidation, reinsertOverflowedTTL, jobs, keyspace, cfnames);
+ } catch (IllegalArgumentException e)
+ {
+ throw e;
+ } catch (Exception e)
+ {
+ throw new RuntimeException("Error occurred during scrubbing", e);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1602e606/test/data/negative-local-expiration-test/table1/lb-1-big-CompressionInfo.db
----------------------------------------------------------------------
diff --cc test/data/negative-local-expiration-test/table1/lb-1-big-CompressionInfo.db
index 0000000,0000000..d7cc13b
new file mode 100644
Binary files differ
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1602e606/test/data/negative-local-expiration-test/table1/lb-1-big-Data.db
----------------------------------------------------------------------
diff --cc test/data/negative-local-expiration-test/table1/lb-1-big-Data.db
index 0000000,0000000..51213c2
new file mode 100644
Binary files differ
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1602e606/test/data/negative-local-expiration-test/table1/lb-1-big-Digest.adler32
----------------------------------------------------------------------
diff --cc test/data/negative-local-expiration-test/table1/lb-1-big-Digest.adler32
index 0000000,0000000..d5b12df
new file mode 100644
--- /dev/null
+++ b/test/data/negative-local-expiration-test/table1/lb-1-big-Digest.adler32
@@@ -1,0 -1,0 +1,1 @@@
++2292388625
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1602e606/test/data/negative-local-expiration-test/table1/lb-1-big-Filter.db
----------------------------------------------------------------------
diff --cc test/data/negative-local-expiration-test/table1/lb-1-big-Filter.db
index 0000000,0000000..f8e53be
new file mode 100644
Binary files differ
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1602e606/test/data/negative-local-expiration-test/table1/lb-1-big-Index.db
----------------------------------------------------------------------
diff --cc test/data/negative-local-expiration-test/table1/lb-1-big-Index.db
index 0000000,0000000..3ab96ee
new file mode 100644
Binary files differ
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1602e606/test/data/negative-local-expiration-test/table1/lb-1-big-Statistics.db
----------------------------------------------------------------------
diff --cc test/data/negative-local-expiration-test/table1/lb-1-big-Statistics.db
index 0000000,0000000..e8cc7e0
new file mode 100644
Binary files differ
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1602e606/test/data/negative-local-expiration-test/table1/lb-1-big-Summary.db
----------------------------------------------------------------------
diff --cc test/data/negative-local-expiration-test/table1/lb-1-big-Summary.db
index 0000000,0000000..1a3f81f
new file mode 100644
Binary files differ
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1602e606/test/data/negative-local-expiration-test/table1/lb-1-big-TOC.txt
----------------------------------------------------------------------
diff --cc test/data/negative-local-expiration-test/table1/lb-1-big-TOC.txt
index 0000000,0000000..26c7025
new file mode 100644
--- /dev/null
+++ b/test/data/negative-local-expiration-test/table1/lb-1-big-TOC.txt
@@@ -1,0 -1,0 +1,8 @@@
++Statistics.db
++Summary.db
++Digest.adler32
++Data.db
++Index.db
++CompressionInfo.db
++Filter.db
++TOC.txt
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1602e606/test/data/negative-local-expiration-test/table2/lb-1-big-CompressionInfo.db
----------------------------------------------------------------------
diff --cc test/data/negative-local-expiration-test/table2/lb-1-big-CompressionInfo.db
index 0000000,0000000..38373b4
new file mode 100644
Binary files differ
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1602e606/test/data/negative-local-expiration-test/table2/lb-1-big-Data.db
----------------------------------------------------------------------
diff --cc test/data/negative-local-expiration-test/table2/lb-1-big-Data.db
index 0000000,0000000..762a229
new file mode 100644
Binary files differ
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1602e606/test/data/negative-local-expiration-test/table2/lb-1-big-Digest.adler32
----------------------------------------------------------------------
diff --cc test/data/negative-local-expiration-test/table2/lb-1-big-Digest.adler32
index 0000000,0000000..ae89849
new file mode 100644
--- /dev/null
+++ b/test/data/negative-local-expiration-test/table2/lb-1-big-Digest.adler32
@@@ -1,0 -1,0 +1,1 @@@
++3829731931
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1602e606/test/data/negative-local-expiration-test/table2/lb-1-big-Filter.db
----------------------------------------------------------------------
diff --cc test/data/negative-local-expiration-test/table2/lb-1-big-Filter.db
index 0000000,0000000..f8e53be
new file mode 100644
Binary files differ
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1602e606/test/data/negative-local-expiration-test/table2/lb-1-big-Index.db
----------------------------------------------------------------------
diff --cc test/data/negative-local-expiration-test/table2/lb-1-big-Index.db
index 0000000,0000000..38a6e4c
new file mode 100644
Binary files differ
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1602e606/test/data/negative-local-expiration-test/table2/lb-1-big-Statistics.db
----------------------------------------------------------------------
diff --cc test/data/negative-local-expiration-test/table2/lb-1-big-Statistics.db
index 0000000,0000000..64dab43
new file mode 100644
Binary files differ
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1602e606/test/data/negative-local-expiration-test/table2/lb-1-big-Summary.db
----------------------------------------------------------------------
diff --cc test/data/negative-local-expiration-test/table2/lb-1-big-Summary.db
index 0000000,0000000..1a3f81f
new file mode 100644
Binary files differ
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1602e606/test/data/negative-local-expiration-test/table2/lb-1-big-TOC.txt
----------------------------------------------------------------------
diff --cc test/data/negative-local-expiration-test/table2/lb-1-big-TOC.txt
index 0000000,0000000..26c7025
new file mode 100644
--- /dev/null
+++ b/test/data/negative-local-expiration-test/table2/lb-1-big-TOC.txt
@@@ -1,0 -1,0 +1,8 @@@
++Statistics.db
++Summary.db
++Digest.adler32
++Data.db
++Index.db
++CompressionInfo.db
++Filter.db
++TOC.txt
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1602e606/test/data/negative-local-expiration-test/table3/lb-1-big-CompressionInfo.db
----------------------------------------------------------------------
diff --cc test/data/negative-local-expiration-test/table3/lb-1-big-CompressionInfo.db
index 0000000,0000000..04a7384
new file mode 100644
Binary files differ
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1602e606/test/data/negative-local-expiration-test/table3/lb-1-big-Data.db
----------------------------------------------------------------------
diff --cc test/data/negative-local-expiration-test/table3/lb-1-big-Data.db
index 0000000,0000000..33145df
new file mode 100644
Binary files differ
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1602e606/test/data/negative-local-expiration-test/table3/lb-1-big-Digest.adler32
----------------------------------------------------------------------
diff --cc test/data/negative-local-expiration-test/table3/lb-1-big-Digest.adler32
index 0000000,0000000..2a542cd
new file mode 100644
--- /dev/null
+++ b/test/data/negative-local-expiration-test/table3/lb-1-big-Digest.adler32
@@@ -1,0 -1,0 +1,1 @@@
++3574474340
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1602e606/test/data/negative-local-expiration-test/table3/lb-1-big-Filter.db
----------------------------------------------------------------------
diff --cc test/data/negative-local-expiration-test/table3/lb-1-big-Filter.db
index 0000000,0000000..f8e53be
new file mode 100644
Binary files differ
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1602e606/test/data/negative-local-expiration-test/table3/lb-1-big-Index.db
----------------------------------------------------------------------
diff --cc test/data/negative-local-expiration-test/table3/lb-1-big-Index.db
index 0000000,0000000..5fb34e8
new file mode 100644
Binary files differ
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1602e606/test/data/negative-local-expiration-test/table3/lb-1-big-Statistics.db
----------------------------------------------------------------------
diff --cc test/data/negative-local-expiration-test/table3/lb-1-big-Statistics.db
index 0000000,0000000..51203ae
new file mode 100644
Binary files differ
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1602e606/test/data/negative-local-expiration-test/table3/lb-1-big-Summary.db
----------------------------------------------------------------------
diff --cc test/data/negative-local-expiration-test/table3/lb-1-big-Summary.db
index 0000000,0000000..1a3f81f
new file mode 100644
Binary files differ
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1602e606/test/data/negative-local-expiration-test/table3/lb-1-big-TOC.txt
----------------------------------------------------------------------
diff --cc test/data/negative-local-expiration-test/table3/lb-1-big-TOC.txt
index 0000000,0000000..26c7025
new file mode 100644
--- /dev/null
+++ b/test/data/negative-local-expiration-test/table3/lb-1-big-TOC.txt
@@@ -1,0 -1,0 +1,8 @@@
++Statistics.db
++Summary.db
++Digest.adler32
++Data.db
++Index.db
++CompressionInfo.db
++Filter.db
++TOC.txt
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1602e606/test/data/negative-local-expiration-test/table4/lb-1-big-CompressionInfo.db
----------------------------------------------------------------------
diff --cc test/data/negative-local-expiration-test/table4/lb-1-big-CompressionInfo.db
index 0000000,0000000..c814fef
new file mode 100644
Binary files differ
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1602e606/test/data/negative-local-expiration-test/table4/lb-1-big-Data.db
----------------------------------------------------------------------
diff --cc test/data/negative-local-expiration-test/table4/lb-1-big-Data.db
index 0000000,0000000..f40e71f
new file mode 100644
Binary files differ
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1602e606/test/data/negative-local-expiration-test/table4/lb-1-big-Digest.adler32
----------------------------------------------------------------------
diff --cc test/data/negative-local-expiration-test/table4/lb-1-big-Digest.adler32
index 0000000,0000000..e6675e4
new file mode 100644
--- /dev/null
+++ b/test/data/negative-local-expiration-test/table4/lb-1-big-Digest.adler32
@@@ -1,0 -1,0 +1,1 @@@
++2405377913
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1602e606/test/data/negative-local-expiration-test/table4/lb-1-big-Filter.db
----------------------------------------------------------------------
diff --cc test/data/negative-local-expiration-test/table4/lb-1-big-Filter.db
index 0000000,0000000..f8e53be
new file mode 100644
Binary files differ
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1602e606/test/data/negative-local-expiration-test/table4/lb-1-big-Index.db
----------------------------------------------------------------------
diff --cc test/data/negative-local-expiration-test/table4/lb-1-big-Index.db
index 0000000,0000000..8291383
new file mode 100644
Binary files differ
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1602e606/test/data/negative-local-expiration-test/table4/lb-1-big-Statistics.db
----------------------------------------------------------------------
diff --cc test/data/negative-local-expiration-test/table4/lb-1-big-Statistics.db
index 0000000,0000000..2217c2d
new file mode 100644
Binary files differ
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1602e606/test/data/negative-local-expiration-test/table4/lb-1-big-Summary.db
----------------------------------------------------------------------
diff --cc test/data/negative-local-expiration-test/table4/lb-1-big-Summary.db
index 0000000,0000000..1a3f81f
new file mode 100644
Binary files differ
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1602e606/test/data/negative-local-expiration-test/table4/lb-1-big-TOC.txt
----------------------------------------------------------------------
diff --cc test/data/negative-local-expiration-test/table4/lb-1-big-TOC.txt
index 0000000,0000000..26c7025
new file mode 100644
--- /dev/null
+++ b/test/data/negative-local-expiration-test/table4/lb-1-big-TOC.txt
@@@ -1,0 -1,0 +1,8 @@@
++Statistics.db
++Summary.db
++Digest.adler32
++Data.db
++Index.db
++CompressionInfo.db
++Filter.db
++TOC.txt
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1602e606/test/unit/org/apache/cassandra/cql3/validation/operations/TTLTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/cql3/validation/operations/TTLTest.java
index 0000000,ab4ef21..b1eaac1
mode 000000,100644..100644
--- a/test/unit/org/apache/cassandra/cql3/validation/operations/TTLTest.java
+++ b/test/unit/org/apache/cassandra/cql3/validation/operations/TTLTest.java
@@@ -1,0 -1,410 +1,405 @@@
+ package org.apache.cassandra.cql3.validation.operations;
+
+ import java.io.File;
+ import java.io.FileInputStream;
+ import java.io.FileOutputStream;
+ import java.io.IOException;
+
+ import static org.junit.Assert.assertEquals;
+ import static org.junit.Assert.assertTrue;
+ import static org.junit.Assert.fail;
+
+ import org.apache.cassandra.cql3.Attributes;
+ import org.apache.cassandra.cql3.CQLTester;
+ import org.apache.cassandra.cql3.UntypedResultSet;
+ import org.apache.cassandra.db.BufferExpiringCell;
+ import org.apache.cassandra.db.ColumnFamilyStore;
+ import org.apache.cassandra.db.ExpiringCell;
+ import org.apache.cassandra.db.Keyspace;
+ import org.apache.cassandra.exceptions.InvalidRequestException;
+ import org.apache.cassandra.utils.FBUtilities;
+
+ import org.junit.Test;
+
+ public class TTLTest extends CQLTester
+ {
+ public static String NEGATIVE_LOCAL_EXPIRATION_TEST_DIR = "test/data/negative-local-expiration-test/%s";
+
+ public static int MAX_TTL = ExpiringCell.MAX_TTL;
+
+ public static final String SIMPLE_NOCLUSTERING = "table1";
+ public static final String SIMPLE_CLUSTERING = "table2";
+ public static final String COMPLEX_NOCLUSTERING = "table3";
+ public static final String COMPLEX_CLUSTERING = "table4";
+
+ @Test
+ public void testTTLPerRequestLimit() throws Throwable
+ {
+ createTable("CREATE TABLE %s (k int PRIMARY KEY, i int)");
+ // insert with low TTL should not be denied
+ execute("INSERT INTO %s (k, i) VALUES (1, 1) USING TTL ?", 10); // max ttl
+
+ try
+ {
+ execute("INSERT INTO %s (k, i) VALUES (1, 1) USING TTL ?", MAX_TTL + 1);
+ fail("Expect InvalidRequestException");
+ }
+ catch (InvalidRequestException e)
+ {
+ assertTrue(e.getMessage().contains("ttl is too large."));
+ }
+
+ try
+ {
+ execute("INSERT INTO %s (k, i) VALUES (1, 1) USING TTL ?", -1);
+ fail("Expect InvalidRequestException");
+ }
+ catch (InvalidRequestException e)
+ {
+ assertTrue(e.getMessage().contains("A TTL must be greater or equal to 0"));
+ }
+ execute("TRUNCATE %s");
+
+ // insert with low TTL should not be denied
+ execute("UPDATE %s USING TTL ? SET i = 1 WHERE k = 2", 5); // max ttl
+
+ try
+ {
+ execute("UPDATE %s USING TTL ? SET i = 1 WHERE k = 2", MAX_TTL + 1);
+ fail("Expect InvalidRequestException");
+ }
+ catch (InvalidRequestException e)
+ {
+ assertTrue(e.getMessage().contains("ttl is too large."));
+ }
+
+ try
+ {
+ execute("UPDATE %s USING TTL ? SET i = 1 WHERE k = 2", -1);
+ fail("Expect InvalidRequestException");
+ }
+ catch (InvalidRequestException e)
+ {
+ assertTrue(e.getMessage().contains("A TTL must be greater or equal to 0"));
+ }
+ }
+
+
+ @Test
+ public void testTTLDefaultLimit() throws Throwable
+ {
+ try
+ {
+ createTable("CREATE TABLE %s (k int PRIMARY KEY, i int) WITH default_time_to_live=-1");
+ fail("Expect Invalid schema");
+ }
+ catch (RuntimeException e)
+ {
+ assertTrue(e.getCause()
- .getCause()
+ .getMessage()
+ .contains("default_time_to_live cannot be smaller than 0"));
+ }
+ try
+ {
+ createTable("CREATE TABLE %s (k int PRIMARY KEY, i int) WITH default_time_to_live="
+ + (MAX_TTL + 1));
+ fail("Expect Invalid schema");
+ }
+ catch (RuntimeException e)
+ {
+ assertTrue(e.getCause()
- .getCause()
+ .getMessage()
+ .contains("default_time_to_live must be less than or equal to " + MAX_TTL + " (got "
+ + (MAX_TTL + 1) + ")"));
+ }
+
+ // table with default low TTL should not be denied
+ createTable("CREATE TABLE %s (k int PRIMARY KEY, i int) WITH default_time_to_live=" + 5);
+ execute("INSERT INTO %s (k, i) VALUES (1, 1)");
+ }
+
+ @Test
+ public void testRejectExpirationDateOverflowPolicy() throws Throwable
+ {
+ Attributes.policy = Attributes.ExpirationDateOverflowPolicy.REJECT;
+ createTable("CREATE TABLE %s (k int PRIMARY KEY, i int)");
+ try
+ {
+ execute("INSERT INTO %s (k, i) VALUES (1, 1) USING TTL " + MAX_TTL);
+ }
+ catch (InvalidRequestException e)
+ {
+ assertTrue(e.getMessage().contains("exceeds maximum supported expiration date"));
+ }
+ try
+ {
+ createTable("CREATE TABLE %s (k int PRIMARY KEY, i int) WITH default_time_to_live=" + MAX_TTL);
+ execute("INSERT INTO %s (k, i) VALUES (1, 1)");
+ }
+ catch (InvalidRequestException e)
+ {
+ assertTrue(e.getMessage().contains("exceeds maximum supported expiration date"));
+ }
+ }
+
+ @Test
+ public void testCapExpirationDatePolicyDefaultTTL() throws Throwable
+ {
+ Attributes.policy = Attributes.ExpirationDateOverflowPolicy.CAP;
+ createTable("CREATE TABLE %s (k int PRIMARY KEY, i int) WITH default_time_to_live=" + MAX_TTL);
+ execute("INSERT INTO %s (k, i) VALUES (1, 1)");
+ checkTTLIsCapped("i");
+ Attributes.policy = Attributes.ExpirationDateOverflowPolicy.REJECT;
+ }
+
+ @Test
+ public void testCapExpirationDatePolicyPerRequest() throws Throwable
+ {
+ // Test cap policy
+ Attributes.policy = Attributes.ExpirationDateOverflowPolicy.CAP;
+
+ // simple column, clustering, flush
+ baseCapExpirationDateOverflowTest(true, true, true);
+ // simple column, clustering, noflush
+ baseCapExpirationDateOverflowTest(true, true, false);
+ // simple column, noclustering, flush
+ baseCapExpirationDateOverflowTest(true, false, true);
+ // simple column, noclustering, noflush
+ baseCapExpirationDateOverflowTest(true, false, false);
+ // complex column, clustering, flush
+ baseCapExpirationDateOverflowTest(false, true, true);
+ // complex column, clustering, noflush
+ baseCapExpirationDateOverflowTest(false, true, false);
+ // complex column, noclustering, flush
+ baseCapExpirationDateOverflowTest(false, false, true);
+ // complex column, noclustering, noflush
+ baseCapExpirationDateOverflowTest(false, false, false);
+ // complex column, noclustering, flush
+ baseCapExpirationDateOverflowTest(false, false, false);
+
+ // Return to previous policy
+ Attributes.policy = Attributes.ExpirationDateOverflowPolicy.REJECT;
+ }
+
+ @Test
+ public void testRecoverOverflowedExpirationWithScrub() throws Throwable
+ {
- createTable(true, true);
- createTable(true, false);
- createTable(false, true);
- createTable(false, false);
-
+ baseTestRecoverOverflowedExpiration(false, false);
+ baseTestRecoverOverflowedExpiration(true, false);
+ baseTestRecoverOverflowedExpiration(true, true);
+ }
+
+ public void baseCapExpirationDateOverflowTest(boolean simple, boolean clustering, boolean flush) throws Throwable
+ {
+ // Create Table
+ if (simple)
+ {
+ if (clustering)
+ createTable("create table %s (k int, a int, b int, primary key(k, a))");
+ else
+ createTable("create table %s (k int primary key, a int, b int)");
+ }
+ else
+ {
+ if (clustering)
+ createTable("create table %s (k int, a int, b set<text>, primary key(k, a))");
+ else
+ createTable("create table %s (k int primary key, a int, b set<text>)");
+ }
+
+ // Insert data with INSERT and UPDATE
+ if (simple)
+ {
+ execute("INSERT INTO %s (k, a, b) VALUES (?, ?, ?) USING TTL " + MAX_TTL, 2, 2, 2);
+ if (clustering)
+ execute("UPDATE %s USING TTL " + MAX_TTL + " SET b = 1 WHERE k = 1 AND a = 1;");
+ else
+ execute("UPDATE %s USING TTL " + MAX_TTL + " SET a = 1, b = 1 WHERE k = 1;");
+ }
+ else
+ {
+ execute("INSERT INTO %s (k, a, b) VALUES (?, ?, ?) USING TTL " + MAX_TTL, 2, 2, set("v21", "v22", "v23", "v24"));
+ if (clustering)
+ execute("UPDATE %s USING TTL " + MAX_TTL + " SET b = ? WHERE k = 1 AND a = 1;", set("v11", "v12", "v13", "v14"));
+ else
+ execute("UPDATE %s USING TTL " + MAX_TTL + " SET a = 1, b = ? WHERE k = 1;", set("v11", "v12", "v13", "v14"));
+ }
+
+ // Maybe Flush
+ Keyspace ks = Keyspace.open(keyspace());
+ if (flush)
+ FBUtilities.waitOnFutures(ks.flush());
+
+ // Verify data
+ verifyData(simple);
+
+ // Maybe major compact
+ if (flush)
+ {
+ // Major compact and check data is still present
+ ks.getColumnFamilyStore(currentTable()).forceMajorCompaction();
+
+ // Verify data again
+ verifyData(simple);
+ }
+ }
+
+ public void baseTestRecoverOverflowedExpiration(boolean runScrub, boolean reinsertOverflowedTTL) throws Throwable
+ {
+ // simple column, clustering
+ testRecoverOverflowedExpirationWithScrub(true, true, runScrub, reinsertOverflowedTTL);
+ // simple column, noclustering
+ testRecoverOverflowedExpirationWithScrub(true, false, runScrub, reinsertOverflowedTTL);
+ // complex column, clustering
+ testRecoverOverflowedExpirationWithScrub(false, true, runScrub, reinsertOverflowedTTL);
+ // complex column, noclustering
+ testRecoverOverflowedExpirationWithScrub(false, false, runScrub, reinsertOverflowedTTL);
+ }
+
+ private void verifyData(boolean simple) throws Throwable
+ {
+ if (simple)
+ {
+ assertRows(execute("SELECT * from %s"), row(1, 1, 1), row(2, 2, 2));
+ }
+ else
+ {
+ assertRows(execute("SELECT * from %s"), row(1, 1, set("v11", "v12", "v13", "v14")), row(2, 2, set("v21", "v22", "v23", "v24")));
+ }
+ // Cannot retrieve TTL from collections
+ if (simple)
+ checkTTLIsCapped("b");
+ }
+
+ /**
+ * Verify that the computed TTL is approximately equal to the maximum allowed ttl given the
+ * {@link ExpiringCell#getLocalDeletionTime()} field limitation (CASSANDRA-14092)
+ */
+ private void checkTTLIsCapped(String field) throws Throwable
+ {
+
+ // TTL is computed dynamically from row expiration time, so if it is
+ // equal or higher to the minimum max TTL we compute before the query
+ // we are fine.
+ int minMaxTTL = computeMaxTTL();
+ UntypedResultSet execute = execute("SELECT ttl(" + field + ") FROM %s");
+ for (UntypedResultSet.Row row : execute)
+ {
+ int ttl = row.getInt("ttl(" + field + ")");
+ assertTrue(ttl >= minMaxTTL);
+ }
+ }
+
+ /**
+ * The max TTL is computed such that the TTL summed with the current time is equal to the maximum
+ * allowed expiration time {@link BufferExpiringCell#getLocalDeletionTime()} (2038-01-19T03:14:06+00:00)
+ */
+ private int computeMaxTTL()
+ {
+ int nowInSecs = (int) (System.currentTimeMillis() / 1000);
+ return BufferExpiringCell.MAX_DELETION_TIME - nowInSecs;
+ }
+
+ public void testRecoverOverflowedExpirationWithScrub(boolean simple, boolean clustering, boolean runScrub, boolean reinsertOverflowedTTL) throws Throwable
+ {
+ if (reinsertOverflowedTTL)
+ {
+ assert runScrub;
+ }
+
++ createTable(simple, clustering);
++
+ Keyspace keyspace = Keyspace.open(KEYSPACE);
- ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(getTableName(simple, clustering));
++ ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(currentTable());
+
- assertEquals(0, cfs.getLiveSSTableCount());
++ assertEquals(0, cfs.getSSTables().size());
+
- copySSTablesToTableDir(simple, clustering);
++ copySSTablesToTableDir(currentTable(), simple, clustering);
+
+ cfs.loadNewSSTables();
+
+ if (runScrub)
+ {
+ cfs.scrub(true, false, false, reinsertOverflowedTTL, 1);
+ }
+
+ if (reinsertOverflowedTTL)
+ {
+ if (simple)
- {
- UntypedResultSet execute = execute(String.format("SELECT * from %s.%s", KEYSPACE, getTableName(simple, clustering)));
- assertRows(execute, row(1, 1, 1), row(2, 2, 2));
-
- }
++ assertRows(execute("SELECT * from %s"), row(1, 1, 1), row(2, 2, 2));
+ else
- assertRows(execute(String.format("SELECT * from %s.%s", KEYSPACE, getTableName(simple, clustering))), row(1, 1, set("v11", "v12", "v13", "v14")), row(2, 2, set("v21", "v22", "v23", "v24")));
++ assertRows(execute("SELECT * from %s"), row(1, 1, set("v11", "v12", "v13", "v14")), row(2, 2, set("v21", "v22", "v23", "v24")));
+
+ cfs.forceMajorCompaction();
+
+ if (simple)
- assertRows(execute(String.format("SELECT * from %s.%s", KEYSPACE, getTableName(simple, clustering))), row(1, 1, 1), row(2, 2, 2));
++ assertRows(execute("SELECT * from %s"), row(1, 1, 1), row(2, 2, 2));
+ else
- assertRows(execute(String.format("SELECT * from %s.%s", KEYSPACE, getTableName(simple, clustering))), row(1, 1, set("v11", "v12", "v13", "v14")), row(2, 2, set("v21", "v22", "v23", "v24")));
++ assertRows(execute("SELECT * from %s"), row(1, 1, set("v11", "v12", "v13", "v14")), row(2, 2, set("v21", "v22", "v23", "v24")));
+ }
+ else
+ {
- assertEmpty(execute(String.format("SELECT * from %s.%s", KEYSPACE, getTableName(simple, clustering))));
++ assertEmpty(execute("SELECT * from %s"));
+ }
- cfs.truncateBlocking(); //cleanup for next tests
+ }
+
- private void copySSTablesToTableDir(boolean simple, boolean clustering) throws IOException
++ private void copySSTablesToTableDir(String table, boolean simple, boolean clustering) throws IOException
+ {
- File destDir = Keyspace.open(KEYSPACE).getColumnFamilyStore(getTableName(simple, clustering)).directories.getCFDirectories().iterator().next();
- File sourceDir = getTableDir(simple, clustering);
++ File destDir = Keyspace.open(keyspace()).getColumnFamilyStore(table).directories.getCFDirectories().iterator().next();
++ File sourceDir = getTableDir(table, simple, clustering);
+ for (File file : sourceDir.listFiles())
+ {
+ copyFile(file, destDir);
+ }
+ }
+
- private void createTable(boolean simple, boolean clustering) throws Throwable
++ private static File getTableDir(String table, boolean simple, boolean clustering)
++ {
++ return new File(String.format(NEGATIVE_LOCAL_EXPIRATION_TEST_DIR, getTableName(simple, clustering)));
++ }
++
++ private void createTable(boolean simple, boolean clustering)
+ {
+ if (simple)
+ {
+ if (clustering)
- execute(String.format("create table %s.%s (k int, a int, b int, primary key(k, a))", KEYSPACE, getTableName(simple, clustering)));
++ createTable("create table %s (k int, a int, b int, primary key(k, a))");
+ else
- execute(String.format("create table %s.%s (k int primary key, a int, b int)", KEYSPACE, getTableName(simple, clustering)));
++ createTable("create table %s (k int primary key, a int, b int)");
+ }
+ else
+ {
+ if (clustering)
- execute(String.format("create table %s.%s (k int, a int, b set<text>, primary key(k, a))", KEYSPACE, getTableName(simple, clustering)));
++ createTable("create table %s (k int, a int, b set<text>, primary key(k, a))");
+ else
- execute(String.format("create table %s.%s (k int primary key, a int, b set<text>)", KEYSPACE, getTableName(simple, clustering)));
++ createTable("create table %s (k int primary key, a int, b set<text>)");
+ }
+ }
+
+ private static File getTableDir(boolean simple, boolean clustering)
+ {
+ return new File(String.format(NEGATIVE_LOCAL_EXPIRATION_TEST_DIR, getTableName(simple, clustering)));
+ }
+
+ private static void copyFile(File src, File dest) throws IOException
+ {
+ byte[] buf = new byte[65536];
+ if (src.isFile())
+ {
+ File target = new File(dest, src.getName());
+ int rd;
+ FileInputStream is = new FileInputStream(src);
+ FileOutputStream os = new FileOutputStream(target);
+ while ((rd = is.read(buf)) >= 0)
+ os.write(buf, 0, rd);
+ }
+ }
+
+ public static String getTableName(boolean simple, boolean clustering)
+ {
+ if (simple)
+ return clustering ? SIMPLE_CLUSTERING : SIMPLE_NOCLUSTERING;
+ else
+ return clustering ? COMPLEX_CLUSTERING : COMPLEX_NOCLUSTERING;
+ }
+ }
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1602e606/test/unit/org/apache/cassandra/db/ScrubTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/ScrubTest.java
index 4cca7ff,4efd082..9b1ede4
--- a/test/unit/org/apache/cassandra/db/ScrubTest.java
+++ b/test/unit/org/apache/cassandra/db/ScrubTest.java
@@@ -654,129 -565,4 +654,129 @@@ public class ScrubTes
assertEquals("bar", iter.next().getString("c"));
assertEquals("boo", iter.next().getString("c"));
}
+
+ @Test /* CASSANDRA-5174 */
+ public void testScrubKeysIndex_preserveOrder() throws IOException, ExecutionException, InterruptedException
+ {
+ //If the partitioner preserves the order then SecondaryIndex uses BytesType comparator,
+ // otherwise it uses LocalByPartitionerType
+ setKeyComparator(BytesType.instance);
+ testScrubIndex(CF_INDEX1, COL_KEYS_INDEX, false, true);
+ }
+
+ @Test /* CASSANDRA-5174 */
+ public void testScrubCompositeIndex_preserveOrder() throws IOException, ExecutionException, InterruptedException
+ {
+ setKeyComparator(BytesType.instance);
+ testScrubIndex(CF_INDEX2, COL_COMPOSITES_INDEX, true, true);
+ }
+
+ @Test /* CASSANDRA-5174 */
+ public void testScrubKeysIndex() throws IOException, ExecutionException, InterruptedException
+ {
+ setKeyComparator(new LocalByPartionerType(StorageService.getPartitioner()));
+ testScrubIndex(CF_INDEX1, COL_KEYS_INDEX, false, true);
+ }
+
+ @Test /* CASSANDRA-5174 */
+ public void testScrubCompositeIndex() throws IOException, ExecutionException, InterruptedException
+ {
+ setKeyComparator(new LocalByPartionerType(StorageService.getPartitioner()));
+ testScrubIndex(CF_INDEX2, COL_COMPOSITES_INDEX, true, true);
+ }
+
+ @Test /* CASSANDRA-5174 */
+ public void testFailScrubKeysIndex() throws IOException, ExecutionException, InterruptedException
+ {
+ testScrubIndex(CF_INDEX1, COL_KEYS_INDEX, false, false);
+ }
+
+ @Test /* CASSANDRA-5174 */
+ public void testFailScrubCompositeIndex() throws IOException, ExecutionException, InterruptedException
+ {
+ testScrubIndex(CF_INDEX2, COL_COMPOSITES_INDEX, true, false);
+ }
+
+ @Test /* CASSANDRA-5174 */
+ public void testScrubTwice() throws IOException, ExecutionException, InterruptedException
+ {
+ testScrubIndex(CF_INDEX1, COL_KEYS_INDEX, false, true, true);
+ }
+
+ /** The SecondaryIndex class is used for custom indexes so to avoid
+ * making a public final field into a private field with getters
+ * and setters, we resort to this hack in order to test it properly
+ * since it can have two values which influence the scrubbing behavior.
+ * @param comparator - the key comparator we want to test
+ */
+ private void setKeyComparator(AbstractType<?> comparator)
+ {
+ try
+ {
+ Field keyComparator = SecondaryIndex.class.getDeclaredField("keyComparator");
+ keyComparator.setAccessible(true);
+ int modifiers = keyComparator.getModifiers();
+ Field modifierField = keyComparator.getClass().getDeclaredField("modifiers");
+ modifiers = modifiers & ~Modifier.FINAL;
+ modifierField.setAccessible(true);
+ modifierField.setInt(keyComparator, modifiers);
+
+ keyComparator.set(null, comparator);
+ }
+ catch (Exception ex)
+ {
+ fail("Failed to change key comparator in secondary index : " + ex.getMessage());
+ ex.printStackTrace();
+ }
+ }
+
+ private void testScrubIndex(String cfName, String colName, boolean composite, boolean ... scrubs)
+ throws IOException, ExecutionException, InterruptedException
+ {
+ CompactionManager.instance.disableAutoCompaction();
+ Keyspace keyspace = Keyspace.open(KEYSPACE);
+ ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(cfName);
+ cfs.clearUnsafe();
+
+ int numRows = 1000;
+ long[] colValues = new long [numRows * 2]; // each row has two columns
+ for (int i = 0; i < colValues.length; i+=2)
+ {
+ colValues[i] = (i % 4 == 0 ? 1L : 2L); // index column
+ colValues[i+1] = 3L; //other column
+ }
+ fillIndexCF(cfs, composite, colValues);
+
+ // check index
+ IndexExpression expr = new IndexExpression(ByteBufferUtil.bytes(colName), Operator.EQ, ByteBufferUtil.bytes(1L));
+ List<Row> rows = cfs.search(Util.range("", ""), Arrays.asList(expr), new IdentityQueryFilter(), numRows);
+ assertNotNull(rows);
+ assertEquals(numRows / 2, rows.size());
+
+ // scrub index
+ Set<ColumnFamilyStore> indexCfss = cfs.indexManager.getIndexesBackedByCfs();
+ assertTrue(indexCfss.size() == 1);
+ for(ColumnFamilyStore indexCfs : indexCfss)
+ {
+ for (int i = 0; i < scrubs.length; i++)
+ {
+ boolean failure = !scrubs[i];
+ if (failure)
+ { //make sure the next scrub fails
+ overrideWithGarbage(indexCfs.getSSTables().iterator().next(), ByteBufferUtil.bytes(1L), ByteBufferUtil.bytes(2L));
+ }
- CompactionManager.AllSSTableOpStatus result = indexCfs.scrub(false, false, true, true, 0);
++ CompactionManager.AllSSTableOpStatus result = indexCfs.scrub(false, false, true, true, true, 0);
+ assertEquals(failure ?
+ CompactionManager.AllSSTableOpStatus.ABORTED :
+ CompactionManager.AllSSTableOpStatus.SUCCESSFUL,
+ result);
+ }
+ }
+
+
+ // check index is still working
+ rows = cfs.search(Util.range("", ""), Arrays.asList(expr), new IdentityQueryFilter(), numRows);
+ assertNotNull(rows);
+ assertEquals(numRows / 2, rows.size());
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org