You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by aw...@apache.org on 2018/09/01 01:35:58 UTC
[06/18] cassandra git commit: Transient Replication and Cheap Quorums
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/db/CleanupTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/CleanupTest.java b/test/unit/org/apache/cassandra/db/CleanupTest.java
index a096c78..46c0afd 100644
--- a/test/unit/org/apache/cassandra/db/CleanupTest.java
+++ b/test/unit/org/apache/cassandra/db/CleanupTest.java
@@ -107,7 +107,6 @@ public class CleanupTest
SchemaLoader.compositeIndexCFMD(KEYSPACE2, CF_INDEXED2, true));
}
- /*
@Test
public void testCleanup() throws ExecutionException, InterruptedException
{
@@ -116,7 +115,6 @@ public class CleanupTest
Keyspace keyspace = Keyspace.open(KEYSPACE1);
ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF_STANDARD1);
- UnfilteredPartitionIterator iter;
// insert data and verify we get it back w/ range query
fillCF(cfs, "val", LOOPS);
@@ -124,8 +122,7 @@ public class CleanupTest
// record max timestamps of the sstables pre-cleanup
List<Long> expectedMaxTimestamps = getMaxTimestampList(cfs);
- iter = Util.getRangeSlice(cfs);
- assertEquals(LOOPS, Iterators.size(iter));
+ assertEquals(LOOPS, Util.getAll(Util.cmd(cfs).build()).size());
// with one token in the ring, owned by the local node, cleanup should be a no-op
CompactionManager.instance.performCleanup(cfs, 2);
@@ -134,10 +131,8 @@ public class CleanupTest
assert expectedMaxTimestamps.equals(getMaxTimestampList(cfs));
// check data is still there
- iter = Util.getRangeSlice(cfs);
- assertEquals(LOOPS, Iterators.size(iter));
+ assertEquals(LOOPS, Util.getAll(Util.cmd(cfs).build()).size());
}
- */
@Test
public void testCleanupWithIndexes() throws IOException, ExecutionException, InterruptedException
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/db/CleanupTransientTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/CleanupTransientTest.java b/test/unit/org/apache/cassandra/db/CleanupTransientTest.java
new file mode 100644
index 0000000..9789183
--- /dev/null
+++ b/test/unit/org/apache/cassandra/db/CleanupTransientTest.java
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.UUID;
+
+import org.apache.cassandra.locator.RangesAtEndpoint;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.Util;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.compaction.CompactionManager;
+import org.apache.cassandra.db.partitions.FilteredPartition;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.RandomPartitioner;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.locator.AbstractNetworkTopologySnitch;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.locator.Replica;
+import org.apache.cassandra.locator.TokenMetadata;
+import org.apache.cassandra.schema.KeyspaceParams;
+import org.apache.cassandra.service.PendingRangeCalculatorService;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class CleanupTransientTest
+{
+ private static final IPartitioner partitioner = RandomPartitioner.instance;
+ private static IPartitioner oldPartitioner;
+
+ public static final int LOOPS = 200;
+ public static final String KEYSPACE1 = "CleanupTest1";
+ public static final String CF_INDEXED1 = "Indexed1";
+ public static final String CF_STANDARD1 = "Standard1";
+
+ public static final String KEYSPACE2 = "CleanupTestMultiDc";
+ public static final String CF_INDEXED2 = "Indexed2";
+ public static final String CF_STANDARD2 = "Standard2";
+
+ public static final ByteBuffer COLUMN = ByteBufferUtil.bytes("birthdate");
+ public static final ByteBuffer VALUE = ByteBuffer.allocate(8);
+ static
+ {
+ VALUE.putLong(20101229);
+ VALUE.flip();
+ }
+
+ @BeforeClass
+ public static void setup() throws Exception
+ {
+ DatabaseDescriptor.daemonInitialization();
+ DatabaseDescriptor.setTransientReplicationEnabledUnsafe(true);
+ oldPartitioner = StorageService.instance.setPartitionerUnsafe(partitioner);
+ SchemaLoader.prepareServer();
+ SchemaLoader.createKeyspace(KEYSPACE1,
+ KeyspaceParams.simple("2/1"),
+ SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARD1),
+ SchemaLoader.compositeIndexCFMD(KEYSPACE1, CF_INDEXED1, true));
+
+ StorageService ss = StorageService.instance;
+ final int RING_SIZE = 2;
+
+ TokenMetadata tmd = ss.getTokenMetadata();
+ tmd.clearUnsafe();
+ ArrayList<Token> endpointTokens = new ArrayList<>();
+ ArrayList<Token> keyTokens = new ArrayList<>();
+ List<InetAddressAndPort> hosts = new ArrayList<>();
+ List<UUID> hostIds = new ArrayList<>();
+
+ endpointTokens.add(RandomPartitioner.MINIMUM);
+ endpointTokens.add(RandomPartitioner.instance.midpoint(RandomPartitioner.MINIMUM, new RandomPartitioner.BigIntegerToken(RandomPartitioner.MAXIMUM)));
+
+ Util.createInitialRing(ss, partitioner, endpointTokens, keyTokens, hosts, hostIds, RING_SIZE);
+ PendingRangeCalculatorService.instance.blockUntilFinished();
+
+
+ DatabaseDescriptor.setEndpointSnitch(new AbstractNetworkTopologySnitch()
+ {
+ @Override
+ public String getRack(InetAddressAndPort endpoint)
+ {
+ return "RC1";
+ }
+
+ @Override
+ public String getDatacenter(InetAddressAndPort endpoint)
+ {
+ return "DC1";
+ }
+ });
+ }
+
+ @Test
+ public void testCleanup() throws Exception
+ {
+ Keyspace keyspace = Keyspace.open(KEYSPACE1);
+ ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF_STANDARD1);
+
+
+ // insert data and verify we get it back w/ range query
+ fillCF(cfs, "val", LOOPS);
+
+ // record max timestamps of the sstables pre-cleanup
+ List<Long> expectedMaxTimestamps = getMaxTimestampList(cfs);
+
+ assertEquals(LOOPS, Util.getAll(Util.cmd(cfs).build()).size());
+
+ // with two tokens RF=2/1 and the sstable not repaired this should do nothing
+ CompactionManager.instance.performCleanup(cfs, 2);
+
+ // ensure max timestamp of the sstables are retained post-cleanup
+ assert expectedMaxTimestamps.equals(getMaxTimestampList(cfs));
+
+ // check data is still there
+ assertEquals(LOOPS, Util.getAll(Util.cmd(cfs).build()).size());
+
+ //Get an exact count of how many partitions are in the fully replicated range and should
+ //be retained
+ int fullCount = 0;
+ RangesAtEndpoint localRanges = StorageService.instance.getLocalReplicas(keyspace.getName()).filter(Replica::isFull);
+ for (FilteredPartition partition : Util.getAll(Util.cmd(cfs).build()))
+ {
+ Token token = partition.partitionKey().getToken();
+ for (Replica r : localRanges)
+ {
+ if (r.range().contains(token))
+ fullCount++;
+ }
+ }
+
+ SSTableReader sstable = cfs.getLiveSSTables().iterator().next();
+ sstable.descriptor.getMetadataSerializer().mutateRepairMetadata(sstable.descriptor, 1, null, false);
+ sstable.reloadSSTableMetadata();
+
+ // This should remove approximately 50% of the data, specifically whatever was transiently replicated
+ CompactionManager.instance.performCleanup(cfs, 2);
+
+ // ensure max timestamp of the sstables are retained post-cleanup
+ assert expectedMaxTimestamps.equals(getMaxTimestampList(cfs));
+
+ // check less data is there, all transient data should be gone since the table was repaired
+ assertEquals(fullCount, Util.getAll(Util.cmd(cfs).build()).size());
+ }
+
+ protected void fillCF(ColumnFamilyStore cfs, String colName, int rowsPerSSTable)
+ {
+ CompactionManager.instance.disableAutoCompaction();
+
+ for (int i = 0; i < rowsPerSSTable; i++)
+ {
+ String key = String.valueOf(i);
+ // create a row and update the birthdate value, test that the index query fetches the new version
+ new RowUpdateBuilder(cfs.metadata(), System.currentTimeMillis(), ByteBufferUtil.bytes(key))
+ .clustering(COLUMN)
+ .add(colName, VALUE)
+ .build()
+ .applyUnsafe();
+ }
+
+ cfs.forceBlockingFlush();
+ }
+
+ protected List<Long> getMaxTimestampList(ColumnFamilyStore cfs)
+ {
+ List<Long> list = new LinkedList<Long>();
+ for (SSTableReader sstable : cfs.getLiveSSTables())
+ list.add(sstable.getMaxTimestamp());
+ return list;
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/db/ImportTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/ImportTest.java b/test/unit/org/apache/cassandra/db/ImportTest.java
index 66bbff3..5ceb233 100644
--- a/test/unit/org/apache/cassandra/db/ImportTest.java
+++ b/test/unit/org/apache/cassandra/db/ImportTest.java
@@ -174,7 +174,7 @@ public class ImportTest extends CQLTester
Set<SSTableReader> sstables = getCurrentColumnFamilyStore().getLiveSSTables();
getCurrentColumnFamilyStore().clearUnsafe();
for (SSTableReader sstable : sstables)
- sstable.descriptor.getMetadataSerializer().mutateRepaired(sstable.descriptor, 111, null);
+ sstable.descriptor.getMetadataSerializer().mutateRepairMetadata(sstable.descriptor, 111, null, false);
File backupdir = moveToBackupDir(sstables);
assertEquals(0, execute("select * from %s").size());
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/db/RepairedDataTombstonesTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/RepairedDataTombstonesTest.java b/test/unit/org/apache/cassandra/db/RepairedDataTombstonesTest.java
index e01088d..a864786 100644
--- a/test/unit/org/apache/cassandra/db/RepairedDataTombstonesTest.java
+++ b/test/unit/org/apache/cassandra/db/RepairedDataTombstonesTest.java
@@ -308,7 +308,7 @@ public class RepairedDataTombstonesTest extends CQLTester
public static void repair(ColumnFamilyStore cfs, SSTableReader sstable) throws IOException
{
- sstable.descriptor.getMetadataSerializer().mutateRepaired(sstable.descriptor, 1, null);
+ sstable.descriptor.getMetadataSerializer().mutateRepairMetadata(sstable.descriptor, 1, null, false);
sstable.reloadSSTableMetadata();
cfs.getTracker().notifySSTableRepairedStatusChanged(Collections.singleton(sstable));
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/db/RowUpdateBuilder.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/RowUpdateBuilder.java b/test/unit/org/apache/cassandra/db/RowUpdateBuilder.java
index 706b274..1ac5440 100644
--- a/test/unit/org/apache/cassandra/db/RowUpdateBuilder.java
+++ b/test/unit/org/apache/cassandra/db/RowUpdateBuilder.java
@@ -70,6 +70,12 @@ public class RowUpdateBuilder
this.updateBuilder.nowInSec(localDeletionTime);
}
+ public RowUpdateBuilder timestamp(long ts)
+ {
+ updateBuilder.timestamp(ts);
+ return this;
+ }
+
private Row.SimpleBuilder rowBuilder()
{
// Normally, rowBuilder is created by the call to clustering(), but we allow skipping that call for an empty
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/db/ScrubTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/ScrubTest.java b/test/unit/org/apache/cassandra/db/ScrubTest.java
index 36af54f..b58909b 100644
--- a/test/unit/org/apache/cassandra/db/ScrubTest.java
+++ b/test/unit/org/apache/cassandra/db/ScrubTest.java
@@ -615,7 +615,7 @@ public class ScrubTest
{
SerializationHeader header = new SerializationHeader(true, metadata.get(), metadata.get().regularAndStaticColumns(), EncodingStats.NO_STATS);
MetadataCollector collector = new MetadataCollector(metadata.get().comparator).sstableLevel(0);
- return new TestMultiWriter(new TestWriter(descriptor, keyCount, 0, null, metadata, collector, header, txn), txn);
+ return new TestMultiWriter(new TestWriter(descriptor, keyCount, 0, null, false, metadata, collector, header, txn), txn);
}
private static class TestMultiWriter extends SimpleSSTableMultiWriter
@@ -631,10 +631,10 @@ public class ScrubTest
*/
private static class TestWriter extends BigTableWriter
{
- TestWriter(Descriptor descriptor, long keyCount, long repairedAt, UUID pendingRepair, TableMetadataRef metadata,
+ TestWriter(Descriptor descriptor, long keyCount, long repairedAt, UUID pendingRepair, boolean isTransient, TableMetadataRef metadata,
MetadataCollector collector, SerializationHeader header, LifecycleTransaction txn)
{
- super(descriptor, keyCount, repairedAt, pendingRepair, metadata, collector, header, Collections.emptySet(), txn);
+ super(descriptor, keyCount, repairedAt, pendingRepair, isTransient, metadata, collector, header, Collections.emptySet(), txn);
}
@Override
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/db/SystemKeyspaceMigrator40Test.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/SystemKeyspaceMigrator40Test.java b/test/unit/org/apache/cassandra/db/SystemKeyspaceMigrator40Test.java
index 1c051f5..a14db00 100644
--- a/test/unit/org/apache/cassandra/db/SystemKeyspaceMigrator40Test.java
+++ b/test/unit/org/apache/cassandra/db/SystemKeyspaceMigrator40Test.java
@@ -34,6 +34,8 @@ import org.apache.cassandra.db.marshal.BytesType;
import org.apache.cassandra.db.marshal.Int32Type;
import org.apache.cassandra.db.marshal.UTF8Type;
import org.apache.cassandra.db.marshal.UUIDType;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
import org.apache.cassandra.utils.UUIDGen;
import static org.junit.Assert.assertEquals;
@@ -189,4 +191,28 @@ public class SystemKeyspaceMigrator40Test extends CQLTester
}
assertEquals(1, rowCount);
}
+
+ @Test
+ public void testMigrateAvailableRanges() throws Throwable
+ {
+ Range<Token> testRange = new Range<>(DatabaseDescriptor.getPartitioner().getRandomToken(), DatabaseDescriptor.getPartitioner().getRandomToken());
+ String insert = String.format("INSERT INTO %s ("
+ + "keyspace_name, "
+ + "ranges) "
+ + " values ( ?, ? )",
+ SystemKeyspaceMigrator40.legacyAvailableRangesName);
+ execute(insert,
+ "foo",
+ ImmutableSet.of(SystemKeyspace.rangeToBytes(testRange)));
+ SystemKeyspaceMigrator40.migrate();
+
+ int rowCount = 0;
+ for (UntypedResultSet.Row row : execute(String.format("SELECT * FROM %s", SystemKeyspaceMigrator40.availableRangesName)))
+ {
+ rowCount++;
+ assertEquals("foo", row.getString("keyspace_name"));
+ assertEquals(ImmutableSet.of(testRange), SystemKeyspace.rawRangesToRangeSet(row.getSet("full_ranges", BytesType.instance), DatabaseDescriptor.getPartitioner()));
+ }
+ assertEquals(1, rowCount);
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/db/TableCQLHelperTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/TableCQLHelperTest.java b/test/unit/org/apache/cassandra/db/TableCQLHelperTest.java
index 32fa4e4..fff567b 100644
--- a/test/unit/org/apache/cassandra/db/TableCQLHelperTest.java
+++ b/test/unit/org/apache/cassandra/db/TableCQLHelperTest.java
@@ -26,6 +26,7 @@ import java.util.*;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.io.Files;
+import org.apache.cassandra.service.reads.NeverSpeculativeRetryPolicy;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@@ -255,6 +256,7 @@ public class TableCQLHelperTest extends CQLTester
.maxIndexInterval(7)
.memtableFlushPeriod(8)
.speculativeRetry(AlwaysSpeculativeRetryPolicy.INSTANCE)
+ .speculativeWriteThreshold(NeverSpeculativeRetryPolicy.INSTANCE)
.extensions(ImmutableMap.of("ext1", ByteBuffer.wrap("val1".getBytes())))
.recordColumnDrop(ColumnMetadata.regularColumn(keyspace, table, "reg1", AsciiType.instance),
FBUtilities.timestampMicros());
@@ -272,6 +274,7 @@ public class TableCQLHelperTest extends CQLTester
"\tAND max_index_interval = 7\n" +
"\tAND memtable_flush_period_in_ms = 8\n" +
"\tAND speculative_retry = 'ALWAYS'\n" +
+ "\tAND speculative_write_threshold = 'NEVER'\n" +
"\tAND comment = 'comment'\n" +
"\tAND caching = { 'keys': 'ALL', 'rows_per_partition': 'NONE' }\n" +
"\tAND compaction = { 'max_threshold': '32', 'min_threshold': '4', 'sstable_size_in_mb': '1', 'class': 'org.apache.cassandra.db.compaction.LeveledCompactionStrategy' }\n" +
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/db/VerifyTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/VerifyTest.java b/test/unit/org/apache/cassandra/db/VerifyTest.java
index 0632274..df2acb4 100644
--- a/test/unit/org/apache/cassandra/db/VerifyTest.java
+++ b/test/unit/org/apache/cassandra/db/VerifyTest.java
@@ -421,7 +421,7 @@ public class VerifyTest
// make the sstable repaired:
SSTableReader sstable = cfs.getLiveSSTables().iterator().next();
- sstable.descriptor.getMetadataSerializer().mutateRepaired(sstable.descriptor, System.currentTimeMillis(), sstable.getSSTableMetadata().pendingRepair);
+ sstable.descriptor.getMetadataSerializer().mutateRepairMetadata(sstable.descriptor, System.currentTimeMillis(), sstable.getPendingRepair(), sstable.isTransient());
sstable.reloadSSTableMetadata();
// break the sstable:
@@ -487,7 +487,7 @@ public class VerifyTest
fillCF(cfs, 2);
SSTableReader sstable = cfs.getLiveSSTables().iterator().next();
- sstable.descriptor.getMetadataSerializer().mutateRepaired(sstable.descriptor, 1, sstable.getSSTableMetadata().pendingRepair);
+ sstable.descriptor.getMetadataSerializer().mutateRepairMetadata(sstable.descriptor, 1, sstable.getPendingRepair(), sstable.isTransient());
sstable.reloadSSTableMetadata();
cfs.getTracker().notifySSTableRepairedStatusChanged(Collections.singleton(sstable));
assertTrue(sstable.isRepaired());
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/db/compaction/AbstractPendingRepairTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/AbstractPendingRepairTest.java b/test/unit/org/apache/cassandra/db/compaction/AbstractPendingRepairTest.java
index a320248..4d62894 100644
--- a/test/unit/org/apache/cassandra/db/compaction/AbstractPendingRepairTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/AbstractPendingRepairTest.java
@@ -109,17 +109,16 @@ public class AbstractPendingRepairTest extends AbstractRepairTest
SSTableReader sstable = diff.iterator().next();
if (orphan)
{
- Iterables.any(csm.getUnrepaired(), s -> s.getSSTables().contains(sstable));
- csm.getUnrepaired().forEach(s -> s.removeSSTable(sstable));
+ csm.getUnrepairedUnsafe().allStrategies().forEach(acs -> acs.removeSSTable(sstable));
}
return sstable;
}
- protected static void mutateRepaired(SSTableReader sstable, long repairedAt, UUID pendingRepair)
+ protected static void mutateRepaired(SSTableReader sstable, long repairedAt, UUID pendingRepair, boolean isTransient)
{
try
{
- sstable.descriptor.getMetadataSerializer().mutateRepaired(sstable.descriptor, repairedAt, pendingRepair);
+ sstable.descriptor.getMetadataSerializer().mutateRepairMetadata(sstable.descriptor, repairedAt, pendingRepair, isTransient);
sstable.reloadSSTableMetadata();
}
catch (IOException e)
@@ -130,11 +129,11 @@ public class AbstractPendingRepairTest extends AbstractRepairTest
protected static void mutateRepaired(SSTableReader sstable, long repairedAt)
{
- mutateRepaired(sstable, repairedAt, ActiveRepairService.NO_PENDING_REPAIR);
+ mutateRepaired(sstable, repairedAt, ActiveRepairService.NO_PENDING_REPAIR, false);
}
- protected static void mutateRepaired(SSTableReader sstable, UUID pendingRepair)
+ protected static void mutateRepaired(SSTableReader sstable, UUID pendingRepair, boolean isTransient)
{
- mutateRepaired(sstable, ActiveRepairService.UNREPAIRED_SSTABLE, pendingRepair);
+ mutateRepaired(sstable, ActiveRepairService.UNREPAIRED_SSTABLE, pendingRepair, isTransient);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java b/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
index 366c18e..f514ea6 100644
--- a/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
@@ -27,6 +27,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.UUID;
+import java.util.function.Predicate;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
@@ -39,6 +40,8 @@ import org.junit.Test;
import org.apache.cassandra.dht.Murmur3Partitioner;
import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.locator.RangesAtEndpoint;
+import org.apache.cassandra.locator.Replica;
import org.apache.cassandra.schema.MockSchema;
import org.apache.cassandra.schema.Schema;
import org.apache.cassandra.schema.TableMetadata;
@@ -50,7 +53,6 @@ import org.apache.cassandra.db.rows.UnfilteredRowIterator;
import org.apache.cassandra.dht.ByteOrderedPartitioner.BytesToken;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.io.sstable.*;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.schema.KeyspaceParams;
@@ -58,6 +60,7 @@ import org.apache.cassandra.service.ActiveRepairService;
import org.apache.cassandra.SchemaLoader;
import org.apache.cassandra.streaming.PreviewKind;
import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.UUIDGen;
import org.apache.cassandra.utils.concurrent.Refs;
import org.apache.cassandra.UpdateBuilder;
@@ -77,16 +80,21 @@ public class AntiCompactionTest
{
private static final String KEYSPACE1 = "AntiCompactionTest";
private static final String CF = "AntiCompactionTest";
+ private static final Collection<Range<Token>> NO_RANGES = Collections.emptyList();
+
private static TableMetadata metadata;
private static ColumnFamilyStore cfs;
+ private static InetAddressAndPort local;
+
@BeforeClass
- public static void defineSchema() throws ConfigurationException
+ public static void defineSchema() throws Throwable
{
SchemaLoader.prepareServer();
metadata = SchemaLoader.standardCFMD(KEYSPACE1, CF).build();
SchemaLoader.createKeyspace(KEYSPACE1, KeyspaceParams.simple(1), metadata);
cfs = Schema.instance.getColumnFamilyStoreInstance(metadata.id);
+ local = InetAddressAndPort.getByName("127.0.0.1");
}
@After
@@ -97,56 +105,86 @@ public class AntiCompactionTest
store.truncateBlocking();
}
- private void registerParentRepairSession(UUID sessionID, Collection<Range<Token>> ranges, long repairedAt, UUID pendingRepair) throws IOException
+ private void registerParentRepairSession(UUID sessionID, Iterable<Range<Token>> ranges, long repairedAt, UUID pendingRepair) throws IOException
{
ActiveRepairService.instance.registerParentRepairSession(sessionID,
InetAddressAndPort.getByName("10.0.0.1"),
- Lists.newArrayList(cfs), ranges,
+ Lists.newArrayList(cfs), ImmutableSet.copyOf(ranges),
pendingRepair != null || repairedAt != UNREPAIRED_SSTABLE,
repairedAt, true, PreviewKind.NONE);
}
- private void antiCompactOne(long repairedAt, UUID pendingRepair) throws Exception
+ private static RangesAtEndpoint atEndpoint(Collection<Range<Token>> full, Collection<Range<Token>> trans)
{
- assert repairedAt != UNREPAIRED_SSTABLE || pendingRepair != null;
+ RangesAtEndpoint.Builder builder = RangesAtEndpoint.builder(local);
+ for (Range<Token> range : full)
+ builder.add(new Replica(local, range, true));
- ColumnFamilyStore store = prepareColumnFamilyStore();
- Collection<SSTableReader> sstables = getUnrepairedSSTables(store);
- assertEquals(store.getLiveSSTables().size(), sstables.size());
- Range<Token> range = new Range<Token>(new BytesToken("0".getBytes()), new BytesToken("4".getBytes()));
- List<Range<Token>> ranges = Arrays.asList(range);
+ for (Range<Token> range : trans)
+ builder.add(new Replica(local, range, false));
+
+ return builder.build();
+ }
+
+ private static Collection<Range<Token>> range(int l, int r)
+ {
+ return Collections.singleton(new Range<>(new BytesToken(Integer.toString(l).getBytes()), new BytesToken(Integer.toString(r).getBytes())));
+ }
- int repairedKeys = 0;
+ private static class SSTableStats
+ {
+ int numLiveSSTables = 0;
int pendingKeys = 0;
- int nonRepairedKeys = 0;
+ int transKeys = 0;
+ int unrepairedKeys = 0;
+ }
+
+ private SSTableStats antiCompactRanges(ColumnFamilyStore store, RangesAtEndpoint ranges) throws IOException
+ {
+ UUID sessionID = UUID.randomUUID();
+ Collection<SSTableReader> sstables = getUnrepairedSSTables(store);
try (LifecycleTransaction txn = store.getTracker().tryModify(sstables, OperationType.ANTICOMPACTION);
Refs<SSTableReader> refs = Refs.ref(sstables))
{
if (txn == null)
throw new IllegalStateException();
- UUID parentRepairSession = pendingRepair == null ? UUID.randomUUID() : pendingRepair;
- registerParentRepairSession(parentRepairSession, ranges, repairedAt, pendingRepair);
- CompactionManager.instance.performAnticompaction(store, ranges, refs, txn, repairedAt, pendingRepair, parentRepairSession);
+ registerParentRepairSession(sessionID, ranges.ranges(), FBUtilities.nowInSeconds(), sessionID);
+ CompactionManager.instance.performAnticompaction(store, ranges, refs, txn, sessionID);
}
- assertEquals(2, store.getLiveSSTables().size());
+ SSTableStats stats = new SSTableStats();
+ stats.numLiveSSTables = store.getLiveSSTables().size();
+
+ Predicate<Token> fullContains = t -> Iterables.any(ranges.fullRanges(), r -> r.contains(t));
+ Predicate<Token> transContains = t -> Iterables.any(ranges.transientRanges(), r -> r.contains(t));
for (SSTableReader sstable : store.getLiveSSTables())
{
+ assertFalse(sstable.isRepaired());
+ assertEquals(sstable.isPendingRepair() ? sessionID : NO_PENDING_REPAIR, sstable.getPendingRepair());
try (ISSTableScanner scanner = sstable.getScanner())
{
while (scanner.hasNext())
{
UnfilteredRowIterator row = scanner.next();
- if (sstable.isRepaired() || sstable.isPendingRepair())
+ Token token = row.partitionKey().getToken();
+ if (sstable.isPendingRepair() && !sstable.isTransient())
{
- assertTrue(range.contains(row.partitionKey().getToken()));
- repairedKeys += sstable.isRepaired() ? 1 : 0;
- pendingKeys += sstable.isPendingRepair() ? 1 : 0;
+ assertTrue(fullContains.test(token));
+ assertFalse(transContains.test(token));
+ stats.pendingKeys++;
+ }
+ else if (sstable.isPendingRepair() && sstable.isTransient())
+ {
+
+ assertTrue(transContains.test(token));
+ assertFalse(fullContains.test(token));
+ stats.transKeys++;
}
else
{
- assertFalse(range.contains(row.partitionKey().getToken()));
- nonRepairedKeys++;
+ assertFalse(fullContains.test(token));
+ assertFalse(transContains.test(token));
+ stats.unrepairedKeys++;
}
}
}
@@ -157,21 +195,40 @@ public class AntiCompactionTest
assertEquals(1, sstable.selfRef().globalCount());
}
assertEquals(0, store.getTracker().getCompacting().size());
- assertEquals(repairedKeys, repairedAt != UNREPAIRED_SSTABLE ? 4 : 0);
- assertEquals(pendingKeys, pendingRepair != NO_PENDING_REPAIR ? 4 : 0);
- assertEquals(nonRepairedKeys, 6);
+ return stats;
}
@Test
- public void antiCompactOneRepairedAt() throws Exception
+ public void antiCompactOneFull() throws Exception
{
- antiCompactOne(1000, NO_PENDING_REPAIR);
+ ColumnFamilyStore store = prepareColumnFamilyStore();
+ SSTableStats stats = antiCompactRanges(store, atEndpoint(range(0, 4), NO_RANGES));
+ assertEquals(2, stats.numLiveSSTables);
+ assertEquals(stats.pendingKeys, 4);
+ assertEquals(stats.transKeys, 0);
+ assertEquals(stats.unrepairedKeys, 6);
+ }
+
+ @Test
+ public void antiCompactOneMixed() throws Exception
+ {
+ ColumnFamilyStore store = prepareColumnFamilyStore();
+ SSTableStats stats = antiCompactRanges(store, atEndpoint(range(0, 4), range(4, 8)));
+ assertEquals(3, stats.numLiveSSTables);
+ assertEquals(stats.pendingKeys, 4);
+ assertEquals(stats.transKeys, 4);
+ assertEquals(stats.unrepairedKeys, 2);
}
@Test
- public void antiCompactOnePendingRepair() throws Exception
+ public void antiCompactOneTransOnly() throws Exception
{
- antiCompactOne(UNREPAIRED_SSTABLE, UUIDGen.getTimeUUID());
+ ColumnFamilyStore store = prepareColumnFamilyStore();
+ SSTableStats stats = antiCompactRanges(store, atEndpoint(NO_RANGES, range(0, 4)));
+ assertEquals(2, stats.numLiveSSTables);
+ assertEquals(stats.pendingKeys, 0);
+ assertEquals(stats.transKeys, 4);
+ assertEquals(stats.unrepairedKeys, 6);
}
@Test
@@ -190,7 +247,7 @@ public class AntiCompactionTest
try (LifecycleTransaction txn = cfs.getTracker().tryModify(sstables, OperationType.ANTICOMPACTION);
Refs<SSTableReader> refs = Refs.ref(sstables))
{
- CompactionManager.instance.performAnticompaction(cfs, ranges, refs, txn, 12345, NO_PENDING_REPAIR, parentRepairSession);
+ CompactionManager.instance.performAnticompaction(cfs, atEndpoint(ranges, NO_RANGES), refs, txn, parentRepairSession);
}
long sum = 0;
long rows = 0;
@@ -208,7 +265,7 @@ public class AntiCompactionTest
File dir = cfs.getDirectories().getDirectoryForNewSSTables();
Descriptor desc = cfs.newSSTableDescriptor(dir);
- try (SSTableTxnWriter writer = SSTableTxnWriter.create(cfs, desc, 0, 0, NO_PENDING_REPAIR, new SerializationHeader(true, cfs.metadata(), cfs.metadata().regularAndStaticColumns(), EncodingStats.NO_STATS)))
+ try (SSTableTxnWriter writer = SSTableTxnWriter.create(cfs, desc, 0, 0, NO_PENDING_REPAIR, false, new SerializationHeader(true, cfs.metadata(), cfs.metadata().regularAndStaticColumns(), EncodingStats.NO_STATS)))
{
for (int i = 0; i < count; i++)
{
@@ -240,7 +297,7 @@ public class AntiCompactionTest
}
@Test
- public void antiCompactTen() throws InterruptedException, IOException
+ public void antiCompactTenFull() throws InterruptedException, IOException
{
Keyspace keyspace = Keyspace.open(KEYSPACE1);
ColumnFamilyStore store = keyspace.getColumnFamilyStore(CF);
@@ -250,56 +307,59 @@ public class AntiCompactionTest
{
generateSStable(store,Integer.toString(table));
}
- Collection<SSTableReader> sstables = getUnrepairedSSTables(store);
- assertEquals(store.getLiveSSTables().size(), sstables.size());
+ SSTableStats stats = antiCompactRanges(store, atEndpoint(range(0, 4), NO_RANGES));
+ /*
+ Anticompaction will be anti-compacting 10 SSTables but will be doing this two at a time
+ so there will be no net change in the number of sstables
+ */
+ assertEquals(10, stats.numLiveSSTables);
+ assertEquals(stats.pendingKeys, 40);
+ assertEquals(stats.transKeys, 0);
+ assertEquals(stats.unrepairedKeys, 60);
+ }
- Range<Token> range = new Range<Token>(new BytesToken("0".getBytes()), new BytesToken("4".getBytes()));
- List<Range<Token>> ranges = Arrays.asList(range);
+ @Test
+ public void antiCompactTenTrans() throws InterruptedException, IOException
+ {
+ Keyspace keyspace = Keyspace.open(KEYSPACE1);
+ ColumnFamilyStore store = keyspace.getColumnFamilyStore(CF);
+ store.disableAutoCompaction();
- long repairedAt = 1000;
- UUID parentRepairSession = UUID.randomUUID();
- registerParentRepairSession(parentRepairSession, ranges, repairedAt, null);
- try (LifecycleTransaction txn = store.getTracker().tryModify(sstables, OperationType.ANTICOMPACTION);
- Refs<SSTableReader> refs = Refs.ref(sstables))
+ for (int table = 0; table < 10; table++)
{
- CompactionManager.instance.performAnticompaction(store, ranges, refs, txn, repairedAt, NO_PENDING_REPAIR, parentRepairSession);
+ generateSStable(store,Integer.toString(table));
}
+ SSTableStats stats = antiCompactRanges(store, atEndpoint(NO_RANGES, range(0, 4)));
/*
Anticompaction will be anti-compacting 10 SSTables but will be doing this two at a time
so there will be no net change in the number of sstables
*/
- assertEquals(10, store.getLiveSSTables().size());
- int repairedKeys = 0;
- int nonRepairedKeys = 0;
- for (SSTableReader sstable : store.getLiveSSTables())
+ assertEquals(10, stats.numLiveSSTables);
+ assertEquals(stats.pendingKeys, 0);
+ assertEquals(stats.transKeys, 40);
+ assertEquals(stats.unrepairedKeys, 60);
+ }
+
+ @Test
+ public void antiCompactTenMixed() throws InterruptedException, IOException
+ {
+ Keyspace keyspace = Keyspace.open(KEYSPACE1);
+ ColumnFamilyStore store = keyspace.getColumnFamilyStore(CF);
+ store.disableAutoCompaction();
+
+ for (int table = 0; table < 10; table++)
{
- try (ISSTableScanner scanner = sstable.getScanner())
- {
- while (scanner.hasNext())
- {
- try (UnfilteredRowIterator row = scanner.next())
- {
- if (sstable.isRepaired())
- {
- assertTrue(range.contains(row.partitionKey().getToken()));
- assertEquals(repairedAt, sstable.getSSTableMetadata().repairedAt);
- repairedKeys++;
- }
- else
- {
- assertFalse(range.contains(row.partitionKey().getToken()));
- assertEquals(ActiveRepairService.UNREPAIRED_SSTABLE, sstable.getSSTableMetadata().repairedAt);
- nonRepairedKeys++;
- }
- }
- }
- }
+ generateSStable(store,Integer.toString(table));
}
- assertEquals(repairedKeys, 40);
- assertEquals(nonRepairedKeys, 60);
+ SSTableStats stats = antiCompactRanges(store, atEndpoint(range(0, 4), range(4, 8)));
+ assertEquals(15, stats.numLiveSSTables);
+ assertEquals(stats.pendingKeys, 40);
+ assertEquals(stats.transKeys, 40);
+ assertEquals(stats.unrepairedKeys, 20);
}
- private void shouldMutate(long repairedAt, UUID pendingRepair) throws InterruptedException, IOException
+ @Test
+ public void shouldMutatePendingRepair() throws InterruptedException, IOException
{
ColumnFamilyStore store = prepareColumnFamilyStore();
Collection<SSTableReader> sstables = getUnrepairedSSTables(store);
@@ -307,35 +367,23 @@ public class AntiCompactionTest
// the sstables start at "0".getBytes() = 48, we need to include that first token, with "/".getBytes() = 47
Range<Token> range = new Range<Token>(new BytesToken("/".getBytes()), new BytesToken("9999".getBytes()));
List<Range<Token>> ranges = Arrays.asList(range);
- UUID parentRepairSession = pendingRepair == null ? UUID.randomUUID() : pendingRepair;
- registerParentRepairSession(parentRepairSession, ranges, repairedAt, pendingRepair);
+ UUID pendingRepair = UUID.randomUUID();
+ registerParentRepairSession(pendingRepair, ranges, UNREPAIRED_SSTABLE, pendingRepair);
try (LifecycleTransaction txn = store.getTracker().tryModify(sstables, OperationType.ANTICOMPACTION);
Refs<SSTableReader> refs = Refs.ref(sstables))
{
- CompactionManager.instance.performAnticompaction(store, ranges, refs, txn, repairedAt, pendingRepair, parentRepairSession);
+ CompactionManager.instance.performAnticompaction(store, atEndpoint(ranges, NO_RANGES), refs, txn, pendingRepair);
}
assertThat(store.getLiveSSTables().size(), is(1));
- assertThat(Iterables.get(store.getLiveSSTables(), 0).isRepaired(), is(repairedAt != UNREPAIRED_SSTABLE));
- assertThat(Iterables.get(store.getLiveSSTables(), 0).isPendingRepair(), is(pendingRepair != NO_PENDING_REPAIR));
+ assertThat(Iterables.get(store.getLiveSSTables(), 0).isRepaired(), is(false));
+ assertThat(Iterables.get(store.getLiveSSTables(), 0).isPendingRepair(), is(true));
assertThat(Iterables.get(store.getLiveSSTables(), 0).selfRef().globalCount(), is(1));
assertThat(store.getTracker().getCompacting().size(), is(0));
}
@Test
- public void shouldMutateRepairedAt() throws InterruptedException, IOException
- {
- shouldMutate(1, NO_PENDING_REPAIR);
- }
-
- @Test
- public void shouldMutatePendingRepair() throws InterruptedException, IOException
- {
- shouldMutate(UNREPAIRED_SSTABLE, UUIDGen.getTimeUUID());
- }
-
- @Test
public void shouldSkipAntiCompactionForNonIntersectingRange() throws InterruptedException, IOException
{
Keyspace keyspace = Keyspace.open(KEYSPACE1);
@@ -358,7 +406,7 @@ public class AntiCompactionTest
try (LifecycleTransaction txn = store.getTracker().tryModify(sstables, OperationType.ANTICOMPACTION);
Refs<SSTableReader> refs = Refs.ref(sstables))
{
- CompactionManager.instance.performAnticompaction(store, ranges, refs, txn, 1, NO_PENDING_REPAIR, parentRepairSession);
+ CompactionManager.instance.performAnticompaction(store, atEndpoint(ranges, NO_RANGES), refs, txn, parentRepairSession);
}
catch (IllegalStateException e)
{
@@ -428,7 +476,7 @@ public class AntiCompactionTest
Assert.assertFalse(refs.isEmpty());
try
{
- CompactionManager.instance.performAnticompaction(store, ranges, refs, txn, 1, missingRepairSession, missingRepairSession);
+ CompactionManager.instance.performAnticompaction(store, atEndpoint(ranges, NO_RANGES), refs, txn, missingRepairSession);
Assert.fail("expected RuntimeException");
}
catch (RuntimeException e)
@@ -484,8 +532,7 @@ public class AntiCompactionTest
Range<Token> r = new Range<>(t(9), t(100)); // sstable is not intersecting and should not be included
- Iterator<SSTableReader> sstableIterator = sstables.iterator();
- CompactionManager.findSSTablesToAnticompact(sstableIterator, Collections.singletonList(r), UUID.randomUUID());
+ CompactionManager.validateSSTableBoundsForAnticompaction(UUID.randomUUID(), sstables, atEndpoint(Collections.singletonList(r), NO_RANGES));
}
@Test(expected = IllegalStateException.class)
@@ -500,8 +547,7 @@ public class AntiCompactionTest
Range<Token> r = new Range<>(t(10), t(11)); // no sstable included, throw
- Iterator<SSTableReader> sstableIterator = sstables.iterator();
- CompactionManager.findSSTablesToAnticompact(sstableIterator, Collections.singletonList(r), UUID.randomUUID());
+ CompactionManager.validateSSTableBoundsForAnticompaction(UUID.randomUUID(), sstables, atEndpoint(Collections.singletonList(r), NO_RANGES));
}
@Test
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/db/compaction/CompactionStrategyManagerPendingRepairTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionStrategyManagerPendingRepairTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionStrategyManagerPendingRepairTest.java
index c7f1ae8..267c2e4 100644
--- a/test/unit/org/apache/cassandra/db/compaction/CompactionStrategyManagerPendingRepairTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/CompactionStrategyManagerPendingRepairTest.java
@@ -18,7 +18,6 @@
package org.apache.cassandra.db.compaction;
-import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.UUID;
@@ -42,29 +41,34 @@ import org.apache.cassandra.utils.FBUtilities;
public class CompactionStrategyManagerPendingRepairTest extends AbstractPendingRepairTest
{
- private static boolean strategiesContain(Collection<AbstractCompactionStrategy> strategies, SSTableReader sstable)
+ private boolean transientContains(SSTableReader sstable)
{
- return Iterables.any(strategies, strategy -> strategy.getSSTables().contains(sstable));
- }
-
- private boolean pendingContains(UUID id, SSTableReader sstable)
- {
- return Iterables.any(csm.getPendingRepairManagers(), p -> p.get(id) != null && p.get(id).getSSTables().contains(sstable));
+ return csm.getTransientRepairsUnsafe().containsSSTable(sstable);
}
private boolean pendingContains(SSTableReader sstable)
{
- return Iterables.any(csm.getPendingRepairManagers(), p -> strategiesContain(p.getStrategies(), sstable));
+ return csm.getPendingRepairsUnsafe().containsSSTable(sstable);
}
private boolean repairedContains(SSTableReader sstable)
{
- return strategiesContain(csm.getRepaired(), sstable);
+ return csm.getRepairedUnsafe().containsSSTable(sstable);
}
private boolean unrepairedContains(SSTableReader sstable)
{
- return strategiesContain(csm.getUnrepaired(), sstable);
+ return csm.getUnrepairedUnsafe().containsSSTable(sstable);
+ }
+
+ private boolean hasPendingStrategiesFor(UUID sessionID)
+ {
+ return !Iterables.isEmpty(csm.getPendingRepairsUnsafe().getStrategiesFor(sessionID));
+ }
+
+ private boolean hasTransientStrategiesFor(UUID sessionID)
+ {
+ return !Iterables.isEmpty(csm.getTransientRepairsUnsafe().getStrategiesFor(sessionID));
}
/**
@@ -75,23 +79,25 @@ public class CompactionStrategyManagerPendingRepairTest extends AbstractPendingR
{
UUID repairID = registerSession(cfs, true, true);
LocalSessionAccessor.prepareUnsafe(repairID, COORDINATOR, PARTICIPANTS);
- Assert.assertTrue(csm.pendingRepairs().isEmpty());
+ Assert.assertTrue(Iterables.isEmpty(csm.getPendingRepairsUnsafe().allStrategies()));
SSTableReader sstable = makeSSTable(true);
Assert.assertFalse(sstable.isRepaired());
Assert.assertFalse(sstable.isPendingRepair());
- mutateRepaired(sstable, repairID);
+ mutateRepaired(sstable, repairID, false);
Assert.assertFalse(sstable.isRepaired());
Assert.assertTrue(sstable.isPendingRepair());
- csm.getForPendingRepair(repairID).forEach(Assert::assertNull);
+ Assert.assertFalse(hasPendingStrategiesFor(repairID));
+ Assert.assertFalse(hasTransientStrategiesFor(repairID));
// add the sstable
csm.handleNotification(new SSTableAddedNotification(Collections.singleton(sstable), null), cfs.getTracker());
Assert.assertFalse(repairedContains(sstable));
Assert.assertFalse(unrepairedContains(sstable));
- csm.getForPendingRepair(repairID).forEach(Assert::assertNotNull);
- Assert.assertTrue(pendingContains(repairID, sstable));
+ Assert.assertTrue(pendingContains(sstable));
+ Assert.assertTrue(hasPendingStrategiesFor(repairID));
+ Assert.assertFalse(hasTransientStrategiesFor(repairID));
}
@Test
@@ -101,16 +107,17 @@ public class CompactionStrategyManagerPendingRepairTest extends AbstractPendingR
LocalSessionAccessor.prepareUnsafe(repairID, COORDINATOR, PARTICIPANTS);
SSTableReader sstable1 = makeSSTable(true);
- mutateRepaired(sstable1, repairID);
+ mutateRepaired(sstable1, repairID, false);
SSTableReader sstable2 = makeSSTable(true);
- mutateRepaired(sstable2, repairID);
+ mutateRepaired(sstable2, repairID, false);
Assert.assertFalse(repairedContains(sstable1));
Assert.assertFalse(unrepairedContains(sstable1));
Assert.assertFalse(repairedContains(sstable2));
Assert.assertFalse(unrepairedContains(sstable2));
- csm.getForPendingRepair(repairID).forEach(Assert::assertNull);
+ Assert.assertFalse(hasPendingStrategiesFor(repairID));
+ Assert.assertFalse(hasTransientStrategiesFor(repairID));
// add only
SSTableListChangedNotification notification;
@@ -119,13 +126,14 @@ public class CompactionStrategyManagerPendingRepairTest extends AbstractPendingR
OperationType.COMPACTION);
csm.handleNotification(notification, cfs.getTracker());
- csm.getForPendingRepair(repairID).forEach(Assert::assertNotNull);
Assert.assertFalse(repairedContains(sstable1));
Assert.assertFalse(unrepairedContains(sstable1));
- Assert.assertTrue(pendingContains(repairID, sstable1));
+ Assert.assertTrue(pendingContains(sstable1));
Assert.assertFalse(repairedContains(sstable2));
Assert.assertFalse(unrepairedContains(sstable2));
- Assert.assertFalse(pendingContains(repairID, sstable2));
+ Assert.assertFalse(pendingContains(sstable2));
+ Assert.assertTrue(hasPendingStrategiesFor(repairID));
+ Assert.assertFalse(hasTransientStrategiesFor(repairID));
// remove and add
notification = new SSTableListChangedNotification(Collections.singleton(sstable2),
@@ -135,10 +143,10 @@ public class CompactionStrategyManagerPendingRepairTest extends AbstractPendingR
Assert.assertFalse(repairedContains(sstable1));
Assert.assertFalse(unrepairedContains(sstable1));
- Assert.assertFalse(pendingContains(repairID, sstable1));
+ Assert.assertFalse(pendingContains(sstable1));
Assert.assertFalse(repairedContains(sstable2));
Assert.assertFalse(unrepairedContains(sstable2));
- Assert.assertTrue(pendingContains(repairID, sstable2));
+ Assert.assertTrue(pendingContains(sstable2));
}
@Test
@@ -151,18 +159,20 @@ public class CompactionStrategyManagerPendingRepairTest extends AbstractPendingR
SSTableReader sstable = makeSSTable(false);
Assert.assertTrue(unrepairedContains(sstable));
Assert.assertFalse(repairedContains(sstable));
- csm.getForPendingRepair(repairID).forEach(Assert::assertNull);
+ Assert.assertFalse(hasPendingStrategiesFor(repairID));
+ Assert.assertFalse(hasTransientStrategiesFor(repairID));
SSTableRepairStatusChanged notification;
// change to pending repaired
- mutateRepaired(sstable, repairID);
+ mutateRepaired(sstable, repairID, false);
notification = new SSTableRepairStatusChanged(Collections.singleton(sstable));
csm.handleNotification(notification, cfs.getTracker());
Assert.assertFalse(unrepairedContains(sstable));
Assert.assertFalse(repairedContains(sstable));
- csm.getForPendingRepair(repairID).forEach(Assert::assertNotNull);
- Assert.assertTrue(pendingContains(repairID, sstable));
+ Assert.assertTrue(hasPendingStrategiesFor(repairID));
+ Assert.assertFalse(hasTransientStrategiesFor(repairID));
+ Assert.assertTrue(pendingContains(sstable));
// change to repaired
mutateRepaired(sstable, System.currentTimeMillis());
@@ -170,7 +180,7 @@ public class CompactionStrategyManagerPendingRepairTest extends AbstractPendingR
csm.handleNotification(notification, cfs.getTracker());
Assert.assertFalse(unrepairedContains(sstable));
Assert.assertTrue(repairedContains(sstable));
- Assert.assertFalse(pendingContains(repairID, sstable));
+ Assert.assertFalse(pendingContains(sstable));
}
@Test
@@ -180,14 +190,14 @@ public class CompactionStrategyManagerPendingRepairTest extends AbstractPendingR
LocalSessionAccessor.prepareUnsafe(repairID, COORDINATOR, PARTICIPANTS);
SSTableReader sstable = makeSSTable(true);
- mutateRepaired(sstable, repairID);
+ mutateRepaired(sstable, repairID, false);
csm.handleNotification(new SSTableAddedNotification(Collections.singleton(sstable), null), cfs.getTracker());
- Assert.assertTrue(pendingContains(repairID, sstable));
+ Assert.assertTrue(pendingContains(sstable));
// delete sstable
SSTableDeletingNotification notification = new SSTableDeletingNotification(sstable);
csm.handleNotification(notification, cfs.getTracker());
- Assert.assertFalse(pendingContains(repairID, sstable));
+ Assert.assertFalse(pendingContains(sstable));
Assert.assertFalse(unrepairedContains(sstable));
Assert.assertFalse(repairedContains(sstable));
}
@@ -209,7 +219,7 @@ public class CompactionStrategyManagerPendingRepairTest extends AbstractPendingR
Assert.assertTrue(strategies.get(2).isEmpty());
SSTableReader sstable = makeSSTable(true);
- mutateRepaired(sstable, repairID);
+ mutateRepaired(sstable, repairID, false);
csm.handleNotification(new SSTableAddedNotification(Collections.singleton(sstable), null), cfs.getTracker());
strategies = csm.getStrategies();
@@ -227,11 +237,12 @@ public class CompactionStrategyManagerPendingRepairTest extends AbstractPendingR
UUID repairID = registerSession(cfs, true, true);
LocalSessionAccessor.prepareUnsafe(repairID, COORDINATOR, PARTICIPANTS);
SSTableReader sstable = makeSSTable(true);
- mutateRepaired(sstable, repairID);
+ mutateRepaired(sstable, repairID, false);
csm.handleNotification(new SSTableAddedNotification(Collections.singleton(sstable), null), cfs.getTracker());
LocalSessionAccessor.finalizeUnsafe(repairID);
- csm.getForPendingRepair(repairID).forEach(Assert::assertNotNull);
- Assert.assertNotNull(pendingContains(repairID, sstable));
+ Assert.assertTrue(hasPendingStrategiesFor(repairID));
+ Assert.assertFalse(hasTransientStrategiesFor(repairID));
+ Assert.assertTrue(pendingContains(sstable));
Assert.assertTrue(sstable.isPendingRepair());
Assert.assertFalse(sstable.isRepaired());
@@ -245,7 +256,9 @@ public class CompactionStrategyManagerPendingRepairTest extends AbstractPendingR
Assert.assertTrue(repairedContains(sstable));
Assert.assertFalse(unrepairedContains(sstable));
- csm.getForPendingRepair(repairID).forEach(Assert::assertNull);
+ Assert.assertFalse(pendingContains(sstable));
+ Assert.assertFalse(hasPendingStrategiesFor(repairID));
+ Assert.assertFalse(hasTransientStrategiesFor(repairID));
// sstable should have pendingRepair cleared, and repairedAt set correctly
long expectedRepairedAt = ActiveRepairService.instance.getParentRepairSession(repairID).repairedAt;
@@ -264,12 +277,13 @@ public class CompactionStrategyManagerPendingRepairTest extends AbstractPendingR
UUID repairID = registerSession(cfs, true, true);
LocalSessionAccessor.prepareUnsafe(repairID, COORDINATOR, PARTICIPANTS);
SSTableReader sstable = makeSSTable(true);
- mutateRepaired(sstable, repairID);
+ mutateRepaired(sstable, repairID, false);
csm.handleNotification(new SSTableAddedNotification(Collections.singleton(sstable), null), cfs.getTracker());
LocalSessionAccessor.failUnsafe(repairID);
- csm.getForPendingRepair(repairID).forEach(Assert::assertNotNull);
- Assert.assertNotNull(pendingContains(repairID, sstable));
+ Assert.assertTrue(hasPendingStrategiesFor(repairID));
+ Assert.assertFalse(hasTransientStrategiesFor(repairID));
+ Assert.assertTrue(pendingContains(sstable));
Assert.assertTrue(sstable.isPendingRepair());
Assert.assertFalse(sstable.isRepaired());
@@ -283,11 +297,78 @@ public class CompactionStrategyManagerPendingRepairTest extends AbstractPendingR
Assert.assertFalse(repairedContains(sstable));
Assert.assertTrue(unrepairedContains(sstable));
- csm.getForPendingRepair(repairID).forEach(Assert::assertNull);
+ Assert.assertFalse(hasPendingStrategiesFor(repairID));
+ Assert.assertFalse(hasTransientStrategiesFor(repairID));
// sstable should have pendingRepair cleared, and repairedAt set correctly
Assert.assertFalse(sstable.isPendingRepair());
Assert.assertFalse(sstable.isRepaired());
Assert.assertEquals(ActiveRepairService.UNREPAIRED_SSTABLE, sstable.getSSTableMetadata().repairedAt);
}
+
+ @Test
+ public void finalizedSessionTransientCleanup()
+ {
+ Assert.assertTrue(cfs.getLiveSSTables().isEmpty());
+ UUID repairID = registerSession(cfs, true, true);
+ LocalSessionAccessor.prepareUnsafe(repairID, COORDINATOR, PARTICIPANTS);
+ SSTableReader sstable = makeSSTable(true);
+ mutateRepaired(sstable, repairID, true);
+ csm.handleNotification(new SSTableAddedNotification(Collections.singleton(sstable), null), cfs.getTracker());
+ LocalSessionAccessor.finalizeUnsafe(repairID);
+
+ Assert.assertFalse(hasPendingStrategiesFor(repairID));
+ Assert.assertTrue(hasTransientStrategiesFor(repairID));
+ Assert.assertTrue(transientContains(sstable));
+ Assert.assertFalse(pendingContains(sstable));
+ Assert.assertFalse(repairedContains(sstable));
+ Assert.assertFalse(unrepairedContains(sstable));
+
+ cfs.getCompactionStrategyManager().enable(); // enable compaction to fetch next background task
+ AbstractCompactionTask compactionTask = csm.getNextBackgroundTask(FBUtilities.nowInSeconds());
+ Assert.assertNotNull(compactionTask);
+ Assert.assertSame(PendingRepairManager.RepairFinishedCompactionTask.class, compactionTask.getClass());
+
+ // run the compaction
+ compactionTask.execute(null);
+
+ Assert.assertTrue(cfs.getLiveSSTables().isEmpty());
+ Assert.assertFalse(hasPendingStrategiesFor(repairID));
+ Assert.assertFalse(hasTransientStrategiesFor(repairID));
+ }
+
+ @Test
+ public void failedSessionTransientCleanup()
+ {
+ Assert.assertTrue(cfs.getLiveSSTables().isEmpty());
+ UUID repairID = registerSession(cfs, true, true);
+ LocalSessionAccessor.prepareUnsafe(repairID, COORDINATOR, PARTICIPANTS);
+ SSTableReader sstable = makeSSTable(true);
+ mutateRepaired(sstable, repairID, true);
+ csm.handleNotification(new SSTableAddedNotification(Collections.singleton(sstable), null), cfs.getTracker());
+ LocalSessionAccessor.failUnsafe(repairID);
+
+ Assert.assertFalse(hasPendingStrategiesFor(repairID));
+ Assert.assertTrue(hasTransientStrategiesFor(repairID));
+ Assert.assertTrue(transientContains(sstable));
+ Assert.assertFalse(pendingContains(sstable));
+ Assert.assertFalse(repairedContains(sstable));
+ Assert.assertFalse(unrepairedContains(sstable));
+
+ cfs.getCompactionStrategyManager().enable(); // enable compaction to fetch next background task
+ AbstractCompactionTask compactionTask = csm.getNextBackgroundTask(FBUtilities.nowInSeconds());
+ Assert.assertNotNull(compactionTask);
+ Assert.assertSame(PendingRepairManager.RepairFinishedCompactionTask.class, compactionTask.getClass());
+
+ // run the compaction
+ compactionTask.execute(null);
+
+ Assert.assertFalse(cfs.getLiveSSTables().isEmpty());
+ Assert.assertFalse(hasPendingStrategiesFor(repairID));
+ Assert.assertFalse(hasTransientStrategiesFor(repairID));
+ Assert.assertFalse(transientContains(sstable));
+ Assert.assertFalse(pendingContains(sstable));
+ Assert.assertFalse(repairedContains(sstable));
+ Assert.assertTrue(unrepairedContains(sstable));
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/db/compaction/CompactionStrategyManagerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionStrategyManagerTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionStrategyManagerTest.java
index eeaaf5b..73e6852 100644
--- a/test/unit/org/apache/cassandra/db/compaction/CompactionStrategyManagerTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/CompactionStrategyManagerTest.java
@@ -129,12 +129,12 @@ public class CompactionStrategyManagerTest
if (i % 3 == 0)
{
//make 1 third of sstables repaired
- cfs.getCompactionStrategyManager().mutateRepaired(newSSTables, System.currentTimeMillis(), null);
+ cfs.getCompactionStrategyManager().mutateRepaired(newSSTables, System.currentTimeMillis(), null, false);
}
else if (i % 3 == 1)
{
//make 1 third of sstables pending repair
- cfs.getCompactionStrategyManager().mutateRepaired(newSSTables, 0, UUIDGen.getTimeUUID());
+ cfs.getCompactionStrategyManager().mutateRepaired(newSSTables, 0, UUIDGen.getTimeUUID(), false);
}
previousSSTables = currentSSTables;
}
@@ -272,19 +272,19 @@ public class CompactionStrategyManagerTest
DatabaseDescriptor.setAutomaticSSTableUpgradeEnabled(false);
}
- private static void assertHolderExclusivity(boolean isRepaired, boolean isPendingRepair, Class<? extends AbstractStrategyHolder> expectedType)
+ private static void assertHolderExclusivity(boolean isRepaired, boolean isPendingRepair, boolean isTransient, Class<? extends AbstractStrategyHolder> expectedType)
{
ColumnFamilyStore cfs = Keyspace.open(KS_PREFIX).getColumnFamilyStore(TABLE_PREFIX);
CompactionStrategyManager csm = cfs.getCompactionStrategyManager();
- AbstractStrategyHolder holder = csm.getHolder(isRepaired, isPendingRepair);
+ AbstractStrategyHolder holder = csm.getHolder(isRepaired, isPendingRepair, isTransient);
assertNotNull(holder);
assertSame(expectedType, holder.getClass());
int matches = 0;
for (AbstractStrategyHolder other : csm.getHolders())
{
- if (other.managesRepairedGroup(isRepaired, isPendingRepair))
+ if (other.managesRepairedGroup(isRepaired, isPendingRepair, isTransient))
{
assertSame("holder assignment should be mutually exclusive", holder, other);
matches++;
@@ -293,13 +293,13 @@ public class CompactionStrategyManagerTest
assertEquals(1, matches);
}
- private static void assertInvalieHolderConfig(boolean isRepaired, boolean isPendingRepair)
+ private static void assertInvalieHolderConfig(boolean isRepaired, boolean isPendingRepair, boolean isTransient)
{
ColumnFamilyStore cfs = Keyspace.open(KS_PREFIX).getColumnFamilyStore(TABLE_PREFIX);
CompactionStrategyManager csm = cfs.getCompactionStrategyManager();
try
{
- csm.getHolder(isRepaired, isPendingRepair);
+ csm.getHolder(isRepaired, isPendingRepair, isTransient);
fail("Expected IllegalArgumentException");
}
catch (IllegalArgumentException e)
@@ -315,10 +315,14 @@ public class CompactionStrategyManagerTest
@Test
public void testMutualExclusiveHolderClassification() throws Exception
{
- assertHolderExclusivity(false, false, CompactionStrategyHolder.class);
- assertHolderExclusivity(true, false, CompactionStrategyHolder.class);
- assertHolderExclusivity(false, true, PendingRepairHolder.class);
- assertInvalieHolderConfig(true, true);
+ assertHolderExclusivity(false, false, false, CompactionStrategyHolder.class);
+ assertHolderExclusivity(true, false, false, CompactionStrategyHolder.class);
+ assertHolderExclusivity(false, true, false, PendingRepairHolder.class);
+ assertHolderExclusivity(false, true, true, PendingRepairHolder.class);
+ assertInvalieHolderConfig(true, true, false);
+ assertInvalieHolderConfig(true, true, true);
+ assertInvalieHolderConfig(false, false, true);
+ assertInvalieHolderConfig(true, false, true);
}
PartitionPosition forKey(int key)
@@ -337,20 +341,23 @@ public class CompactionStrategyManagerTest
ColumnFamilyStore cfs = createJBODMockCFS(numDir);
Keyspace.open(cfs.keyspace.getName()).getColumnFamilyStore(cfs.name).disableAutoCompaction();
assertTrue(cfs.getLiveSSTables().isEmpty());
- List<SSTableReader> unrepaired = new ArrayList<>();
+ List<SSTableReader> transientRepairs = new ArrayList<>();
List<SSTableReader> pendingRepair = new ArrayList<>();
+ List<SSTableReader> unrepaired = new ArrayList<>();
List<SSTableReader> repaired = new ArrayList<>();
for (int i = 0; i < numDir; i++)
{
int key = 100 * i;
- unrepaired.add(createSSTableWithKey(cfs.keyspace.getName(), cfs.name, key++));
+ transientRepairs.add(createSSTableWithKey(cfs.keyspace.getName(), cfs.name, key++));
pendingRepair.add(createSSTableWithKey(cfs.keyspace.getName(), cfs.name, key++));
+ unrepaired.add(createSSTableWithKey(cfs.keyspace.getName(), cfs.name, key++));
repaired.add(createSSTableWithKey(cfs.keyspace.getName(), cfs.name, key++));
}
- cfs.getCompactionStrategyManager().mutateRepaired(pendingRepair, 0, UUID.randomUUID());
- cfs.getCompactionStrategyManager().mutateRepaired(repaired, 1000, null);
+ cfs.getCompactionStrategyManager().mutateRepaired(transientRepairs, 0, UUID.randomUUID(), true);
+ cfs.getCompactionStrategyManager().mutateRepaired(pendingRepair, 0, UUID.randomUUID(), false);
+ cfs.getCompactionStrategyManager().mutateRepaired(repaired, 1000, null, false);
DiskBoundaries boundaries = new DiskBoundaries(cfs.getDirectories().getWriteableLocations(),
Lists.newArrayList(forKey(100), forKey(200), forKey(300)),
@@ -358,7 +365,7 @@ public class CompactionStrategyManagerTest
CompactionStrategyManager csm = new CompactionStrategyManager(cfs, () -> boundaries, true);
- List<GroupedSSTableContainer> grouped = csm.groupSSTables(Iterables.concat(repaired, pendingRepair, unrepaired));
+ List<GroupedSSTableContainer> grouped = csm.groupSSTables(Iterables.concat( transientRepairs, pendingRepair, repaired, unrepaired));
for (int x=0; x<grouped.size(); x++)
{
@@ -372,7 +379,16 @@ public class CompactionStrategyManagerTest
if (sstable.isRepaired())
expected = repaired.get(y);
else if (sstable.isPendingRepair())
- expected = pendingRepair.get(y);
+ {
+ if (sstable.isTransient())
+ {
+ expected = transientRepairs.get(y);
+ }
+ else
+ {
+ expected = pendingRepair.get(y);
+ }
+ }
else
expected = unrepaired.get(y);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/db/compaction/CompactionTaskTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionTaskTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionTaskTest.java
index 599fc74..5370f33 100644
--- a/test/unit/org/apache/cassandra/db/compaction/CompactionTaskTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/CompactionTaskTest.java
@@ -96,9 +96,9 @@ public class CompactionTaskTest
Assert.assertEquals(Transactional.AbstractTransactional.State.ABORTED, txn.state());
}
- private static void mutateRepaired(SSTableReader sstable, long repairedAt, UUID pendingRepair) throws IOException
+ private static void mutateRepaired(SSTableReader sstable, long repairedAt, UUID pendingRepair, boolean isTransient) throws IOException
{
- sstable.descriptor.getMetadataSerializer().mutateRepaired(sstable.descriptor, repairedAt, pendingRepair);
+ sstable.descriptor.getMetadataSerializer().mutateRepairMetadata(sstable.descriptor, repairedAt, pendingRepair, isTransient);
sstable.reloadSSTableMetadata();
}
@@ -127,9 +127,9 @@ public class CompactionTaskTest
SSTableReader pending1 = sstables.get(2);
SSTableReader pending2 = sstables.get(3);
- mutateRepaired(repaired, FBUtilities.nowInSeconds(), ActiveRepairService.NO_PENDING_REPAIR);
- mutateRepaired(pending1, ActiveRepairService.UNREPAIRED_SSTABLE, UUIDGen.getTimeUUID());
- mutateRepaired(pending2, ActiveRepairService.UNREPAIRED_SSTABLE, UUIDGen.getTimeUUID());
+ mutateRepaired(repaired, FBUtilities.nowInSeconds(), ActiveRepairService.NO_PENDING_REPAIR, false);
+ mutateRepaired(pending1, ActiveRepairService.UNREPAIRED_SSTABLE, UUIDGen.getTimeUUID(), false);
+ mutateRepaired(pending2, ActiveRepairService.UNREPAIRED_SSTABLE, UUIDGen.getTimeUUID(), false);
LifecycleTransaction txn = null;
List<SSTableReader> toCompact = new ArrayList<>(sstables);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/db/compaction/CompactionsCQLTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionsCQLTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionsCQLTest.java
index c91d2fe..857fa32 100644
--- a/test/unit/org/apache/cassandra/db/compaction/CompactionsCQLTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/CompactionsCQLTest.java
@@ -411,7 +411,7 @@ public class CompactionsCQLTest extends CQLTester
cfs.forceBlockingFlush();
}
assertEquals(50, cfs.getLiveSSTables().size());
- LeveledCompactionStrategy lcs = (LeveledCompactionStrategy) cfs.getCompactionStrategyManager().getUnrepaired().get(0);
+ LeveledCompactionStrategy lcs = (LeveledCompactionStrategy) cfs.getCompactionStrategyManager().getUnrepairedUnsafe().first();
AbstractCompactionTask act = lcs.getNextBackgroundTask(0);
// we should be compacting all 50 sstables:
assertEquals(50, act.transaction.originals().size());
@@ -445,7 +445,7 @@ public class CompactionsCQLTest extends CQLTester
// mark the L1 sstable as compacting to make sure we trigger STCS in L0:
LifecycleTransaction txn = cfs.getTracker().tryModify(l1sstable, OperationType.COMPACTION);
- LeveledCompactionStrategy lcs = (LeveledCompactionStrategy) cfs.getCompactionStrategyManager().getUnrepaired().get(0);
+ LeveledCompactionStrategy lcs = (LeveledCompactionStrategy) cfs.getCompactionStrategyManager().getUnrepairedUnsafe().first();
AbstractCompactionTask act = lcs.getNextBackgroundTask(0);
// note that max_threshold is 60 (more than the amount of L0 sstables), but MAX_COMPACTING_L0 is 32, which means we will trigger STCS with at most max_threshold sstables
assertEquals(50, act.transaction.originals().size());
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java b/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
index 9ebe326..23e88fe 100644
--- a/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
@@ -364,7 +364,7 @@ public class LeveledCompactionStrategyTest
SSTableReader sstable1 = unrepaired.manifest.generations[2].get(0);
SSTableReader sstable2 = unrepaired.manifest.generations[1].get(0);
- sstable1.descriptor.getMetadataSerializer().mutateRepaired(sstable1.descriptor, System.currentTimeMillis(), null);
+ sstable1.descriptor.getMetadataSerializer().mutateRepairMetadata(sstable1.descriptor, System.currentTimeMillis(), null, false);
sstable1.reloadSSTableMetadata();
assertTrue(sstable1.isRepaired());
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/db/compaction/PendingRepairManagerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/PendingRepairManagerTest.java b/test/unit/org/apache/cassandra/db/compaction/PendingRepairManagerTest.java
index 2b88c9c..d83e063 100644
--- a/test/unit/org/apache/cassandra/db/compaction/PendingRepairManagerTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/PendingRepairManagerTest.java
@@ -45,7 +45,7 @@ public class PendingRepairManagerTest extends AbstractPendingRepairTest
UUID repairID = registerSession(cfs, true, true);
LocalSessionAccessor.prepareUnsafe(repairID, COORDINATOR, PARTICIPANTS);
SSTableReader sstable = makeSSTable(true);
- mutateRepaired(sstable, repairID);
+ mutateRepaired(sstable, repairID, false);
prm.addSSTable(sstable);
Assert.assertNotNull(prm.get(repairID));
@@ -63,7 +63,7 @@ public class PendingRepairManagerTest extends AbstractPendingRepairTest
UUID repairID = registerSession(cfs, true, true);
LocalSessionAccessor.prepareUnsafe(repairID, COORDINATOR, PARTICIPANTS);
SSTableReader sstable = makeSSTable(true);
- mutateRepaired(sstable, repairID);
+ mutateRepaired(sstable, repairID, false);
prm.addSSTable(sstable);
Assert.assertNotNull(prm.get(repairID));
LocalSessionAccessor.finalizeUnsafe(repairID);
@@ -82,7 +82,7 @@ public class PendingRepairManagerTest extends AbstractPendingRepairTest
UUID repairID = registerSession(cfs, true, true);
LocalSessionAccessor.prepareUnsafe(repairID, COORDINATOR, PARTICIPANTS);
SSTableReader sstable = makeSSTable(true);
- mutateRepaired(sstable, repairID);
+ mutateRepaired(sstable, repairID, false);
prm.addSSTable(sstable);
Assert.assertNotNull(prm.get(repairID));
LocalSessionAccessor.failUnsafe(repairID);
@@ -94,7 +94,7 @@ public class PendingRepairManagerTest extends AbstractPendingRepairTest
public void needsCleanupNoSession()
{
UUID fakeID = UUIDGen.getTimeUUID();
- PendingRepairManager prm = new PendingRepairManager(cfs, null);
+ PendingRepairManager prm = new PendingRepairManager(cfs, null, false);
Assert.assertTrue(prm.canCleanup(fakeID));
}
@@ -106,7 +106,7 @@ public class PendingRepairManagerTest extends AbstractPendingRepairTest
UUID repairID = registerSession(cfs, true, true);
LocalSessionAccessor.prepareUnsafe(repairID, COORDINATOR, PARTICIPANTS);
SSTableReader sstable = makeSSTable(true);
- mutateRepaired(sstable, repairID);
+ mutateRepaired(sstable, repairID, false);
prm.addSSTable(sstable);
Assert.assertNotNull(prm.get(repairID));
@@ -122,7 +122,7 @@ public class PendingRepairManagerTest extends AbstractPendingRepairTest
UUID repairID = registerSession(cfs, true, true);
LocalSessionAccessor.prepareUnsafe(repairID, COORDINATOR, PARTICIPANTS);
SSTableReader sstable = makeSSTable(true);
- mutateRepaired(sstable, repairID);
+ mutateRepaired(sstable, repairID, false);
prm.addSSTable(sstable);
Assert.assertNotNull(prm.get(repairID));
Assert.assertNotNull(prm.get(repairID));
@@ -140,13 +140,13 @@ public class PendingRepairManagerTest extends AbstractPendingRepairTest
UUID repairID = registerSession(cfs, true, true);
LocalSessionAccessor.prepareUnsafe(repairID, COORDINATOR, PARTICIPANTS);
SSTableReader sstable = makeSSTable(true);
- mutateRepaired(sstable, repairID);
+ mutateRepaired(sstable, repairID, false);
prm.addSSTable(sstable);
repairID = registerSession(cfs, true, true);
LocalSessionAccessor.prepareUnsafe(repairID, COORDINATOR, PARTICIPANTS);
sstable = makeSSTable(true);
- mutateRepaired(sstable, repairID);
+ mutateRepaired(sstable, repairID, false);
prm.addSSTable(sstable);
LocalSessionAccessor.finalizeUnsafe(repairID);
@@ -184,7 +184,7 @@ public class PendingRepairManagerTest extends AbstractPendingRepairTest
LocalSessionAccessor.prepareUnsafe(repairID, COORDINATOR, PARTICIPANTS);
SSTableReader sstable = makeSSTable(true);
- mutateRepaired(sstable, repairID);
+ mutateRepaired(sstable, repairID, false);
prm.addSSTable(sstable);
Assert.assertNotNull(prm.get(repairID));
Assert.assertNotNull(prm.get(repairID));
@@ -202,7 +202,7 @@ public class PendingRepairManagerTest extends AbstractPendingRepairTest
UUID repairID = registerSession(cfs, true, true);
LocalSessionAccessor.prepareUnsafe(repairID, COORDINATOR, PARTICIPANTS);
SSTableReader sstable = makeSSTable(true);
- mutateRepaired(sstable, repairID);
+ mutateRepaired(sstable, repairID, false);
prm.addSSTable(sstable);
Assert.assertNotNull(prm.get(repairID));
Assert.assertNotNull(prm.get(repairID));
@@ -225,7 +225,7 @@ public class PendingRepairManagerTest extends AbstractPendingRepairTest
PendingRepairManager prm = csm.getPendingRepairManagers().get(0);
UUID repairId = registerSession(cfs, true, true);
SSTableReader sstable = makeSSTable(true);
- mutateRepaired(sstable, repairId);
+ mutateRepaired(sstable, repairId, false);
prm.addSSTable(sstable);
List<AbstractCompactionTask> tasks = csm.getUserDefinedTasks(Collections.singleton(sstable), 100);
try
@@ -247,8 +247,8 @@ public class PendingRepairManagerTest extends AbstractPendingRepairTest
SSTableReader sstable = makeSSTable(true);
SSTableReader sstable2 = makeSSTable(true);
- mutateRepaired(sstable, repairId);
- mutateRepaired(sstable2, repairId2);
+ mutateRepaired(sstable, repairId, false);
+ mutateRepaired(sstable2, repairId2, false);
prm.addSSTable(sstable);
prm.addSSTable(sstable2);
List<AbstractCompactionTask> tasks = csm.getUserDefinedTasks(Lists.newArrayList(sstable, sstable2), 100);
@@ -296,7 +296,7 @@ public class PendingRepairManagerTest extends AbstractPendingRepairTest
Assert.assertFalse(prm.hasDataForSession(repairID));
SSTableReader sstable = makeSSTable(true);
- mutateRepaired(sstable, repairID);
+ mutateRepaired(sstable, repairID, false);
prm.addSSTable(sstable);
Assert.assertTrue(prm.hasDataForSession(repairID));
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/db/compaction/SingleSSTableLCSTaskTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/SingleSSTableLCSTaskTest.java b/test/unit/org/apache/cassandra/db/compaction/SingleSSTableLCSTaskTest.java
index 6428ab7..1292b7e 100644
--- a/test/unit/org/apache/cassandra/db/compaction/SingleSSTableLCSTaskTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/SingleSSTableLCSTaskTest.java
@@ -56,7 +56,7 @@ public class SingleSSTableLCSTaskTest extends CQLTester
assertEquals(1, cfs.getLiveSSTables().size());
cfs.getLiveSSTables().forEach(s -> assertEquals(2, s.getSSTableLevel()));
// make sure compaction strategy is notified:
- LeveledCompactionStrategy lcs = (LeveledCompactionStrategy) cfs.getCompactionStrategyManager().getUnrepaired().iterator().next();
+ LeveledCompactionStrategy lcs = (LeveledCompactionStrategy) cfs.getCompactionStrategyManager().getUnrepairedUnsafe().first();
for (int i = 0; i < lcs.manifest.getLevelCount(); i++)
{
if (i == 2)
@@ -98,7 +98,7 @@ public class SingleSSTableLCSTaskTest extends CQLTester
cfs.forceBlockingFlush();
}
// now we have a bunch of data in L0, first compaction will be a normal one, containing all sstables:
- LeveledCompactionStrategy lcs = (LeveledCompactionStrategy) cfs.getCompactionStrategyManager().getUnrepaired().get(0);
+ LeveledCompactionStrategy lcs = (LeveledCompactionStrategy) cfs.getCompactionStrategyManager().getUnrepairedUnsafe().first();
AbstractCompactionTask act = lcs.getNextBackgroundTask(0);
act.execute(null);
@@ -148,7 +148,7 @@ public class SingleSSTableLCSTaskTest extends CQLTester
assertEquals(1, cfs.getLiveSSTables().size());
for (SSTableReader sst : cfs.getLiveSSTables())
assertEquals(0, sst.getSSTableMetadata().sstableLevel);
- LeveledCompactionStrategy lcs = (LeveledCompactionStrategy) cfs.getCompactionStrategyManager().getUnrepaired().iterator().next();
+ LeveledCompactionStrategy lcs = (LeveledCompactionStrategy) cfs.getCompactionStrategyManager().getUnrepairedUnsafe().first();
assertEquals(1, lcs.getLevelSize(0));
assertTrue(cfs.getTracker().getCompacting().isEmpty());
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/db/lifecycle/LogTransactionTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/lifecycle/LogTransactionTest.java b/test/unit/org/apache/cassandra/db/lifecycle/LogTransactionTest.java
index 5694e86..e5ff138 100644
--- a/test/unit/org/apache/cassandra/db/lifecycle/LogTransactionTest.java
+++ b/test/unit/org/apache/cassandra/db/lifecycle/LogTransactionTest.java
@@ -1183,7 +1183,7 @@ public class LogTransactionTest extends AbstractTransactionalTest
SerializationHeader header = SerializationHeader.make(cfs.metadata(), Collections.emptyList());
StatsMetadata metadata = (StatsMetadata) new MetadataCollector(cfs.metadata().comparator)
- .finalizeMetadata(cfs.metadata().partitioner.getClass().getCanonicalName(), 0.01f, -1, null, header)
+ .finalizeMetadata(cfs.metadata().partitioner.getClass().getCanonicalName(), 0.01f, -1, null, false, header)
.get(MetadataType.STATS);
SSTableReader reader = SSTableReader.internalOpen(descriptor,
components,
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/db/lifecycle/RealTransactionsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/lifecycle/RealTransactionsTest.java b/test/unit/org/apache/cassandra/db/lifecycle/RealTransactionsTest.java
index 5cda2ad..b7b7d4a 100644
--- a/test/unit/org/apache/cassandra/db/lifecycle/RealTransactionsTest.java
+++ b/test/unit/org/apache/cassandra/db/lifecycle/RealTransactionsTest.java
@@ -164,6 +164,7 @@ public class RealTransactionsTest extends SchemaLoader
0,
0,
null,
+ false,
0,
SerializationHeader.make(cfs.metadata(), txn.originals()),
cfs.indexManager.listIndexes(),
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/db/repair/CompactionManagerGetSSTablesForValidationTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/repair/CompactionManagerGetSSTablesForValidationTest.java b/test/unit/org/apache/cassandra/db/repair/CompactionManagerGetSSTablesForValidationTest.java
index 76ebfd8..3b29cc5 100644
--- a/test/unit/org/apache/cassandra/db/repair/CompactionManagerGetSSTablesForValidationTest.java
+++ b/test/unit/org/apache/cassandra/db/repair/CompactionManagerGetSSTablesForValidationTest.java
@@ -119,11 +119,11 @@ public class CompactionManagerGetSSTablesForValidationTest
Iterator<SSTableReader> iter = cfs.getLiveSSTables().iterator();
repaired = iter.next();
- repaired.descriptor.getMetadataSerializer().mutateRepaired(repaired.descriptor, System.currentTimeMillis(), null);
+ repaired.descriptor.getMetadataSerializer().mutateRepairMetadata(repaired.descriptor, System.currentTimeMillis(), null, false);
repaired.reloadSSTableMetadata();
pendingRepair = iter.next();
- pendingRepair.descriptor.getMetadataSerializer().mutateRepaired(pendingRepair.descriptor, ActiveRepairService.UNREPAIRED_SSTABLE, sessionID);
+ pendingRepair.descriptor.getMetadataSerializer().mutateRepairMetadata(pendingRepair.descriptor, ActiveRepairService.UNREPAIRED_SSTABLE, sessionID, false);
pendingRepair.reloadSSTableMetadata();
unrepaired = iter.next();
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org