You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by be...@apache.org on 2015/07/24 18:32:11 UTC
[2/5] cassandra git commit: Introduce safer durable sstable
membership management (and simplify cleanup of compaction leftovers)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09e60f7/test/long/org/apache/cassandra/io/sstable/CQLSSTableWriterLongTest.java
----------------------------------------------------------------------
diff --git a/test/long/org/apache/cassandra/io/sstable/CQLSSTableWriterLongTest.java b/test/long/org/apache/cassandra/io/sstable/CQLSSTableWriterLongTest.java
index fcec40d..b49055d 100644
--- a/test/long/org/apache/cassandra/io/sstable/CQLSSTableWriterLongTest.java
+++ b/test/long/org/apache/cassandra/io/sstable/CQLSSTableWriterLongTest.java
@@ -30,8 +30,6 @@ import org.junit.BeforeClass;
import org.junit.Test;
import org.apache.cassandra.SchemaLoader;
import org.apache.cassandra.config.Config;
-import org.apache.cassandra.config.Schema;
-import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.service.StorageService;
public class CQLSSTableWriterLongTest
@@ -39,8 +37,7 @@ public class CQLSSTableWriterLongTest
@BeforeClass
public static void setup() throws Exception
{
- SchemaLoader.cleanupAndLeaveDirs();
- Keyspace.setInitialized();
+ SchemaLoader.prepareServer();
StorageService.instance.initServer();
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09e60f7/test/unit/org/apache/cassandra/MockSchema.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/MockSchema.java b/test/unit/org/apache/cassandra/MockSchema.java
index 104e88f..d9c7e8b 100644
--- a/test/unit/org/apache/cassandra/MockSchema.java
+++ b/test/unit/org/apache/cassandra/MockSchema.java
@@ -60,7 +60,7 @@ public class MockSchema
private static final AtomicInteger id = new AtomicInteger();
public static final Keyspace ks = Keyspace.mockKS(KeyspaceMetadata.create("mockks", KeyspaceParams.simpleTransient(1)));
- private static final IndexSummary indexSummary;
+ public static final IndexSummary indexSummary;
private static final SegmentedFile segmentedFile = new BufferedSegmentedFile(new ChannelProxy(temp("mocksegmentedfile")), 0);
public static Memtable memtable(ColumnFamilyStore cfs)
@@ -88,8 +88,7 @@ public class MockSchema
Descriptor descriptor = new Descriptor(cfs.directories.getDirectoryForNewSSTables(),
cfs.keyspace.getName(),
cfs.getColumnFamilyName(),
- generation,
- Descriptor.Type.FINAL);
+ generation);
Set<Component> components = ImmutableSet.of(Component.DATA, Component.PRIMARY_INDEX, Component.FILTER, Component.TOC);
for (Component component : components)
{
@@ -132,8 +131,13 @@ public class MockSchema
public static ColumnFamilyStore newCFS()
{
+ return newCFS(ks.getName());
+ }
+
+ public static ColumnFamilyStore newCFS(String ksname)
+ {
String cfname = "mockcf" + (id.incrementAndGet());
- CFMetaData metadata = newCFMetaData(ks.getName(), cfname);
+ CFMetaData metadata = newCFMetaData(ksname, cfname);
return new ColumnFamilyStore(ks, cfname, Murmur3Partitioner.instance, 0, metadata, new Directories(metadata), false, false);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09e60f7/test/unit/org/apache/cassandra/Util.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/Util.java b/test/unit/org/apache/cassandra/Util.java
index 654b8c6..c828de9 100644
--- a/test/unit/org/apache/cassandra/Util.java
+++ b/test/unit/org/apache/cassandra/Util.java
@@ -53,12 +53,9 @@ import org.apache.cassandra.dht.Token;
import org.apache.cassandra.gms.ApplicationState;
import org.apache.cassandra.gms.Gossiper;
import org.apache.cassandra.gms.VersionedValue;
-import org.apache.cassandra.io.sstable.Component;
import org.apache.cassandra.io.sstable.Descriptor;
-import org.apache.cassandra.io.sstable.IndexSummary;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.utils.AlwaysPresentFilter;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.CounterId;
import org.apache.cassandra.utils.FBUtilities;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09e60f7/test/unit/org/apache/cassandra/cql3/CQLTester.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/CQLTester.java b/test/unit/org/apache/cassandra/cql3/CQLTester.java
index 06b5ceb..ddc41c7 100644
--- a/test/unit/org/apache/cassandra/cql3/CQLTester.java
+++ b/test/unit/org/apache/cassandra/cql3/CQLTester.java
@@ -282,7 +282,7 @@ public abstract class CQLTester
schemaChange(String.format("DROP TYPE IF EXISTS %s.%s", KEYSPACE, typesToDrop.get(i)));
// Dropping doesn't delete the sstables. It's not a huge deal but it's cleaner to cleanup after us
- // Thas said, we shouldn't delete blindly before the SSTableDeletingTask for the table we drop
+ // Thas said, we shouldn't delete blindly before the TransactionLogs.SSTableTidier for the table we drop
// have run or they will be unhappy. Since those taks are scheduled on StorageService.tasks and that's
// mono-threaded, just push a task on the queue to find when it's empty. No perfect but good enough.
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09e60f7/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java b/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
index 9da4876..2a15bdf 100644
--- a/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
+++ b/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
@@ -49,7 +49,7 @@ import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.Pair;
import org.apache.cassandra.utils.WrappedRunnable;
-
+import static junit.framework.Assert.assertNotNull;
@RunWith(OrderedJUnit4ClassRunner.class)
public class ColumnFamilyStoreTest
{
@@ -349,8 +349,8 @@ public class ColumnFamilyStoreTest
for (int version = 1; version <= 2; ++version)
{
- Descriptor existing = new Descriptor(cfs.directories.getDirectoryForNewSSTables(), KEYSPACE2, CF_STANDARD1, version, Descriptor.Type.FINAL);
- Descriptor desc = new Descriptor(Directories.getBackupsDirectory(existing), KEYSPACE2, CF_STANDARD1, version, Descriptor.Type.FINAL);
+ Descriptor existing = new Descriptor(cfs.directories.getDirectoryForNewSSTables(), KEYSPACE2, CF_STANDARD1, version);
+ Descriptor desc = new Descriptor(Directories.getBackupsDirectory(existing), KEYSPACE2, CF_STANDARD1, version);
for (Component c : new Component[]{ Component.DATA, Component.PRIMARY_INDEX, Component.FILTER, Component.STATS })
assertTrue("Cannot find backed-up file:" + desc.filenameFor(c), new File(desc.filenameFor(c)).exists());
}
@@ -389,126 +389,10 @@ public class ColumnFamilyStoreTest
// assertEquals(ByteBufferUtil.bytes("B"), cfSliced.getColumn(CellNames.compositeDense(superColName, ByteBufferUtil.bytes("b"))).value());
// }
- // TODO: fix once SSTableSimpleWriter's back in
-// @Test
-// public void testRemoveUnfinishedCompactionLeftovers() throws Throwable
-// {
-// String ks = KEYSPACE1;
-// String cf = CF_STANDARD3; // should be empty
-//
-// final CFMetaData cfmeta = Schema.instance.getCFMetaData(ks, cf);
-// Directories dir = new Directories(cfmeta);
-// ByteBuffer key = bytes("key");
-//
-// // 1st sstable
-// SSTableSimpleWriter writer = new SSTableSimpleWriter(dir.getDirectoryForNewSSTables(), cfmeta, StorageService.getPartitioner());
-// writer.newRow(key);
-// writer.addColumn(bytes("col"), bytes("val"), 1);
-// writer.close();
-//
-// Map<Descriptor, Set<Component>> sstables = dir.sstableLister().list();
-// assertEquals(1, sstables.size());
-//
-// Map.Entry<Descriptor, Set<Component>> sstableToOpen = sstables.entrySet().iterator().next();
-// final SSTableReader sstable1 = SSTableReader.open(sstableToOpen.getKey());
-//
-// // simulate incomplete compaction
-// writer = new SSTableSimpleWriter(dir.getDirectoryForNewSSTables(),
-// cfmeta, StorageService.getPartitioner())
-// {
-// protected SSTableWriter getWriter()
-// {
-// MetadataCollector collector = new MetadataCollector(cfmeta.comparator);
-// collector.addAncestor(sstable1.descriptor.generation); // add ancestor from previously written sstable
-// return SSTableWriter.create(createDescriptor(directory, metadata.ksName, metadata.cfName, DatabaseDescriptor.getSSTableFormat()),
-// 0L,
-// ActiveRepairService.UNREPAIRED_SSTABLE,
-// metadata,
-// DatabaseDescriptor.getPartitioner(),
-// collector);
-// }
-// };
-// writer.newRow(key);
-// writer.addColumn(bytes("col"), bytes("val"), 1);
-// writer.close();
-//
-// // should have 2 sstables now
-// sstables = dir.sstableLister().list();
-// assertEquals(2, sstables.size());
-//
-// SSTableReader sstable2 = SSTableReader.open(sstable1.descriptor);
-// UUID compactionTaskID = SystemKeyspace.startCompaction(
-// Keyspace.open(ks).getColumnFamilyStore(cf),
-// Collections.singleton(sstable2));
-//
-// Map<Integer, UUID> unfinishedCompaction = new HashMap<>();
-// unfinishedCompaction.put(sstable1.descriptor.generation, compactionTaskID);
-// ColumnFamilyStore.removeUnfinishedCompactionLeftovers(cfmeta, unfinishedCompaction);
-//
-// // 2nd sstable should be removed (only 1st sstable exists in set of size 1)
-// sstables = dir.sstableLister().list();
-// assertEquals(1, sstables.size());
-// assertTrue(sstables.containsKey(sstable1.descriptor));
-//
-// Map<Pair<String, String>, Map<Integer, UUID>> unfinished = SystemKeyspace.getUnfinishedCompactions();
-// assertTrue(unfinished.isEmpty());
-// sstable1.selfRef().release();
-// sstable2.selfRef().release();
-// }
// TODO: Fix once SSTableSimpleWriter's back in
// @see <a href="https://issues.apache.org/jira/browse/CASSANDRA-6086">CASSANDRA-6086</a>
-// @Test
-// public void testFailedToRemoveUnfinishedCompactionLeftovers() throws Throwable
-// {
-// final String ks = KEYSPACE1;
-// final String cf = CF_STANDARD4; // should be empty
-//
-// final CFMetaData cfmeta = Schema.instance.getCFMetaData(ks, cf);
-// Directories dir = new Directories(cfmeta);
-// ByteBuffer key = bytes("key");
-//
-// // Write SSTable generation 3 that has ancestors 1 and 2
-// final Set<Integer> ancestors = Sets.newHashSet(1, 2);
-// SSTableSimpleWriter writer = new SSTableSimpleWriter(dir.getDirectoryForNewSSTables(),
-// cfmeta, StorageService.getPartitioner())
-// {
-// protected SSTableWriter getWriter()
-// {
-// MetadataCollector collector = new MetadataCollector(cfmeta.comparator);
-// for (int ancestor : ancestors)
-// collector.addAncestor(ancestor);
-// String file = new Descriptor(directory, ks, cf, 3, Descriptor.Type.TEMP).filenameFor(Component.DATA);
-// return SSTableWriter.create(Descriptor.fromFilename(file),
-// 0L,
-// ActiveRepairService.UNREPAIRED_SSTABLE,
-// metadata,
-// StorageService.getPartitioner(),
-// collector);
-// }
-// };
-// writer.newRow(key);
-// writer.addColumn(bytes("col"), bytes("val"), 1);
-// writer.close();
-//
-// Map<Descriptor, Set<Component>> sstables = dir.sstableLister().list();
-// assert sstables.size() == 1;
-//
-// Map.Entry<Descriptor, Set<Component>> sstableToOpen = sstables.entrySet().iterator().next();
-// final SSTableReader sstable1 = SSTableReader.open(sstableToOpen.getKey());
-//
-// // simulate we don't have generation in compaction_history
-// Map<Integer, UUID> unfinishedCompactions = new HashMap<>();
-// UUID compactionTaskID = UUID.randomUUID();
-// for (Integer ancestor : ancestors)
-// unfinishedCompactions.put(ancestor, compactionTaskID);
-// ColumnFamilyStore.removeUnfinishedCompactionLeftovers(cfmeta, unfinishedCompactions);
-//
-// // SSTable should not be deleted
-// sstables = dir.sstableLister().list();
-// assert sstables.size() == 1;
-// assert sstables.containsKey(sstable1.descriptor);
-// }
+
// TODO: Fix once SSTableSimpleWriter's back in
// @Test
@@ -625,4 +509,30 @@ public class ColumnFamilyStoreTest
}
assertEquals(count, found);
}
+
+ @Test
+ public void testScrubDataDirectories() throws Throwable
+ {
+ ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD1);
+
+ ColumnFamilyStore.scrubDataDirectories(cfs.metadata);
+
+ new RowUpdateBuilder(cfs.metadata, 2, "key").clustering("name").add("val", "2").build().applyUnsafe();
+ cfs.forceBlockingFlush();
+
+ // Nuke the metadata and reload that sstable
+ Collection<SSTableReader> ssTables = cfs.getSSTables();
+ assertEquals(1, ssTables.size());
+ SSTableReader ssTable = ssTables.iterator().next();
+
+ String dataFileName = ssTable.descriptor.filenameFor(Component.DATA);
+ String tmpDataFileName = ssTable.descriptor.tmpFilenameFor(Component.DATA);
+ new File(dataFileName).renameTo(new File(tmpDataFileName));
+
+ ColumnFamilyStore.scrubDataDirectories(cfs.metadata);
+
+ List<File> ssTableFiles = new Directories(cfs.metadata).sstableLister().listFiles();
+ assertNotNull(ssTableFiles);
+ assertEquals(0, ssTableFiles.size());
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09e60f7/test/unit/org/apache/cassandra/db/DirectoriesTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/DirectoriesTest.java b/test/unit/org/apache/cassandra/db/DirectoriesTest.java
index 6f3ccc9..14db2d1 100644
--- a/test/unit/org/apache/cassandra/db/DirectoriesTest.java
+++ b/test/unit/org/apache/cassandra/db/DirectoriesTest.java
@@ -95,22 +95,22 @@ public class DirectoriesTest
File dir = cfDir(cfm);
dir.mkdirs();
- createFakeSSTable(dir, cfm.cfName, 1, false, fs);
- createFakeSSTable(dir, cfm.cfName, 2, true, fs);
+ createFakeSSTable(dir, cfm.cfName, 1, fs);
+ createFakeSSTable(dir, cfm.cfName, 2, fs);
File backupDir = new File(dir, Directories.BACKUPS_SUBDIR);
backupDir.mkdir();
- createFakeSSTable(backupDir, cfm.cfName, 1, false, fs);
+ createFakeSSTable(backupDir, cfm.cfName, 1, fs);
File snapshotDir = new File(dir, Directories.SNAPSHOT_SUBDIR + File.separator + "42");
snapshotDir.mkdirs();
- createFakeSSTable(snapshotDir, cfm.cfName, 1, false, fs);
+ createFakeSSTable(snapshotDir, cfm.cfName, 1, fs);
}
}
- private static void createFakeSSTable(File dir, String cf, int gen, boolean temp, List<File> addTo) throws IOException
+ private static void createFakeSSTable(File dir, String cf, int gen, List<File> addTo) throws IOException
{
- Descriptor desc = new Descriptor(dir, KS, cf, gen, temp ? Descriptor.Type.TEMP : Descriptor.Type.FINAL);
+ Descriptor desc = new Descriptor(dir, KS, cf, gen);
for (Component c : new Component[]{ Component.DATA, Component.PRIMARY_INDEX, Component.FILTER })
{
File f = new File(desc.filenameFor(c));
@@ -145,7 +145,7 @@ public class DirectoriesTest
Directories directories = new Directories(cfm);
assertEquals(cfDir(cfm), directories.getDirectoryForNewSSTables());
- Descriptor desc = new Descriptor(cfDir(cfm), KS, cfm.cfName, 1, Descriptor.Type.FINAL);
+ Descriptor desc = new Descriptor(cfDir(cfm), KS, cfm.cfName, 1);
File snapshotDir = new File(cfDir(cfm), File.separator + Directories.SNAPSHOT_SUBDIR + File.separator + "42");
assertEquals(snapshotDir, Directories.getSnapshotDirectory(desc, "42"));
@@ -173,8 +173,8 @@ public class DirectoriesTest
{
assertEquals(cfDir(INDEX_CFM), dir);
}
- Descriptor parentDesc = new Descriptor(parentDirectories.getDirectoryForNewSSTables(), KS, PARENT_CFM.cfName, 0, Descriptor.Type.FINAL);
- Descriptor indexDesc = new Descriptor(indexDirectories.getDirectoryForNewSSTables(), KS, INDEX_CFM.cfName, 0, Descriptor.Type.FINAL);
+ Descriptor parentDesc = new Descriptor(parentDirectories.getDirectoryForNewSSTables(), KS, PARENT_CFM.cfName, 0);
+ Descriptor indexDesc = new Descriptor(indexDirectories.getDirectoryForNewSSTables(), KS, INDEX_CFM.cfName, 0);
// snapshot dir should be created under its parent's
File parentSnapshotDirectory = Directories.getSnapshotDirectory(parentDesc, "test");
@@ -191,9 +191,9 @@ public class DirectoriesTest
indexDirectories.snapshotCreationTime("test"));
// check true snapshot size
- Descriptor parentSnapshot = new Descriptor(parentSnapshotDirectory, KS, PARENT_CFM.cfName, 0, Descriptor.Type.FINAL);
+ Descriptor parentSnapshot = new Descriptor(parentSnapshotDirectory, KS, PARENT_CFM.cfName, 0);
createFile(parentSnapshot.filenameFor(Component.DATA), 30);
- Descriptor indexSnapshot = new Descriptor(indexSnapshotDirectory, KS, INDEX_CFM.cfName, 0, Descriptor.Type.FINAL);
+ Descriptor indexSnapshot = new Descriptor(indexSnapshotDirectory, KS, INDEX_CFM.cfName, 0);
createFile(indexSnapshot.filenameFor(Component.DATA), 40);
assertEquals(30, parentDirectories.trueSnapshotsSize());
@@ -311,7 +311,7 @@ public class DirectoriesTest
final String n = Long.toString(System.nanoTime());
Callable<File> directoryGetter = new Callable<File>() {
public File call() throws Exception {
- Descriptor desc = new Descriptor(cfDir(cfm), KS, cfm.cfName, 1, Descriptor.Type.FINAL);
+ Descriptor desc = new Descriptor(cfDir(cfm), KS, cfm.cfName, 1);
return Directories.getSnapshotDirectory(desc, n);
}
};
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09e60f7/test/unit/org/apache/cassandra/db/ScrubTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/ScrubTest.java b/test/unit/org/apache/cassandra/db/ScrubTest.java
index 47bfa0c..181b4e0 100644
--- a/test/unit/org/apache/cassandra/db/ScrubTest.java
+++ b/test/unit/org/apache/cassandra/db/ScrubTest.java
@@ -65,6 +65,7 @@ import org.apache.cassandra.OrderedJUnit4ClassRunner;
import org.apache.cassandra.SchemaLoader;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.OutputHandler;
import static org.junit.Assert.*;
import static org.junit.Assume.assumeTrue;
@@ -315,7 +316,7 @@ public class ScrubTest
* Code used to generate an outOfOrder sstable. The test for out-of-order key in BigTableWriter must also be commented out.
* The test also assumes an ordered partitioner.
List<String> keys = Arrays.asList("t", "a", "b", "z", "c", "y", "d");
- String filename = cfs.getTempSSTablePath(new File(System.getProperty("corrupt-sstable-root")));
+ SSTableWriter writer = new SSTableWriter(cfs.getTempSSTablePath(new File(System.getProperty("corrupt-sstable-root"))),
SSTableWriter writer = SSTableWriter.create(cfs.metadata,
Descriptor.fromFilename(filename),
keys.size(),
@@ -341,7 +342,7 @@ public class ScrubTest
File rootDir = new File(root);
assert rootDir.isDirectory();
- Descriptor desc = new Descriptor("la", rootDir, KEYSPACE, columnFamily, 1, Descriptor.Type.FINAL, SSTableFormat.Type.BIG);
+ Descriptor desc = new Descriptor("la", rootDir, KEYSPACE, columnFamily, 1, SSTableFormat.Type.BIG);
CFMetaData metadata = Schema.instance.getCFMetaData(desc.ksname, desc.cfname);
try
@@ -366,7 +367,7 @@ public class ScrubTest
sstable.last = sstable.first;
try (LifecycleTransaction txn = LifecycleTransaction.offline(OperationType.SCRUB, sstable);
- Scrubber scrubber = new Scrubber(cfs, txn, false, true, true);)
+ Scrubber scrubber = new Scrubber(cfs, txn, false, new OutputHandler.LogOutput(), true, true, true))
{
scrubber.scrub();
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09e60f7/test/unit/org/apache/cassandra/db/SystemKeyspaceTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/SystemKeyspaceTest.java b/test/unit/org/apache/cassandra/db/SystemKeyspaceTest.java
index 9ea7f93..d58985c 100644
--- a/test/unit/org/apache/cassandra/db/SystemKeyspaceTest.java
+++ b/test/unit/org/apache/cassandra/db/SystemKeyspaceTest.java
@@ -17,11 +17,15 @@
*/
package org.apache.cassandra.db;
+import java.io.File;
import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
+import java.nio.file.Path;
+import java.nio.file.Paths;
import java.util.*;
+import org.apache.commons.io.FileUtils;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -30,6 +34,7 @@ import org.apache.cassandra.cql3.QueryProcessor;
import org.apache.cassandra.cql3.UntypedResultSet;
import org.apache.cassandra.dht.ByteOrderedPartitioner.BytesToken;
import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.io.sstable.Descriptor;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.CassandraVersion;
@@ -39,6 +44,8 @@ import static org.junit.Assert.assertTrue;
public class SystemKeyspaceTest
{
+ public static final String MIGRATION_SSTABLES_ROOT = "migration-sstable-root";
+
@BeforeClass
public static void prepSnapshotTracker()
{
@@ -145,6 +152,47 @@ public class SystemKeyspaceTest
Keyspace.clearSnapshot(null, SystemKeyspace.NAME);
}
+ @Test
+ public void testMigrateDataDirs() throws IOException
+ {
+ Path migrationSSTableRoot = Paths.get(System.getProperty(MIGRATION_SSTABLES_ROOT), "2.2");
+ Path dataDir = Paths.get(DatabaseDescriptor.getAllDataFileLocations()[0]);
+
+ FileUtils.copyDirectory(migrationSSTableRoot.toFile(), dataDir.toFile());
+
+ assertEquals(5, numLegacyFiles()); // see test data
+
+ SystemKeyspace.migrateDataDirs();
+
+ assertEquals(0, numLegacyFiles());
+ }
+
+ private static int numLegacyFiles()
+ {
+ int ret = 0;
+ Iterable<String> dirs = Arrays.asList(DatabaseDescriptor.getAllDataFileLocations());
+ for (String dataDir : dirs)
+ {
+ File dir = new File(dataDir);
+ for (File ksdir : dir.listFiles((d, n) -> d.isDirectory()))
+ {
+ for (File cfdir : ksdir.listFiles((d, n) -> d.isDirectory()))
+ {
+ if (Descriptor.isLegacyFile(cfdir.getName()))
+ {
+ ret++;
+ }
+ else
+ {
+ File[] legacyFiles = cfdir.listFiles((d, n) -> Descriptor.isLegacyFile(n));
+ ret += legacyFiles.length;
+ }
+ }
+ }
+ }
+ return ret;
+ }
+
private String getOlderVersionString()
{
String version = FBUtilities.getReleaseVersionString();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09e60f7/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java b/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
index b18e67b..9de33db 100644
--- a/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
@@ -156,9 +156,9 @@ public class AntiCompactionTest
private SSTableReader writeFile(ColumnFamilyStore cfs, int count)
{
File dir = cfs.directories.getDirectoryForNewSSTables();
- String filename = cfs.getTempSSTablePath(dir);
+ String filename = cfs.getSSTablePath(dir);
- try (SSTableWriter writer = SSTableWriter.create(filename, 0, 0, new SerializationHeader(cfm, cfm.partitionColumns(), EncodingStats.NO_STATS)))
+ try (SSTableTxnWriter writer = SSTableTxnWriter.create(filename, 0, 0, new SerializationHeader(cfm, cfm.partitionColumns(), EncodingStats.NO_STATS)))
{
for (int i = 0; i < count; i++)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09e60f7/test/unit/org/apache/cassandra/db/compaction/CompactionAwareWriterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionAwareWriterTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionAwareWriterTest.java
index 0bcd603..30f14fc 100644
--- a/test/unit/org/apache/cassandra/db/compaction/CompactionAwareWriterTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/CompactionAwareWriterTest.java
@@ -78,7 +78,7 @@ public class CompactionAwareWriterTest extends CQLTester
populate(rowCount);
LifecycleTransaction txn = cfs.getTracker().tryModify(cfs.getSSTables(), OperationType.COMPACTION);
long beforeSize = txn.originals().iterator().next().onDiskLength();
- CompactionAwareWriter writer = new DefaultCompactionWriter(cfs, txn, txn.originals(), false, OperationType.COMPACTION);
+ CompactionAwareWriter writer = new DefaultCompactionWriter(cfs, txn, txn.originals(), false);
int rows = compact(cfs, txn, writer);
assertEquals(1, cfs.getSSTables().size());
assertEquals(rowCount, rows);
@@ -97,7 +97,7 @@ public class CompactionAwareWriterTest extends CQLTester
LifecycleTransaction txn = cfs.getTracker().tryModify(cfs.getSSTables(), OperationType.COMPACTION);
long beforeSize = txn.originals().iterator().next().onDiskLength();
int sstableSize = (int)beforeSize/10;
- CompactionAwareWriter writer = new MaxSSTableSizeWriter(cfs, txn, txn.originals(), sstableSize, 0, false, OperationType.COMPACTION);
+ CompactionAwareWriter writer = new MaxSSTableSizeWriter(cfs, txn, txn.originals(), sstableSize, 0, false);
int rows = compact(cfs, txn, writer);
assertEquals(10, cfs.getSSTables().size());
assertEquals(rowCount, rows);
@@ -114,7 +114,7 @@ public class CompactionAwareWriterTest extends CQLTester
populate(rowCount);
LifecycleTransaction txn = cfs.getTracker().tryModify(cfs.getSSTables(), OperationType.COMPACTION);
long beforeSize = txn.originals().iterator().next().onDiskLength();
- CompactionAwareWriter writer = new SplittingSizeTieredCompactionWriter(cfs, txn, txn.originals(), OperationType.COMPACTION, 0);
+ CompactionAwareWriter writer = new SplittingSizeTieredCompactionWriter(cfs, txn, txn.originals(), 0);
int rows = compact(cfs, txn, writer);
long expectedSize = beforeSize / 2;
List<SSTableReader> sortedSSTables = new ArrayList<>(cfs.getSSTables());
@@ -150,7 +150,7 @@ public class CompactionAwareWriterTest extends CQLTester
LifecycleTransaction txn = cfs.getTracker().tryModify(cfs.getSSTables(), OperationType.COMPACTION);
long beforeSize = txn.originals().iterator().next().onDiskLength();
int sstableSize = (int)beforeSize/targetSSTableCount;
- CompactionAwareWriter writer = new MajorLeveledCompactionWriter(cfs, txn, txn.originals(), sstableSize, false, OperationType.COMPACTION);
+ CompactionAwareWriter writer = new MajorLeveledCompactionWriter(cfs, txn, txn.originals(), sstableSize, false);
int rows = compact(cfs, txn, writer);
assertEquals(targetSSTableCount, cfs.getSSTables().size());
int [] levelCounts = new int[5];
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09e60f7/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
index 2fa8488..7beb405 100644
--- a/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
@@ -21,6 +21,11 @@ package org.apache.cassandra.db.compaction;
import java.util.*;
import java.util.concurrent.TimeUnit;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
import org.apache.cassandra.OrderedJUnit4ClassRunner;
import org.apache.cassandra.SchemaLoader;
import org.apache.cassandra.Util;
@@ -34,11 +39,6 @@ import org.apache.cassandra.schema.KeyspaceParams;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
-import org.junit.BeforeClass;
-import org.junit.Ignore;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-
import static org.junit.Assert.*;
@RunWith(OrderedJUnit4ClassRunner.class)
@@ -394,21 +394,25 @@ public class CompactionsTest
cf.addColumn(Util.column("a", "a", 3));
cf.deletionInfo().add(new RangeTombstone(Util.cellname("0"), Util.cellname("b"), 2, (int) (System.currentTimeMillis()/1000)),cfmeta.comparator);
- SSTableWriter writer = SSTableWriter.create(Descriptor.fromFilename(cfs.getTempSSTablePath(dir.getDirectoryForNewSSTables())), 0, 0, 0);
-
-
- writer.append(Util.dk("0"), cf);
- writer.append(Util.dk("1"), cf);
- writer.append(Util.dk("3"), cf);
+ Descriptor desc = Descriptor.fromFilename(cfs.getSSTablePath(dir.getDirectoryForNewSSTables()));
+ try(SSTableTxnWriter writer = SSTableTxnWriter.create(desc, 0, 0, 0))
+ {
+ writer.append(Util.dk("0"), cf);
+ writer.append(Util.dk("1"), cf);
+ writer.append(Util.dk("3"), cf);
- cfs.addSSTable(writer.closeAndOpenReader());
- writer = SSTableWriter.create(Descriptor.fromFilename(cfs.getTempSSTablePath(dir.getDirectoryForNewSSTables())), 0, 0, 0);
+ cfs.addSSTable(writer.closeAndOpenReader());
+ }
- writer.append(Util.dk("0"), cf);
- writer.append(Util.dk("1"), cf);
- writer.append(Util.dk("2"), cf);
- writer.append(Util.dk("3"), cf);
- cfs.addSSTable(writer.closeAndOpenReader());
+ desc = Descriptor.fromFilename(cfs.getSSTablePath(dir.getDirectoryForNewSSTables()));
+ try (SSTableTxnWriter writer = SSTableTxnWriter.create(desc, 0, 0, 0))
+ {
+ writer.append(Util.dk("0"), cf);
+ writer.append(Util.dk("1"), cf);
+ writer.append(Util.dk("2"), cf);
+ writer.append(Util.dk("3"), cf);
+ cfs.addSSTable(writer.closeAndOpenReader());
+ }
Collection<SSTableReader> toCompact = cfs.getSSTables();
assert toCompact.size() == 2;
@@ -439,35 +443,6 @@ public class CompactionsTest
assertEquals(keys, k);
}
- @Test
- public void testCompactionLog() throws Exception
- {
- SystemKeyspace.discardCompactionsInProgress();
-
- String cf = "Standard4";
- ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(cf);
- SchemaLoader.insertData(KEYSPACE1, cf, 0, 1);
- cfs.forceBlockingFlush();
-
- Collection<SSTableReader> sstables = cfs.getSSTables();
- assertFalse(sstables.isEmpty());
- Set<Integer> generations = Sets.newHashSet(Iterables.transform(sstables, new Function<SSTableReader, Integer>()
- {
- public Integer apply(SSTableReader sstable)
- {
- return sstable.descriptor.generation;
- }
- }));
- UUID taskId = SystemKeyspace.startCompaction(cfs, sstables);
- Map<Pair<String, String>, Map<Integer, UUID>> compactionLogs = SystemKeyspace.getUnfinishedCompactions();
- Set<Integer> unfinishedCompactions = compactionLogs.get(Pair.create(KEYSPACE1, cf)).keySet();
- assertTrue(unfinishedCompactions.containsAll(generations));
-
- SystemKeyspace.finishCompaction(taskId);
- compactionLogs = SystemKeyspace.getUnfinishedCompactions();
- assertFalse(compactionLogs.containsKey(Pair.create(KEYSPACE1, cf)));
- }
-
private void testDontPurgeAccidentaly(String k, String cfname) throws InterruptedException
{
// This test catches the regression of CASSANDRA-2786
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09e60f7/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java b/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
index ab84555..a869a7f 100644
--- a/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
@@ -190,6 +190,9 @@ public class LeveledCompactionStrategyTest
// L0 is the lowest priority, so when that's done, we know everything is done
while (strategy.getSSTableCountPerLevel()[0] > 1)
Thread.sleep(100);
+
+ // in AbstractCompationStrategy.replaceSSTables() first we remove and then we add sstables so wait a little bit longer
+ Thread.sleep(10);
}
@Test
@@ -211,7 +214,7 @@ public class LeveledCompactionStrategyTest
}
waitForLeveling(cfs);
- LeveledCompactionStrategy strategy = (LeveledCompactionStrategy) ( cfs.getCompactionStrategyManager()).getStrategies().get(1);
+ LeveledCompactionStrategy strategy = (LeveledCompactionStrategy) (cfs.getCompactionStrategyManager()).getStrategies().get(1);
assert strategy.getLevelSize(1) > 0;
// get LeveledScanner for level 1 sstables
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09e60f7/test/unit/org/apache/cassandra/db/lifecycle/HelpersTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/lifecycle/HelpersTest.java b/test/unit/org/apache/cassandra/db/lifecycle/HelpersTest.java
index 18bce10..e9c903e 100644
--- a/test/unit/org/apache/cassandra/db/lifecycle/HelpersTest.java
+++ b/test/unit/org/apache/cassandra/db/lifecycle/HelpersTest.java
@@ -18,6 +18,8 @@
*/
package org.apache.cassandra.db.lifecycle;
+import java.util.ArrayList;
+import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -32,11 +34,16 @@ import junit.framework.Assert;
import org.apache.cassandra.MockSchema;
import org.apache.cassandra.Util;
import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.compaction.OperationType;
import org.apache.cassandra.io.sstable.Descriptor;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.io.sstable.format.big.BigTableReader;
+import org.apache.cassandra.utils.Pair;
import org.apache.cassandra.utils.concurrent.Refs;
+import static junit.framework.Assert.assertEquals;
+import static junit.framework.Assert.assertNotNull;
+
public class HelpersTest
{
@@ -150,19 +157,29 @@ public class HelpersTest
for (SSTableReader reader : readers)
Assert.assertTrue(reader.isReplaced());
accumulate = Helpers.setReplaced(readers, null);
- Assert.assertNotNull(accumulate);
+ assertNotNull(accumulate);
}
@Test
public void testMarkObsolete()
{
ColumnFamilyStore cfs = MockSchema.newCFS();
+ TransactionLogs txnLogs = new TransactionLogs(OperationType.UNKNOWN, cfs.metadata);
Iterable<SSTableReader> readers = Lists.newArrayList(MockSchema.sstable(1, cfs), MockSchema.sstable(2, cfs));
- Throwable accumulate = Helpers.markObsolete(null, readers, null);
+
+ List<TransactionLogs.Obsoletion> obsoletions = new ArrayList<>();
+ Assert.assertNull(Helpers.prepareForObsoletion(readers, txnLogs, obsoletions, null));
+ assertNotNull(obsoletions);
+ assertEquals(2, obsoletions.size());
+
+ Throwable accumulate = Helpers.markObsolete(obsoletions, null);
Assert.assertNull(accumulate);
for (SSTableReader reader : readers)
Assert.assertTrue(reader.isMarkedCompacted());
- accumulate = Helpers.markObsolete(null, readers, null);
- Assert.assertNotNull(accumulate);
+
+ accumulate = Helpers.markObsolete(obsoletions, null);
+ assertNotNull(accumulate);
+
+ txnLogs.finish();
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09e60f7/test/unit/org/apache/cassandra/db/lifecycle/LifecycleTransactionTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/lifecycle/LifecycleTransactionTest.java b/test/unit/org/apache/cassandra/db/lifecycle/LifecycleTransactionTest.java
index d6af447..a876891 100644
--- a/test/unit/org/apache/cassandra/db/lifecycle/LifecycleTransactionTest.java
+++ b/test/unit/org/apache/cassandra/db/lifecycle/LifecycleTransactionTest.java
@@ -33,7 +33,6 @@ import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.compaction.OperationType;
import org.apache.cassandra.db.lifecycle.LifecycleTransaction.ReaderState;
import org.apache.cassandra.db.lifecycle.LifecycleTransaction.ReaderState.Action;
-import org.apache.cassandra.io.sstable.SSTableDeletingTask;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.utils.Pair;
import org.apache.cassandra.utils.concurrent.AbstractTransactionalTest;
@@ -249,7 +248,7 @@ public class LifecycleTransactionTest extends AbstractTransactionalTest
protected TestableTransaction newTest()
{
- SSTableDeletingTask.waitForDeletions();
+ TransactionLogs.waitForDeletions();
SSTableReader.resetTidying();
return new TxnTest();
}
@@ -399,6 +398,12 @@ public class LifecycleTransactionTest extends AbstractTransactionalTest
for (SSTableReader reader : concat(loggedObsolete, stagedObsolete))
Assert.assertTrue(reader.selfRef().globalCount() == 0);
}
+
+ @Override
+ protected boolean commitCanThrow()
+ {
+ return true;
+ }
}
private static SSTableReader[] readersArray(int lb, int ub, ColumnFamilyStore cfs)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09e60f7/test/unit/org/apache/cassandra/db/lifecycle/RealTransactionsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/lifecycle/RealTransactionsTest.java b/test/unit/org/apache/cassandra/db/lifecycle/RealTransactionsTest.java
new file mode 100644
index 0000000..5291baa
--- /dev/null
+++ b/test/unit/org/apache/cassandra/db/lifecycle/RealTransactionsTest.java
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.lifecycle;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.lang3.StringUtils;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import junit.framework.Assert;
+import org.apache.cassandra.MockSchema;
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Directories;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.SerializationHeader;
+import org.apache.cassandra.db.compaction.AbstractCompactionStrategy;
+import org.apache.cassandra.db.compaction.CompactionController;
+import org.apache.cassandra.db.compaction.CompactionIterator;
+import org.apache.cassandra.db.compaction.OperationType;
+import org.apache.cassandra.io.sstable.CQLSSTableWriter;
+import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.SSTableRewriter;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.sstable.format.SSTableWriter;
+import org.apache.cassandra.schema.KeyspaceParams;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.FBUtilities;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests to simulate real transactions such as compactions and flushing
+ * using SSTableRewriter, ColumnFamilyStore, LifecycleTransaction, TransactionLogs, etc
+ */
+public class RealTransactionsTest extends SchemaLoader
+{
+ private static final String KEYSPACE = "TransactionLogsTest";
+ private static final String REWRITE_FINISHED_CF = "RewriteFinished";
+ private static final String REWRITE_ABORTED_CF = "RewriteAborted";
+ private static final String FLUSH_CF = "Flush";
+
+ @BeforeClass
+ public static void setUp()
+ {
+ MockSchema.cleanup();
+
+ SchemaLoader.prepareServer();
+ SchemaLoader.createKeyspace(KEYSPACE,
+ KeyspaceParams.simple(1),
+ SchemaLoader.standardCFMD(KEYSPACE, REWRITE_FINISHED_CF),
+ SchemaLoader.standardCFMD(KEYSPACE, REWRITE_ABORTED_CF),
+ SchemaLoader.standardCFMD(KEYSPACE, FLUSH_CF));
+ }
+
+ @Test
+ public void testRewriteFinished() throws IOException
+ {
+ Keyspace keyspace = Keyspace.open(KEYSPACE);
+ ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(REWRITE_FINISHED_CF);
+
+ SSTableReader oldSSTable = getSSTable(cfs, 1);
+ LifecycleTransaction txn = cfs.getTracker().tryModify(oldSSTable, OperationType.COMPACTION);
+ SSTableReader newSSTable = replaceSSTable(cfs, txn, false);
+ TransactionLogs.waitForDeletions();
+
+ assertFiles(txn.logs().getDataFolder(), new HashSet<>(newSSTable.getAllFilePaths()));
+ assertFiles(txn.logs().getLogsFolder(), Collections.<String>emptySet());
+ }
+
+ @Test
+ public void testRewriteAborted() throws IOException
+ {
+ Keyspace keyspace = Keyspace.open(KEYSPACE);
+ ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(REWRITE_ABORTED_CF);
+
+ SSTableReader oldSSTable = getSSTable(cfs, 1);
+ LifecycleTransaction txn = cfs.getTracker().tryModify(oldSSTable, OperationType.COMPACTION);
+
+ replaceSSTable(cfs, txn, true);
+ TransactionLogs.waitForDeletions();
+
+ assertFiles(txn.logs().getDataFolder(), new HashSet<>(oldSSTable.getAllFilePaths()));
+ assertFiles(txn.logs().getLogsFolder(), Collections.<String>emptySet());
+ }
+
+ @Test
+ public void testFlush() throws IOException
+ {
+ Keyspace keyspace = Keyspace.open(KEYSPACE);
+ ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(FLUSH_CF);
+
+ SSTableReader ssTableReader = getSSTable(cfs, 100);
+
+ String dataFolder = cfs.getSSTables().iterator().next().descriptor.directory.getPath();
+ String transactionLogsFolder = StringUtils.join(dataFolder, File.separator, Directories.TRANSACTIONS_SUBDIR);
+
+ assertTrue(new File(transactionLogsFolder).exists());
+ assertFiles(transactionLogsFolder, Collections.<String>emptySet());
+
+ assertFiles(dataFolder, new HashSet<>(ssTableReader.getAllFilePaths()));
+ }
+
+ private SSTableReader getSSTable(ColumnFamilyStore cfs, int numPartitions) throws IOException
+ {
+ createSSTable(cfs, numPartitions);
+
+ Set<SSTableReader> sstables = new HashSet<>(cfs.getSSTables());
+ assertEquals(1, sstables.size());
+ return sstables.iterator().next();
+ }
+
+ private void createSSTable(ColumnFamilyStore cfs, int numPartitions) throws IOException
+ {
+ cfs.truncateBlocking();
+
+ String schema = "CREATE TABLE %s.%s (key ascii, name ascii, val ascii, val1 ascii, PRIMARY KEY (key, name))";
+ String query = "INSERT INTO %s.%s (key, name, val) VALUES (?, ?, ?)";
+
+ try (CQLSSTableWriter writer = CQLSSTableWriter.builder()
+ .withPartitioner(StorageService.getPartitioner())
+ .inDirectory(cfs.directories.getDirectoryForNewSSTables())
+ .forTable(String.format(schema, cfs.keyspace.getName(), cfs.name))
+ .using(String.format(query, cfs.keyspace.getName(), cfs.name))
+ .build())
+ {
+ for (int j = 0; j < numPartitions; j ++)
+ writer.addRow(String.format("key%d", j), "col1", "0");
+ }
+
+ cfs.loadNewSSTables();
+ }
+
+ private SSTableReader replaceSSTable(ColumnFamilyStore cfs, LifecycleTransaction txn, boolean fail)
+ {
+ List<SSTableReader> newsstables = null;
+ int nowInSec = FBUtilities.nowInSeconds();
+ try (CompactionController controller = new CompactionController(cfs, txn.originals(), cfs.gcBefore(FBUtilities.nowInSeconds())))
+ {
+ try (SSTableRewriter rewriter = new SSTableRewriter(cfs, txn, 1000, false);
+ AbstractCompactionStrategy.ScannerList scanners = cfs.getCompactionStrategyManager().getScanners(txn.originals());
+ CompactionIterator ci = new CompactionIterator(txn.opType(), scanners.scanners, controller, nowInSec, txn.opId())
+ )
+ {
+ long lastCheckObsoletion = System.nanoTime();
+ File directory = txn.originals().iterator().next().descriptor.directory;
+ Descriptor desc = Descriptor.fromFilename(cfs.getSSTablePath(directory));
+ CFMetaData metadata = Schema.instance.getCFMetaData(desc);
+ rewriter.switchWriter(SSTableWriter.create(metadata,
+ desc,
+ 0,
+ 0,
+ 0,
+ DatabaseDescriptor.getPartitioner(),
+ SerializationHeader.make(cfs.metadata, txn.originals()),
+ txn));
+ while (ci.hasNext())
+ {
+ rewriter.append(ci.next());
+
+ if (System.nanoTime() - lastCheckObsoletion > TimeUnit.MINUTES.toNanos(1L))
+ {
+ controller.maybeRefreshOverlaps();
+ lastCheckObsoletion = System.nanoTime();
+ }
+ }
+
+ if (!fail)
+ newsstables = rewriter.finish();
+ else
+ rewriter.abort();
+ }
+ }
+
+ assertTrue(fail || newsstables != null);
+
+ if (newsstables != null)
+ {
+ Assert.assertEquals(1, newsstables.size());
+ return newsstables.iterator().next();
+ }
+
+ return null;
+ }
+
+ private void assertFiles(String dirPath, Set<String> expectedFiles)
+ {
+ File dir = new File(dirPath);
+ for (File file : dir.listFiles())
+ {
+ if (file.isDirectory())
+ continue;
+
+ String filePath = file.getPath();
+ assertTrue(filePath, expectedFiles.contains(filePath));
+ expectedFiles.remove(filePath);
+ }
+
+ assertTrue(expectedFiles.isEmpty());
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09e60f7/test/unit/org/apache/cassandra/db/lifecycle/TrackerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/lifecycle/TrackerTest.java b/test/unit/org/apache/cassandra/db/lifecycle/TrackerTest.java
index dbd5287..89924a5 100644
--- a/test/unit/org/apache/cassandra/db/lifecycle/TrackerTest.java
+++ b/test/unit/org/apache/cassandra/db/lifecycle/TrackerTest.java
@@ -40,7 +40,6 @@ import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.Memtable;
import org.apache.cassandra.db.commitlog.ReplayPosition;
import org.apache.cassandra.db.compaction.OperationType;
-import org.apache.cassandra.io.sstable.SSTableDeletingTask;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.notifications.*;
import org.apache.cassandra.utils.concurrent.OpOrder;
@@ -152,6 +151,9 @@ public class TrackerTest
Assert.assertEquals(3, tracker.view.get().sstables.size());
+ for (SSTableReader reader : readers)
+ Assert.assertTrue(reader.isKeyCacheSetup());
+
Assert.assertEquals(17 + 121 + 9, cfs.metric.liveDiskSpaceUsed.getCount());
}
@@ -171,6 +173,9 @@ public class TrackerTest
Assert.assertEquals(3, tracker.view.get().sstables.size());
+ for (SSTableReader reader : readers)
+ Assert.assertTrue(reader.isKeyCacheSetup());
+
Assert.assertEquals(17 + 121 + 9, cfs.metric.liveDiskSpaceUsed.getCount());
Assert.assertEquals(3, listener.senders.size());
Assert.assertEquals(tracker, listener.senders.get(0));
@@ -182,9 +187,9 @@ public class TrackerTest
public void testDropSSTables()
{
testDropSSTables(false);
- SSTableDeletingTask.waitForDeletions();
+ TransactionLogs.waitForDeletions();
testDropSSTables(true);
- SSTableDeletingTask.waitForDeletions();
+ TransactionLogs.waitForDeletions();
}
private void testDropSSTables(boolean invalidate)
@@ -200,7 +205,7 @@ public class TrackerTest
try
{
- SSTableDeletingTask.pauseDeletions(true);
+ TransactionLogs.pauseDeletions(true);
try (LifecycleTransaction txn = tracker.tryModify(readers.get(0), OperationType.COMPACTION))
{
if (invalidate)
@@ -222,19 +227,19 @@ public class TrackerTest
Assert.assertEquals(0, reader.selfRef().globalCount());
Assert.assertTrue(reader.isMarkedCompacted());
}
- Assert.assertNull(tracker.dropSSTables(new Predicate<SSTableReader>() {
- public boolean apply(SSTableReader reader)
- {
- return reader != readers.get(0);
- }
- },
- OperationType.UNKNOWN,
- null));
+
+ Assert.assertNull(tracker.dropSSTables(reader -> reader != readers.get(0), OperationType.UNKNOWN, null));
+
Assert.assertEquals(1, tracker.getView().sstables.size());
Assert.assertEquals(3, listener.received.size());
Assert.assertEquals(tracker, listener.senders.get(0));
- Assert.assertEquals(2, ((SSTableListChangedNotification) listener.received.get(0)).removed.size());
- Assert.assertEquals(0, ((SSTableListChangedNotification) listener.received.get(0)).added.size());
+ Assert.assertTrue(listener.received.get(0) instanceof SSTableDeletingNotification);
+ Assert.assertTrue(listener.received.get(1) instanceof SSTableDeletingNotification);
+ Assert.assertTrue(listener.received.get(2) instanceof SSTableListChangedNotification);
+ Assert.assertEquals(readers.get(1), ((SSTableDeletingNotification) listener.received.get(0)).deleting);
+ Assert.assertEquals(readers.get(2), ((SSTableDeletingNotification)listener.received.get(1)).deleting);
+ Assert.assertEquals(2, ((SSTableListChangedNotification) listener.received.get(2)).removed.size());
+ Assert.assertEquals(0, ((SSTableListChangedNotification) listener.received.get(2)).added.size());
Assert.assertEquals(9, cfs.metric.liveDiskSpaceUsed.getCount());
readers.get(0).selfRef().release();
}
@@ -248,7 +253,7 @@ public class TrackerTest
}
finally
{
- SSTableDeletingTask.pauseDeletions(false);
+ TransactionLogs.pauseDeletions(false);
}
}
@@ -299,6 +304,7 @@ public class TrackerTest
Assert.assertEquals(1, listener.received.size());
Assert.assertEquals(reader, ((SSTableAddedNotification) listener.received.get(0)).added);
listener.received.clear();
+ Assert.assertTrue(reader.isKeyCacheSetup());
Assert.assertEquals(10, cfs.metric.liveDiskSpaceUsed.getCount());
// test invalidated CFS
@@ -314,8 +320,10 @@ public class TrackerTest
Assert.assertEquals(0, tracker.getView().sstables.size());
Assert.assertEquals(0, tracker.getView().flushingMemtables.size());
Assert.assertEquals(0, cfs.metric.liveDiskSpaceUsed.getCount());
+ Assert.assertEquals(3, listener.received.size());
Assert.assertEquals(reader, ((SSTableAddedNotification) listener.received.get(0)).added);
- Assert.assertEquals(1, ((SSTableListChangedNotification) listener.received.get(1)).removed.size());
+ Assert.assertTrue(listener.received.get(1) instanceof SSTableDeletingNotification);
+ Assert.assertEquals(1, ((SSTableListChangedNotification) listener.received.get(2)).removed.size());
DatabaseDescriptor.setIncrementalBackupsEnabled(backups);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09e60f7/test/unit/org/apache/cassandra/db/lifecycle/TransactionLogsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/lifecycle/TransactionLogsTest.java b/test/unit/org/apache/cassandra/db/lifecycle/TransactionLogsTest.java
new file mode 100644
index 0000000..3150087
--- /dev/null
+++ b/test/unit/org/apache/cassandra/db/lifecycle/TransactionLogsTest.java
@@ -0,0 +1,558 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.lifecycle;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.file.Files;
+import java.nio.file.StandardCopyOption;
+import java.util.*;
+
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Sets;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import static junit.framework.Assert.assertNotNull;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import junit.framework.Assert;
+import org.apache.cassandra.MockSchema;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Directories;
+import org.apache.cassandra.db.SerializationHeader;
+import org.apache.cassandra.db.compaction.*;
+import org.apache.cassandra.dht.Murmur3Partitioner;
+import org.apache.cassandra.io.sstable.*;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
+import org.apache.cassandra.io.sstable.metadata.MetadataType;
+import org.apache.cassandra.io.sstable.metadata.StatsMetadata;
+import org.apache.cassandra.io.util.BufferedSegmentedFile;
+import org.apache.cassandra.io.util.ChannelProxy;
+import org.apache.cassandra.io.util.SegmentedFile;
+import org.apache.cassandra.utils.AlwaysPresentFilter;
+import org.apache.cassandra.utils.concurrent.AbstractTransactionalTest;
+import org.apache.cassandra.utils.concurrent.Transactional;
+
+public class TransactionLogsTest extends AbstractTransactionalTest
+{
+ private static final String KEYSPACE = "TransactionLogsTest";
+
+ @BeforeClass
+ public static void setUp()
+ {
+ MockSchema.cleanup();
+ }
+
+ protected AbstractTransactionalTest.TestableTransaction newTest() throws Exception
+ {
+ TransactionLogs.waitForDeletions();
+ SSTableReader.resetTidying();
+ return new TxnTest();
+ }
+
+ private static final class TxnTest extends TestableTransaction
+ {
+ private final static class Transaction extends Transactional.AbstractTransactional implements Transactional
+ {
+ final ColumnFamilyStore cfs;
+ final TransactionLogs txnLogs;
+ final SSTableReader sstableOld;
+ final SSTableReader sstableNew;
+ final TransactionLogs.SSTableTidier tidier;
+
+ public Transaction(ColumnFamilyStore cfs, TransactionLogs txnLogs) throws IOException
+ {
+ this.cfs = cfs;
+ this.txnLogs = txnLogs;
+ this.sstableOld = sstable(cfs, 0, 128);
+ this.sstableNew = sstable(cfs, 1, 128);
+
+ assertNotNull(txnLogs);
+ assertNotNull(txnLogs.getId());
+ Assert.assertEquals(OperationType.COMPACTION, txnLogs.getType());
+
+ txnLogs.trackNew(sstableNew);
+ tidier = txnLogs.obsoleted(sstableOld);
+ assertNotNull(tidier);
+ }
+
+ protected Throwable doCommit(Throwable accumulate)
+ {
+ sstableOld.markObsolete(tidier);
+ sstableOld.selfRef().release();
+ TransactionLogs.waitForDeletions();
+
+ Throwable ret = txnLogs.commit(accumulate);
+
+ sstableNew.selfRef().release();
+ return ret;
+ }
+
+ protected Throwable doAbort(Throwable accumulate)
+ {
+ tidier.abort();
+ TransactionLogs.waitForDeletions();
+
+ Throwable ret = txnLogs.abort(accumulate);
+
+ sstableNew.selfRef().release();
+ sstableOld.selfRef().release();
+ return ret;
+ }
+
+ protected void doPrepare()
+ {
+ txnLogs.prepareToCommit();
+ }
+
+ protected void assertInProgress() throws Exception
+ {
+ assertFiles(txnLogs.getDataFolder(), Sets.newHashSet(Iterables.concat(sstableNew.getAllFilePaths(),
+ sstableOld.getAllFilePaths())));
+ assertFiles(txnLogs.getLogsFolder(), Sets.newHashSet(txnLogs.getData().oldLog().file.getPath(),
+ txnLogs.getData().newLog().file.getPath()));
+ assertEquals(2, TransactionLogs.getLogFiles(cfs.metadata).size());
+ }
+
+ protected void assertPrepared() throws Exception
+ {
+ }
+
+ protected void assertAborted() throws Exception
+ {
+ assertFiles(txnLogs.getDataFolder(), new HashSet<>(sstableOld.getAllFilePaths()));
+ assertFiles(txnLogs.getLogsFolder(), Collections.<String>emptySet());
+ assertEquals(0, TransactionLogs.getLogFiles(cfs.metadata).size());
+ }
+
+ protected void assertCommitted() throws Exception
+ {
+ assertFiles(txnLogs.getDataFolder(), new HashSet<>(sstableNew.getAllFilePaths()));
+ assertFiles(txnLogs.getLogsFolder(), Collections.<String>emptySet());
+ assertEquals(0, TransactionLogs.getLogFiles(cfs.metadata).size());
+ }
+ }
+
+ final Transaction txn;
+
+ private TxnTest() throws IOException
+ {
+ this(MockSchema.newCFS(KEYSPACE));
+ }
+
+ private TxnTest(ColumnFamilyStore cfs) throws IOException
+ {
+ this(cfs, new TransactionLogs(OperationType.COMPACTION, cfs.metadata));
+ }
+
+ private TxnTest(ColumnFamilyStore cfs, TransactionLogs txnLogs) throws IOException
+ {
+ this(new Transaction(cfs, txnLogs));
+ }
+
+ private TxnTest(Transaction txn)
+ {
+ super(txn);
+ this.txn = txn;
+ }
+
+ protected void assertInProgress() throws Exception
+ {
+ txn.assertInProgress();
+ }
+
+ protected void assertPrepared() throws Exception
+ {
+ txn.assertPrepared();
+ }
+
+ protected void assertAborted() throws Exception
+ {
+ txn.assertAborted();
+ }
+
+ protected void assertCommitted() throws Exception
+ {
+ txn.assertCommitted();
+ }
+ }
+
+ @Test
+ public void testUntrack() throws Throwable
+ {
+ ColumnFamilyStore cfs = MockSchema.newCFS(KEYSPACE);
+ SSTableReader sstableNew = sstable(cfs, 1, 128);
+
+ // complete a transaction without keep the new files since they were untracked
+ TransactionLogs transactionLogs = new TransactionLogs(OperationType.COMPACTION, cfs.metadata);
+ assertNotNull(transactionLogs);
+
+ transactionLogs.trackNew(sstableNew);
+ transactionLogs.untrackNew(sstableNew);
+
+ transactionLogs.finish();
+
+ assertFiles(transactionLogs.getDataFolder(), Collections.<String>emptySet());
+ assertFiles(transactionLogs.getLogsFolder(), Collections.<String>emptySet());
+ assertEquals(0, TransactionLogs.getLogFiles(cfs.metadata).size());
+
+ sstableNew.selfRef().release();
+ }
+
+ @Test
+ public void testCommitSameDesc() throws Throwable
+ {
+ ColumnFamilyStore cfs = MockSchema.newCFS(KEYSPACE);
+ SSTableReader sstableOld1 = sstable(cfs, 0, 128);
+ SSTableReader sstableOld2 = sstable(cfs, 0, 256);
+ SSTableReader sstableNew = sstable(cfs, 1, 128);
+
+ TransactionLogs transactionLogs = new TransactionLogs(OperationType.COMPACTION, cfs.metadata);
+ assertNotNull(transactionLogs);
+
+ transactionLogs.trackNew(sstableNew);
+
+ sstableOld1.setReplaced();
+
+ TransactionLogs.SSTableTidier tidier = transactionLogs.obsoleted(sstableOld2);
+ assertNotNull(tidier);
+
+ transactionLogs.finish();
+
+ sstableOld2.markObsolete(tidier);
+
+ sstableOld1.selfRef().release();
+ sstableOld2.selfRef().release();
+
+ TransactionLogs.waitForDeletions();
+
+ assertFiles(transactionLogs.getDataFolder(), new HashSet<>(sstableNew.getAllFilePaths()));
+ assertFiles(transactionLogs.getLogsFolder(), Collections.<String>emptySet());
+ assertEquals(0, TransactionLogs.getLogFiles(cfs.metadata).size());
+
+ sstableNew.selfRef().release();
+ }
+
+ @Test
+ public void testCommitOnlyNew() throws Throwable
+ {
+ ColumnFamilyStore cfs = MockSchema.newCFS(KEYSPACE);
+ SSTableReader sstable = sstable(cfs, 0, 128);
+
+ TransactionLogs transactionLogs = new TransactionLogs(OperationType.COMPACTION, cfs.metadata);
+ assertNotNull(transactionLogs);
+
+ transactionLogs.trackNew(sstable);
+ transactionLogs.finish();
+
+ assertFiles(transactionLogs.getDataFolder(), new HashSet<>(sstable.getAllFilePaths()));
+ assertFiles(transactionLogs.getLogsFolder(), Collections.<String>emptySet());
+ assertEquals(0, TransactionLogs.getLogFiles(cfs.metadata).size());
+
+ sstable.selfRef().release();
+ }
+
+ @Test
+ public void testCommitOnlyOld() throws Throwable
+ {
+ ColumnFamilyStore cfs = MockSchema.newCFS(KEYSPACE);
+ SSTableReader sstable = sstable(cfs, 0, 128);
+
+ TransactionLogs transactionLogs = new TransactionLogs(OperationType.COMPACTION, cfs.metadata);
+ assertNotNull(transactionLogs);
+
+ TransactionLogs.SSTableTidier tidier = transactionLogs.obsoleted(sstable);
+ assertNotNull(tidier);
+
+ transactionLogs.finish();
+ sstable.markObsolete(tidier);
+ sstable.selfRef().release();
+
+ TransactionLogs.waitForDeletions();
+
+ assertFiles(transactionLogs.getDataFolder(), new HashSet<>());
+ assertFiles(transactionLogs.getLogsFolder(), Collections.<String>emptySet());
+ assertEquals(0, TransactionLogs.getLogFiles(cfs.metadata).size());
+ }
+
+ @Test
+ public void testAbortOnlyNew() throws Throwable
+ {
+ ColumnFamilyStore cfs = MockSchema.newCFS(KEYSPACE);
+ SSTableReader sstable = sstable(cfs, 0, 128);
+
+ TransactionLogs transactionLogs = new TransactionLogs(OperationType.COMPACTION, cfs.metadata);
+ assertNotNull(transactionLogs);
+
+ transactionLogs.trackNew(sstable);
+ transactionLogs.abort();
+
+ sstable.selfRef().release();
+
+ assertFiles(transactionLogs.getDataFolder(), new HashSet<>());
+ assertFiles(transactionLogs.getLogsFolder(), Collections.<String>emptySet());
+ assertEquals(0, TransactionLogs.getLogFiles(cfs.metadata).size());
+ }
+
+ @Test
+ public void testAbortOnlyOld() throws Throwable
+ {
+ ColumnFamilyStore cfs = MockSchema.newCFS(KEYSPACE);
+ SSTableReader sstable = sstable(cfs, 0, 128);
+
+ TransactionLogs transactionLogs = new TransactionLogs(OperationType.COMPACTION, cfs.metadata);
+ assertNotNull(transactionLogs);
+
+ TransactionLogs.SSTableTidier tidier = transactionLogs.obsoleted(sstable);
+ assertNotNull(tidier);
+
+ tidier.abort();
+ transactionLogs.abort();
+
+ sstable.selfRef().release();
+
+ assertFiles(transactionLogs.getDataFolder(), new HashSet<>(sstable.getAllFilePaths()));
+ assertFiles(transactionLogs.getLogsFolder(), Collections.<String>emptySet());
+ assertEquals(0, TransactionLogs.getLogFiles(cfs.metadata).size());
+ }
+
+ private File copyToTmpFile(File file) throws IOException
+ {
+ File ret = File.createTempFile(file.getName(), ".tmp");
+ ret.deleteOnExit();
+ Files.copy(file.toPath(), ret.toPath(), StandardCopyOption.REPLACE_EXISTING);
+ return ret;
+ }
+
+ @Test
+ public void testRemoveUnfinishedLeftovers_newLogFound() throws Throwable
+ {
+ ColumnFamilyStore cfs = MockSchema.newCFS(KEYSPACE);
+ SSTableReader sstableOld = sstable(cfs, 0, 128);
+ SSTableReader sstableNew = sstable(cfs, 1, 128);
+
+ // simulate tracking sstables with a failed transaction (new log file NOT deleted)
+ TransactionLogs transactionLogs = new TransactionLogs(OperationType.COMPACTION, cfs.metadata);
+ assertNotNull(transactionLogs);
+
+ transactionLogs.trackNew(sstableNew);
+ TransactionLogs.SSTableTidier tidier = transactionLogs.obsoleted(sstableOld);
+
+ File tmpNewLog = copyToTmpFile(transactionLogs.getData().newLog().file);
+ File tmpOldLog = copyToTmpFile(transactionLogs.getData().oldLog().file);
+
+ Set<File> tmpFiles = new HashSet<>(TransactionLogs.getLogFiles(cfs.metadata));
+ for (String p : sstableNew.getAllFilePaths())
+ tmpFiles.add(new File(p));
+
+ sstableNew.selfRef().release();
+ sstableOld.selfRef().release();
+
+ Assert.assertEquals(tmpFiles, TransactionLogs.getTemporaryFiles(cfs.metadata, sstableNew.descriptor.directory));
+
+ // normally called at startup
+ TransactionLogs.removeUnfinishedLeftovers(cfs.metadata);
+
+ // sstable should not have been removed because the new log was found
+ Directories directories = new Directories(cfs.metadata);
+ Map<Descriptor, Set<Component>> sstables = directories.sstableLister().list();
+ assertEquals(1, sstables.size());
+
+ assertFiles(transactionLogs.getDataFolder(), new HashSet<>(sstableOld.getAllFilePaths()));
+ assertFiles(transactionLogs.getLogsFolder(), Collections.<String>emptySet());
+
+ tidier.run();
+
+ // copy old transaction files contents back or transactionlogs will throw assertions
+ Files.move(tmpNewLog.toPath(), transactionLogs.getData().newLog().file.toPath());
+ Files.move(tmpOldLog.toPath(), transactionLogs.getData().oldLog().file.toPath());
+
+ transactionLogs.close();
+ }
+
+ @Test
+ public void testRemoveUnfinishedLeftovers_oldLogFound() throws Throwable
+ {
+ ColumnFamilyStore cfs = MockSchema.newCFS(KEYSPACE);
+ SSTableReader sstableOld = sstable(cfs, 0, 128);
+ SSTableReader sstableNew = sstable(cfs, 1, 128);
+
+ // simulate tracking sstables with a committed transaction (new log file deleted)
+ TransactionLogs transactionLogs = new TransactionLogs(OperationType.COMPACTION, cfs.metadata);
+ assertNotNull(transactionLogs);
+
+ transactionLogs.trackNew(sstableNew);
+ TransactionLogs.SSTableTidier tidier = transactionLogs.obsoleted(sstableOld);
+
+ File tmpNewLog = copyToTmpFile(transactionLogs.getData().newLog().file);
+ File tmpOldLog = copyToTmpFile(transactionLogs.getData().oldLog().file);
+
+ transactionLogs.getData().newLog().delete(false);
+
+ Set<File> tmpFiles = new HashSet<>(TransactionLogs.getLogFiles(cfs.metadata));
+ for (String p : sstableOld.getAllFilePaths())
+ tmpFiles.add(new File(p));
+
+ sstableNew.selfRef().release();
+ sstableOld.selfRef().release();
+
+ Assert.assertEquals(tmpFiles, TransactionLogs.getTemporaryFiles(cfs.metadata, sstableOld.descriptor.directory));
+
+ // normally called at startup
+ TransactionLogs.removeUnfinishedLeftovers(cfs.metadata);
+
+ // sstable should have been removed because there was no new log.
+ Directories directories = new Directories(cfs.metadata);
+ Map<Descriptor, Set<Component>> sstables = directories.sstableLister().list();
+ assertEquals(1, sstables.size());
+
+ assertFiles(transactionLogs.getDataFolder(), new HashSet<>(sstableNew.getAllFilePaths()));
+ assertFiles(transactionLogs.getLogsFolder(), Collections.<String>emptySet());
+
+ tidier.run();
+
+ // copy old transaction files contents back or transactionlogs will throw assertions
+ Files.move(tmpNewLog.toPath(), transactionLogs.getData().newLog().file.toPath());
+ Files.move(tmpOldLog.toPath(), transactionLogs.getData().oldLog().file.toPath());
+
+ transactionLogs.close();
+ }
+
+ @Test
+ public void testGetTemporaryFiles() throws IOException
+ {
+ ColumnFamilyStore cfs = MockSchema.newCFS(KEYSPACE);
+ SSTableReader sstable1 = sstable(cfs, 0, 128);
+
+ File dataFolder = sstable1.descriptor.directory;
+
+ Set<File> tmpFiles = TransactionLogs.getTemporaryFiles(cfs.metadata, dataFolder);
+ assertNotNull(tmpFiles);
+ assertEquals(0, tmpFiles.size());
+
+ TransactionLogs transactionLogs = new TransactionLogs(OperationType.WRITE, cfs.metadata);
+ Directories directories = new Directories(cfs.metadata);
+
+ File[] beforeSecondSSTable = dataFolder.listFiles(pathname -> !pathname.isDirectory());
+
+ SSTableReader sstable2 = sstable(cfs, 1, 128);
+ transactionLogs.trackNew(sstable2);
+
+ Map<Descriptor, Set<Component>> sstables = directories.sstableLister().list();
+ assertEquals(2, sstables.size());
+
+ File[] afterSecondSSTable = dataFolder.listFiles(pathname -> !pathname.isDirectory());
+ int numNewFiles = afterSecondSSTable.length - beforeSecondSSTable.length;
+ assertTrue(numNewFiles == sstable2.getAllFilePaths().size());
+
+ tmpFiles = TransactionLogs.getTemporaryFiles(cfs.metadata, dataFolder);
+ assertNotNull(tmpFiles);
+ assertEquals(numNewFiles + 2, tmpFiles.size()); //the extra files are the transaction log files
+
+ File ssTable2DataFile = new File(sstable2.descriptor.filenameFor(Component.DATA));
+ File ssTable2IndexFile = new File(sstable2.descriptor.filenameFor(Component.PRIMARY_INDEX));
+
+ assertTrue(tmpFiles.contains(ssTable2DataFile));
+ assertTrue(tmpFiles.contains(ssTable2IndexFile));
+
+ List<File> files = directories.sstableLister().listFiles();
+ List<File> filesNoTmp = directories.sstableLister().skipTemporary(true).listFiles();
+ assertNotNull(files);
+ assertNotNull(filesNoTmp);
+
+ assertTrue(files.contains(ssTable2DataFile));
+ assertTrue(files.contains(ssTable2IndexFile));
+
+ assertFalse(filesNoTmp.contains(ssTable2DataFile));
+ assertFalse(filesNoTmp.contains(ssTable2IndexFile));
+
+ transactionLogs.finish();
+
+ //Now it should be empty since the transaction has finished
+ tmpFiles = TransactionLogs.getTemporaryFiles(cfs.metadata, dataFolder);
+ assertNotNull(tmpFiles);
+ assertEquals(0, tmpFiles.size());
+
+ filesNoTmp = directories.sstableLister().skipTemporary(true).listFiles();
+ assertNotNull(filesNoTmp);
+ assertTrue(filesNoTmp.contains(ssTable2DataFile));
+ assertTrue(filesNoTmp.contains(ssTable2IndexFile));
+
+ sstable1.selfRef().release();
+ sstable2.selfRef().release();
+ }
+
+ public static SSTableReader sstable(ColumnFamilyStore cfs, int generation, int size) throws IOException
+ {
+ Directories dir = new Directories(cfs.metadata);
+ Descriptor descriptor = new Descriptor(dir.getDirectoryForNewSSTables(), cfs.keyspace.getName(), cfs.getColumnFamilyName(), generation);
+ Set<Component> components = ImmutableSet.of(Component.DATA, Component.PRIMARY_INDEX, Component.FILTER, Component.TOC);
+ for (Component component : components)
+ {
+ File file = new File(descriptor.filenameFor(component));
+ file.createNewFile();
+ try (RandomAccessFile raf = new RandomAccessFile(file, "rw"))
+ {
+ raf.setLength(size);
+ }
+ }
+
+ SegmentedFile dFile = new BufferedSegmentedFile(new ChannelProxy(new File(descriptor.filenameFor(Component.DATA))), 0);
+ SegmentedFile iFile = new BufferedSegmentedFile(new ChannelProxy(new File(descriptor.filenameFor(Component.PRIMARY_INDEX))), 0);
+
+ SerializationHeader header = SerializationHeader.make(cfs.metadata, Collections.EMPTY_LIST);
+ StatsMetadata metadata = (StatsMetadata) new MetadataCollector(cfs.metadata.comparator)
+ .finalizeMetadata(Murmur3Partitioner.instance.getClass().getCanonicalName(), 0.01f, -1, header)
+ .get(MetadataType.STATS);
+ SSTableReader reader = SSTableReader.internalOpen(descriptor,
+ components,
+ cfs.metadata,
+ Murmur3Partitioner.instance,
+ dFile,
+ iFile,
+ MockSchema.indexSummary.sharedCopy(),
+ new AlwaysPresentFilter(),
+ 1L,
+ metadata,
+ SSTableReader.OpenReason.NORMAL,
+ header);
+ reader.first = reader.last = MockSchema.readerBounds(generation);
+ return reader;
+ }
+
+ private static void assertFiles(String dirPath, Set<String> expectedFiles)
+ {
+ File dir = new File(dirPath);
+ for (File file : dir.listFiles())
+ {
+ if (file.isDirectory())
+ continue;
+
+ String filePath = file.getPath();
+ assertTrue(filePath, expectedFiles.contains(filePath));
+ expectedFiles.remove(filePath);
+ }
+
+ assertTrue(expectedFiles.isEmpty());
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09e60f7/test/unit/org/apache/cassandra/io/sstable/BigTableWriterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/BigTableWriterTest.java b/test/unit/org/apache/cassandra/io/sstable/BigTableWriterTest.java
index 29875d5..08de62f 100644
--- a/test/unit/org/apache/cassandra/io/sstable/BigTableWriterTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/BigTableWriterTest.java
@@ -61,19 +61,19 @@ public class BigTableWriterTest extends AbstractTransactionalTest
{
final File file;
final Descriptor descriptor;
- final SSTableWriter writer;
+ final SSTableTxnWriter writer;
private TestableBTW() throws IOException
{
- this(cfs.getTempSSTablePath(cfs.directories.getDirectoryForNewSSTables()));
+ this(cfs.getSSTablePath(cfs.directories.getDirectoryForNewSSTables()));
}
private TestableBTW(String file) throws IOException
{
- this(file, SSTableWriter.create(file, 0, 0, new SerializationHeader(cfs.metadata, cfs.metadata.partitionColumns(), EncodingStats.NO_STATS)));
+ this(file, SSTableTxnWriter.create(file, 0, 0, new SerializationHeader(cfs.metadata, cfs.metadata.partitionColumns(), EncodingStats.NO_STATS)));
}
- private TestableBTW(String file, SSTableWriter sw) throws IOException
+ private TestableBTW(String file, SSTableTxnWriter sw) throws IOException
{
super(sw);
this.file = new File(file);
@@ -91,22 +91,19 @@ public class BigTableWriterTest extends AbstractTransactionalTest
protected void assertInProgress() throws Exception
{
- assertExists(Descriptor.Type.TEMP, Component.DATA, Component.PRIMARY_INDEX);
- assertNotExists(Descriptor.Type.TEMP, Component.FILTER, Component.SUMMARY);
- assertNotExists(Descriptor.Type.FINAL, Component.DATA, Component.PRIMARY_INDEX, Component.FILTER, Component.SUMMARY);
+ assertExists(Component.DATA, Component.PRIMARY_INDEX);
+ assertNotExists(Component.FILTER, Component.SUMMARY);
Assert.assertTrue(file.length() > 0);
}
protected void assertPrepared() throws Exception
{
- assertNotExists(Descriptor.Type.TEMP, Component.DATA, Component.PRIMARY_INDEX, Component.FILTER, Component.SUMMARY);
- assertExists(Descriptor.Type.FINAL, Component.DATA, Component.PRIMARY_INDEX, Component.FILTER, Component.SUMMARY);
+ assertExists(Component.DATA, Component.PRIMARY_INDEX, Component.FILTER, Component.SUMMARY);
}
protected void assertAborted() throws Exception
{
- assertNotExists(Descriptor.Type.TEMP, Component.DATA, Component.PRIMARY_INDEX, Component.FILTER, Component.SUMMARY);
- assertNotExists(Descriptor.Type.FINAL, Component.DATA, Component.PRIMARY_INDEX, Component.FILTER, Component.SUMMARY);
+ assertNotExists(Component.DATA, Component.PRIMARY_INDEX, Component.FILTER, Component.SUMMARY);
Assert.assertFalse(file.exists());
}
@@ -115,15 +112,15 @@ public class BigTableWriterTest extends AbstractTransactionalTest
assertPrepared();
}
- private void assertExists(Descriptor.Type type, Component ... components)
+ private void assertExists(Component ... components)
{
for (Component component : components)
- Assert.assertTrue(new File(descriptor.asType(type).filenameFor(component)).exists());
+ Assert.assertTrue(new File(descriptor.filenameFor(component)).exists());
}
- private void assertNotExists(Descriptor.Type type, Component ... components)
+ private void assertNotExists(Component ... components)
{
for (Component component : components)
- Assert.assertFalse(type.toString() + " " + component.toString(), new File(descriptor.asType(type).filenameFor(component)).exists());
+ Assert.assertFalse(component.toString(), new File(descriptor.filenameFor(component)).exists());
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09e60f7/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterClientTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterClientTest.java b/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterClientTest.java
index dab88c8..9a558f1 100644
--- a/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterClientTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterClientTest.java
@@ -28,10 +28,13 @@ import org.junit.Before;
import org.junit.Test;
import org.apache.cassandra.config.Config;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.Directories;
import org.apache.cassandra.exceptions.InvalidRequestException;
import org.apache.cassandra.io.util.FileUtils;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
public class CQLSSTableWriterClientTest
{
@@ -97,5 +100,11 @@ public class CQLSSTableWriterClientTest
File[] dataFiles = this.testDirectory.listFiles(filter);
assertEquals(2, dataFiles.length);
+ File transactionsFolder = Directories.getTransactionsDirectory(testDirectory);
+ assertTrue(transactionsFolder.exists());
+
+ File[] opFiles = transactionsFolder.listFiles();
+ assertEquals(0, opFiles.length);
+
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09e60f7/test/unit/org/apache/cassandra/io/sstable/DescriptorTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/DescriptorTest.java b/test/unit/org/apache/cassandra/io/sstable/DescriptorTest.java
index ceb0f9c..19eca40 100644
--- a/test/unit/org/apache/cassandra/io/sstable/DescriptorTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/DescriptorTest.java
@@ -77,24 +77,19 @@ public class DescriptorTest
private void testFromFilenameFor(File dir)
{
// normal
- checkFromFilename(new Descriptor(dir, ksname, cfname, 1, Descriptor.Type.FINAL), false);
+ checkFromFilename(new Descriptor(dir, ksname, cfname, 1), false);
// skip component (for streaming lock file)
- checkFromFilename(new Descriptor(dir, ksname, cfname, 2, Descriptor.Type.FINAL), true);
- // tmp
- checkFromFilename(new Descriptor(dir, ksname, cfname, 3, Descriptor.Type.TEMP), false);
+ checkFromFilename(new Descriptor(dir, ksname, cfname, 2), true);
+
// secondary index
String idxName = "myidx";
File idxDir = new File(dir.getAbsolutePath() + File.separator + Directories.SECONDARY_INDEX_NAME_SEPARATOR + idxName);
- checkFromFilename(new Descriptor(idxDir, ksname, cfname + Directories.SECONDARY_INDEX_NAME_SEPARATOR + idxName, 4, Descriptor.Type.FINAL), false);
- // secondary index tmp
- checkFromFilename(new Descriptor(idxDir, ksname, cfname + Directories.SECONDARY_INDEX_NAME_SEPARATOR + idxName, 5, Descriptor.Type.TEMP), false);
+ checkFromFilename(new Descriptor(idxDir, ksname, cfname + Directories.SECONDARY_INDEX_NAME_SEPARATOR + idxName, 4), false);
// legacy version
- checkFromFilename(new Descriptor("ja", dir, ksname, cfname, 1, Descriptor.Type.FINAL, SSTableFormat.Type.LEGACY), false);
- // legacy tmp
- checkFromFilename(new Descriptor("ja", dir, ksname, cfname, 2, Descriptor.Type.TEMP, SSTableFormat.Type.LEGACY), false);
+ checkFromFilename(new Descriptor("ja", dir, ksname, cfname, 1, SSTableFormat.Type.LEGACY), false);
// legacy secondary index
- checkFromFilename(new Descriptor("ja", dir, ksname, cfname + Directories.SECONDARY_INDEX_NAME_SEPARATOR + idxName, 3, Descriptor.Type.FINAL, SSTableFormat.Type.LEGACY), false);
+ checkFromFilename(new Descriptor("ja", dir, ksname, cfname + Directories.SECONDARY_INDEX_NAME_SEPARATOR + idxName, 3, SSTableFormat.Type.LEGACY), false);
}
private void checkFromFilename(Descriptor original, boolean skipComponent)
@@ -109,7 +104,6 @@ public class DescriptorTest
assertEquals(original.cfname, desc.cfname);
assertEquals(original.version, desc.version);
assertEquals(original.generation, desc.generation);
- assertEquals(original.type, desc.type);
if (skipComponent)
{