You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by be...@apache.org on 2015/08/17 11:20:40 UTC
[4/7] cassandra git commit: Improve transaction log under FS
corruption
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5726625a/test/unit/org/apache/cassandra/db/DirectoriesTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/DirectoriesTest.java b/test/unit/org/apache/cassandra/db/DirectoriesTest.java
index 14db2d1..e0786f9 100644
--- a/test/unit/org/apache/cassandra/db/DirectoriesTest.java
+++ b/test/unit/org/apache/cassandra/db/DirectoriesTest.java
@@ -23,7 +23,6 @@ import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
-import com.google.common.collect.Lists;
import org.apache.commons.lang3.StringUtils;
import org.junit.AfterClass;
@@ -236,7 +235,7 @@ public class DirectoriesTest
Set<File> listed;
// List all but no snapshot, backup
- lister = directories.sstableLister();
+ lister = directories.sstableLister(Directories.OnTxnErr.THROW);
listed = new HashSet<>(lister.listFiles());
for (File f : files.get(cfm.cfName))
{
@@ -247,7 +246,7 @@ public class DirectoriesTest
}
// List all but including backup (but no snapshot)
- lister = directories.sstableLister().includeBackups(true);
+ lister = directories.sstableLister(Directories.OnTxnErr.THROW).includeBackups(true);
listed = new HashSet<>(lister.listFiles());
for (File f : files.get(cfm.cfName))
{
@@ -258,7 +257,7 @@ public class DirectoriesTest
}
// Skip temporary and compacted
- lister = directories.sstableLister().skipTemporary(true);
+ lister = directories.sstableLister(Directories.OnTxnErr.THROW).skipTemporary(true);
listed = new HashSet<>(lister.listFiles());
for (File f : files.get(cfm.cfName))
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5726625a/test/unit/org/apache/cassandra/db/KeyCacheTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/KeyCacheTest.java b/test/unit/org/apache/cassandra/db/KeyCacheTest.java
index 30812d2..aedba0e 100644
--- a/test/unit/org/apache/cassandra/db/KeyCacheTest.java
+++ b/test/unit/org/apache/cassandra/db/KeyCacheTest.java
@@ -37,6 +37,7 @@ import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.Schema;
import org.apache.cassandra.db.compaction.CompactionManager;
+import org.apache.cassandra.db.lifecycle.TransactionLog;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.schema.KeyspaceParams;
@@ -163,10 +164,7 @@ public class KeyCacheTest
refs.release();
- while (ScheduledExecutors.nonPeriodicTasks.getActiveCount() + ScheduledExecutors.nonPeriodicTasks.getQueue().size() > 0)
- {
- Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);;
- }
+ TransactionLog.waitForDeletions();
// after releasing the reference this should drop to 2
assertKeyCacheSize(2, KEYSPACE1, COLUMN_FAMILY1);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5726625a/test/unit/org/apache/cassandra/db/ScrubTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/ScrubTest.java b/test/unit/org/apache/cassandra/db/ScrubTest.java
index cc4038d..8889488 100644
--- a/test/unit/org/apache/cassandra/db/ScrubTest.java
+++ b/test/unit/org/apache/cassandra/db/ScrubTest.java
@@ -43,7 +43,7 @@ import org.apache.cassandra.db.compaction.CompactionManager;
import org.apache.cassandra.db.compaction.OperationType;
import org.apache.cassandra.db.compaction.Scrubber;
import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
-import org.apache.cassandra.db.lifecycle.TransactionLogs;
+import org.apache.cassandra.db.lifecycle.TransactionLog;
import org.apache.cassandra.db.marshal.*;
import org.apache.cassandra.db.marshal.UUIDType;
import org.apache.cassandra.db.partitions.Partition;
@@ -371,7 +371,7 @@ public class ScrubTest
{
scrubber.scrub();
}
- TransactionLogs.waitForDeletions();
+ TransactionLog.waitForDeletions();
cfs.loadNewSSTables();
assertOrderedAll(cfs, 7);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5726625a/test/unit/org/apache/cassandra/db/lifecycle/HelpersTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/lifecycle/HelpersTest.java b/test/unit/org/apache/cassandra/db/lifecycle/HelpersTest.java
index e9c903e..7f1b2bd 100644
--- a/test/unit/org/apache/cassandra/db/lifecycle/HelpersTest.java
+++ b/test/unit/org/apache/cassandra/db/lifecycle/HelpersTest.java
@@ -32,14 +32,9 @@ import org.junit.Test;
import junit.framework.Assert;
import org.apache.cassandra.MockSchema;
-import org.apache.cassandra.Util;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.compaction.OperationType;
-import org.apache.cassandra.io.sstable.Descriptor;
import org.apache.cassandra.io.sstable.format.SSTableReader;
-import org.apache.cassandra.io.sstable.format.big.BigTableReader;
-import org.apache.cassandra.utils.Pair;
-import org.apache.cassandra.utils.concurrent.Refs;
import static junit.framework.Assert.assertEquals;
import static junit.framework.Assert.assertNotNull;
@@ -164,10 +159,10 @@ public class HelpersTest
public void testMarkObsolete()
{
ColumnFamilyStore cfs = MockSchema.newCFS();
- TransactionLogs txnLogs = new TransactionLogs(OperationType.UNKNOWN, cfs.metadata);
+ TransactionLog txnLogs = new TransactionLog(OperationType.UNKNOWN, cfs.metadata);
Iterable<SSTableReader> readers = Lists.newArrayList(MockSchema.sstable(1, cfs), MockSchema.sstable(2, cfs));
- List<TransactionLogs.Obsoletion> obsoletions = new ArrayList<>();
+ List<TransactionLog.Obsoletion> obsoletions = new ArrayList<>();
Assert.assertNull(Helpers.prepareForObsoletion(readers, txnLogs, obsoletions, null));
assertNotNull(obsoletions);
assertEquals(2, obsoletions.size());
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5726625a/test/unit/org/apache/cassandra/db/lifecycle/LifecycleTransactionTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/lifecycle/LifecycleTransactionTest.java b/test/unit/org/apache/cassandra/db/lifecycle/LifecycleTransactionTest.java
index a376a61..db27662 100644
--- a/test/unit/org/apache/cassandra/db/lifecycle/LifecycleTransactionTest.java
+++ b/test/unit/org/apache/cassandra/db/lifecycle/LifecycleTransactionTest.java
@@ -249,7 +249,7 @@ public class LifecycleTransactionTest extends AbstractTransactionalTest
protected TestableTransaction newTest()
{
- TransactionLogs.waitForDeletions();
+ TransactionLog.waitForDeletions();
SSTableReader.resetTidying();
return new TxnTest();
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5726625a/test/unit/org/apache/cassandra/db/lifecycle/RealTransactionsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/lifecycle/RealTransactionsTest.java b/test/unit/org/apache/cassandra/db/lifecycle/RealTransactionsTest.java
index 44f4d30..309e35a 100644
--- a/test/unit/org/apache/cassandra/db/lifecycle/RealTransactionsTest.java
+++ b/test/unit/org/apache/cassandra/db/lifecycle/RealTransactionsTest.java
@@ -89,10 +89,9 @@ public class RealTransactionsTest extends SchemaLoader
SSTableReader oldSSTable = getSSTable(cfs, 1);
LifecycleTransaction txn = cfs.getTracker().tryModify(oldSSTable, OperationType.COMPACTION);
SSTableReader newSSTable = replaceSSTable(cfs, txn, false);
- TransactionLogs.waitForDeletions();
+ TransactionLog.waitForDeletions();
- assertFiles(txn.logs().getDataFolder(), new HashSet<>(newSSTable.getAllFilePaths()));
- assertFiles(txn.logs().getLogsFolder(), Collections.<String>emptySet());
+ assertFiles(txn.log().getDataFolder(), new HashSet<>(newSSTable.getAllFilePaths()));
}
@Test
@@ -105,10 +104,9 @@ public class RealTransactionsTest extends SchemaLoader
LifecycleTransaction txn = cfs.getTracker().tryModify(oldSSTable, OperationType.COMPACTION);
replaceSSTable(cfs, txn, true);
- TransactionLogs.waitForDeletions();
+ TransactionLog.waitForDeletions();
- assertFiles(txn.logs().getDataFolder(), new HashSet<>(oldSSTable.getAllFilePaths()));
- assertFiles(txn.logs().getLogsFolder(), Collections.<String>emptySet());
+ assertFiles(txn.log().getDataFolder(), new HashSet<>(oldSSTable.getAllFilePaths()));
}
@Test
@@ -120,11 +118,6 @@ public class RealTransactionsTest extends SchemaLoader
SSTableReader ssTableReader = getSSTable(cfs, 100);
String dataFolder = cfs.getLiveSSTables().iterator().next().descriptor.directory.getPath();
- String transactionLogsFolder = StringUtils.join(dataFolder, File.separator, Directories.TRANSACTIONS_SUBDIR);
-
- assertTrue(new File(transactionLogsFolder).exists());
- assertFiles(transactionLogsFolder, Collections.<String>emptySet());
-
assertFiles(dataFolder, new HashSet<>(ssTableReader.getAllFilePaths()));
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5726625a/test/unit/org/apache/cassandra/db/lifecycle/TrackerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/lifecycle/TrackerTest.java b/test/unit/org/apache/cassandra/db/lifecycle/TrackerTest.java
index ea0d9a8..3a943c4 100644
--- a/test/unit/org/apache/cassandra/db/lifecycle/TrackerTest.java
+++ b/test/unit/org/apache/cassandra/db/lifecycle/TrackerTest.java
@@ -187,9 +187,9 @@ public class TrackerTest
public void testDropSSTables()
{
testDropSSTables(false);
- TransactionLogs.waitForDeletions();
+ TransactionLog.waitForDeletions();
testDropSSTables(true);
- TransactionLogs.waitForDeletions();
+ TransactionLog.waitForDeletions();
}
private void testDropSSTables(boolean invalidate)
@@ -203,62 +203,54 @@ public class TrackerTest
MockSchema.sstable(2, 71, true, cfs));
tracker.addInitialSSTables(copyOf(readers));
- try
+ try (LifecycleTransaction txn = tracker.tryModify(readers.get(0), OperationType.COMPACTION))
{
- // TransactionLogs.pauseDeletions(true);
- try (LifecycleTransaction txn = tracker.tryModify(readers.get(0), OperationType.COMPACTION))
+ if (invalidate)
{
- if (invalidate)
- {
- cfs.invalidate(false);
- }
- else
- {
- tracker.dropSSTables();
- TransactionLogs.waitForDeletions();
- }
- Assert.assertEquals(9, cfs.metric.totalDiskSpaceUsed.getCount());
- Assert.assertEquals(9, cfs.metric.liveDiskSpaceUsed.getCount());
- Assert.assertEquals(1, tracker.getView().sstables.size());
+ cfs.invalidate(false);
}
- if (!invalidate)
+ else
{
- Assert.assertEquals(1, tracker.getView().sstables.size());
- Assert.assertEquals(readers.get(0), Iterables.getFirst(tracker.getView().sstables, null));
- Assert.assertEquals(1, readers.get(0).selfRef().globalCount());
- Assert.assertFalse(readers.get(0).isMarkedCompacted());
- for (SSTableReader reader : readers.subList(1, 3))
- {
- Assert.assertEquals(0, reader.selfRef().globalCount());
- Assert.assertTrue(reader.isMarkedCompacted());
- }
-
- Assert.assertNull(tracker.dropSSTables(reader -> reader != readers.get(0), OperationType.UNKNOWN, null));
-
- Assert.assertEquals(1, tracker.getView().sstables.size());
- Assert.assertEquals(3, listener.received.size());
- Assert.assertEquals(tracker, listener.senders.get(0));
- Assert.assertTrue(listener.received.get(0) instanceof SSTableDeletingNotification);
- Assert.assertTrue(listener.received.get(1) instanceof SSTableDeletingNotification);
- Assert.assertTrue(listener.received.get(2) instanceof SSTableListChangedNotification);
- Assert.assertEquals(readers.get(1), ((SSTableDeletingNotification) listener.received.get(0)).deleting);
- Assert.assertEquals(readers.get(2), ((SSTableDeletingNotification)listener.received.get(1)).deleting);
- Assert.assertEquals(2, ((SSTableListChangedNotification) listener.received.get(2)).removed.size());
- Assert.assertEquals(0, ((SSTableListChangedNotification) listener.received.get(2)).added.size());
- Assert.assertEquals(9, cfs.metric.liveDiskSpaceUsed.getCount());
- readers.get(0).selfRef().release();
+ tracker.dropSSTables();
+ TransactionLog.waitForDeletions();
}
- else
+ Assert.assertEquals(9, cfs.metric.totalDiskSpaceUsed.getCount());
+ Assert.assertEquals(9, cfs.metric.liveDiskSpaceUsed.getCount());
+ Assert.assertEquals(1, tracker.getView().sstables.size());
+ }
+ if (!invalidate)
+ {
+ Assert.assertEquals(1, tracker.getView().sstables.size());
+ Assert.assertEquals(readers.get(0), Iterables.getFirst(tracker.getView().sstables, null));
+ Assert.assertEquals(1, readers.get(0).selfRef().globalCount());
+ Assert.assertFalse(readers.get(0).isMarkedCompacted());
+ for (SSTableReader reader : readers.subList(1, 3))
{
- Assert.assertEquals(0, tracker.getView().sstables.size());
- Assert.assertEquals(0, cfs.metric.liveDiskSpaceUsed.getCount());
- for (SSTableReader reader : readers)
- Assert.assertTrue(reader.isMarkedCompacted());
+ Assert.assertEquals(0, reader.selfRef().globalCount());
+ Assert.assertTrue(reader.isMarkedCompacted());
}
+
+ Assert.assertNull(tracker.dropSSTables(reader -> reader != readers.get(0), OperationType.UNKNOWN, null));
+
+ Assert.assertEquals(1, tracker.getView().sstables.size());
+ Assert.assertEquals(3, listener.received.size());
+ Assert.assertEquals(tracker, listener.senders.get(0));
+ Assert.assertTrue(listener.received.get(0) instanceof SSTableDeletingNotification);
+ Assert.assertTrue(listener.received.get(1) instanceof SSTableDeletingNotification);
+ Assert.assertTrue(listener.received.get(2) instanceof SSTableListChangedNotification);
+ Assert.assertEquals(readers.get(1), ((SSTableDeletingNotification) listener.received.get(0)).deleting);
+ Assert.assertEquals(readers.get(2), ((SSTableDeletingNotification)listener.received.get(1)).deleting);
+ Assert.assertEquals(2, ((SSTableListChangedNotification) listener.received.get(2)).removed.size());
+ Assert.assertEquals(0, ((SSTableListChangedNotification) listener.received.get(2)).added.size());
+ Assert.assertEquals(9, cfs.metric.liveDiskSpaceUsed.getCount());
+ readers.get(0).selfRef().release();
}
- finally
+ else
{
- // TransactionLogs.pauseDeletions(false);
+ Assert.assertEquals(0, tracker.getView().sstables.size());
+ Assert.assertEquals(0, cfs.metric.liveDiskSpaceUsed.getCount());
+ for (SSTableReader reader : readers)
+ Assert.assertTrue(reader.isMarkedCompacted());
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5726625a/test/unit/org/apache/cassandra/db/lifecycle/TransactionLogTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/lifecycle/TransactionLogTest.java b/test/unit/org/apache/cassandra/db/lifecycle/TransactionLogTest.java
new file mode 100644
index 0000000..7739163
--- /dev/null
+++ b/test/unit/org/apache/cassandra/db/lifecycle/TransactionLogTest.java
@@ -0,0 +1,791 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.lifecycle;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.util.*;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Sets;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import static junit.framework.Assert.assertNotNull;
+import static junit.framework.Assert.assertNull;
+import static junit.framework.Assert.fail;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import junit.framework.Assert;
+import org.apache.cassandra.MockSchema;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Directories;
+import org.apache.cassandra.db.SerializationHeader;
+import org.apache.cassandra.db.compaction.*;
+import org.apache.cassandra.io.sstable.*;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
+import org.apache.cassandra.io.sstable.metadata.MetadataType;
+import org.apache.cassandra.io.sstable.metadata.StatsMetadata;
+import org.apache.cassandra.io.util.BufferedSegmentedFile;
+import org.apache.cassandra.io.util.ChannelProxy;
+import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.io.util.RandomAccessReader;
+import org.apache.cassandra.io.util.SegmentedFile;
+import org.apache.cassandra.utils.AlwaysPresentFilter;
+import org.apache.cassandra.utils.concurrent.AbstractTransactionalTest;
+import org.apache.cassandra.utils.concurrent.Transactional;
+
+public class TransactionLogTest extends AbstractTransactionalTest
+{
+ private static final String KEYSPACE = "TransactionLogsTest";
+
+ @BeforeClass
+ public static void setUp()
+ {
+ MockSchema.cleanup();
+ }
+
+ protected AbstractTransactionalTest.TestableTransaction newTest() throws Exception
+ {
+ TransactionLog.waitForDeletions();
+ SSTableReader.resetTidying();
+ return new TxnTest();
+ }
+
+ private static final class TxnTest extends TestableTransaction
+ {
+ private final static class Transaction extends Transactional.AbstractTransactional implements Transactional
+ {
+ final ColumnFamilyStore cfs;
+ final TransactionLog txnLogs;
+ final SSTableReader sstableOld;
+ final SSTableReader sstableNew;
+ final TransactionLog.SSTableTidier tidier;
+
+ public Transaction(ColumnFamilyStore cfs, TransactionLog txnLogs) throws IOException
+ {
+ this.cfs = cfs;
+ this.txnLogs = txnLogs;
+ this.sstableOld = sstable(cfs, 0, 128);
+ this.sstableNew = sstable(cfs, 1, 128);
+
+ assertNotNull(txnLogs);
+ assertNotNull(txnLogs.getId());
+ Assert.assertEquals(OperationType.COMPACTION, txnLogs.getType());
+
+ txnLogs.trackNew(sstableNew);
+ tidier = txnLogs.obsoleted(sstableOld);
+ assertNotNull(tidier);
+ }
+
+ protected Throwable doCommit(Throwable accumulate)
+ {
+ sstableOld.markObsolete(tidier);
+ sstableOld.selfRef().release();
+ TransactionLog.waitForDeletions();
+
+ Throwable ret = txnLogs.commit(accumulate);
+
+ sstableNew.selfRef().release();
+ return ret;
+ }
+
+ protected Throwable doAbort(Throwable accumulate)
+ {
+ tidier.abort();
+ TransactionLog.waitForDeletions();
+
+ Throwable ret = txnLogs.abort(accumulate);
+
+ sstableNew.selfRef().release();
+ sstableOld.selfRef().release();
+ return ret;
+ }
+
+ protected void doPrepare()
+ {
+ txnLogs.prepareToCommit();
+ }
+
+ protected void assertInProgress() throws Exception
+ {
+ assertFiles(txnLogs.getDataFolder(), Sets.newHashSet(Iterables.concat(sstableNew.getAllFilePaths(),
+ sstableOld.getAllFilePaths(),
+ Collections.singleton(txnLogs.getData().getLogFile().file.getPath()))));
+ }
+
+ protected void assertPrepared() throws Exception
+ {
+ }
+
+ protected void assertAborted() throws Exception
+ {
+ assertFiles(txnLogs.getDataFolder(), new HashSet<>(sstableOld.getAllFilePaths()));
+ }
+
+ protected void assertCommitted() throws Exception
+ {
+ assertFiles(txnLogs.getDataFolder(), new HashSet<>(sstableNew.getAllFilePaths()));
+ }
+ }
+
+ final Transaction txn;
+
+ private TxnTest() throws IOException
+ {
+ this(MockSchema.newCFS(KEYSPACE));
+ }
+
+ private TxnTest(ColumnFamilyStore cfs) throws IOException
+ {
+ this(cfs, new TransactionLog(OperationType.COMPACTION, cfs.metadata));
+ }
+
+ private TxnTest(ColumnFamilyStore cfs, TransactionLog txnLogs) throws IOException
+ {
+ this(new Transaction(cfs, txnLogs));
+ }
+
+ private TxnTest(Transaction txn)
+ {
+ super(txn);
+ this.txn = txn;
+ }
+
+ protected void assertInProgress() throws Exception
+ {
+ txn.assertInProgress();
+ }
+
+ protected void assertPrepared() throws Exception
+ {
+ txn.assertPrepared();
+ }
+
+ protected void assertAborted() throws Exception
+ {
+ txn.assertAborted();
+ }
+
+ protected void assertCommitted() throws Exception
+ {
+ txn.assertCommitted();
+ }
+ }
+
+ @Test
+ public void testUntrack() throws Throwable
+ {
+ ColumnFamilyStore cfs = MockSchema.newCFS(KEYSPACE);
+ SSTableReader sstableNew = sstable(cfs, 1, 128);
+
+ // complete a transaction without keep the new files since they were untracked
+ TransactionLog transactionLog = new TransactionLog(OperationType.COMPACTION, cfs.metadata);
+ assertNotNull(transactionLog);
+
+ transactionLog.trackNew(sstableNew);
+ transactionLog.untrackNew(sstableNew);
+
+ transactionLog.finish();
+
+ sstableNew.selfRef().release();
+ Thread.sleep(1);
+ TransactionLog.waitForDeletions();
+
+ assertFiles(transactionLog.getDataFolder(), Collections.<String>emptySet());
+ }
+
+ @Test
+ public void testCommitSameDesc() throws Throwable
+ {
+ ColumnFamilyStore cfs = MockSchema.newCFS(KEYSPACE);
+ SSTableReader sstableOld1 = sstable(cfs, 0, 128);
+ SSTableReader sstableOld2 = sstable(cfs, 0, 256);
+ SSTableReader sstableNew = sstable(cfs, 1, 128);
+
+ TransactionLog transactionLog = new TransactionLog(OperationType.COMPACTION, cfs.metadata);
+ assertNotNull(transactionLog);
+
+ transactionLog.trackNew(sstableNew);
+
+ sstableOld1.setReplaced();
+
+ TransactionLog.SSTableTidier tidier = transactionLog.obsoleted(sstableOld2);
+ assertNotNull(tidier);
+
+ transactionLog.finish();
+
+ sstableOld2.markObsolete(tidier);
+
+ sstableOld1.selfRef().release();
+ sstableOld2.selfRef().release();
+
+ assertFiles(transactionLog.getDataFolder(), new HashSet<>(sstableNew.getAllFilePaths()));
+
+ sstableNew.selfRef().release();
+ }
+
+ @Test
+ public void testCommitOnlyNew() throws Throwable
+ {
+ ColumnFamilyStore cfs = MockSchema.newCFS(KEYSPACE);
+ SSTableReader sstable = sstable(cfs, 0, 128);
+
+ TransactionLog transactionLog = new TransactionLog(OperationType.COMPACTION, cfs.metadata);
+ assertNotNull(transactionLog);
+
+ transactionLog.trackNew(sstable);
+ transactionLog.finish();
+
+ assertFiles(transactionLog.getDataFolder(), new HashSet<>(sstable.getAllFilePaths()));
+
+ sstable.selfRef().release();
+ }
+
+ @Test
+ public void testCommitOnlyOld() throws Throwable
+ {
+ ColumnFamilyStore cfs = MockSchema.newCFS(KEYSPACE);
+ SSTableReader sstable = sstable(cfs, 0, 128);
+
+ TransactionLog transactionLog = new TransactionLog(OperationType.COMPACTION, cfs.metadata);
+ assertNotNull(transactionLog);
+
+ TransactionLog.SSTableTidier tidier = transactionLog.obsoleted(sstable);
+ assertNotNull(tidier);
+
+ transactionLog.finish();
+ sstable.markObsolete(tidier);
+ sstable.selfRef().release();
+
+ assertFiles(transactionLog.getDataFolder(), new HashSet<>());
+ }
+
+ @Test
+ public void testAbortOnlyNew() throws Throwable
+ {
+ ColumnFamilyStore cfs = MockSchema.newCFS(KEYSPACE);
+ SSTableReader sstable = sstable(cfs, 0, 128);
+
+ TransactionLog transactionLog = new TransactionLog(OperationType.COMPACTION, cfs.metadata);
+ assertNotNull(transactionLog);
+
+ transactionLog.trackNew(sstable);
+ transactionLog.abort();
+
+ sstable.selfRef().release();
+
+ assertFiles(transactionLog.getDataFolder(), new HashSet<>());
+ }
+
+ @Test
+ public void testAbortOnlyOld() throws Throwable
+ {
+ ColumnFamilyStore cfs = MockSchema.newCFS(KEYSPACE);
+ SSTableReader sstable = sstable(cfs, 0, 128);
+
+ TransactionLog transactionLog = new TransactionLog(OperationType.COMPACTION, cfs.metadata);
+ assertNotNull(transactionLog);
+
+ TransactionLog.SSTableTidier tidier = transactionLog.obsoleted(sstable);
+ assertNotNull(tidier);
+
+ tidier.abort();
+ transactionLog.abort();
+
+ sstable.selfRef().release();
+
+ assertFiles(transactionLog.getDataFolder(), new HashSet<>(sstable.getAllFilePaths()));
+ }
+
+ @Test
+ public void testRemoveUnfinishedLeftovers_abort() throws Throwable
+ {
+ ColumnFamilyStore cfs = MockSchema.newCFS(KEYSPACE);
+ SSTableReader sstableOld = sstable(cfs, 0, 128);
+ SSTableReader sstableNew = sstable(cfs, 1, 128);
+
+ // simulate tracking sstables with a failed transaction (new log file NOT deleted)
+ TransactionLog transactionLog = new TransactionLog(OperationType.COMPACTION, cfs.metadata);
+ assertNotNull(transactionLog);
+
+ transactionLog.trackNew(sstableNew);
+ TransactionLog.SSTableTidier tidier = transactionLog.obsoleted(sstableOld);
+
+ Set<File> tmpFiles = Sets.newHashSet(Iterables.concat(sstableNew.getAllFilePaths().stream().map(p -> new File(p)).collect(Collectors.toList()),
+ Collections.singleton(transactionLog.getData().getLogFile().file)));
+
+ sstableNew.selfRef().release();
+ sstableOld.selfRef().release();
+
+ Assert.assertEquals(tmpFiles, TransactionLog.getTemporaryFiles(cfs.metadata, sstableNew.descriptor.directory));
+
+ // normally called at startup
+ TransactionLog.removeUnfinishedLeftovers(cfs.metadata);
+
+ // sstableOld should be only table left
+ Directories directories = new Directories(cfs.metadata);
+ Map<Descriptor, Set<Component>> sstables = directories.sstableLister(Directories.OnTxnErr.THROW).list();
+ assertEquals(1, sstables.size());
+
+ assertFiles(transactionLog.getDataFolder(), new HashSet<>(sstableOld.getAllFilePaths()));
+
+ tidier.run();
+
+ // complete the transaction to avoid LEAK errors
+ transactionLog.close();
+ }
+
+ @Test
+ public void testRemoveUnfinishedLeftovers_commit() throws Throwable
+ {
+ ColumnFamilyStore cfs = MockSchema.newCFS(KEYSPACE);
+ SSTableReader sstableOld = sstable(cfs, 0, 128);
+ SSTableReader sstableNew = sstable(cfs, 1, 128);
+
+ // simulate tracking sstables with a committed transaction (new log file deleted)
+ TransactionLog transactionLog = new TransactionLog(OperationType.COMPACTION, cfs.metadata);
+ assertNotNull(transactionLog);
+
+ transactionLog.trackNew(sstableNew);
+ TransactionLog.SSTableTidier tidier = transactionLog.obsoleted(sstableOld);
+
+ //Fake a commit
+ transactionLog.getData().getLogFile().commit();
+
+ Set<File> tmpFiles = Sets.newHashSet(Iterables.concat(sstableOld.getAllFilePaths().stream().map(p -> new File(p)).collect(Collectors.toList()),
+ Collections.singleton(transactionLog.getData().getLogFile().file)));
+
+ sstableNew.selfRef().release();
+ sstableOld.selfRef().release();
+
+ Assert.assertEquals(tmpFiles, TransactionLog.getTemporaryFiles(cfs.metadata, sstableOld.descriptor.directory));
+
+ // normally called at startup
+ TransactionLog.removeUnfinishedLeftovers(cfs.metadata);
+
+ // sstableNew should be only table left
+ Directories directories = new Directories(cfs.metadata);
+ Map<Descriptor, Set<Component>> sstables = directories.sstableLister(Directories.OnTxnErr.THROW).list();
+ assertEquals(1, sstables.size());
+
+ assertFiles(transactionLog.getDataFolder(), new HashSet<>(sstableNew.getAllFilePaths()));
+
+ tidier.run();
+
+ // complete the transaction to avoid LEAK errors
+ assertNull(transactionLog.complete(null));
+ }
+
+ @Test
+ public void testGetTemporaryFiles() throws IOException
+ {
+ ColumnFamilyStore cfs = MockSchema.newCFS(KEYSPACE);
+ SSTableReader sstable1 = sstable(cfs, 0, 128);
+
+ File dataFolder = sstable1.descriptor.directory;
+
+ Set<File> tmpFiles = TransactionLog.getTemporaryFiles(cfs.metadata, dataFolder);
+ assertNotNull(tmpFiles);
+ assertEquals(0, tmpFiles.size());
+
+ TransactionLog transactionLog = new TransactionLog(OperationType.WRITE, cfs.metadata);
+ Directories directories = new Directories(cfs.metadata);
+
+ File[] beforeSecondSSTable = dataFolder.listFiles(pathname -> !pathname.isDirectory());
+
+ SSTableReader sstable2 = sstable(cfs, 1, 128);
+ transactionLog.trackNew(sstable2);
+
+ Map<Descriptor, Set<Component>> sstables = directories.sstableLister(Directories.OnTxnErr.THROW).list();
+ assertEquals(2, sstables.size());
+
+ // this should contain sstable1, sstable2 and the transaction log file
+ File[] afterSecondSSTable = dataFolder.listFiles(pathname -> !pathname.isDirectory());
+
+ int numNewFiles = afterSecondSSTable.length - beforeSecondSSTable.length;
+ assertEquals(numNewFiles - 1, sstable2.getAllFilePaths().size()); // new files except for transaction log file
+
+ tmpFiles = TransactionLog.getTemporaryFiles(cfs.metadata, dataFolder);
+ assertNotNull(tmpFiles);
+ assertEquals(numNewFiles, tmpFiles.size());
+
+ File ssTable2DataFile = new File(sstable2.descriptor.filenameFor(Component.DATA));
+ File ssTable2IndexFile = new File(sstable2.descriptor.filenameFor(Component.PRIMARY_INDEX));
+
+ assertTrue(tmpFiles.contains(ssTable2DataFile));
+ assertTrue(tmpFiles.contains(ssTable2IndexFile));
+
+ List<File> files = directories.sstableLister(Directories.OnTxnErr.THROW).listFiles();
+ List<File> filesNoTmp = directories.sstableLister(Directories.OnTxnErr.THROW).skipTemporary(true).listFiles();
+ assertNotNull(files);
+ assertNotNull(filesNoTmp);
+
+ assertTrue(files.contains(ssTable2DataFile));
+ assertTrue(files.contains(ssTable2IndexFile));
+
+ assertFalse(filesNoTmp.contains(ssTable2DataFile));
+ assertFalse(filesNoTmp.contains(ssTable2IndexFile));
+
+ transactionLog.finish();
+
+ //Now it should be empty since the transaction has finished
+ tmpFiles = TransactionLog.getTemporaryFiles(cfs.metadata, dataFolder);
+ assertNotNull(tmpFiles);
+ assertEquals(0, tmpFiles.size());
+
+ filesNoTmp = directories.sstableLister(Directories.OnTxnErr.THROW).skipTemporary(true).listFiles();
+ assertNotNull(filesNoTmp);
+ assertTrue(filesNoTmp.contains(ssTable2DataFile));
+ assertTrue(filesNoTmp.contains(ssTable2IndexFile));
+
+ sstable1.selfRef().release();
+ sstable2.selfRef().release();
+ }
+
+ @Test
+ public void testWrongChecksumLastLine() throws IOException
+ {
+ testCorruptRecord((t, s) ->
+ { // Fake a commit with invalid checksum
+ FileUtils.append(t.getData().getLogFile().file,
+ String.format("commit:[%d,0,0][%d]",
+ System.currentTimeMillis(),
+ 12345678L));
+ },
+ true);
+ }
+
+ @Test
+ public void testWrongChecksumSecondFromLastLine() throws IOException
+ {
+ testCorruptRecord((t, s) ->
+ { // Fake two lines with invalid checksum
+ FileUtils.append(t.getData().getLogFile().file,
+ String.format("add:[ma-3-big,%d,4][%d]",
+ System.currentTimeMillis(),
+ 12345678L));
+
+ FileUtils.append(t.getData().getLogFile().file,
+ String.format("commit:[%d,0,0][%d]",
+ System.currentTimeMillis(),
+ 12345678L));
+ },
+ false);
+ }
+
+ @Test
+ public void testWrongChecksumLastLineMissingFile() throws IOException
+ {
+ testCorruptRecord((t, s) ->
+ { // Fake a commit with invalid checksum and also delete one of the old files
+ for (String filePath : s.getAllFilePaths())
+ {
+ if (filePath.endsWith("Data.db"))
+ {
+ FileUtils.delete(filePath);
+ break;
+ }
+ }
+
+ FileUtils.append(t.getData().getLogFile().file,
+ String.format("commit:[%d,0,0][%d]",
+ System.currentTimeMillis(),
+ 12345678L));
+ },
+ false);
+ }
+
+ @Test
+ public void testWrongChecksumLastLineWrongRecordFormat() throws IOException
+ {
+ testCorruptRecord((t, s) ->
+ { // Fake a commit with invalid checksum and a wrong record format (extra spaces)
+ FileUtils.append(t.getData().getLogFile().file,
+ String.format("commit:[%d ,0 ,0 ][%d]",
+ System.currentTimeMillis(),
+ 12345678L));
+ },
+ true);
+ }
+
+ @Test
+ public void testMissingChecksumLastLine() throws IOException
+ {
+ testCorruptRecord((t, s) ->
+ {
+ // Fake a commit without a checksum
+ FileUtils.append(t.getData().getLogFile().file,
+ String.format("commit:[%d,0,0]",
+ System.currentTimeMillis()));
+ },
+ true);
+ }
+
+ @Test
+ public void testMissingChecksumSecondFromLastLine() throws IOException
+ {
+ testCorruptRecord((t, s) ->
+ { // Fake two lines without a checksum
+ FileUtils.append(t.getData().getLogFile().file,
+ String.format("add:[ma-3-big,%d,4]",
+ System.currentTimeMillis()));
+
+ FileUtils.append(t.getData().getLogFile().file,
+ String.format("commit:[%d,0,0]",
+ System.currentTimeMillis()));
+ },
+ false);
+ }
+
+ private void testCorruptRecord(BiConsumer<TransactionLog, SSTableReader> modifier, boolean isRecoverable) throws IOException
+ {
+ ColumnFamilyStore cfs = MockSchema.newCFS(KEYSPACE);
+ SSTableReader sstableOld = sstable(cfs, 0, 128);
+ SSTableReader sstableNew = sstable(cfs, 1, 128);
+
+ File dataFolder = sstableOld.descriptor.directory;
+
+ // simulate tracking sstables with a committed transaction except the checksum will be wrong
+ TransactionLog transactionLog = new TransactionLog(OperationType.COMPACTION, cfs.metadata);
+ assertNotNull(transactionLog);
+
+ transactionLog.trackNew(sstableNew);
+ transactionLog.obsoleted(sstableOld);
+
+ //Modify the transaction log in some way
+ modifier.accept(transactionLog, sstableOld);
+
+ String txnFilePath = transactionLog.getData().getLogFile().file.getPath();
+
+ transactionLog.complete(null);
+
+ sstableOld.selfRef().release();
+ sstableNew.selfRef().release();
+
+ if (isRecoverable)
+ { // the corruption is recoverable, we assume there is a commit record
+
+ //This should return the old files and the tx log
+ assertFiles(Iterables.concat(sstableOld.getAllFilePaths(), Collections.singleton(txnFilePath)),
+ TransactionLog.getTemporaryFiles(cfs.metadata, dataFolder));
+
+ //This should remove old files
+ TransactionLog.removeUnfinishedLeftovers(cfs.metadata);
+
+ assertFiles(dataFolder.getPath(), Sets.newHashSet(sstableNew.getAllFilePaths()));
+ }
+ else
+ { // if an intermediate line was modified, we cannot tell,
+ // it should just throw and handle the exception with a log message
+
+ //This should not return any files
+ assertEquals(Collections.emptyList(), new TransactionLog.FileLister(dataFolder.toPath(),
+ (file, type) -> type != Directories.FileType.FINAL,
+ Directories.OnTxnErr.IGNORE).list());
+
+ try
+ {
+ //This should throw a RuntimeException
+ new TransactionLog.FileLister(dataFolder.toPath(),
+ (file, type) -> type != Directories.FileType.FINAL,
+ Directories.OnTxnErr.THROW).list();
+ fail("Expected exception");
+ }
+ catch (RuntimeException ex)
+ {
+ // pass
+ ex.printStackTrace();
+ }
+
+ //This should not remove any files
+ TransactionLog.removeUnfinishedLeftovers(cfs.metadata);
+
+ assertFiles(dataFolder.getPath(), Sets.newHashSet(Iterables.concat(sstableNew.getAllFilePaths(),
+ sstableOld.getAllFilePaths(),
+ Collections.singleton(txnFilePath))),
+ true);
+ }
+ }
+
+ @Test
+ public void testObsoletedDataFileUpdateTimeChanged() throws IOException
+ {
+ testObsoletedFilesChanged(sstable ->
+ {
+ // increase the modification time of the Data file
+ for (String filePath : sstable.getAllFilePaths())
+ {
+ if (filePath.endsWith("Data.db"))
+ assertTrue(new File(filePath).setLastModified(System.currentTimeMillis() + 60000)); //one minute later
+ }
+ });
+ }
+
+ private void testObsoletedFilesChanged(Consumer<SSTableReader> modifier) throws IOException
+ {
+ ColumnFamilyStore cfs = MockSchema.newCFS(KEYSPACE);
+ SSTableReader sstableOld = sstable(cfs, 0, 128);
+ SSTableReader sstableNew = sstable(cfs, 1, 128);
+
+ // simulate tracking sstables with a committed transaction except the checksum will be wrong
+ TransactionLog transactionLog = new TransactionLog(OperationType.COMPACTION, cfs.metadata);
+ assertNotNull(transactionLog);
+
+ transactionLog.trackNew(sstableNew);
+ /*TransactionLog.SSTableTidier tidier =*/ transactionLog.obsoleted(sstableOld);
+
+ //modify the old sstable files
+ modifier.accept(sstableOld);
+
+ //Fake a commit
+ transactionLog.getData().getLogFile().commit();
+
+ //This should not remove the old files
+ TransactionLog.removeUnfinishedLeftovers(cfs.metadata);
+
+ assertFiles(transactionLog.getDataFolder(), Sets.newHashSet(Iterables.concat(
+ sstableNew.getAllFilePaths(),
+ sstableOld.getAllFilePaths(),
+ Collections.singleton(transactionLog.getData().getLogFile().file.getPath()))));
+
+ sstableOld.selfRef().release();
+ sstableNew.selfRef().release();
+
+ // complete the transaction to avoid LEAK errors
+ assertNull(transactionLog.complete(null));
+
+ assertFiles(transactionLog.getDataFolder(), Sets.newHashSet(Iterables.concat(
+ sstableNew.getAllFilePaths(),
+ sstableOld.getAllFilePaths(),
+ Collections.singleton(transactionLog.getData().getLogFile().file.getPath()))));
+ }
+
+ @Test
+ public void testGetTemporaryFilesSafeAfterObsoletion() throws Throwable
+ {
+ ColumnFamilyStore cfs = MockSchema.newCFS(KEYSPACE);
+ SSTableReader sstable = sstable(cfs, 0, 128);
+ File dataFolder = sstable.descriptor.directory;
+
+ TransactionLog transactionLogs = new TransactionLog(OperationType.COMPACTION, cfs.metadata);
+ assertNotNull(transactionLogs);
+
+ TransactionLog.SSTableTidier tidier = transactionLogs.obsoleted(sstable);
+
+ transactionLogs.finish();
+ sstable.markObsolete(tidier);
+ sstable.selfRef().release();
+
+ for (int i = 0; i < 1000; i++)
+ {
+ // This should race with the asynchronous deletion of txn log files
+ // It doesn't matter what it returns but it should not throw
+ TransactionLog.getTemporaryFiles(cfs.metadata, dataFolder);
+ }
+ }
+
+ private static SSTableReader sstable(ColumnFamilyStore cfs, int generation, int size) throws IOException
+ {
+ Directories dir = new Directories(cfs.metadata);
+ Descriptor descriptor = new Descriptor(dir.getDirectoryForNewSSTables(), cfs.keyspace.getName(), cfs.getColumnFamilyName(), generation);
+ Set<Component> components = ImmutableSet.of(Component.DATA, Component.PRIMARY_INDEX, Component.FILTER, Component.TOC);
+ for (Component component : components)
+ {
+ File file = new File(descriptor.filenameFor(component));
+ file.createNewFile();
+ try (RandomAccessFile raf = new RandomAccessFile(file, "rw"))
+ {
+ raf.setLength(size);
+ }
+ }
+
+ SegmentedFile dFile = new BufferedSegmentedFile(new ChannelProxy(new File(descriptor.filenameFor(Component.DATA))), RandomAccessReader.DEFAULT_BUFFER_SIZE, 0);
+ SegmentedFile iFile = new BufferedSegmentedFile(new ChannelProxy(new File(descriptor.filenameFor(Component.PRIMARY_INDEX))), RandomAccessReader.DEFAULT_BUFFER_SIZE, 0);
+
+ SerializationHeader header = SerializationHeader.make(cfs.metadata, Collections.EMPTY_LIST);
+ StatsMetadata metadata = (StatsMetadata) new MetadataCollector(cfs.metadata.comparator)
+ .finalizeMetadata(cfs.metadata.partitioner.getClass().getCanonicalName(), 0.01f, -1, header)
+ .get(MetadataType.STATS);
+ SSTableReader reader = SSTableReader.internalOpen(descriptor,
+ components,
+ cfs.metadata,
+ dFile,
+ iFile,
+ MockSchema.indexSummary.sharedCopy(),
+ new AlwaysPresentFilter(),
+ 1L,
+ metadata,
+ SSTableReader.OpenReason.NORMAL,
+ header);
+ reader.first = reader.last = MockSchema.readerBounds(generation);
+ return reader;
+ }
+
+ private static void assertFiles(String dirPath, Set<String> expectedFiles)
+ {
+ assertFiles(dirPath, expectedFiles, false);
+ }
+
+ private static void assertFiles(String dirPath, Set<String> expectedFiles, boolean excludeNonExistingFiles)
+ {
+ TransactionLog.waitForDeletions();
+
+ File dir = new File(dirPath);
+ for (File file : dir.listFiles())
+ {
+ if (file.isDirectory())
+ continue;
+
+ String filePath = file.getPath();
+ assertTrue(filePath, expectedFiles.contains(filePath));
+ expectedFiles.remove(filePath);
+ }
+
+ if (excludeNonExistingFiles)
+ {
+ for (String filePath : expectedFiles)
+ {
+ File file = new File(filePath);
+ if (!file.exists())
+ expectedFiles.remove(filePath);
+ }
+ }
+
+ assertTrue(expectedFiles.toString(), expectedFiles.isEmpty());
+ }
+
+ private static void assertFiles(Iterable<String> filePaths, Set<File> expectedFiles)
+ {
+ for (String filePath : filePaths)
+ {
+ File file = new File(filePath);
+ assertTrue(filePath, expectedFiles.contains(file));
+ expectedFiles.remove(file);
+ }
+
+ assertTrue(expectedFiles.isEmpty());
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5726625a/test/unit/org/apache/cassandra/db/lifecycle/TransactionLogsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/lifecycle/TransactionLogsTest.java b/test/unit/org/apache/cassandra/db/lifecycle/TransactionLogsTest.java
deleted file mode 100644
index 991eed3..0000000
--- a/test/unit/org/apache/cassandra/db/lifecycle/TransactionLogsTest.java
+++ /dev/null
@@ -1,581 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db.lifecycle;
-
-import java.io.File;
-import java.io.IOException;
-import java.io.RandomAccessFile;
-import java.nio.file.Files;
-import java.nio.file.StandardCopyOption;
-import java.util.*;
-
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Sets;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import static junit.framework.Assert.assertNotNull;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import junit.framework.Assert;
-import org.apache.cassandra.MockSchema;
-import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.db.Directories;
-import org.apache.cassandra.db.SerializationHeader;
-import org.apache.cassandra.db.compaction.*;
-import org.apache.cassandra.io.sstable.*;
-import org.apache.cassandra.io.sstable.format.SSTableReader;
-import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
-import org.apache.cassandra.io.sstable.metadata.MetadataType;
-import org.apache.cassandra.io.sstable.metadata.StatsMetadata;
-import org.apache.cassandra.io.util.BufferedSegmentedFile;
-import org.apache.cassandra.io.util.ChannelProxy;
-import org.apache.cassandra.io.util.RandomAccessReader;
-import org.apache.cassandra.io.util.SegmentedFile;
-import org.apache.cassandra.utils.AlwaysPresentFilter;
-import org.apache.cassandra.utils.concurrent.AbstractTransactionalTest;
-import org.apache.cassandra.utils.concurrent.Transactional;
-
-public class TransactionLogsTest extends AbstractTransactionalTest
-{
- private static final String KEYSPACE = "TransactionLogsTest";
-
- @BeforeClass
- public static void setUp()
- {
- MockSchema.cleanup();
- }
-
- protected AbstractTransactionalTest.TestableTransaction newTest() throws Exception
- {
- TransactionLogs.waitForDeletions();
- SSTableReader.resetTidying();
- return new TxnTest();
- }
-
- private static final class TxnTest extends TestableTransaction
- {
- private final static class Transaction extends Transactional.AbstractTransactional implements Transactional
- {
- final ColumnFamilyStore cfs;
- final TransactionLogs txnLogs;
- final SSTableReader sstableOld;
- final SSTableReader sstableNew;
- final TransactionLogs.SSTableTidier tidier;
-
- public Transaction(ColumnFamilyStore cfs, TransactionLogs txnLogs) throws IOException
- {
- this.cfs = cfs;
- this.txnLogs = txnLogs;
- this.sstableOld = sstable(cfs, 0, 128);
- this.sstableNew = sstable(cfs, 1, 128);
-
- assertNotNull(txnLogs);
- assertNotNull(txnLogs.getId());
- Assert.assertEquals(OperationType.COMPACTION, txnLogs.getType());
-
- txnLogs.trackNew(sstableNew);
- tidier = txnLogs.obsoleted(sstableOld);
- assertNotNull(tidier);
- }
-
- protected Throwable doCommit(Throwable accumulate)
- {
- sstableOld.markObsolete(tidier);
- sstableOld.selfRef().release();
- TransactionLogs.waitForDeletions();
-
- Throwable ret = txnLogs.commit(accumulate);
-
- sstableNew.selfRef().release();
- return ret;
- }
-
- protected Throwable doAbort(Throwable accumulate)
- {
- tidier.abort();
- TransactionLogs.waitForDeletions();
-
- Throwable ret = txnLogs.abort(accumulate);
-
- sstableNew.selfRef().release();
- sstableOld.selfRef().release();
- return ret;
- }
-
- protected void doPrepare()
- {
- txnLogs.prepareToCommit();
- }
-
- protected void assertInProgress() throws Exception
- {
- assertFiles(txnLogs.getDataFolder(), Sets.newHashSet(Iterables.concat(sstableNew.getAllFilePaths(),
- sstableOld.getAllFilePaths())));
- assertFiles(txnLogs.getLogsFolder(), Sets.newHashSet(txnLogs.getData().oldLog().file.getPath(),
- txnLogs.getData().newLog().file.getPath()));
- assertEquals(2, TransactionLogs.getLogFiles(cfs.metadata).size());
- }
-
- protected void assertPrepared() throws Exception
- {
- }
-
- protected void assertAborted() throws Exception
- {
- assertFiles(txnLogs.getDataFolder(), new HashSet<>(sstableOld.getAllFilePaths()));
- assertFiles(txnLogs.getLogsFolder(), Collections.<String>emptySet());
- assertEquals(0, TransactionLogs.getLogFiles(cfs.metadata).size());
- }
-
- protected void assertCommitted() throws Exception
- {
- assertFiles(txnLogs.getDataFolder(), new HashSet<>(sstableNew.getAllFilePaths()));
- assertFiles(txnLogs.getLogsFolder(), Collections.<String>emptySet());
- assertEquals(0, TransactionLogs.getLogFiles(cfs.metadata).size());
- }
- }
-
- final Transaction txn;
-
- private TxnTest() throws IOException
- {
- this(MockSchema.newCFS(KEYSPACE));
- }
-
- private TxnTest(ColumnFamilyStore cfs) throws IOException
- {
- this(cfs, new TransactionLogs(OperationType.COMPACTION, cfs.metadata));
- }
-
- private TxnTest(ColumnFamilyStore cfs, TransactionLogs txnLogs) throws IOException
- {
- this(new Transaction(cfs, txnLogs));
- }
-
- private TxnTest(Transaction txn)
- {
- super(txn);
- this.txn = txn;
- }
-
- protected void assertInProgress() throws Exception
- {
- txn.assertInProgress();
- }
-
- protected void assertPrepared() throws Exception
- {
- txn.assertPrepared();
- }
-
- protected void assertAborted() throws Exception
- {
- txn.assertAborted();
- }
-
- protected void assertCommitted() throws Exception
- {
- txn.assertCommitted();
- }
- }
-
- @Test
- public void testUntrack() throws Throwable
- {
- ColumnFamilyStore cfs = MockSchema.newCFS(KEYSPACE);
- SSTableReader sstableNew = sstable(cfs, 1, 128);
-
- // complete a transaction without keep the new files since they were untracked
- TransactionLogs transactionLogs = new TransactionLogs(OperationType.COMPACTION, cfs.metadata);
- assertNotNull(transactionLogs);
-
- transactionLogs.trackNew(sstableNew);
- transactionLogs.untrackNew(sstableNew);
-
- transactionLogs.finish();
-
- assertFiles(transactionLogs.getDataFolder(), Collections.<String>emptySet());
- assertFiles(transactionLogs.getLogsFolder(), Collections.<String>emptySet());
- assertEquals(0, TransactionLogs.getLogFiles(cfs.metadata).size());
-
- sstableNew.selfRef().release();
- }
-
- @Test
- public void testCommitSameDesc() throws Throwable
- {
- ColumnFamilyStore cfs = MockSchema.newCFS(KEYSPACE);
- SSTableReader sstableOld1 = sstable(cfs, 0, 128);
- SSTableReader sstableOld2 = sstable(cfs, 0, 256);
- SSTableReader sstableNew = sstable(cfs, 1, 128);
-
- TransactionLogs transactionLogs = new TransactionLogs(OperationType.COMPACTION, cfs.metadata);
- assertNotNull(transactionLogs);
-
- transactionLogs.trackNew(sstableNew);
-
- sstableOld1.setReplaced();
-
- TransactionLogs.SSTableTidier tidier = transactionLogs.obsoleted(sstableOld2);
- assertNotNull(tidier);
-
- transactionLogs.finish();
-
- sstableOld2.markObsolete(tidier);
-
- sstableOld1.selfRef().release();
- sstableOld2.selfRef().release();
-
- TransactionLogs.waitForDeletions();
-
- assertFiles(transactionLogs.getDataFolder(), new HashSet<>(sstableNew.getAllFilePaths()));
- assertFiles(transactionLogs.getLogsFolder(), Collections.<String>emptySet());
- assertEquals(0, TransactionLogs.getLogFiles(cfs.metadata).size());
-
- sstableNew.selfRef().release();
- }
-
- @Test
- public void testCommitOnlyNew() throws Throwable
- {
- ColumnFamilyStore cfs = MockSchema.newCFS(KEYSPACE);
- SSTableReader sstable = sstable(cfs, 0, 128);
-
- TransactionLogs transactionLogs = new TransactionLogs(OperationType.COMPACTION, cfs.metadata);
- assertNotNull(transactionLogs);
-
- transactionLogs.trackNew(sstable);
- transactionLogs.finish();
-
- assertFiles(transactionLogs.getDataFolder(), new HashSet<>(sstable.getAllFilePaths()));
- assertFiles(transactionLogs.getLogsFolder(), Collections.<String>emptySet());
- assertEquals(0, TransactionLogs.getLogFiles(cfs.metadata).size());
-
- sstable.selfRef().release();
- }
-
- @Test
- public void testCommitOnlyOld() throws Throwable
- {
- ColumnFamilyStore cfs = MockSchema.newCFS(KEYSPACE);
- SSTableReader sstable = sstable(cfs, 0, 128);
-
- TransactionLogs transactionLogs = new TransactionLogs(OperationType.COMPACTION, cfs.metadata);
- assertNotNull(transactionLogs);
-
- TransactionLogs.SSTableTidier tidier = transactionLogs.obsoleted(sstable);
- assertNotNull(tidier);
-
- transactionLogs.finish();
- sstable.markObsolete(tidier);
- sstable.selfRef().release();
-
- TransactionLogs.waitForDeletions();
-
- assertFiles(transactionLogs.getDataFolder(), new HashSet<>());
- assertFiles(transactionLogs.getLogsFolder(), Collections.<String>emptySet());
- assertEquals(0, TransactionLogs.getLogFiles(cfs.metadata).size());
- }
-
- @Test
- public void testAbortOnlyNew() throws Throwable
- {
- ColumnFamilyStore cfs = MockSchema.newCFS(KEYSPACE);
- SSTableReader sstable = sstable(cfs, 0, 128);
-
- TransactionLogs transactionLogs = new TransactionLogs(OperationType.COMPACTION, cfs.metadata);
- assertNotNull(transactionLogs);
-
- transactionLogs.trackNew(sstable);
- transactionLogs.abort();
-
- sstable.selfRef().release();
-
- assertFiles(transactionLogs.getDataFolder(), new HashSet<>());
- assertFiles(transactionLogs.getLogsFolder(), Collections.<String>emptySet());
- assertEquals(0, TransactionLogs.getLogFiles(cfs.metadata).size());
- }
-
- @Test
- public void testAbortOnlyOld() throws Throwable
- {
- ColumnFamilyStore cfs = MockSchema.newCFS(KEYSPACE);
- SSTableReader sstable = sstable(cfs, 0, 128);
-
- TransactionLogs transactionLogs = new TransactionLogs(OperationType.COMPACTION, cfs.metadata);
- assertNotNull(transactionLogs);
-
- TransactionLogs.SSTableTidier tidier = transactionLogs.obsoleted(sstable);
- assertNotNull(tidier);
-
- tidier.abort();
- transactionLogs.abort();
-
- sstable.selfRef().release();
-
- assertFiles(transactionLogs.getDataFolder(), new HashSet<>(sstable.getAllFilePaths()));
- assertFiles(transactionLogs.getLogsFolder(), Collections.<String>emptySet());
- assertEquals(0, TransactionLogs.getLogFiles(cfs.metadata).size());
- }
-
- private File copyToTmpFile(File file) throws IOException
- {
- File ret = File.createTempFile(file.getName(), ".tmp");
- ret.deleteOnExit();
- Files.copy(file.toPath(), ret.toPath(), StandardCopyOption.REPLACE_EXISTING);
- return ret;
- }
-
- @Test
- public void testRemoveUnfinishedLeftovers_newLogFound() throws Throwable
- {
- ColumnFamilyStore cfs = MockSchema.newCFS(KEYSPACE);
- SSTableReader sstableOld = sstable(cfs, 0, 128);
- SSTableReader sstableNew = sstable(cfs, 1, 128);
-
- // simulate tracking sstables with a failed transaction (new log file NOT deleted)
- TransactionLogs transactionLogs = new TransactionLogs(OperationType.COMPACTION, cfs.metadata);
- assertNotNull(transactionLogs);
-
- transactionLogs.trackNew(sstableNew);
- TransactionLogs.SSTableTidier tidier = transactionLogs.obsoleted(sstableOld);
-
- File tmpNewLog = copyToTmpFile(transactionLogs.getData().newLog().file);
- File tmpOldLog = copyToTmpFile(transactionLogs.getData().oldLog().file);
-
- Set<File> tmpFiles = new HashSet<>(TransactionLogs.getLogFiles(cfs.metadata));
- for (String p : sstableNew.getAllFilePaths())
- tmpFiles.add(new File(p));
-
- sstableNew.selfRef().release();
- sstableOld.selfRef().release();
-
- Assert.assertEquals(tmpFiles, TransactionLogs.getTemporaryFiles(cfs.metadata, sstableNew.descriptor.directory));
-
- // normally called at startup
- TransactionLogs.removeUnfinishedLeftovers(cfs.metadata);
-
- // sstable should not have been removed because the new log was found
- Directories directories = new Directories(cfs.metadata);
- Map<Descriptor, Set<Component>> sstables = directories.sstableLister().list();
- assertEquals(1, sstables.size());
-
- assertFiles(transactionLogs.getDataFolder(), new HashSet<>(sstableOld.getAllFilePaths()));
- assertFiles(transactionLogs.getLogsFolder(), Collections.<String>emptySet());
-
- tidier.run();
-
- // copy old transaction files contents back or transactionlogs will throw assertions
- Files.move(tmpNewLog.toPath(), transactionLogs.getData().newLog().file.toPath());
- Files.move(tmpOldLog.toPath(), transactionLogs.getData().oldLog().file.toPath());
-
- transactionLogs.close();
- }
-
- @Test
- public void testRemoveUnfinishedLeftovers_oldLogFound() throws Throwable
- {
- ColumnFamilyStore cfs = MockSchema.newCFS(KEYSPACE);
- SSTableReader sstableOld = sstable(cfs, 0, 128);
- SSTableReader sstableNew = sstable(cfs, 1, 128);
-
- // simulate tracking sstables with a committed transaction (new log file deleted)
- TransactionLogs transactionLogs = new TransactionLogs(OperationType.COMPACTION, cfs.metadata);
- assertNotNull(transactionLogs);
-
- transactionLogs.trackNew(sstableNew);
- TransactionLogs.SSTableTidier tidier = transactionLogs.obsoleted(sstableOld);
-
- File tmpNewLog = copyToTmpFile(transactionLogs.getData().newLog().file);
- File tmpOldLog = copyToTmpFile(transactionLogs.getData().oldLog().file);
-
- transactionLogs.getData().newLog().delete(false);
-
- Set<File> tmpFiles = new HashSet<>(TransactionLogs.getLogFiles(cfs.metadata));
- for (String p : sstableOld.getAllFilePaths())
- tmpFiles.add(new File(p));
-
- sstableNew.selfRef().release();
- sstableOld.selfRef().release();
-
- Assert.assertEquals(tmpFiles, TransactionLogs.getTemporaryFiles(cfs.metadata, sstableOld.descriptor.directory));
-
- // normally called at startup
- TransactionLogs.removeUnfinishedLeftovers(cfs.metadata);
-
- // sstable should have been removed because there was no new log.
- Directories directories = new Directories(cfs.metadata);
- Map<Descriptor, Set<Component>> sstables = directories.sstableLister().list();
- assertEquals(1, sstables.size());
-
- assertFiles(transactionLogs.getDataFolder(), new HashSet<>(sstableNew.getAllFilePaths()));
- assertFiles(transactionLogs.getLogsFolder(), Collections.<String>emptySet());
-
- tidier.run();
-
- // copy old transaction files contents back or transactionlogs will throw assertions
- Files.move(tmpNewLog.toPath(), transactionLogs.getData().newLog().file.toPath());
- Files.move(tmpOldLog.toPath(), transactionLogs.getData().oldLog().file.toPath());
-
- transactionLogs.close();
- }
-
- @Test
- public void testGetTemporaryFiles() throws IOException
- {
- ColumnFamilyStore cfs = MockSchema.newCFS(KEYSPACE);
- SSTableReader sstable1 = sstable(cfs, 0, 128);
-
- File dataFolder = sstable1.descriptor.directory;
-
- Set<File> tmpFiles = TransactionLogs.getTemporaryFiles(cfs.metadata, dataFolder);
- assertNotNull(tmpFiles);
- assertEquals(0, tmpFiles.size());
-
- TransactionLogs transactionLogs = new TransactionLogs(OperationType.WRITE, cfs.metadata);
- Directories directories = new Directories(cfs.metadata);
-
- File[] beforeSecondSSTable = dataFolder.listFiles(pathname -> !pathname.isDirectory());
-
- SSTableReader sstable2 = sstable(cfs, 1, 128);
- transactionLogs.trackNew(sstable2);
-
- Map<Descriptor, Set<Component>> sstables = directories.sstableLister().list();
- assertEquals(2, sstables.size());
-
- File[] afterSecondSSTable = dataFolder.listFiles(pathname -> !pathname.isDirectory());
- int numNewFiles = afterSecondSSTable.length - beforeSecondSSTable.length;
- assertTrue(numNewFiles == sstable2.getAllFilePaths().size());
-
- tmpFiles = TransactionLogs.getTemporaryFiles(cfs.metadata, dataFolder);
- assertNotNull(tmpFiles);
- assertEquals(numNewFiles + 2, tmpFiles.size()); //the extra files are the transaction log files
-
- File ssTable2DataFile = new File(sstable2.descriptor.filenameFor(Component.DATA));
- File ssTable2IndexFile = new File(sstable2.descriptor.filenameFor(Component.PRIMARY_INDEX));
-
- assertTrue(tmpFiles.contains(ssTable2DataFile));
- assertTrue(tmpFiles.contains(ssTable2IndexFile));
-
- List<File> files = directories.sstableLister().listFiles();
- List<File> filesNoTmp = directories.sstableLister().skipTemporary(true).listFiles();
- assertNotNull(files);
- assertNotNull(filesNoTmp);
-
- assertTrue(files.contains(ssTable2DataFile));
- assertTrue(files.contains(ssTable2IndexFile));
-
- assertFalse(filesNoTmp.contains(ssTable2DataFile));
- assertFalse(filesNoTmp.contains(ssTable2IndexFile));
-
- transactionLogs.finish();
-
- //Now it should be empty since the transaction has finished
- tmpFiles = TransactionLogs.getTemporaryFiles(cfs.metadata, dataFolder);
- assertNotNull(tmpFiles);
- assertEquals(0, tmpFiles.size());
-
- filesNoTmp = directories.sstableLister().skipTemporary(true).listFiles();
- assertNotNull(filesNoTmp);
- assertTrue(filesNoTmp.contains(ssTable2DataFile));
- assertTrue(filesNoTmp.contains(ssTable2IndexFile));
-
- sstable1.selfRef().release();
- sstable2.selfRef().release();
- }
-
- @Test
- public void testGetTemporaryFilesSafeAfterObsoletion() throws Throwable
- {
- ColumnFamilyStore cfs = MockSchema.newCFS(KEYSPACE);
- SSTableReader sstable = sstable(cfs, 0, 128);
- File dataFolder = sstable.descriptor.directory;
-
- TransactionLogs transactionLogs = new TransactionLogs(OperationType.COMPACTION, cfs.metadata);
- assertNotNull(transactionLogs);
-
- TransactionLogs.SSTableTidier tidier = transactionLogs.obsoleted(sstable);
-
- transactionLogs.finish();
- sstable.markObsolete(tidier);
- sstable.selfRef().release();
-
- for (int i = 0; i < 1000; i++)
- {
- // This should race with the asynchronous deletion of txn log files
- // It doesn't matter what it returns but it should not throw
- TransactionLogs.getTemporaryFiles(cfs.metadata, dataFolder);
- }
- }
-
- private static SSTableReader sstable(ColumnFamilyStore cfs, int generation, int size) throws IOException
- {
- Directories dir = new Directories(cfs.metadata);
- Descriptor descriptor = new Descriptor(dir.getDirectoryForNewSSTables(), cfs.keyspace.getName(), cfs.getColumnFamilyName(), generation);
- Set<Component> components = ImmutableSet.of(Component.DATA, Component.PRIMARY_INDEX, Component.FILTER, Component.TOC);
- for (Component component : components)
- {
- File file = new File(descriptor.filenameFor(component));
- file.createNewFile();
- try (RandomAccessFile raf = new RandomAccessFile(file, "rw"))
- {
- raf.setLength(size);
- }
- }
-
- SegmentedFile dFile = new BufferedSegmentedFile(new ChannelProxy(new File(descriptor.filenameFor(Component.DATA))), RandomAccessReader.DEFAULT_BUFFER_SIZE, 0);
- SegmentedFile iFile = new BufferedSegmentedFile(new ChannelProxy(new File(descriptor.filenameFor(Component.PRIMARY_INDEX))), RandomAccessReader.DEFAULT_BUFFER_SIZE, 0);
-
- SerializationHeader header = SerializationHeader.make(cfs.metadata, Collections.EMPTY_LIST);
- StatsMetadata metadata = (StatsMetadata) new MetadataCollector(cfs.metadata.comparator)
- .finalizeMetadata(cfs.metadata.partitioner.getClass().getCanonicalName(), 0.01f, -1, header)
- .get(MetadataType.STATS);
- SSTableReader reader = SSTableReader.internalOpen(descriptor,
- components,
- cfs.metadata,
- dFile,
- iFile,
- MockSchema.indexSummary.sharedCopy(),
- new AlwaysPresentFilter(),
- 1L,
- metadata,
- SSTableReader.OpenReason.NORMAL,
- header);
- reader.first = reader.last = MockSchema.readerBounds(generation);
- return reader;
- }
-
- private static void assertFiles(String dirPath, Set<String> expectedFiles)
- {
- File dir = new File(dirPath);
- for (File file : dir.listFiles())
- {
- if (file.isDirectory())
- continue;
-
- String filePath = file.getPath();
- assertTrue(filePath, expectedFiles.contains(filePath));
- expectedFiles.remove(filePath);
- }
-
- assertTrue(expectedFiles.isEmpty());
- }
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5726625a/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterClientTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterClientTest.java b/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterClientTest.java
index 9a558f1..ceeb369 100644
--- a/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterClientTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterClientTest.java
@@ -99,12 +99,5 @@ public class CQLSSTableWriterClientTest
File[] dataFiles = this.testDirectory.listFiles(filter);
assertEquals(2, dataFiles.length);
-
- File transactionsFolder = Directories.getTransactionsDirectory(testDirectory);
- assertTrue(transactionsFolder.exists());
-
- File[] opFiles = transactionsFolder.listFiles();
- assertEquals(0, opFiles.length);
-
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5726625a/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterTest.java b/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterTest.java
index 2e9768e..e7cf51c 100644
--- a/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterTest.java
@@ -45,7 +45,6 @@ import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.OutputHandler;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
public class CQLSSTableWriterTest
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5726625a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
index fd801ad..d9516cb 100644
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
@@ -44,7 +44,7 @@ import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.db.RowUpdateBuilder;
import org.apache.cassandra.db.SerializationHeader;
import org.apache.cassandra.db.rows.EncodingStats;
-import org.apache.cassandra.db.lifecycle.TransactionLogs;
+import org.apache.cassandra.db.lifecycle.TransactionLog;
import org.apache.cassandra.db.rows.UnfilteredRowIterator;
import org.apache.cassandra.db.compaction.AbstractCompactionStrategy;
import org.apache.cassandra.db.compaction.CompactionController;
@@ -109,7 +109,7 @@ public class SSTableRewriterTest extends SchemaLoader
Keyspace keyspace = Keyspace.open(KEYSPACE);
ColumnFamilyStore store = keyspace.getColumnFamilyStore(CF);
store.truncateBlocking();
- TransactionLogs.waitForDeletions();
+ TransactionLog.waitForDeletions();
}
@Test
@@ -145,7 +145,7 @@ public class SSTableRewriterTest extends SchemaLoader
}
writer.finish();
}
- TransactionLogs.waitForDeletions();
+ TransactionLog.waitForDeletions();
validateCFS(cfs);
int filecounts = assertFileCounts(sstables.iterator().next().descriptor.directory.list());
assertEquals(1, filecounts);
@@ -177,7 +177,7 @@ public class SSTableRewriterTest extends SchemaLoader
}
writer.finish();
}
- TransactionLogs.waitForDeletions();
+ TransactionLog.waitForDeletions();
validateCFS(cfs);
int filecounts = assertFileCounts(sstables.iterator().next().descriptor.directory.list());
assertEquals(1, filecounts);
@@ -232,7 +232,7 @@ public class SSTableRewriterTest extends SchemaLoader
assertTrue(checked);
writer.finish();
}
- TransactionLogs.waitForDeletions();
+ TransactionLog.waitForDeletions();
validateCFS(cfs);
int filecounts = assertFileCounts(sstables.iterator().next().descriptor.directory.list());
assertEquals(1, filecounts);
@@ -277,12 +277,12 @@ public class SSTableRewriterTest extends SchemaLoader
// open till .abort() is called (via the builder)
if (!FBUtilities.isWindows())
{
- TransactionLogs.waitForDeletions();
+ TransactionLog.waitForDeletions();
assertFileCounts(dir.list());
}
writer.abort();
txn.abort();
- TransactionLogs.waitForDeletions();
+ TransactionLog.waitForDeletions();
int datafiles = assertFileCounts(dir.list());
assertEquals(datafiles, 0);
validateCFS(cfs);
@@ -328,7 +328,7 @@ public class SSTableRewriterTest extends SchemaLoader
sstables = rewriter.finish();
}
- TransactionLogs.waitForDeletions();
+ TransactionLog.waitForDeletions();
long sum = 0;
for (SSTableReader x : cfs.getLiveSSTables())
@@ -337,7 +337,7 @@ public class SSTableRewriterTest extends SchemaLoader
assertEquals(startStorageMetricsLoad - sBytesOnDisk + sum, StorageMetrics.load.getCount());
assertEquals(files, sstables.size());
assertEquals(files, cfs.getLiveSSTables().size());
- TransactionLogs.waitForDeletions();
+ TransactionLog.waitForDeletions();
// tmplink and tmp files should be gone:
assertEquals(sum, cfs.metric.totalDiskSpaceUsed.getCount());
@@ -382,7 +382,7 @@ public class SSTableRewriterTest extends SchemaLoader
assertEquals(files, sstables.size());
assertEquals(files, cfs.getLiveSSTables().size());
- TransactionLogs.waitForDeletions();
+ TransactionLog.waitForDeletions();
assertFileCounts(s.descriptor.directory.list());
validateCFS(cfs);
@@ -519,7 +519,7 @@ public class SSTableRewriterTest extends SchemaLoader
test.run(scanner, controller, s, cfs, rewriter, txn);
}
- TransactionLogs.waitForDeletions();
+ TransactionLog.waitForDeletions();
assertEquals(startSize, cfs.metric.liveDiskSpaceUsed.getCount());
assertEquals(1, cfs.getLiveSSTables().size());
@@ -567,7 +567,7 @@ public class SSTableRewriterTest extends SchemaLoader
}
}
- TransactionLogs.waitForDeletions();
+ TransactionLog.waitForDeletions();
assertEquals(files - 1, cfs.getLiveSSTables().size()); // we never wrote anything to the last file
assertFileCounts(s.descriptor.directory.list());
@@ -609,7 +609,7 @@ public class SSTableRewriterTest extends SchemaLoader
sstables = rewriter.finish();
}
- TransactionLogs.waitForDeletions();
+ TransactionLog.waitForDeletions();
assertFileCounts(s.descriptor.directory.list());
validateCFS(cfs);
}
@@ -650,7 +650,7 @@ public class SSTableRewriterTest extends SchemaLoader
}
assertEquals(files, sstables.size());
assertEquals(files, cfs.getLiveSSTables().size());
- TransactionLogs.waitForDeletions();
+ TransactionLog.waitForDeletions();
assertFileCounts(s.descriptor.directory.list());
validateCFS(cfs);
@@ -670,7 +670,7 @@ public class SSTableRewriterTest extends SchemaLoader
splitter.split();
assertFileCounts(s.descriptor.directory.list());
- TransactionLogs.waitForDeletions();
+ TransactionLog.waitForDeletions();
for (File f : s.descriptor.directory.listFiles())
{
@@ -746,7 +746,7 @@ public class SSTableRewriterTest extends SchemaLoader
s.selfRef().release();
}
- TransactionLogs.waitForDeletions();
+ TransactionLog.waitForDeletions();
int filecount = assertFileCounts(s.descriptor.directory.list());
assertEquals(filecount, 1);
@@ -825,7 +825,7 @@ public class SSTableRewriterTest extends SchemaLoader
rewriter.finish();
}
validateKeys(keyspace);
- TransactionLogs.waitForDeletions();
+ TransactionLog.waitForDeletions();
validateCFS(cfs);
truncate(cfs);
}
@@ -923,7 +923,7 @@ public class SSTableRewriterTest extends SchemaLoader
public static void truncate(ColumnFamilyStore cfs)
{
cfs.truncateBlocking();
- TransactionLogs.waitForDeletions();
+ TransactionLog.waitForDeletions();
Uninterruptibles.sleepUninterruptibly(10L, TimeUnit.MILLISECONDS);
assertEquals(0, cfs.metric.liveDiskSpaceUsed.getCount());
assertEquals(0, cfs.metric.totalDiskSpaceUsed.getCount());
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5726625a/test/unit/org/apache/cassandra/io/sstable/metadata/MetadataSerializerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/metadata/MetadataSerializerTest.java b/test/unit/org/apache/cassandra/io/sstable/metadata/MetadataSerializerTest.java
index c763932..0898515 100644
--- a/test/unit/org/apache/cassandra/io/sstable/metadata/MetadataSerializerTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/metadata/MetadataSerializerTest.java
@@ -51,17 +51,10 @@ public class MetadataSerializerTest
CFMetaData cfm = SchemaLoader.standardCFMD("ks1", "cf1");
-
ReplayPosition rp = new ReplayPosition(11L, 12);
-
MetadataCollector collector = new MetadataCollector(cfm.comparator).replayPosition(rp);
-
- Set<Integer> ancestors = Sets.newHashSet(1, 2, 3, 4);
- for (int i : ancestors)
- collector.addAncestor(i);
-
String partitioner = RandomPartitioner.class.getCanonicalName();
double bfFpChance = 0.1;
Map<MetadataType, MetadataComponent> originalMetadata = collector.finalizeMetadata(partitioner, bfFpChance, 0, SerializationHeader.make(cfm, Collections.EMPTY_LIST));
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5726625a/test/unit/org/apache/cassandra/schema/DefsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/schema/DefsTest.java b/test/unit/org/apache/cassandra/schema/DefsTest.java
index 680c016..98a954c 100644
--- a/test/unit/org/apache/cassandra/schema/DefsTest.java
+++ b/test/unit/org/apache/cassandra/schema/DefsTest.java
@@ -39,8 +39,9 @@ import org.apache.cassandra.config.Schema;
import org.apache.cassandra.cql3.QueryProcessor;
import org.apache.cassandra.cql3.UntypedResultSet;
import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Directories;
import org.apache.cassandra.db.Keyspace;
-import org.apache.cassandra.db.lifecycle.TransactionLogs;
+import org.apache.cassandra.db.lifecycle.TransactionLog;
import org.apache.cassandra.db.marshal.BytesType;
import org.apache.cassandra.db.marshal.UTF8Type;
import org.apache.cassandra.exceptions.ConfigurationException;
@@ -205,7 +206,7 @@ public class DefsTest
ColumnFamilyStore store = Keyspace.open(cfm.ksName).getColumnFamilyStore(cfm.cfName);
assertNotNull(store);
store.forceBlockingFlush();
- assertTrue(store.directories.sstableLister().list().size() > 0);
+ assertTrue(store.directories.sstableLister(Directories.OnTxnErr.THROW).list().size() > 0);
MigrationManager.announceColumnFamilyDrop(ks.name, cfm.cfName);
@@ -227,7 +228,7 @@ public class DefsTest
// verify that the files are gone.
Supplier<Object> lambda = () -> {
- for (File file : store.directories.sstableLister().listFiles())
+ for (File file : store.directories.sstableLister(Directories.OnTxnErr.THROW).listFiles())
{
if (file.getPath().endsWith("Data.db") && !new File(file.getPath().replace("Data.db", "Compacted")).exists())
return false;
@@ -276,7 +277,7 @@ public class DefsTest
ColumnFamilyStore cfs = Keyspace.open(cfm.ksName).getColumnFamilyStore(cfm.cfName);
assertNotNull(cfs);
cfs.forceBlockingFlush();
- assertTrue(!cfs.directories.sstableLister().list().isEmpty());
+ assertTrue(!cfs.directories.sstableLister(Directories.OnTxnErr.THROW).list().isEmpty());
MigrationManager.announceKeyspaceDrop(ks.name);
@@ -521,7 +522,7 @@ public class DefsTest
// check
assertTrue(cfs.indexManager.getIndexes().isEmpty());
- TransactionLogs.waitForDeletions();
+ TransactionLog.waitForDeletions();
assertFalse(new File(desc.filenameFor(Component.DATA)).exists());
}