You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by bl...@apache.org on 2018/11/13 11:05:09 UTC
[1/3] cassandra git commit: Correct sstable sorting for
garbagecollect and levelled compaction
Repository: cassandra
Updated Branches:
refs/heads/cassandra-3.11 d17836dec -> a03424ef9
refs/heads/trunk caf50de31 -> b80f6c65f
Correct sstable sorting for garbagecollect and levelled compaction
patch by Branimir Lambov and Vincent White; reviewed by Zhao Yang for CASSANDRA-14879
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/a03424ef
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/a03424ef
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/a03424ef
Branch: refs/heads/cassandra-3.11
Commit: a03424ef95559c9df2bb7f86e1ac1edca1436058
Parents: d17836d
Author: Branimir Lambov <br...@datastax.com>
Authored: Wed Nov 7 13:10:39 2018 +0200
Committer: Branimir Lambov <br...@datastax.com>
Committed: Tue Nov 13 12:50:08 2018 +0200
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../db/SinglePartitionReadCommand.java | 4 +-
.../db/compaction/CompactionManager.java | 2 +-
.../db/compaction/LeveledManifest.java | 5 +-
.../io/sstable/format/SSTableReader.java | 4 +-
.../tools/nodetool/GarbageCollect.java | 8 ++-
.../apache/cassandra/cql3/GcCompactionTest.java | 73 +++++++++++++++++++-
.../LeveledCompactionStrategyTest.java | 33 +++++++++
8 files changed, 119 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a03424ef/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index e07099a..83e8b08 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
3.11.4
+ * Correct sstable sorting for garbagecollect and levelled compaction (CASSANDRA-14870)
Merged from 3.0:
* Move TWCS message 'No compaction necessary for bucket size' to Trace level (CASSANDRA-14884)
* Sstable min/max metadata can cause data loss (CASSANDRA-14861)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a03424ef/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
index ed98e28..bee4961 100644
--- a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
+++ b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
@@ -728,7 +728,7 @@ public class SinglePartitionReadCommand extends ReadCommand
* In other words, iterating in maxTimestamp order allow to do our mostRecentPartitionTombstone elimination
* in one pass, and minimize the number of sstables for which we read a partition tombstone.
*/
- Collections.sort(view.sstables, SSTableReader.maxTimestampComparator);
+ Collections.sort(view.sstables, SSTableReader.maxTimestampDescending);
long mostRecentPartitionTombstone = Long.MIN_VALUE;
int nonIntersectingSSTables = 0;
List<SSTableReader> skippedSSTablesWithTombstones = null;
@@ -916,7 +916,7 @@ public class SinglePartitionReadCommand extends ReadCommand
}
/* add the SSTables on disk */
- Collections.sort(view.sstables, SSTableReader.maxTimestampComparator);
+ Collections.sort(view.sstables, SSTableReader.maxTimestampDescending);
boolean onlyUnrepaired = true;
// read sorted sstables
SSTableReadMetricsCollector metricsCollector = new SSTableReadMetricsCollector();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a03424ef/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 235fe2b..61da975 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -498,7 +498,7 @@ public class CompactionManager implements CompactionManagerMBean
if (cfStore.getCompactionStrategyManager().onlyPurgeRepairedTombstones())
originals = Iterables.filter(originals, SSTableReader::isRepaired);
List<SSTableReader> sortedSSTables = Lists.newArrayList(originals);
- Collections.sort(sortedSSTables, SSTableReader.maxTimestampComparator);
+ Collections.sort(sortedSSTables, SSTableReader.maxTimestampAscending);
return sortedSSTables;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a03424ef/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java b/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
index ceb3811..520b08d 100644
--- a/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
+++ b/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
@@ -746,10 +746,11 @@ public class LeveledManifest
return sstables;
}
- private List<SSTableReader> ageSortedSSTables(Collection<SSTableReader> candidates)
+ @VisibleForTesting
+ List<SSTableReader> ageSortedSSTables(Collection<SSTableReader> candidates)
{
List<SSTableReader> ageSortedCandidates = new ArrayList<>(candidates);
- Collections.sort(ageSortedCandidates, SSTableReader.maxTimestampComparator);
+ Collections.sort(ageSortedCandidates, SSTableReader.maxTimestampAscending);
return ageSortedCandidates;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a03424ef/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
index 2f1af58..116d489 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
@@ -154,8 +154,8 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
}
private static final RateLimiter meterSyncThrottle = RateLimiter.create(100.0);
- // Descending order
- public static final Comparator<SSTableReader> maxTimestampComparator = (o1, o2) -> Long.compare(o2.getMaxTimestamp(), o1.getMaxTimestamp());
+ public static final Comparator<SSTableReader> maxTimestampDescending = (o1, o2) -> Long.compare(o2.getMaxTimestamp(), o1.getMaxTimestamp());
+ public static final Comparator<SSTableReader> maxTimestampAscending = (o1, o2) -> Long.compare(o1.getMaxTimestamp(), o2.getMaxTimestamp());
// it's just an object, which we use regular Object equality on; we introduce a special class just for easy recognition
public static final class UniqueIdentifier {}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a03424ef/src/java/org/apache/cassandra/tools/nodetool/GarbageCollect.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/nodetool/GarbageCollect.java b/src/java/org/apache/cassandra/tools/nodetool/GarbageCollect.java
index 37daf09..baa245f 100644
--- a/src/java/org/apache/cassandra/tools/nodetool/GarbageCollect.java
+++ b/src/java/org/apache/cassandra/tools/nodetool/GarbageCollect.java
@@ -41,8 +41,10 @@ public class GarbageCollect extends NodeToolCmd
@Option(title = "jobs",
name = {"-j", "--jobs"},
- description = "Number of sstables to cleanup simultanously, set to 0 to use all available compaction threads")
- private int jobs = 2;
+ description = "Number of sstables to cleanup simultanously, set to 0 to use all available compaction " +
+ "threads. Defaults to 1 so that collections of newer tables can see the data is deleted " +
+ "and also remove tombstones.")
+ private int jobs = 1;
@Override
public void execute(NodeProbe probe)
@@ -61,4 +63,4 @@ public class GarbageCollect extends NodeToolCmd
}
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a03424ef/test/unit/org/apache/cassandra/cql3/GcCompactionTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/GcCompactionTest.java b/test/unit/org/apache/cassandra/cql3/GcCompactionTest.java
index 84a20de..548cdc1 100644
--- a/test/unit/org/apache/cassandra/cql3/GcCompactionTest.java
+++ b/test/unit/org/apache/cassandra/cql3/GcCompactionTest.java
@@ -21,6 +21,7 @@ package org.apache.cassandra.cql3;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
+import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
@@ -34,6 +35,7 @@ import org.apache.cassandra.db.compaction.CompactionManager;
import org.apache.cassandra.db.rows.*;
import org.apache.cassandra.io.sstable.ISSTableScanner;
import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.schema.CompactionParams;
import org.apache.cassandra.utils.FBUtilities;
public class GcCompactionTest extends CQLTester
@@ -149,6 +151,75 @@ public class GcCompactionTest extends CQLTester
}
@Test
+ public void testGarbageCollectOrder() throws Throwable
+ {
+ // partition-level deletions, 0 gc_grace
+ createTable("CREATE TABLE %s(" +
+ " key int," +
+ " column int," +
+ " col2 int," +
+ " data int," +
+ " extra text," +
+ " PRIMARY KEY((key, column))" +
+ ") WITH gc_grace_seconds = 0;"
+ );
+
+ assertEquals(1, getCurrentColumnFamilyStore().gcBefore(1)); // make sure gc_grace is 0
+
+ for (int i = 0; i < KEY_COUNT; ++i)
+ for (int j = 0; j < CLUSTERING_COUNT; ++j)
+ execute("INSERT INTO %s (key, column, data, extra) VALUES (?, ?, ?, ?)", i, j, i+j, "" + i + ":" + j);
+
+
+ Set<SSTableReader> readers = new HashSet<>();
+ ColumnFamilyStore cfs = getCurrentColumnFamilyStore();
+
+ flush();
+ assertEquals(1, cfs.getLiveSSTables().size());
+ SSTableReader table0 = getNewTable(readers);
+ assertEquals(0, countTombstoneMarkers(table0));
+ int rowCount0 = countRows(table0);
+
+ deleteWithSomeInserts(3, 5, 10);
+ flush();
+ assertEquals(2, cfs.getLiveSSTables().size());
+ SSTableReader table1 = getNewTable(readers);
+ final int rowCount1 = countRows(table1);
+ assertTrue(rowCount1 > 0);
+ assertTrue(countTombstoneMarkers(table1) > 0);
+
+ deleteWithSomeInserts(2, 4, 0);
+ flush();
+ assertEquals(3, cfs.getLiveSSTables().size());
+ SSTableReader table2 = getNewTable(readers);
+ assertEquals(0, countRows(table2));
+ assertTrue(countTombstoneMarkers(table2) > 0);
+
+ // Wait a little to make sure nowInSeconds is greater than gcBefore
+ Thread.sleep(1000);
+
+ CompactionManager.AllSSTableOpStatus status =
+ CompactionManager.instance.performGarbageCollection(getCurrentColumnFamilyStore(), CompactionParams.TombstoneOption.ROW, 1);
+ assertEquals(CompactionManager.AllSSTableOpStatus.SUCCESSFUL, status);
+
+ SSTableReader[] tables = cfs.getLiveSSTables().toArray(new SSTableReader[0]);
+ Arrays.sort(tables, (o1, o2) -> Integer.compare(o1.descriptor.generation, o2.descriptor.generation)); // by order of compaction
+
+ // Make sure deleted data was removed
+ assertTrue(rowCount0 > countRows(tables[0]));
+ assertTrue(rowCount1 > countRows(tables[1]));
+
+ // Make sure all tombstones got purged
+ for (SSTableReader t : tables)
+ {
+ assertEquals("Table " + t + " has tombstones", 0, countTombstoneMarkers(t));
+ }
+
+ // The last table should have become empty and be removed
+ assertEquals(2, tables.length);
+ }
+
+ @Test
public void testGcCompactionCells() throws Throwable
{
createTable("CREATE TABLE %s(" +
@@ -387,4 +458,4 @@ public class GcCompactionTest extends CQLTester
}
return instances;
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a03424ef/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java b/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
index de8efd7..b1d467e 100644
--- a/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
@@ -447,4 +447,37 @@ public class LeveledCompactionStrategyTest
// the 11 tables containing key1 should all compact to 1 table
assertEquals(1, cfs.getLiveSSTables().size());
}
+
+ @Test
+ public void testCompactionCandidateOrdering() throws Exception
+ {
+ // add some data
+ byte [] b = new byte[100 * 1024];
+ new Random().nextBytes(b);
+ ByteBuffer value = ByteBuffer.wrap(b);
+ int rows = 4;
+ int columns = 10;
+ // Just keep sstables in L0 for this test
+ cfs.disableAutoCompaction();
+ for (int r = 0; r < rows; r++)
+ {
+ UpdateBuilder update = UpdateBuilder.create(cfs.metadata, String.valueOf(r));
+ for (int c = 0; c < columns; c++)
+ update.newRow("column" + c).add("val", value);
+ update.applyUnsafe();
+ cfs.forceBlockingFlush();
+ }
+ LeveledCompactionStrategy strategy = (LeveledCompactionStrategy) (cfs.getCompactionStrategyManager()).getStrategies().get(1).get(0);
+ // get readers for level 0 sstables
+ Collection<SSTableReader> sstables = strategy.manifest.getLevel(0);
+ Collection<SSTableReader> sortedCandidates = strategy.manifest.ageSortedSSTables(sstables);
+ assertTrue(String.format("More than 1 sstable required for test, found: %d .", sortedCandidates.size()), sortedCandidates.size() > 1);
+ long lastMaxTimeStamp = Long.MIN_VALUE;
+ for (SSTableReader sstable : sortedCandidates)
+ {
+ assertTrue(String.format("SStables not sorted into oldest to newest by maxTimestamp. Current sstable: %d , last sstable: %d", sstable.getMaxTimestamp(), lastMaxTimeStamp),
+ sstable.getMaxTimestamp() > lastMaxTimeStamp);
+ lastMaxTimeStamp = sstable.getMaxTimestamp();
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org
[3/3] cassandra git commit: Merge branch 'cassandra-3.11' into trunk
Posted by bl...@apache.org.
Merge branch 'cassandra-3.11' into trunk
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/b80f6c65
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/b80f6c65
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/b80f6c65
Branch: refs/heads/trunk
Commit: b80f6c65fb0b97a8c79f6da027deac06a4af9801
Parents: caf50de a03424e
Author: Branimir Lambov <br...@datastax.com>
Authored: Tue Nov 13 12:53:14 2018 +0200
Committer: Branimir Lambov <br...@datastax.com>
Committed: Tue Nov 13 13:00:39 2018 +0200
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../db/SinglePartitionReadCommand.java | 4 +-
.../db/compaction/CompactionManager.java | 2 +-
.../db/compaction/LeveledManifest.java | 5 +-
.../io/sstable/format/SSTableReader.java | 4 +-
.../tools/nodetool/GarbageCollect.java | 8 ++-
.../apache/cassandra/cql3/GcCompactionTest.java | 71 ++++++++++++++++++++
.../apache/cassandra/db/ReadCommandTest.java | 2 +-
.../LeveledCompactionStrategyTest.java | 33 +++++++++
9 files changed, 119 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b80f6c65/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 5fd28bc,83e8b08..c77e7ed
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,332 -1,5 +1,333 @@@
+4.0
+ * Lower default chunk_length_in_kb from 64kb to 16kb (CASSANDRA-13241)
+ * Startup checker should wait for count rather than percentage (CASSANDRA-14297)
+ * Fix incorrect sorting of replicas in SimpleStrategy.calculateNaturalReplicas (CASSANDRA-14862)
+ * Partitioned outbound internode TCP connections can occur when nodes restart (CASSANDRA-14358)
+ * Don't write to system_distributed.repair_history, system_traces.sessions, system_traces.events in mixed version 3.X/4.0 clusters (CASSANDRA-14841)
+ * Avoid running query to self through messaging service (CASSANDRA-14807)
+ * Allow using custom script for chronicle queue BinLog archival (CASSANDRA-14373)
+ * Transient->Full range movements mishandle consistency level upgrade (CASSANDRA-14759)
+ * ReplicaCollection follow-up (CASSANDRA-14726)
+ * Transient node receives full data requests (CASSANDRA-14762)
+ * Enable snapshot artifacts publish (CASSANDRA-12704)
+ * Introduce RangesAtEndpoint.unwrap to simplify StreamSession.addTransferRanges (CASSANDRA-14770)
+ * LOCAL_QUORUM may speculate to non-local nodes, resulting in Timeout instead of Unavailable (CASSANDRA-14735)
+ * Avoid creating empty compaction tasks after truncate (CASSANDRA-14780)
+ * Fail incremental repair prepare phase if it encounters sstables from un-finalized sessions (CASSANDRA-14763)
+ * Add a check for receiving digest response from transient node (CASSANDRA-14750)
+ * Fail query on transient replica if coordinator only expects full data (CASSANDRA-14704)
+ * Remove mentions of transient replication from repair path (CASSANDRA-14698)
+ * Fix handleRepairStatusChangedNotification to remove first then add (CASSANDRA-14720)
+ * Allow transient node to serve as a repair coordinator (CASSANDRA-14693)
+ * DecayingEstimatedHistogramReservoir.EstimatedHistogramReservoirSnapshot returns wrong value for size() and incorrectly calculates count (CASSANDRA-14696)
+ * AbstractReplicaCollection equals and hash code should throw due to conflict between order sensitive/insensitive uses (CASSANDRA-14700)
+ * Detect inconsistencies in repaired data on the read path (CASSANDRA-14145)
+ * Add checksumming to the native protocol (CASSANDRA-13304)
+ * Make AuthCache more easily extendable (CASSANDRA-14662)
+ * Extend RolesCache to include detailed role info (CASSANDRA-14497)
+ * Add fqltool compare (CASSANDRA-14619)
+ * Add fqltool replay (CASSANDRA-14618)
+ * Log keyspace in full query log (CASSANDRA-14656)
+ * Transient Replication and Cheap Quorums (CASSANDRA-14404)
+ * Log server-generated timestamp and nowInSeconds used by queries in FQL (CASSANDRA-14675)
+ * Add diagnostic events for read repairs (CASSANDRA-14668)
+ * Use consistent nowInSeconds and timestamps values within a request (CASSANDRA-14671)
+ * Add sampler for query time and expose with nodetool (CASSANDRA-14436)
+ * Clean up Message.Request implementations (CASSANDRA-14677)
+ * Disable old native protocol versions on demand (CASANDRA-14659)
+ * Allow specifying now-in-seconds in native protocol (CASSANDRA-14664)
+ * Improve BTree build performance by avoiding data copy (CASSANDRA-9989)
+ * Make monotonic read / read repair configurable (CASSANDRA-14635)
+ * Refactor CompactionStrategyManager (CASSANDRA-14621)
+ * Flush netty client messages immediately by default (CASSANDRA-13651)
+ * Improve read repair blocking behavior (CASSANDRA-10726)
+ * Add a virtual table to expose settings (CASSANDRA-14573)
+ * Fix up chunk cache handling of metrics (CASSANDRA-14628)
+ * Extend IAuthenticator to accept peer SSL certificates (CASSANDRA-14652)
+ * Incomplete handling of exceptions when decoding incoming messages (CASSANDRA-14574)
+ * Add diagnostic events for user audit logging (CASSANDRA-13668)
+ * Allow retrieving diagnostic events via JMX (CASSANDRA-14435)
+ * Add base classes for diagnostic events (CASSANDRA-13457)
+ * Clear view system metadata when dropping keyspace (CASSANDRA-14646)
+ * Allocate ReentrantLock on-demand in java11 AtomicBTreePartitionerBase (CASSANDRA-14637)
+ * Make all existing virtual tables use LocalPartitioner (CASSANDRA-14640)
+ * Revert 4.0 GC alg back to CMS (CASANDRA-14636)
+ * Remove hardcoded java11 jvm args in idea workspace files (CASSANDRA-14627)
+ * Update netty to 4.1.128 (CASSANDRA-14633)
+ * Add a virtual table to expose thread pools (CASSANDRA-14523)
+ * Add a virtual table to expose caches (CASSANDRA-14538, CASSANDRA-14626)
+ * Fix toDate function for timestamp arguments (CASSANDRA-14502)
+ * Revert running dtests by default in circleci (CASSANDRA-14614)
+ * Stream entire SSTables when possible (CASSANDRA-14556)
+ * Cell reconciliation should not depend on nowInSec (CASSANDRA-14592)
+ * Add experimental support for Java 11 (CASSANDRA-9608)
+ * Make PeriodicCommitLogService.blockWhenSyncLagsNanos configurable (CASSANDRA-14580)
+ * Improve logging in MessageInHandler's constructor (CASSANDRA-14576)
+ * Set broadcast address in internode messaging handshake (CASSANDRA-14579)
+ * Wait for schema agreement prior to building MVs (CASSANDRA-14571)
+ * Make all DDL statements idempotent and not dependent on global state (CASSANDRA-13426)
+ * Bump the hints messaging version to match the current one (CASSANDRA-14536)
+ * OffsetAwareConfigurationLoader doesn't set ssl storage port causing bind errors in CircleCI (CASSANDRA-14546)
+ * Report why native_transport_port fails to bind (CASSANDRA-14544)
+ * Optimize internode messaging protocol (CASSANDRA-14485)
+ * Internode messaging handshake sends wrong messaging version number (CASSANDRA-14540)
+ * Add a virtual table to expose active client connections (CASSANDRA-14458)
+ * Clean up and refactor client metrics (CASSANDRA-14524)
+ * Nodetool import row cache invalidation races with adding sstables to tracker (CASSANDRA-14529)
+ * Fix assertions in LWTs after TableMetadata was made immutable (CASSANDRA-14356)
+ * Abort compactions quicker (CASSANDRA-14397)
+ * Support light-weight transactions in cassandra-stress (CASSANDRA-13529)
+ * Make AsyncOneResponse use the correct timeout (CASSANDRA-14509)
+ * Add option to sanity check tombstones on reads/compactions (CASSANDRA-14467)
+ * Add a virtual table to expose all running sstable tasks (CASSANDRA-14457)
+ * Let nodetool import take a list of directories (CASSANDRA-14442)
+ * Avoid unneeded memory allocations / cpu for disabled log levels (CASSANDRA-14488)
+ * Implement virtual keyspace interface (CASSANDRA-7622)
+ * nodetool import cleanup and improvements (CASSANDRA-14417)
+ * Bump jackson version to >= 2.9.5 (CASSANDRA-14427)
+ * Allow nodetool toppartitions without specifying table (CASSANDRA-14360)
+ * Audit logging for database activity (CASSANDRA-12151)
+ * Clean up build artifacts in docs container (CASSANDRA-14432)
+ * Minor network authz improvements (Cassandra-14413)
+ * Automatic sstable upgrades (CASSANDRA-14197)
+ * Replace deprecated junit.framework.Assert usages with org.junit.Assert (CASSANDRA-14431)
+ * Cassandra-stress throws NPE if insert section isn't specified in user profile (CASSSANDRA-14426)
+ * List clients by protocol versions `nodetool clientstats --by-protocol` (CASSANDRA-14335)
+ * Improve LatencyMetrics performance by reducing write path processing (CASSANDRA-14281)
+ * Add network authz (CASSANDRA-13985)
+ * Use the correct IP/Port for Streaming when localAddress is left unbound (CASSANDRA-14389)
+ * nodetool listsnapshots is missing local system keyspace snapshots (CASSANDRA-14381)
+ * Remove StreamCoordinator.streamExecutor thread pool (CASSANDRA-14402)
+ * Rename nodetool --with-port to --print-port to disambiguate from --port (CASSANDRA-14392)
+ * Client TOPOLOGY_CHANGE messages have wrong port. (CASSANDRA-14398)
+ * Add ability to load new SSTables from a separate directory (CASSANDRA-6719)
+ * Eliminate background repair and probablistic read_repair_chance table options
+ (CASSANDRA-13910)
+ * Bind to correct local address in 4.0 streaming (CASSANDRA-14362)
+ * Use standard Amazon naming for datacenter and rack in Ec2Snitch (CASSANDRA-7839)
+ * Fix junit failure for SSTableReaderTest (CASSANDRA-14387)
+ * Abstract write path for pluggable storage (CASSANDRA-14118)
+ * nodetool describecluster should be more informative (CASSANDRA-13853)
+ * Compaction performance improvements (CASSANDRA-14261)
+ * Refactor Pair usage to avoid boxing ints/longs (CASSANDRA-14260)
+ * Add options to nodetool tablestats to sort and limit output (CASSANDRA-13889)
+ * Rename internals to reflect CQL vocabulary (CASSANDRA-14354)
+ * Add support for hybrid MIN(), MAX() speculative retry policies
+ (CASSANDRA-14293, CASSANDRA-14338, CASSANDRA-14352)
+ * Fix some regressions caused by 14058 (CASSANDRA-14353)
+ * Abstract repair for pluggable storage (CASSANDRA-14116)
+ * Add meaningful toString() impls (CASSANDRA-13653)
+ * Add sstableloader option to accept target keyspace name (CASSANDRA-13884)
+ * Move processing of EchoMessage response to gossip stage (CASSANDRA-13713)
+ * Add coordinator write metric per CF (CASSANDRA-14232)
+ * Correct and clarify SSLFactory.getSslContext method and call sites (CASSANDRA-14314)
+ * Handle static and partition deletion properly on ThrottledUnfilteredIterator (CASSANDRA-14315)
+ * NodeTool clientstats should show SSL Cipher (CASSANDRA-14322)
+ * Add ability to specify driver name and version (CASSANDRA-14275)
+ * Abstract streaming for pluggable storage (CASSANDRA-14115)
+ * Forced incremental repairs should promote sstables if they can (CASSANDRA-14294)
+ * Use Murmur3 for validation compactions (CASSANDRA-14002)
+ * Comma at the end of the seed list is interpretated as localhost (CASSANDRA-14285)
+ * Refactor read executor and response resolver, abstract read repair (CASSANDRA-14058)
+ * Add optional startup delay to wait until peers are ready (CASSANDRA-13993)
+ * Add a few options to nodetool verify (CASSANDRA-14201)
+ * CVE-2017-5929 Security vulnerability and redefine default log rotation policy (CASSANDRA-14183)
+ * Use JVM default SSL validation algorithm instead of custom default (CASSANDRA-13259)
+ * Better document in code InetAddressAndPort usage post 7544, incorporate port into UUIDGen node (CASSANDRA-14226)
+ * Fix sstablemetadata date string for minLocalDeletionTime (CASSANDRA-14132)
+ * Make it possible to change neverPurgeTombstones during runtime (CASSANDRA-14214)
+ * Remove GossipDigestSynVerbHandler#doSort() (CASSANDRA-14174)
+ * Add nodetool clientlist (CASSANDRA-13665)
+ * Revert ProtocolVersion changes from CASSANDRA-7544 (CASSANDRA-14211)
+ * Non-disruptive seed node list reload (CASSANDRA-14190)
+ * Nodetool tablehistograms to print statics for all the tables (CASSANDRA-14185)
+ * Migrate dtests to use pytest and python3 (CASSANDRA-14134)
+ * Allow storage port to be configurable per node (CASSANDRA-7544)
+ * Make sub-range selection for non-frozen collections return null instead of empty (CASSANDRA-14182)
+ * BloomFilter serialization format should not change byte ordering (CASSANDRA-9067)
+ * Remove unused on-heap BloomFilter implementation (CASSANDRA-14152)
+ * Delete temp test files on exit (CASSANDRA-14153)
+ * Make PartitionUpdate and Mutation immutable (CASSANDRA-13867)
+ * Fix CommitLogReplayer exception for CDC data (CASSANDRA-14066)
+ * Fix cassandra-stress startup failure (CASSANDRA-14106)
+ * Remove initialDirectories from CFS (CASSANDRA-13928)
+ * Fix trivial log format error (CASSANDRA-14015)
+ * Allow sstabledump to do a json object per partition (CASSANDRA-13848)
+ * Add option to optimise merkle tree comparison across replicas (CASSANDRA-3200)
+ * Remove unused and deprecated methods from AbstractCompactionStrategy (CASSANDRA-14081)
+ * Fix Distribution.average in cassandra-stress (CASSANDRA-14090)
+ * Support a means of logging all queries as they were invoked (CASSANDRA-13983)
+ * Presize collections (CASSANDRA-13760)
+ * Add GroupCommitLogService (CASSANDRA-13530)
+ * Parallelize initial materialized view build (CASSANDRA-12245)
+ * Fix flaky SecondaryIndexManagerTest.assert[Not]MarkedAsBuilt (CASSANDRA-13965)
+ * Make LWTs send resultset metadata on every request (CASSANDRA-13992)
+ * Fix flaky indexWithFailedInitializationIsNotQueryableAfterPartialRebuild (CASSANDRA-13963)
+ * Introduce leaf-only iterator (CASSANDRA-9988)
+ * Upgrade Guava to 23.3 and Airline to 0.8 (CASSANDRA-13997)
+ * Allow only one concurrent call to StatusLogger (CASSANDRA-12182)
+ * Refactoring to specialised functional interfaces (CASSANDRA-13982)
+ * Speculative retry should allow more friendly params (CASSANDRA-13876)
+ * Throw exception if we send/receive repair messages to incompatible nodes (CASSANDRA-13944)
+ * Replace usages of MessageDigest with Guava's Hasher (CASSANDRA-13291)
+ * Add nodetool cmd to print hinted handoff window (CASSANDRA-13728)
+ * Fix some alerts raised by static analysis (CASSANDRA-13799)
+ * Checksum sstable metadata (CASSANDRA-13321, CASSANDRA-13593)
+ * Add result set metadata to prepared statement MD5 hash calculation (CASSANDRA-10786)
+ * Refactor GcCompactionTest to avoid boxing (CASSANDRA-13941)
+ * Expose recent histograms in JmxHistograms (CASSANDRA-13642)
+ * Fix buffer length comparison when decompressing in netty-based streaming (CASSANDRA-13899)
+ * Properly close StreamCompressionInputStream to release any ByteBuf (CASSANDRA-13906)
+ * Add SERIAL and LOCAL_SERIAL support for cassandra-stress (CASSANDRA-13925)
+ * LCS needlessly checks for L0 STCS candidates multiple times (CASSANDRA-12961)
+ * Correctly close netty channels when a stream session ends (CASSANDRA-13905)
+ * Update lz4 to 1.4.0 (CASSANDRA-13741)
+ * Optimize Paxos prepare and propose stage for local requests (CASSANDRA-13862)
+ * Throttle base partitions during MV repair streaming to prevent OOM (CASSANDRA-13299)
+ * Use compaction threshold for STCS in L0 (CASSANDRA-13861)
+ * Fix problem with min_compress_ratio: 1 and disallow ratio < 1 (CASSANDRA-13703)
+ * Add extra information to SASI timeout exception (CASSANDRA-13677)
+ * Add incremental repair support for --hosts, --force, and subrange repair (CASSANDRA-13818)
+ * Rework CompactionStrategyManager.getScanners synchronization (CASSANDRA-13786)
+ * Add additional unit tests for batch behavior, TTLs, Timestamps (CASSANDRA-13846)
+ * Add keyspace and table name in schema validation exception (CASSANDRA-13845)
+ * Emit metrics whenever we hit tombstone failures and warn thresholds (CASSANDRA-13771)
+ * Make netty EventLoopGroups daemon threads (CASSANDRA-13837)
+ * Race condition when closing stream sessions (CASSANDRA-13852)
+ * NettyFactoryTest is failing in trunk on macOS (CASSANDRA-13831)
+ * Allow changing log levels via nodetool for related classes (CASSANDRA-12696)
+ * Add stress profile yaml with LWT (CASSANDRA-7960)
+ * Reduce memory copies and object creations when acting on ByteBufs (CASSANDRA-13789)
+ * Simplify mx4j configuration (Cassandra-13578)
+ * Fix trigger example on 4.0 (CASSANDRA-13796)
+ * Force minumum timeout value (CASSANDRA-9375)
+ * Use netty for streaming (CASSANDRA-12229)
+ * Use netty for internode messaging (CASSANDRA-8457)
+ * Add bytes repaired/unrepaired to nodetool tablestats (CASSANDRA-13774)
+ * Don't delete incremental repair sessions if they still have sstables (CASSANDRA-13758)
+ * Fix pending repair manager index out of bounds check (CASSANDRA-13769)
+ * Don't use RangeFetchMapCalculator when RF=1 (CASSANDRA-13576)
+ * Don't optimise trivial ranges in RangeFetchMapCalculator (CASSANDRA-13664)
+ * Use an ExecutorService for repair commands instead of new Thread(..).start() (CASSANDRA-13594)
+ * Fix race / ref leak in anticompaction (CASSANDRA-13688)
+ * Expose tasks queue length via JMX (CASSANDRA-12758)
+ * Fix race / ref leak in PendingRepairManager (CASSANDRA-13751)
+ * Enable ppc64le runtime as unsupported architecture (CASSANDRA-13615)
+ * Improve sstablemetadata output (CASSANDRA-11483)
+ * Support for migrating legacy users to roles has been dropped (CASSANDRA-13371)
+ * Introduce error metrics for repair (CASSANDRA-13387)
+ * Refactoring to primitive functional interfaces in AuthCache (CASSANDRA-13732)
+ * Update metrics to 3.1.5 (CASSANDRA-13648)
+ * batch_size_warn_threshold_in_kb can now be set at runtime (CASSANDRA-13699)
+ * Avoid always rebuilding secondary indexes at startup (CASSANDRA-13725)
+ * Upgrade JMH from 1.13 to 1.19 (CASSANDRA-13727)
+ * Upgrade SLF4J from 1.7.7 to 1.7.25 (CASSANDRA-12996)
+ * Default for start_native_transport now true if not set in config (CASSANDRA-13656)
+ * Don't add localhost to the graph when calculating where to stream from (CASSANDRA-13583)
+ * Make CDC availability more deterministic via hard-linking (CASSANDRA-12148)
+ * Allow skipping equality-restricted clustering columns in ORDER BY clause (CASSANDRA-10271)
+ * Use common nowInSec for validation compactions (CASSANDRA-13671)
+ * Improve handling of IR prepare failures (CASSANDRA-13672)
+ * Send IR coordinator messages synchronously (CASSANDRA-13673)
+ * Flush system.repair table before IR finalize promise (CASSANDRA-13660)
+ * Fix column filter creation for wildcard queries (CASSANDRA-13650)
+ * Add 'nodetool getbatchlogreplaythrottle' and 'nodetool setbatchlogreplaythrottle' (CASSANDRA-13614)
+ * fix race condition in PendingRepairManager (CASSANDRA-13659)
+ * Allow noop incremental repair state transitions (CASSANDRA-13658)
+ * Run repair with down replicas (CASSANDRA-10446)
+ * Added started & completed repair metrics (CASSANDRA-13598)
+ * Added started & completed repair metrics (CASSANDRA-13598)
+ * Improve secondary index (re)build failure and concurrency handling (CASSANDRA-10130)
+ * Improve calculation of available disk space for compaction (CASSANDRA-13068)
+ * Change the accessibility of RowCacheSerializer for third party row cache plugins (CASSANDRA-13579)
+ * Allow sub-range repairs for a preview of repaired data (CASSANDRA-13570)
+ * NPE in IR cleanup when columnfamily has no sstables (CASSANDRA-13585)
+ * Fix Randomness of stress values (CASSANDRA-12744)
+ * Allow selecting Map values and Set elements (CASSANDRA-7396)
+ * Fast and garbage-free Streaming Histogram (CASSANDRA-13444)
+ * Update repairTime for keyspaces on completion (CASSANDRA-13539)
+ * Add configurable upper bound for validation executor threads (CASSANDRA-13521)
+ * Bring back maxHintTTL propery (CASSANDRA-12982)
+ * Add testing guidelines (CASSANDRA-13497)
+ * Add more repair metrics (CASSANDRA-13531)
+ * RangeStreamer should be smarter when picking endpoints for streaming (CASSANDRA-4650)
+ * Avoid rewrapping an exception thrown for cache load functions (CASSANDRA-13367)
+ * Log time elapsed for each incremental repair phase (CASSANDRA-13498)
+ * Add multiple table operation support to cassandra-stress (CASSANDRA-8780)
+ * Fix incorrect cqlsh results when selecting same columns multiple times (CASSANDRA-13262)
+ * Fix WriteResponseHandlerTest is sensitive to test execution order (CASSANDRA-13421)
+ * Improve incremental repair logging (CASSANDRA-13468)
+ * Start compaction when incremental repair finishes (CASSANDRA-13454)
+ * Add repair streaming preview (CASSANDRA-13257)
+ * Cleanup isIncremental/repairedAt usage (CASSANDRA-13430)
+ * Change protocol to allow sending key space independent of query string (CASSANDRA-10145)
+ * Make gc_log and gc_warn settable at runtime (CASSANDRA-12661)
+ * Take number of files in L0 in account when estimating remaining compaction tasks (CASSANDRA-13354)
+ * Skip building views during base table streams on range movements (CASSANDRA-13065)
+ * Improve error messages for +/- operations on maps and tuples (CASSANDRA-13197)
+ * Remove deprecated repair JMX APIs (CASSANDRA-11530)
+ * Fix version check to enable streaming keep-alive (CASSANDRA-12929)
+ * Make it possible to monitor an ideal consistency level separate from actual consistency level (CASSANDRA-13289)
+ * Outbound TCP connections ignore internode authenticator (CASSANDRA-13324)
+ * Upgrade junit from 4.6 to 4.12 (CASSANDRA-13360)
+ * Cleanup ParentRepairSession after repairs (CASSANDRA-13359)
+ * Upgrade snappy-java to 1.1.2.6 (CASSANDRA-13336)
+ * Incremental repair not streaming correct sstables (CASSANDRA-13328)
+ * Upgrade the jna version to 4.3.0 (CASSANDRA-13300)
+ * Add the currentTimestamp, currentDate, currentTime and currentTimeUUID functions (CASSANDRA-13132)
+ * Remove config option index_interval (CASSANDRA-10671)
+ * Reduce lock contention for collection types and serializers (CASSANDRA-13271)
+ * Make it possible to override MessagingService.Verb ids (CASSANDRA-13283)
+ * Avoid synchronized on prepareForRepair in ActiveRepairService (CASSANDRA-9292)
+ * Adds the ability to use uncompressed chunks in compressed files (CASSANDRA-10520)
+ * Don't flush sstables when streaming for incremental repair (CASSANDRA-13226)
+ * Remove unused method (CASSANDRA-13227)
+ * Fix minor bugs related to #9143 (CASSANDRA-13217)
+ * Output warning if user increases RF (CASSANDRA-13079)
+ * Remove pre-3.0 streaming compatibility code for 4.0 (CASSANDRA-13081)
+ * Add support for + and - operations on dates (CASSANDRA-11936)
+ * Fix consistency of incrementally repaired data (CASSANDRA-9143)
+ * Increase commitlog version (CASSANDRA-13161)
+ * Make TableMetadata immutable, optimize Schema (CASSANDRA-9425)
+ * Refactor ColumnCondition (CASSANDRA-12981)
+ * Parallelize streaming of different keyspaces (CASSANDRA-4663)
+ * Improved compactions metrics (CASSANDRA-13015)
+ * Speed-up start-up sequence by avoiding un-needed flushes (CASSANDRA-13031)
+ * Use Caffeine (W-TinyLFU) for on-heap caches (CASSANDRA-10855)
+ * Thrift removal (CASSANDRA-11115)
+ * Remove pre-3.0 compatibility code for 4.0 (CASSANDRA-12716)
+ * Add column definition kind to dropped columns in schema (CASSANDRA-12705)
+ * Add (automate) Nodetool Documentation (CASSANDRA-12672)
+ * Update bundled cqlsh python driver to 3.7.0 (CASSANDRA-12736)
+ * Reject invalid replication settings when creating or altering a keyspace (CASSANDRA-12681)
+ * Clean up the SSTableReader#getScanner API wrt removal of RateLimiter (CASSANDRA-12422)
+ * Use new token allocation for non bootstrap case as well (CASSANDRA-13080)
+ * Avoid byte-array copy when key cache is disabled (CASSANDRA-13084)
+ * Require forceful decommission if number of nodes is less than replication factor (CASSANDRA-12510)
+ * Allow IN restrictions on column families with collections (CASSANDRA-12654)
+ * Log message size in trace message in OutboundTcpConnection (CASSANDRA-13028)
+ * Add timeUnit Days for cassandra-stress (CASSANDRA-13029)
+ * Add mutation size and batch metrics (CASSANDRA-12649)
+ * Add method to get size of endpoints to TokenMetadata (CASSANDRA-12999)
+ * Expose time spent waiting in thread pool queue (CASSANDRA-8398)
+ * Conditionally update index built status to avoid unnecessary flushes (CASSANDRA-12969)
+ * cqlsh auto completion: refactor definition of compaction strategy options (CASSANDRA-12946)
+ * Add support for arithmetic operators (CASSANDRA-11935)
+ * Add histogram for delay to deliver hints (CASSANDRA-13234)
+ * Fix cqlsh automatic protocol downgrade regression (CASSANDRA-13307)
+ * Changing `max_hint_window_in_ms` at runtime (CASSANDRA-11720)
+ * Trivial format error in StorageProxy (CASSANDRA-13551)
+ * Nodetool repair can hang forever if we lose the notification for the repair completing/failing (CASSANDRA-13480)
+ * Anticompaction can cause noisy log messages (CASSANDRA-13684)
+ * Switch to client init for sstabledump (CASSANDRA-13683)
+ * CQLSH: Don't pause when capturing data (CASSANDRA-13743)
+ * nodetool clearsnapshot requires --all to clear all snapshots (CASSANDRA-13391)
+ * Correctly count range tombstones in traces and tombstone thresholds (CASSANDRA-8527)
+ * cqlshrc.sample uses incorrect option for time formatting (CASSANDRA-14243)
+
+
3.11.4
+ * Correct sstable sorting for garbagecollect and levelled compaction (CASSANDRA-14870)
Merged from 3.0:
* Move TWCS message 'No compaction necessary for bucket size' to Trace level (CASSANDRA-14884)
* Sstable min/max metadata can cause data loss (CASSANDRA-14861)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b80f6c65/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
index 205f80c,bee4961..aec1a54
--- a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
+++ b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
@@@ -589,11 -694,10 +589,11 @@@ public class SinglePartitionReadComman
Tracing.trace("Acquiring sstable references");
ColumnFamilyStore.ViewFragment view = cfs.select(View.select(SSTableSet.LIVE, partitionKey()));
- Collections.sort(view.sstables, SSTableReader.maxTimestampComparator);
- List<UnfilteredRowIterator> iterators = new ArrayList<>(Iterables.size(view.memtables) + view.sstables.size());
++ Collections.sort(view.sstables, SSTableReader.maxTimestampDescending);
ClusteringIndexFilter filter = clusteringIndexFilter();
long minTimestamp = Long.MAX_VALUE;
-
+ long mostRecentPartitionTombstone = Long.MIN_VALUE;
+ InputCollector<UnfilteredRowIterator> inputCollector = iteratorsForPartition(view);
try
{
for (Memtable memtable : view.memtables)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b80f6c65/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b80f6c65/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b80f6c65/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b80f6c65/src/java/org/apache/cassandra/tools/nodetool/GarbageCollect.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b80f6c65/test/unit/org/apache/cassandra/cql3/GcCompactionTest.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b80f6c65/test/unit/org/apache/cassandra/db/ReadCommandTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/ReadCommandTest.java
index fba2bf2,774645e..3bb30d9
--- a/test/unit/org/apache/cassandra/db/ReadCommandTest.java
+++ b/test/unit/org/apache/cassandra/db/ReadCommandTest.java
@@@ -574,289 -416,22 +574,289 @@@ public class ReadCommandTes
}
}
- // deserialize, merge and check the results are all there
- List<UnfilteredPartitionIterator> iterators = new ArrayList<>();
+ assertEquals(1, cfs.metric.tombstoneScannedHistogram.cf.getSnapshot().getMax());
+ }
- for (ByteBuffer buffer : buffers)
+ @Test
+ public void testSinglePartitionSliceRepairedDataTracking() throws Exception
+ {
+ ColumnFamilyStore cfs = Keyspace.open(KEYSPACE).getColumnFamilyStore(CF2);
+ ReadCommand readCommand = Util.cmd(cfs, Util.dk("key")).build();
+ testRepairedDataTracking(cfs, readCommand);
+ }
+
+ @Test
+ public void testPartitionRangeRepairedDataTracking() throws Exception
+ {
+ ColumnFamilyStore cfs = Keyspace.open(KEYSPACE).getColumnFamilyStore(CF2);
+ ReadCommand readCommand = Util.cmd(cfs).build();
+ testRepairedDataTracking(cfs, readCommand);
+ }
+
+ @Test
+ public void testSinglePartitionNamesRepairedDataTracking() throws Exception
+ {
+ ColumnFamilyStore cfs = Keyspace.open(KEYSPACE).getColumnFamilyStore(CF2);
+ ReadCommand readCommand = Util.cmd(cfs, Util.dk("key")).includeRow("cc").includeRow("dd").build();
+ testRepairedDataTracking(cfs, readCommand);
+ }
+
+ @Test
+ public void testSinglePartitionNamesSkipsOptimisationsIfTrackingRepairedData()
+ {
+ // when tracking, the optimizations of querying sstables in timestamp order and
+ // returning once all requested columns are not available so just assert that
+ // all sstables are read when performing such queries
+ ColumnFamilyStore cfs = Keyspace.open(KEYSPACE).getColumnFamilyStore(CF2);
+ cfs.truncateBlocking();
+ cfs.disableAutoCompaction();
+
+ new RowUpdateBuilder(cfs.metadata(), 0, ByteBufferUtil.bytes("key"))
+ .clustering("dd")
+ .add("a", ByteBufferUtil.bytes("abcd"))
+ .build()
+ .apply();
+
+ cfs.forceBlockingFlush();
+
+ new RowUpdateBuilder(cfs.metadata(), 1, ByteBufferUtil.bytes("key"))
+ .clustering("dd")
+ .add("a", ByteBufferUtil.bytes("wxyz"))
+ .build()
+ .apply();
+
+ cfs.forceBlockingFlush();
+ List<SSTableReader> sstables = new ArrayList<>(cfs.getLiveSSTables());
+ assertEquals(2, sstables.size());
- Collections.sort(sstables, SSTableReader.maxTimestampComparator);
++ Collections.sort(sstables, SSTableReader.maxTimestampDescending);
+
+ ReadCommand readCommand = Util.cmd(cfs, Util.dk("key")).includeRow("dd").columns("a").build();
+
+ assertEquals(0, readCount(sstables.get(0)));
+ assertEquals(0, readCount(sstables.get(1)));
+ ReadCommand withTracking = readCommand.copy();
+ withTracking.trackRepairedStatus();
+ Util.getAll(withTracking);
+ assertEquals(1, readCount(sstables.get(0)));
+ assertEquals(1, readCount(sstables.get(1)));
+
+ // same command without tracking touches only the table with the higher timestamp
+ Util.getAll(readCommand.copy());
+ assertEquals(2, readCount(sstables.get(0)));
+ assertEquals(1, readCount(sstables.get(1)));
+ }
+
+ private long readCount(SSTableReader sstable)
+ {
+ return sstable.getReadMeter().count();
+ }
+
+ @Test
+ public void skipRowCacheIfTrackingRepairedData()
+ {
+ ColumnFamilyStore cfs = Keyspace.open(KEYSPACE).getColumnFamilyStore(CF6);
+
+ cfs.truncateBlocking();
+ cfs.disableAutoCompaction();
+
+ new RowUpdateBuilder(cfs.metadata(), 0, ByteBufferUtil.bytes("key"))
+ .clustering("cc")
+ .add("a", ByteBufferUtil.bytes("abcd"))
+ .build()
+ .apply();
+
+ cfs.forceBlockingFlush();
+
+ ReadCommand readCommand = Util.cmd(cfs, Util.dk("key")).build();
+ assertTrue(cfs.isRowCacheEnabled());
+ // warm the cache
+ assertFalse(Util.getAll(readCommand).isEmpty());
+ long cacheHits = cfs.metric.rowCacheHit.getCount();
+
+ Util.getAll(readCommand);
+ assertTrue(cfs.metric.rowCacheHit.getCount() > cacheHits);
+ cacheHits = cfs.metric.rowCacheHit.getCount();
+
+ ReadCommand withRepairedInfo = readCommand.copy();
+ withRepairedInfo.trackRepairedStatus();
+ Util.getAll(withRepairedInfo);
+ assertEquals(cacheHits, cfs.metric.rowCacheHit.getCount());
+ }
+
+ @Test (expected = IllegalArgumentException.class)
+ public void copyFullAsTransientTest()
+ {
+ ColumnFamilyStore cfs = Keyspace.open(KEYSPACE).getColumnFamilyStore(CF6);
+ ReadCommand readCommand = Util.cmd(cfs, Util.dk("key")).build();
+ readCommand.copyAsTransientQuery(ReplicaUtils.full(FBUtilities.getBroadcastAddressAndPort()));
+ }
+
+ @Test (expected = IllegalArgumentException.class)
+ public void copyTransientAsDigestQuery()
+ {
+ ColumnFamilyStore cfs = Keyspace.open(KEYSPACE).getColumnFamilyStore(CF6);
+ ReadCommand readCommand = Util.cmd(cfs, Util.dk("key")).build();
+ readCommand.copyAsDigestQuery(ReplicaUtils.trans(FBUtilities.getBroadcastAddressAndPort()));
+ }
+
+ @Test (expected = IllegalArgumentException.class)
+ public void copyMultipleFullAsTransientTest()
+ {
+ ColumnFamilyStore cfs = Keyspace.open(KEYSPACE).getColumnFamilyStore(CF6);
+ DecoratedKey key = Util.dk("key");
+ Token token = key.getToken();
+ // Address is unimportant for this test
+ InetAddressAndPort addr = FBUtilities.getBroadcastAddressAndPort();
+ ReadCommand readCommand = Util.cmd(cfs, key).build();
+ readCommand.copyAsTransientQuery(EndpointsForToken.of(token,
+ ReplicaUtils.trans(addr, token),
+ ReplicaUtils.full(addr, token)));
+ }
+
+ @Test (expected = IllegalArgumentException.class)
+ public void copyMultipleTransientAsDigestQuery()
+ {
+ ColumnFamilyStore cfs = Keyspace.open(KEYSPACE).getColumnFamilyStore(CF6);
+ DecoratedKey key = Util.dk("key");
+ Token token = key.getToken();
+ // Address is unimportant for this test
+ InetAddressAndPort addr = FBUtilities.getBroadcastAddressAndPort();
+ ReadCommand readCommand = Util.cmd(cfs, key).build();
+ readCommand.copyAsDigestQuery(EndpointsForToken.of(token,
+ ReplicaUtils.trans(addr, token),
+ ReplicaUtils.full(addr, token)));
+ }
+
+ private void testRepairedDataTracking(ColumnFamilyStore cfs, ReadCommand readCommand) throws IOException
+ {
+ cfs.truncateBlocking();
+ cfs.disableAutoCompaction();
+
+ new RowUpdateBuilder(cfs.metadata(), 0, ByteBufferUtil.bytes("key"))
+ .clustering("cc")
+ .add("a", ByteBufferUtil.bytes("abcd"))
+ .build()
+ .apply();
+
+ cfs.forceBlockingFlush();
+
+ new RowUpdateBuilder(cfs.metadata(), 1, ByteBufferUtil.bytes("key"))
+ .clustering("dd")
+ .add("a", ByteBufferUtil.bytes("abcd"))
+ .build()
+ .apply();
+
+ cfs.forceBlockingFlush();
+ List<SSTableReader> sstables = new ArrayList<>(cfs.getLiveSSTables());
+ assertEquals(2, sstables.size());
+ sstables.forEach(sstable -> assertFalse(sstable.isRepaired() || sstable.isPendingRepair()));
+ SSTableReader sstable1 = sstables.get(0);
+ SSTableReader sstable2 = sstables.get(1);
+
+ int numPartitions = 1;
+ int rowsPerPartition = 2;
+
+ // Capture all the digest versions as we mutate the table's repaired status. Each time
+ // we make a change, we expect a different digest.
+ Set<ByteBuffer> digests = new HashSet<>();
+ // first time round, nothing has been marked repaired so we expect digest to be an empty buffer and to be marked conclusive
+ ByteBuffer digest = performReadAndVerifyRepairedInfo(readCommand, numPartitions, rowsPerPartition, true);
+ assertEquals(ByteBufferUtil.EMPTY_BYTE_BUFFER, digest);
+ digests.add(digest);
+
+ // add a pending repair session to table1, digest should remain the same but now we expect it to be marked inconclusive
+ UUID session1 = UUIDGen.getTimeUUID();
+ mutateRepaired(cfs, sstable1, ActiveRepairService.UNREPAIRED_SSTABLE, session1);
+ digests.add(performReadAndVerifyRepairedInfo(readCommand, numPartitions, rowsPerPartition, false));
+ assertEquals(1, digests.size());
+
+ // add a different pending session to table2, digest should remain the same and still consider it inconclusive
+ UUID session2 = UUIDGen.getTimeUUID();
+ mutateRepaired(cfs, sstable2, ActiveRepairService.UNREPAIRED_SSTABLE, session2);
+ digests.add(performReadAndVerifyRepairedInfo(readCommand, numPartitions, rowsPerPartition, false));
+ assertEquals(1, digests.size());
+
+ // mark one table repaired
+ mutateRepaired(cfs, sstable1, 111, null);
+ // this time, digest should not be empty, session2 still means that the result is inconclusive
+ digests.add(performReadAndVerifyRepairedInfo(readCommand, numPartitions, rowsPerPartition, false));
+ assertEquals(2, digests.size());
+
+ // mark the second table repaired
+ mutateRepaired(cfs, sstable2, 222, null);
+ // digest should be updated again and as there are no longer any pending sessions, it should be considered conclusive
+ digests.add(performReadAndVerifyRepairedInfo(readCommand, numPartitions, rowsPerPartition, true));
+ assertEquals(3, digests.size());
+
+ // insert a partition tombstone into the memtable, then re-check the repaired info.
+ // This is to ensure that when the optimisations which skip reading from sstables
+ // when a newer partition tombstone has already been cause the digest to be marked
+ // as inconclusive.
+ // the exception to this case is for partition range reads, where we always read
+ // and generate digests for all sstables, so we only test this path for single partition reads
+ if (readCommand.isLimitedToOnePartition())
{
- try (DataInputBuffer in = new DataInputBuffer(buffer, true))
- {
- iterators.add(UnfilteredPartitionIterators.serializerForIntraNode().deserialize(in,
- MessagingService.current_version,
- cfs.metadata,
- columnFilter,
- SerializationHelper.Flag.LOCAL));
- }
+ new Mutation(PartitionUpdate.simpleBuilder(cfs.metadata(), ByteBufferUtil.bytes("key"))
+ .delete()
+ .build()).apply();
+ digest = performReadAndVerifyRepairedInfo(readCommand, 0, rowsPerPartition, false);
+ assertEquals(ByteBufferUtil.EMPTY_BYTE_BUFFER, digest);
+
+ // now flush so we have an unrepaired table with the deletion and repeat the check
+ cfs.forceBlockingFlush();
+ digest = performReadAndVerifyRepairedInfo(readCommand, 0, rowsPerPartition, false);
+ assertEquals(ByteBufferUtil.EMPTY_BYTE_BUFFER, digest);
}
+ }
- return iterators;
+ private void mutateRepaired(ColumnFamilyStore cfs, SSTableReader sstable, long repairedAt, UUID pendingSession) throws IOException
+ {
+ sstable.descriptor.getMetadataSerializer().mutateRepairMetadata(sstable.descriptor, repairedAt, pendingSession, false);
+ sstable.reloadSSTableMetadata();
+ if (pendingSession != null)
+ {
+ // setup a minimal repair session. This is necessary because we
+ // check for sessions which have exceeded timeout and been purged
+ Range<Token> range = new Range<>(cfs.metadata().partitioner.getMinimumToken(),
+ cfs.metadata().partitioner.getRandomToken());
+ ActiveRepairService.instance.registerParentRepairSession(pendingSession,
+ REPAIR_COORDINATOR,
+ Lists.newArrayList(cfs),
+ Sets.newHashSet(range),
+ true,
+ repairedAt,
+ true,
+ PreviewKind.NONE);
+
+ LocalSessionAccessor.prepareUnsafe(pendingSession, null, Sets.newHashSet(REPAIR_COORDINATOR));
+ }
+ }
+
+ private ByteBuffer performReadAndVerifyRepairedInfo(ReadCommand command,
+ int expectedPartitions,
+ int expectedRowsPerPartition,
+ boolean expectConclusive)
+ {
+ // perform equivalent read command multiple times and assert that
+ // the repaired data info is always consistent. Return the digest
+ // so we can verify that it changes when the repaired status of
+ // the queried tables does.
+ Set<ByteBuffer> digests = new HashSet<>();
+ for (int i = 0; i < 10; i++)
+ {
+ ReadCommand withRepairedInfo = command.copy();
+ withRepairedInfo.trackRepairedStatus();
+
+ List<FilteredPartition> partitions = Util.getAll(withRepairedInfo);
+ assertEquals(expectedPartitions, partitions.size());
+ partitions.forEach(p -> assertEquals(expectedRowsPerPartition, p.rowCount()));
+
+ ByteBuffer digest = withRepairedInfo.getRepairedDataDigest();
+ digests.add(digest);
+ assertEquals(1, digests.size());
+ assertEquals(expectConclusive, withRepairedInfo.isRepairedDataDigestConclusive());
+ }
+ return digests.iterator().next();
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b80f6c65/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
index 23e88fe,b1d467e..b1aaf8a
--- a/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
@@@ -457,4 -447,37 +457,37 @@@ public class LeveledCompactionStrategyT
// the 11 tables containing key1 should all compact to 1 table
assertEquals(1, cfs.getLiveSSTables().size());
}
+
+ @Test
+ public void testCompactionCandidateOrdering() throws Exception
+ {
+ // add some data
+ byte [] b = new byte[100 * 1024];
+ new Random().nextBytes(b);
+ ByteBuffer value = ByteBuffer.wrap(b);
+ int rows = 4;
+ int columns = 10;
+ // Just keep sstables in L0 for this test
+ cfs.disableAutoCompaction();
+ for (int r = 0; r < rows; r++)
+ {
- UpdateBuilder update = UpdateBuilder.create(cfs.metadata, String.valueOf(r));
++ UpdateBuilder update = UpdateBuilder.create(cfs.metadata(), String.valueOf(r));
+ for (int c = 0; c < columns; c++)
+ update.newRow("column" + c).add("val", value);
+ update.applyUnsafe();
+ cfs.forceBlockingFlush();
+ }
+ LeveledCompactionStrategy strategy = (LeveledCompactionStrategy) (cfs.getCompactionStrategyManager()).getStrategies().get(1).get(0);
+ // get readers for level 0 sstables
+ Collection<SSTableReader> sstables = strategy.manifest.getLevel(0);
+ Collection<SSTableReader> sortedCandidates = strategy.manifest.ageSortedSSTables(sstables);
+ assertTrue(String.format("More than 1 sstable required for test, found: %d .", sortedCandidates.size()), sortedCandidates.size() > 1);
+ long lastMaxTimeStamp = Long.MIN_VALUE;
+ for (SSTableReader sstable : sortedCandidates)
+ {
+ assertTrue(String.format("SStables not sorted into oldest to newest by maxTimestamp. Current sstable: %d , last sstable: %d", sstable.getMaxTimestamp(), lastMaxTimeStamp),
+ sstable.getMaxTimestamp() > lastMaxTimeStamp);
+ lastMaxTimeStamp = sstable.getMaxTimestamp();
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org
[2/3] cassandra git commit: Correct sstable sorting for
garbagecollect and levelled compaction
Posted by bl...@apache.org.
Correct sstable sorting for garbagecollect and levelled compaction
patch by Branimir Lambov and Vincent White; reviewed by Zhao Yang for CASSANDRA-14879
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/a03424ef
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/a03424ef
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/a03424ef
Branch: refs/heads/trunk
Commit: a03424ef95559c9df2bb7f86e1ac1edca1436058
Parents: d17836d
Author: Branimir Lambov <br...@datastax.com>
Authored: Wed Nov 7 13:10:39 2018 +0200
Committer: Branimir Lambov <br...@datastax.com>
Committed: Tue Nov 13 12:50:08 2018 +0200
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../db/SinglePartitionReadCommand.java | 4 +-
.../db/compaction/CompactionManager.java | 2 +-
.../db/compaction/LeveledManifest.java | 5 +-
.../io/sstable/format/SSTableReader.java | 4 +-
.../tools/nodetool/GarbageCollect.java | 8 ++-
.../apache/cassandra/cql3/GcCompactionTest.java | 73 +++++++++++++++++++-
.../LeveledCompactionStrategyTest.java | 33 +++++++++
8 files changed, 119 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a03424ef/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index e07099a..83e8b08 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
3.11.4
+ * Correct sstable sorting for garbagecollect and levelled compaction (CASSANDRA-14870)
Merged from 3.0:
* Move TWCS message 'No compaction necessary for bucket size' to Trace level (CASSANDRA-14884)
* Sstable min/max metadata can cause data loss (CASSANDRA-14861)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a03424ef/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
index ed98e28..bee4961 100644
--- a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
+++ b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
@@ -728,7 +728,7 @@ public class SinglePartitionReadCommand extends ReadCommand
* In other words, iterating in maxTimestamp order allow to do our mostRecentPartitionTombstone elimination
* in one pass, and minimize the number of sstables for which we read a partition tombstone.
*/
- Collections.sort(view.sstables, SSTableReader.maxTimestampComparator);
+ Collections.sort(view.sstables, SSTableReader.maxTimestampDescending);
long mostRecentPartitionTombstone = Long.MIN_VALUE;
int nonIntersectingSSTables = 0;
List<SSTableReader> skippedSSTablesWithTombstones = null;
@@ -916,7 +916,7 @@ public class SinglePartitionReadCommand extends ReadCommand
}
/* add the SSTables on disk */
- Collections.sort(view.sstables, SSTableReader.maxTimestampComparator);
+ Collections.sort(view.sstables, SSTableReader.maxTimestampDescending);
boolean onlyUnrepaired = true;
// read sorted sstables
SSTableReadMetricsCollector metricsCollector = new SSTableReadMetricsCollector();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a03424ef/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 235fe2b..61da975 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -498,7 +498,7 @@ public class CompactionManager implements CompactionManagerMBean
if (cfStore.getCompactionStrategyManager().onlyPurgeRepairedTombstones())
originals = Iterables.filter(originals, SSTableReader::isRepaired);
List<SSTableReader> sortedSSTables = Lists.newArrayList(originals);
- Collections.sort(sortedSSTables, SSTableReader.maxTimestampComparator);
+ Collections.sort(sortedSSTables, SSTableReader.maxTimestampAscending);
return sortedSSTables;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a03424ef/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java b/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
index ceb3811..520b08d 100644
--- a/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
+++ b/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
@@ -746,10 +746,11 @@ public class LeveledManifest
return sstables;
}
- private List<SSTableReader> ageSortedSSTables(Collection<SSTableReader> candidates)
+ @VisibleForTesting
+ List<SSTableReader> ageSortedSSTables(Collection<SSTableReader> candidates)
{
List<SSTableReader> ageSortedCandidates = new ArrayList<>(candidates);
- Collections.sort(ageSortedCandidates, SSTableReader.maxTimestampComparator);
+ Collections.sort(ageSortedCandidates, SSTableReader.maxTimestampAscending);
return ageSortedCandidates;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a03424ef/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
index 2f1af58..116d489 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
@@ -154,8 +154,8 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
}
private static final RateLimiter meterSyncThrottle = RateLimiter.create(100.0);
- // Descending order
- public static final Comparator<SSTableReader> maxTimestampComparator = (o1, o2) -> Long.compare(o2.getMaxTimestamp(), o1.getMaxTimestamp());
+ public static final Comparator<SSTableReader> maxTimestampDescending = (o1, o2) -> Long.compare(o2.getMaxTimestamp(), o1.getMaxTimestamp());
+ public static final Comparator<SSTableReader> maxTimestampAscending = (o1, o2) -> Long.compare(o1.getMaxTimestamp(), o2.getMaxTimestamp());
// it's just an object, which we use regular Object equality on; we introduce a special class just for easy recognition
public static final class UniqueIdentifier {}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a03424ef/src/java/org/apache/cassandra/tools/nodetool/GarbageCollect.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/nodetool/GarbageCollect.java b/src/java/org/apache/cassandra/tools/nodetool/GarbageCollect.java
index 37daf09..baa245f 100644
--- a/src/java/org/apache/cassandra/tools/nodetool/GarbageCollect.java
+++ b/src/java/org/apache/cassandra/tools/nodetool/GarbageCollect.java
@@ -41,8 +41,10 @@ public class GarbageCollect extends NodeToolCmd
@Option(title = "jobs",
name = {"-j", "--jobs"},
- description = "Number of sstables to cleanup simultanously, set to 0 to use all available compaction threads")
- private int jobs = 2;
+ description = "Number of sstables to cleanup simultanously, set to 0 to use all available compaction " +
+ "threads. Defaults to 1 so that collections of newer tables can see the data is deleted " +
+ "and also remove tombstones.")
+ private int jobs = 1;
@Override
public void execute(NodeProbe probe)
@@ -61,4 +63,4 @@ public class GarbageCollect extends NodeToolCmd
}
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a03424ef/test/unit/org/apache/cassandra/cql3/GcCompactionTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/GcCompactionTest.java b/test/unit/org/apache/cassandra/cql3/GcCompactionTest.java
index 84a20de..548cdc1 100644
--- a/test/unit/org/apache/cassandra/cql3/GcCompactionTest.java
+++ b/test/unit/org/apache/cassandra/cql3/GcCompactionTest.java
@@ -21,6 +21,7 @@ package org.apache.cassandra.cql3;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
+import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
@@ -34,6 +35,7 @@ import org.apache.cassandra.db.compaction.CompactionManager;
import org.apache.cassandra.db.rows.*;
import org.apache.cassandra.io.sstable.ISSTableScanner;
import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.schema.CompactionParams;
import org.apache.cassandra.utils.FBUtilities;
public class GcCompactionTest extends CQLTester
@@ -149,6 +151,75 @@ public class GcCompactionTest extends CQLTester
}
@Test
+ public void testGarbageCollectOrder() throws Throwable
+ {
+ // partition-level deletions, 0 gc_grace
+ createTable("CREATE TABLE %s(" +
+ " key int," +
+ " column int," +
+ " col2 int," +
+ " data int," +
+ " extra text," +
+ " PRIMARY KEY((key, column))" +
+ ") WITH gc_grace_seconds = 0;"
+ );
+
+ assertEquals(1, getCurrentColumnFamilyStore().gcBefore(1)); // make sure gc_grace is 0
+
+ for (int i = 0; i < KEY_COUNT; ++i)
+ for (int j = 0; j < CLUSTERING_COUNT; ++j)
+ execute("INSERT INTO %s (key, column, data, extra) VALUES (?, ?, ?, ?)", i, j, i+j, "" + i + ":" + j);
+
+
+ Set<SSTableReader> readers = new HashSet<>();
+ ColumnFamilyStore cfs = getCurrentColumnFamilyStore();
+
+ flush();
+ assertEquals(1, cfs.getLiveSSTables().size());
+ SSTableReader table0 = getNewTable(readers);
+ assertEquals(0, countTombstoneMarkers(table0));
+ int rowCount0 = countRows(table0);
+
+ deleteWithSomeInserts(3, 5, 10);
+ flush();
+ assertEquals(2, cfs.getLiveSSTables().size());
+ SSTableReader table1 = getNewTable(readers);
+ final int rowCount1 = countRows(table1);
+ assertTrue(rowCount1 > 0);
+ assertTrue(countTombstoneMarkers(table1) > 0);
+
+ deleteWithSomeInserts(2, 4, 0);
+ flush();
+ assertEquals(3, cfs.getLiveSSTables().size());
+ SSTableReader table2 = getNewTable(readers);
+ assertEquals(0, countRows(table2));
+ assertTrue(countTombstoneMarkers(table2) > 0);
+
+ // Wait a little to make sure nowInSeconds is greater than gcBefore
+ Thread.sleep(1000);
+
+ CompactionManager.AllSSTableOpStatus status =
+ CompactionManager.instance.performGarbageCollection(getCurrentColumnFamilyStore(), CompactionParams.TombstoneOption.ROW, 1);
+ assertEquals(CompactionManager.AllSSTableOpStatus.SUCCESSFUL, status);
+
+ SSTableReader[] tables = cfs.getLiveSSTables().toArray(new SSTableReader[0]);
+ Arrays.sort(tables, (o1, o2) -> Integer.compare(o1.descriptor.generation, o2.descriptor.generation)); // by order of compaction
+
+ // Make sure deleted data was removed
+ assertTrue(rowCount0 > countRows(tables[0]));
+ assertTrue(rowCount1 > countRows(tables[1]));
+
+ // Make sure all tombstones got purged
+ for (SSTableReader t : tables)
+ {
+ assertEquals("Table " + t + " has tombstones", 0, countTombstoneMarkers(t));
+ }
+
+ // The last table should have become empty and be removed
+ assertEquals(2, tables.length);
+ }
+
+ @Test
public void testGcCompactionCells() throws Throwable
{
createTable("CREATE TABLE %s(" +
@@ -387,4 +458,4 @@ public class GcCompactionTest extends CQLTester
}
return instances;
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a03424ef/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java b/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
index de8efd7..b1d467e 100644
--- a/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
@@ -447,4 +447,37 @@ public class LeveledCompactionStrategyTest
// the 11 tables containing key1 should all compact to 1 table
assertEquals(1, cfs.getLiveSSTables().size());
}
+
+ @Test
+ public void testCompactionCandidateOrdering() throws Exception
+ {
+ // add some data
+ byte [] b = new byte[100 * 1024];
+ new Random().nextBytes(b);
+ ByteBuffer value = ByteBuffer.wrap(b);
+ int rows = 4;
+ int columns = 10;
+ // Just keep sstables in L0 for this test
+ cfs.disableAutoCompaction();
+ for (int r = 0; r < rows; r++)
+ {
+ UpdateBuilder update = UpdateBuilder.create(cfs.metadata, String.valueOf(r));
+ for (int c = 0; c < columns; c++)
+ update.newRow("column" + c).add("val", value);
+ update.applyUnsafe();
+ cfs.forceBlockingFlush();
+ }
+ LeveledCompactionStrategy strategy = (LeveledCompactionStrategy) (cfs.getCompactionStrategyManager()).getStrategies().get(1).get(0);
+ // get readers for level 0 sstables
+ Collection<SSTableReader> sstables = strategy.manifest.getLevel(0);
+ Collection<SSTableReader> sortedCandidates = strategy.manifest.ageSortedSSTables(sstables);
+ assertTrue(String.format("More than 1 sstable required for test, found: %d .", sortedCandidates.size()), sortedCandidates.size() > 1);
+ long lastMaxTimeStamp = Long.MIN_VALUE;
+ for (SSTableReader sstable : sortedCandidates)
+ {
+ assertTrue(String.format("SStables not sorted into oldest to newest by maxTimestamp. Current sstable: %d , last sstable: %d", sstable.getMaxTimestamp(), lastMaxTimeStamp),
+ sstable.getMaxTimestamp() > lastMaxTimeStamp);
+ lastMaxTimeStamp = sstable.getMaxTimestamp();
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org