You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by be...@apache.org on 2015/05/22 10:46:33 UTC
[2/7] cassandra git commit: Extend Transactional API to sstable
lifecycle management
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e5a76bdb/test/unit/org/apache/cassandra/db/lifecycle/HelpersTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/lifecycle/HelpersTest.java b/test/unit/org/apache/cassandra/db/lifecycle/HelpersTest.java
new file mode 100644
index 0000000..d53a830
--- /dev/null
+++ b/test/unit/org/apache/cassandra/db/lifecycle/HelpersTest.java
@@ -0,0 +1,158 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+package org.apache.cassandra.db.lifecycle;
+
+import java.util.Map;
+import java.util.Set;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Lists;
+
+import org.junit.Test;
+
+import junit.framework.Assert;
+import org.apache.cassandra.MockSchema;
+import org.apache.cassandra.Util;
+import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.sstable.format.big.BigTableReader;
+import org.apache.cassandra.utils.concurrent.Refs;
+
+public class HelpersTest
+{
+
+ static Set<Integer> a = set(1, 2, 3);
+ static Set<Integer> b = set(4, 5, 6);
+ static Set<Integer> c = set(7, 8, 9);
+ static Set<Integer> abc = set(1, 2, 3, 4, 5, 6, 7, 8, 9);
+
+ // this also tests orIn
+ @Test
+ public void testFilterIn()
+ {
+ check(Helpers.filterIn(abc, a), a);
+ check(Helpers.filterIn(abc, a, c), set(1, 2, 3, 7, 8, 9));
+ check(Helpers.filterIn(a, c), set());
+ }
+
+ // this also tests notIn
+ @Test
+ public void testFilterOut()
+ {
+ check(Helpers.filterOut(abc, a), set(4, 5, 6, 7, 8, 9));
+ check(Helpers.filterOut(abc, b), set(1, 2, 3, 7, 8, 9));
+ check(Helpers.filterOut(a, a), set());
+ }
+
+ @Test
+ public void testConcatUniq()
+ {
+ check(Helpers.concatUniq(a, b, a, c, b, a), abc);
+ }
+
+ @Test
+ public void testIdentityMap()
+ {
+ Integer one = new Integer(1);
+ Integer two = new Integer(2);
+ Integer three = new Integer(3);
+ Map<Integer, Integer> identity = Helpers.identityMap(set(one, two, three));
+ Assert.assertEquals(3, identity.size());
+ Assert.assertSame(one, identity.get(1));
+ Assert.assertSame(two, identity.get(2));
+ Assert.assertSame(three, identity.get(3));
+ }
+
+ @Test
+ public void testReplace()
+ {
+ boolean failure;
+ failure = false;
+ try
+ {
+ Helpers.replace(abc, a, c);
+ }
+ catch (AssertionError e)
+ {
+ failure = true;
+ }
+ Assert.assertTrue(failure);
+
+ failure = false;
+ try
+ {
+ Helpers.replace(a, abc, c);
+ }
+ catch (AssertionError e)
+ {
+ failure = true;
+ }
+ Assert.assertTrue(failure);
+
+ failure = false;
+ try
+ {
+ Map<Integer, Integer> notIdentity = ImmutableMap.of(1, new Integer(1), 2, 2, 3, 3);
+ Helpers.replace(notIdentity, a, b);
+ }
+ catch (AssertionError e)
+ {
+ failure = true;
+ }
+ Assert.assertTrue(failure);
+
+ // check it actually works when correct values provided
+ check(Helpers.replace(a, a, b), b);
+ }
+
+ private static Set<Integer> set(Integer ... contents)
+ {
+ return ImmutableSet.copyOf(contents);
+ }
+
+ private static void check(Iterable<Integer> check, Set<Integer> expected)
+ {
+ Assert.assertEquals(expected, ImmutableSet.copyOf(check));
+ }
+
+ @Test
+ public void testSetupDeletionNotification()
+ {
+ Iterable<SSTableReader> readers = Lists.newArrayList(MockSchema.sstable(1), MockSchema.sstable(2));
+ Throwable accumulate = Helpers.setReplaced(readers, null);
+ Assert.assertNull(accumulate);
+ for (SSTableReader reader : readers)
+ Assert.assertTrue(reader.isReplaced());
+ accumulate = Helpers.setReplaced(readers, null);
+ Assert.assertNotNull(accumulate);
+ }
+
+ @Test
+ public void testMarkObsolete()
+ {
+ Iterable<SSTableReader> readers = Lists.newArrayList(MockSchema.sstable(1), MockSchema.sstable(2));
+ Throwable accumulate = Helpers.markObsolete(readers, null);
+ Assert.assertNull(accumulate);
+ for (SSTableReader reader : readers)
+ Assert.assertTrue(reader.isMarkedCompacted());
+ accumulate = Helpers.markObsolete(readers, null);
+ Assert.assertNotNull(accumulate);
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e5a76bdb/test/unit/org/apache/cassandra/db/lifecycle/LifecycleTransactionTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/lifecycle/LifecycleTransactionTest.java b/test/unit/org/apache/cassandra/db/lifecycle/LifecycleTransactionTest.java
new file mode 100644
index 0000000..3153ef1
--- /dev/null
+++ b/test/unit/org/apache/cassandra/db/lifecycle/LifecycleTransactionTest.java
@@ -0,0 +1,412 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+package org.apache.cassandra.db.lifecycle;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import junit.framework.Assert;
+import org.apache.cassandra.MockSchema;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.compaction.OperationType;
+import org.apache.cassandra.db.lifecycle.LifecycleTransaction.ReaderState;
+import org.apache.cassandra.db.lifecycle.LifecycleTransaction.ReaderState.Action;
+import org.apache.cassandra.io.sstable.SSTableDeletingTask;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.utils.Pair;
+import org.apache.cassandra.utils.concurrent.AbstractTransactionalTest;
+import org.apache.cassandra.utils.concurrent.Transactional.AbstractTransactional.State;
+
+import static com.google.common.base.Predicates.in;
+import static com.google.common.collect.ImmutableList.copyOf;
+import static com.google.common.collect.ImmutableList.of;
+import static com.google.common.collect.Iterables.all;
+import static com.google.common.collect.Iterables.concat;
+import static com.google.common.collect.Iterables.size;
+import static org.apache.cassandra.db.lifecycle.Helpers.idIn;
+import static org.apache.cassandra.db.lifecycle.Helpers.orIn;
+import static org.apache.cassandra.db.lifecycle.Helpers.select;
+
+public class LifecycleTransactionTest extends AbstractTransactionalTest
+{
+ private boolean incrementalBackups;
+
+ @Before
+ public void disableIncrementalBackup()
+ {
+ incrementalBackups = DatabaseDescriptor.isIncrementalBackupsEnabled();
+ DatabaseDescriptor.setIncrementalBackupsEnabled(false);
+ }
+ @After
+ public void restoreIncrementalBackup()
+ {
+ DatabaseDescriptor.setIncrementalBackupsEnabled(incrementalBackups);
+ }
+
+ @Test
+ public void testUpdates() // (including obsoletion)
+ {
+ Tracker tracker = new Tracker(null, false);
+ SSTableReader[] readers = readersArray(0, 3);
+ SSTableReader[] readers2 = readersArray(0, 4);
+ SSTableReader[] readers3 = readersArray(0, 4);
+ tracker.addInitialSSTables(copyOf(readers));
+ LifecycleTransaction txn = tracker.tryModify(copyOf(readers), OperationType.UNKNOWN);
+
+ txn.update(readers2[0], true);
+ txn.obsolete(readers[1]);
+
+ Assert.assertTrue(txn.isObsolete(readers[1]));
+ Assert.assertFalse(txn.isObsolete(readers[0]));
+
+ testBadUpdate(txn, readers2[0], true); // same reader && instances
+ testBadUpdate(txn, readers2[1], true); // staged obsolete; cannot update
+ testBadUpdate(txn, readers3[0], true); // same reader, diff instances
+ testBadUpdate(txn, readers2[2], false); // incorrectly declared original status
+ testBadUpdate(txn, readers2[3], true); // incorrectly declared original status
+
+ testBadObsolete(txn, readers[1]); // staged obsolete; cannot obsolete again
+ testBadObsolete(txn, readers2[0]); // staged update; cannot obsolete
+
+ txn.update(readers2[3], false);
+
+ Assert.assertEquals(3, tracker.getView().compacting.size());
+ txn.checkpoint();
+ Assert.assertTrue(txn.isObsolete(readers[1]));
+ Assert.assertFalse(txn.isObsolete(readers[0]));
+ Assert.assertEquals(4, tracker.getView().compacting.size());
+ Assert.assertEquals(3, tracker.getView().sstables.size());
+ Assert.assertEquals(3, size(txn.current()));
+ Assert.assertTrue(all(of(readers2[0], readers[2], readers2[3]), idIn(tracker.getView().sstablesMap)));
+ Assert.assertTrue(all(txn.current(), idIn(tracker.getView().sstablesMap)));
+
+ testBadObsolete(txn, readers[1]); // logged obsolete; cannot obsolete again
+ testBadObsolete(txn, readers2[2]); // never seen instance; cannot obsolete
+ testBadObsolete(txn, readers2[3]); // non-original; cannot obsolete
+ testBadUpdate(txn, readers3[1], true); // logged obsolete; cannot update
+ testBadUpdate(txn, readers2[0], true); // same instance as logged update
+
+ txn.update(readers3[0], true); // same reader as logged update, different instance
+ txn.checkpoint();
+
+ Assert.assertEquals(4, tracker.getView().compacting.size());
+ Assert.assertEquals(3, tracker.getView().sstables.size());
+ Assert.assertEquals(3, size(txn.current()));
+ Assert.assertTrue(all(of(readers3[0], readers[2], readers2[3]), idIn(tracker.getView().sstablesMap)));
+ Assert.assertTrue(all(txn.current(), idIn(tracker.getView().sstablesMap)));
+
+ testBadObsolete(txn, readers2[0]); // not current version of sstable
+
+ txn.obsoleteOriginals();
+ txn.checkpoint();
+ Assert.assertEquals(1, tracker.getView().sstables.size());
+ txn.obsoleteOriginals(); // should be no-op
+ txn.checkpoint();
+ Assert.assertEquals(1, tracker.getView().sstables.size());
+ Assert.assertEquals(4, tracker.getView().compacting.size());
+ }
+
+ @Test
+ public void testCancellation()
+ {
+ Tracker tracker = new Tracker(null, false);
+ List<SSTableReader> readers = readers(0, 3);
+ tracker.addInitialSSTables(readers);
+ LifecycleTransaction txn = tracker.tryModify(readers, OperationType.UNKNOWN);
+
+ SSTableReader cancel = readers.get(0);
+ SSTableReader update = readers(1, 2).get(0);
+ SSTableReader fresh = readers(3, 4).get(0);
+ SSTableReader notPresent = readers(4, 5).get(0);
+
+ txn.cancel(cancel);
+ txn.update(update, true);
+ txn.update(fresh, false);
+
+ testBadCancel(txn, cancel);
+ testBadCancel(txn, update);
+ testBadCancel(txn, fresh);
+ testBadCancel(txn, notPresent);
+ Assert.assertEquals(2, txn.originals().size());
+ Assert.assertEquals(2, tracker.getView().compacting.size());
+ Assert.assertTrue(all(readers.subList(1, 3), idIn(tracker.getView().compacting)));
+
+ txn.checkpoint();
+
+ testBadCancel(txn, cancel);
+ testBadCancel(txn, update);
+ testBadCancel(txn, fresh);
+ testBadCancel(txn, notPresent);
+ Assert.assertEquals(2, txn.originals().size());
+ Assert.assertEquals(3, tracker.getView().compacting.size());
+ Assert.assertEquals(3, size(txn.current()));
+ Assert.assertTrue(all(concat(readers.subList(1, 3), of(fresh)), idIn(tracker.getView().compacting)));
+
+ txn.cancel(readers.get(2));
+ Assert.assertEquals(1, txn.originals().size());
+ Assert.assertEquals(2, tracker.getView().compacting.size());
+ Assert.assertEquals(2, size(txn.current()));
+ Assert.assertTrue(all(of(readers.get(1), fresh), idIn(tracker.getView().compacting)));
+ }
+
+ @Test
+ public void testSplit()
+ {
+ Tracker tracker = new Tracker(null, false);
+ List<SSTableReader> readers = readers(0, 3);
+ tracker.addInitialSSTables(readers);
+ LifecycleTransaction txn = tracker.tryModify(readers, OperationType.UNKNOWN);
+ LifecycleTransaction txn2 = txn.split(readers.subList(0, 1));
+ Assert.assertEquals(2, txn.originals().size());
+ Assert.assertTrue(all(readers.subList(1, 3), in(txn.originals())));
+ Assert.assertEquals(1, txn2.originals().size());
+ Assert.assertTrue(all(readers.subList(0, 1), in(txn2.originals())));
+ txn.update(readers(1, 2).get(0), true);
+ boolean failed = false;
+ try
+ {
+ txn.split(readers.subList(2, 3));
+ }
+ catch (Throwable t)
+ {
+ failed = true;
+ }
+ Assert.assertTrue(failed);
+ }
+
+ private static void testBadUpdate(LifecycleTransaction txn, SSTableReader update, boolean original)
+ {
+ boolean failed = false;
+ try
+ {
+ txn.update(update, original);
+ }
+ catch (Throwable t)
+ {
+ failed = true;
+ }
+ Assert.assertTrue(failed);
+ }
+
+ private static void testBadObsolete(LifecycleTransaction txn, SSTableReader update)
+ {
+ boolean failed = false;
+ try
+ {
+ txn.obsolete(update);
+ }
+ catch (Throwable t)
+ {
+ failed = true;
+ }
+ Assert.assertTrue(failed);
+ }
+
+ private static void testBadCancel(LifecycleTransaction txn, SSTableReader cancel)
+ {
+ boolean failed = false;
+ try
+ {
+ txn.cancel(cancel);
+ }
+ catch (Throwable t)
+ {
+ failed = true;
+ }
+ Assert.assertTrue(failed);
+ }
+
+ protected TestableTransaction newTest()
+ {
+ SSTableDeletingTask.waitForDeletions();
+ SSTableReader.resetTidying();
+ return new TxnTest();
+ }
+
+ private static final class TxnTest extends TestableTransaction
+ {
+ final List<SSTableReader> originals;
+ final List<SSTableReader> untouchedOriginals;
+ final List<SSTableReader> loggedUpdate;
+ final List<SSTableReader> loggedObsolete;
+ final List<SSTableReader> stagedObsolete;
+ final List<SSTableReader> loggedNew;
+ final List<SSTableReader> stagedNew;
+ final Tracker tracker;
+ final LifecycleTransaction txn;
+
+ private static Tracker tracker(List<SSTableReader> readers)
+ {
+ Tracker tracker = new Tracker(MockSchema.cfs, false);
+ tracker.addInitialSSTables(readers);
+ return tracker;
+ }
+
+ private TxnTest()
+ {
+ this(readers(0, 8));
+ }
+
+ private TxnTest(List<SSTableReader> readers)
+ {
+ this(tracker(readers), readers);
+ }
+
+ private TxnTest(Tracker tracker, List<SSTableReader> readers)
+ {
+ this(tracker, readers, tracker.tryModify(readers, OperationType.UNKNOWN));
+ }
+
+ private TxnTest(Tracker tracker, List<SSTableReader> readers, LifecycleTransaction txn)
+ {
+ super(txn);
+ this.tracker = tracker;
+ this.originals = readers;
+ this.txn = txn;
+ update(txn, loggedUpdate = readers(0, 2), true);
+ obsolete(txn, loggedObsolete = readers.subList(2, 4));
+ update(txn, loggedNew = readers(8, 10), false);
+ txn.checkpoint();
+ update(txn, stagedNew = readers(10, 12), false);
+ obsolete(txn, stagedObsolete = copyOf(concat(loggedUpdate, originals.subList(4, 6))));
+ untouchedOriginals = originals.subList(6, 8);
+ }
+
+ private ReaderState state(SSTableReader reader, State state)
+ {
+ SSTableReader original = select(reader, originals);
+ boolean isOriginal = original != null;
+
+ switch (state)
+ {
+ case ABORTED:
+ {
+ return new ReaderState(Action.NONE, Action.NONE, original, original, isOriginal);
+ }
+
+ case READY_TO_COMMIT:
+ {
+ ReaderState prev = state(reader, State.IN_PROGRESS);
+ Action logged;
+ SSTableReader visible;
+ if (prev.staged == Action.NONE)
+ {
+ logged = prev.logged;
+ visible = prev.currentlyVisible;
+ }
+ else
+ {
+ logged = prev.staged;
+ visible = prev.nextVisible;
+ }
+ return new ReaderState(logged, Action.NONE, visible, visible, isOriginal);
+ }
+
+ case IN_PROGRESS:
+ {
+ Action logged = Action.get(loggedUpdate.contains(reader) || loggedNew.contains(reader), loggedObsolete.contains(reader));
+ Action staged = Action.get(stagedNew.contains(reader), stagedObsolete.contains(reader));
+ SSTableReader currentlyVisible = ReaderState.visible(reader, in(loggedObsolete), loggedNew, loggedUpdate, originals);
+ SSTableReader nextVisible = ReaderState.visible(reader, orIn(stagedObsolete, loggedObsolete), stagedNew, loggedNew, loggedUpdate, originals);
+ return new ReaderState(logged, staged, currentlyVisible, nextVisible, isOriginal);
+ }
+ }
+ throw new IllegalStateException();
+ }
+
+ private List<Pair<SSTableReader, ReaderState>> states(State state)
+ {
+ List<Pair<SSTableReader, ReaderState>> result = new ArrayList<>();
+ for (SSTableReader reader : concat(originals, loggedNew, stagedNew))
+ result.add(Pair.create(reader, state(reader, state)));
+ return result;
+ }
+
+ protected void doAssert(State state)
+ {
+ for (Pair<SSTableReader, ReaderState> pair : states(state))
+ {
+ SSTableReader reader = pair.left;
+ ReaderState readerState = pair.right;
+
+ Assert.assertEquals(readerState, txn.state(reader));
+ Assert.assertEquals(readerState.currentlyVisible, tracker.getView().sstablesMap.get(reader));
+ if (readerState.currentlyVisible == null && readerState.nextVisible == null && !readerState.original)
+ Assert.assertTrue(reader.selfRef().globalCount() == 0);
+ }
+ }
+
+ protected void assertInProgress() throws Exception
+ {
+ doAssert(State.IN_PROGRESS);
+ }
+
+ protected void assertPrepared() throws Exception
+ {
+ doAssert(State.READY_TO_COMMIT);
+ }
+
+ protected void assertAborted() throws Exception
+ {
+ doAssert(State.ABORTED);
+ Assert.assertEquals(0, tracker.getView().compacting.size());
+ Assert.assertEquals(8, tracker.getView().sstables.size());
+ for (SSTableReader reader : concat(loggedNew, stagedNew))
+ Assert.assertTrue(reader.selfRef().globalCount() == 0);
+ }
+
+ protected void assertCommitted() throws Exception
+ {
+ doAssert(State.READY_TO_COMMIT);
+ Assert.assertEquals(0, tracker.getView().compacting.size());
+ Assert.assertEquals(6, tracker.getView().sstables.size());
+ for (SSTableReader reader : concat(loggedObsolete, stagedObsolete))
+ Assert.assertTrue(reader.selfRef().globalCount() == 0);
+ }
+ }
+
+ private static SSTableReader[] readersArray(int lb, int ub)
+ {
+ return readers(lb, ub).toArray(new SSTableReader[0]);
+ }
+
+ private static List<SSTableReader> readers(int lb, int ub)
+ {
+ List<SSTableReader> readers = new ArrayList<>();
+ for (int i = lb ; i < ub ; i++)
+ readers.add(MockSchema.sstable(i, i, true));
+ return copyOf(readers);
+ }
+
+ private static void update(LifecycleTransaction txn, Iterable<SSTableReader> readers, boolean originals)
+ {
+ for (SSTableReader reader : readers)
+ txn.update(reader, originals);
+ }
+
+ private static void obsolete(LifecycleTransaction txn, Iterable<SSTableReader> readers)
+ {
+ for (SSTableReader reader : readers)
+ txn.obsolete(reader);
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e5a76bdb/test/unit/org/apache/cassandra/db/lifecycle/TrackerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/lifecycle/TrackerTest.java b/test/unit/org/apache/cassandra/db/lifecycle/TrackerTest.java
new file mode 100644
index 0000000..1eef7b0
--- /dev/null
+++ b/test/unit/org/apache/cassandra/db/lifecycle/TrackerTest.java
@@ -0,0 +1,342 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+package org.apache.cassandra.db.lifecycle;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+import javax.annotation.Nullable;
+
+import com.google.common.base.Function;
+import com.google.common.base.Predicate;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import org.junit.Test;
+
+import junit.framework.Assert;
+import org.apache.cassandra.MockSchema;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Memtable;
+import org.apache.cassandra.db.commitlog.ReplayPosition;
+import org.apache.cassandra.db.compaction.OperationType;
+import org.apache.cassandra.io.sstable.SSTableDeletingTask;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.notifications.*;
+import org.apache.cassandra.utils.concurrent.OpOrder;
+
+import static com.google.common.collect.ImmutableSet.copyOf;
+import static java.util.Collections.singleton;
+
+public class TrackerTest
+{
+
+ private static final class MockListener implements INotificationConsumer
+ {
+ final boolean throwException;
+ final List<INotification> received = new ArrayList<>();
+ final List<Object> senders = new ArrayList<>();
+
+ private MockListener(boolean throwException)
+ {
+ this.throwException = throwException;
+ }
+
+ public void handleNotification(INotification notification, Object sender)
+ {
+ if (throwException)
+ throw new RuntimeException();
+ received.add(notification);
+ senders.add(sender);
+ }
+ }
+
+ @Test
+ public void testTryModify()
+ {
+ Tracker tracker = new Tracker(MockSchema.cfs, false);
+ List<SSTableReader> readers = ImmutableList.of(MockSchema.sstable(0), MockSchema.sstable(1), MockSchema.sstable(2));
+ tracker.addInitialSSTables(copyOf(readers));
+ try (LifecycleTransaction txn = tracker.tryModify(readers.get(0), OperationType.COMPACTION);)
+ {
+ Assert.assertNotNull(txn);
+ Assert.assertNull(tracker.tryModify(readers.get(0), OperationType.COMPACTION));
+ Assert.assertEquals(1, txn.originals().size());
+ Assert.assertTrue(txn.originals().contains(readers.get(0)));
+ }
+ try (LifecycleTransaction txn = tracker.tryModify(Collections.<SSTableReader>emptyList(), OperationType.COMPACTION);)
+ {
+ Assert.assertNotNull(txn);
+ Assert.assertEquals(0, txn.originals().size());
+ }
+ }
+
+ @Test
+ public void testApply()
+ {
+ final Tracker tracker = new Tracker(null, false);
+ final View resultView = ViewTest.fakeView(0, 0);
+ final AtomicInteger count = new AtomicInteger();
+ tracker.apply(new Predicate<View>()
+ {
+ public boolean apply(View view)
+ {
+ // confound the CAS by swapping the view, and check we retry
+ if (count.incrementAndGet() < 3)
+ tracker.view.set(ViewTest.fakeView(0, 0));
+ return true;
+ }
+ }, new Function<View, View>()
+ {
+ @Nullable
+ public View apply(View view)
+ {
+ return resultView;
+ }
+ });
+ Assert.assertEquals(3, count.get());
+ Assert.assertEquals(resultView, tracker.getView());
+
+ count.set(0);
+ // check that if the predicate returns false, we stop immediately and return null
+ Assert.assertNull(tracker.apply(new Predicate<View>()
+ {
+ public boolean apply(View view)
+ {
+ count.incrementAndGet();
+ return false;
+ }
+ }, null));
+ Assert.assertEquals(1, count.get());
+ Assert.assertEquals(resultView, tracker.getView());
+ }
+
+ @Test
+ public void testAddInitialSSTables()
+ {
+ ColumnFamilyStore cfs = MockSchema.newCFS();
+ Tracker tracker = new Tracker(cfs, false);
+ List<SSTableReader> readers = ImmutableList.of(MockSchema.sstable(0, 17), MockSchema.sstable(1, 121), MockSchema.sstable(2, 9));
+ tracker.addInitialSSTables(copyOf(readers));
+
+ Assert.assertEquals(3, tracker.view.get().sstables.size());
+
+ for (SSTableReader reader : readers)
+ Assert.assertTrue(reader.isDeleteNotificationSetup());
+
+ Assert.assertEquals(17 + 121 + 9, cfs.metric.liveDiskSpaceUsed.getCount());
+ }
+
+ @Test
+ public void testAddSSTables()
+ {
+ boolean backups = DatabaseDescriptor.isIncrementalBackupsEnabled();
+ DatabaseDescriptor.setIncrementalBackupsEnabled(false);
+ Tracker tracker = new Tracker(MockSchema.cfs, false);
+ MockListener listener = new MockListener(false);
+ tracker.subscribe(listener);
+ List<SSTableReader> readers = ImmutableList.of(MockSchema.sstable(0, 17), MockSchema.sstable(1, 121), MockSchema.sstable(2, 9));
+ tracker.addSSTables(copyOf(readers));
+
+ Assert.assertEquals(3, tracker.view.get().sstables.size());
+
+ for (SSTableReader reader : readers)
+ Assert.assertTrue(reader.isDeleteNotificationSetup());
+
+ Assert.assertEquals(17 + 121 + 9, MockSchema.cfs.metric.liveDiskSpaceUsed.getCount());
+ Assert.assertEquals(3, listener.senders.size());
+ Assert.assertEquals(tracker, listener.senders.get(0));
+ Assert.assertTrue(listener.received.get(0) instanceof SSTableAddedNotification);
+ DatabaseDescriptor.setIncrementalBackupsEnabled(backups);
+ }
+
+ @Test
+ public void testDropSSTables()
+ {
+ testDropSSTables(false);
+ SSTableDeletingTask.waitForDeletions();
+ testDropSSTables(true);
+ SSTableDeletingTask.waitForDeletions();
+ }
+
+ private void testDropSSTables(boolean invalidate)
+ {
+ ColumnFamilyStore cfs = MockSchema.newCFS();
+ Tracker tracker = cfs.getTracker();
+ MockListener listener = new MockListener(false);
+ tracker.subscribe(listener);
+ final List<SSTableReader> readers = ImmutableList.of(MockSchema.sstable(0, 9, true), MockSchema.sstable(1, 15, true), MockSchema.sstable(2, 71, true));
+ tracker.addInitialSSTables(copyOf(readers));
+ try (LifecycleTransaction txn = tracker.tryModify(readers.get(0), OperationType.COMPACTION);)
+ {
+ SSTableDeletingTask.pauseDeletions(true);
+ if (invalidate)
+ cfs.invalidate(false);
+ else
+ tracker.dropSSTables();
+ Assert.assertEquals(95, cfs.metric.totalDiskSpaceUsed.getCount());
+ Assert.assertEquals(9, cfs.metric.liveDiskSpaceUsed.getCount());
+ Assert.assertEquals(1, tracker.getView().sstables.size());
+ SSTableDeletingTask.pauseDeletions(false);
+ }
+ if (!invalidate)
+ {
+ Assert.assertEquals(1, tracker.getView().sstables.size());
+ Assert.assertEquals(readers.get(0), Iterables.getFirst(tracker.getView().sstables, null));
+ Assert.assertEquals(1, readers.get(0).selfRef().globalCount());
+ Assert.assertFalse(readers.get(0).isMarkedCompacted());
+ for (SSTableReader reader : readers.subList(1, 3))
+ {
+ Assert.assertEquals(0, reader.selfRef().globalCount());
+ Assert.assertTrue(reader.isMarkedCompacted());
+ }
+ Assert.assertNull(tracker.dropSSTables(new Predicate<SSTableReader>()
+ {
+ public boolean apply(SSTableReader reader)
+ {
+ return reader != readers.get(0);
+ }
+ }, OperationType.UNKNOWN, null));
+
+ Assert.assertEquals(1, tracker.getView().sstables.size());
+ Assert.assertEquals(1, listener.received.size());
+ Assert.assertEquals(tracker, listener.senders.get(0));
+ Assert.assertEquals(2, ((SSTableListChangedNotification) listener.received.get(0)).removed.size());
+ Assert.assertEquals(0, ((SSTableListChangedNotification) listener.received.get(0)).added.size());
+ Assert.assertEquals(9, cfs.metric.liveDiskSpaceUsed.getCount());
+ readers.get(0).selfRef().release();
+ }
+ else
+ {
+ Assert.assertEquals(0, tracker.getView().sstables.size());
+ Assert.assertEquals(0, cfs.metric.liveDiskSpaceUsed.getCount());
+ for (SSTableReader reader : readers)
+ Assert.assertTrue(reader.isMarkedCompacted());
+ }
+ }
+
+ @Test
+ public void testMemtableReplacement()
+ {
+ boolean backups = DatabaseDescriptor.isIncrementalBackupsEnabled();
+ DatabaseDescriptor.setIncrementalBackupsEnabled(false);
+ ColumnFamilyStore cfs = MockSchema.newCFS();
+ MockListener listener = new MockListener(false);
+ Tracker tracker = cfs.getTracker();
+ tracker.subscribe(listener);
+
+ Memtable prev1 = tracker.switchMemtable(true);
+ OpOrder.Group write1 = cfs.keyspace.writeOrder.getCurrent();
+ OpOrder.Barrier barrier1 = cfs.keyspace.writeOrder.newBarrier();
+ prev1.setDiscarding(barrier1, new AtomicReference<ReplayPosition>());
+ barrier1.issue();
+ Memtable prev2 = tracker.switchMemtable(false);
+ OpOrder.Group write2 = cfs.keyspace.writeOrder.getCurrent();
+ OpOrder.Barrier barrier2 = cfs.keyspace.writeOrder.newBarrier();
+ prev2.setDiscarding(barrier2, new AtomicReference<ReplayPosition>());
+ barrier2.issue();
+ Memtable cur = tracker.getView().getCurrentMemtable();
+ OpOrder.Group writecur = cfs.keyspace.writeOrder.getCurrent();
+ Assert.assertEquals(prev1, tracker.getMemtableFor(write1, ReplayPosition.NONE));
+ Assert.assertEquals(prev2, tracker.getMemtableFor(write2, ReplayPosition.NONE));
+ Assert.assertEquals(cur, tracker.getMemtableFor(writecur, ReplayPosition.NONE));
+ Assert.assertEquals(1, listener.received.size());
+ Assert.assertTrue(listener.received.get(0) instanceof MemtableRenewedNotification);
+ listener.received.clear();
+
+ tracker.markFlushing(prev2);
+ Assert.assertEquals(1, tracker.getView().flushingMemtables.size());
+ Assert.assertTrue(tracker.getView().flushingMemtables.contains(prev2));
+
+ tracker.markFlushing(prev1);
+ Assert.assertTrue(tracker.getView().flushingMemtables.contains(prev1));
+ Assert.assertEquals(2, tracker.getView().flushingMemtables.size());
+
+ tracker.replaceFlushed(prev1, null);
+ Assert.assertEquals(1, tracker.getView().flushingMemtables.size());
+ Assert.assertTrue(tracker.getView().flushingMemtables.contains(prev2));
+
+ SSTableReader reader = MockSchema.sstable(0, 10, false, cfs);
+ tracker.replaceFlushed(prev2, reader);
+ Assert.assertEquals(1, tracker.getView().sstables.size());
+ Assert.assertEquals(1, listener.received.size());
+ Assert.assertEquals(reader, ((SSTableAddedNotification) listener.received.get(0)).added);
+ listener.received.clear();
+ Assert.assertTrue(reader.isDeleteNotificationSetup());
+ Assert.assertEquals(10, cfs.metric.liveDiskSpaceUsed.getCount());
+
+ // test invalidated CFS
+ cfs = MockSchema.newCFS();
+ tracker = cfs.getTracker();
+ listener = new MockListener(false);
+ tracker.subscribe(listener);
+ prev1 = tracker.switchMemtable(false);
+ tracker.markFlushing(prev1);
+ reader = MockSchema.sstable(0, 10, true, cfs);
+ cfs.invalidate(false);
+ tracker.replaceFlushed(prev1, reader);
+ Assert.assertEquals(0, tracker.getView().sstables.size());
+ Assert.assertEquals(0, tracker.getView().flushingMemtables.size());
+ Assert.assertEquals(0, cfs.metric.liveDiskSpaceUsed.getCount());
+ Assert.assertEquals(reader, ((SSTableAddedNotification) listener.received.get(0)).added);
+ Assert.assertEquals(1, ((SSTableListChangedNotification) listener.received.get(1)).removed.size());
+ DatabaseDescriptor.setIncrementalBackupsEnabled(backups);
+ }
+
+ @Test
+ public void testNotifications()
+ {
+ SSTableReader r1 = MockSchema.sstable(0), r2 = MockSchema.sstable(1);
+ Tracker tracker = new Tracker(null, false);
+ MockListener listener = new MockListener(false);
+ tracker.subscribe(listener);
+ tracker.notifyAdded(r1);
+ Assert.assertEquals(r1, ((SSTableAddedNotification) listener.received.get(0)).added);
+ listener.received.clear();
+ tracker.notifyDeleting(r1);
+ Assert.assertEquals(r1, ((SSTableDeletingNotification) listener.received.get(0)).deleting);
+ listener.received.clear();
+ Assert.assertNull(tracker.notifySSTablesChanged(singleton(r1), singleton(r2), OperationType.COMPACTION, null));
+ Assert.assertEquals(singleton(r1), ((SSTableListChangedNotification) listener.received.get(0)).removed);
+ Assert.assertEquals(singleton(r2), ((SSTableListChangedNotification) listener.received.get(0)).added);
+ listener.received.clear();
+ tracker.notifySSTableRepairedStatusChanged(singleton(r1));
+ Assert.assertEquals(singleton(r1), ((SSTableRepairStatusChanged) listener.received.get(0)).sstable);
+ listener.received.clear();
+ Memtable memtable = MockSchema.memtable();
+ tracker.notifyRenewed(memtable);
+ Assert.assertEquals(memtable, ((MemtableRenewedNotification) listener.received.get(0)).renewed);
+ listener.received.clear();
+ tracker.unsubscribe(listener);
+ MockListener failListener = new MockListener(true);
+ tracker.subscribe(failListener);
+ tracker.subscribe(listener);
+ Assert.assertNotNull(tracker.notifyAdded(r1, null));
+ Assert.assertEquals(r1, ((SSTableAddedNotification) listener.received.get(0)).added);
+ listener.received.clear();
+ Assert.assertNotNull(tracker.notifySSTablesChanged(singleton(r1), singleton(r2), OperationType.COMPACTION, null));
+ Assert.assertEquals(singleton(r1), ((SSTableListChangedNotification) listener.received.get(0)).removed);
+ Assert.assertEquals(singleton(r2), ((SSTableListChangedNotification) listener.received.get(0)).added);
+ listener.received.clear();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e5a76bdb/test/unit/org/apache/cassandra/db/lifecycle/ViewTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/lifecycle/ViewTest.java b/test/unit/org/apache/cassandra/db/lifecycle/ViewTest.java
new file mode 100644
index 0000000..811e025
--- /dev/null
+++ b/test/unit/org/apache/cassandra/db/lifecycle/ViewTest.java
@@ -0,0 +1,202 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+package org.apache.cassandra.db.lifecycle;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import com.google.common.base.Function;
+import com.google.common.base.Predicates;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import org.junit.Test;
+
+import junit.framework.Assert;
+import org.apache.cassandra.MockSchema;
+import org.apache.cassandra.db.Memtable;
+import org.apache.cassandra.db.RowPosition;
+import org.apache.cassandra.dht.AbstractBounds;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+
+import static com.google.common.collect.ImmutableSet.copyOf;
+import static com.google.common.collect.ImmutableSet.of;
+import static com.google.common.collect.Iterables.concat;
+import static org.apache.cassandra.db.lifecycle.Helpers.emptySet;
+
+public class ViewTest
+{
+
+ @Test
+ public void testSSTablesInBounds()
+ {
+ View initialView = fakeView(0, 5);
+ for (int i = 0 ; i < 5 ; i++)
+ {
+ for (int j = i ; j < 5 ; j++)
+ {
+ RowPosition min = MockSchema.readerBounds(i);
+ RowPosition max = MockSchema.readerBounds(j);
+ for (boolean minInc : new boolean[] { true, false} )
+ {
+ for (boolean maxInc : new boolean[] { true, false} )
+ {
+ if (i == j && !(minInc && maxInc))
+ continue;
+ List<SSTableReader> r = initialView.sstablesInBounds(AbstractBounds.bounds(min, minInc, max, maxInc));
+ Assert.assertEquals(String.format("%d(%s) %d(%s)", i, minInc, j, maxInc), j - i + (minInc ? 0 : -1) + (maxInc ? 1 : 0), r.size());
+ }
+ }
+ }
+ }
+ }
+
+ @Test
+ public void testCompaction()
+ {
+ View initialView = fakeView(0, 5);
+ View cur = initialView;
+ List<SSTableReader> readers = ImmutableList.copyOf(initialView.sstables);
+ Assert.assertTrue(View.permitCompacting(readers).apply(cur));
+ // check we permit compacting duplicates in the predicate, so we don't spin infinitely if there is a screw up
+ Assert.assertTrue(View.permitCompacting(ImmutableList.copyOf(concat(readers, readers))).apply(cur));
+ // check we fail in the application in the presence of duplicates
+ testFailure(View.updateCompacting(emptySet(), concat(readers.subList(0, 1), readers.subList(0, 1))), cur);
+
+ // do lots of trivial checks that the compacting set and related methods behave properly for a simple update
+ cur = View.updateCompacting(emptySet(), readers.subList(0, 2)).apply(cur);
+ Assert.assertTrue(View.permitCompacting(readers.subList(2, 5)).apply(cur));
+ Assert.assertFalse(View.permitCompacting(readers.subList(0, 2)).apply(cur));
+ Assert.assertFalse(View.permitCompacting(readers.subList(0, 1)).apply(cur));
+ Assert.assertFalse(View.permitCompacting(readers.subList(1, 2)).apply(cur));
+ Assert.assertTrue(readers.subList(2, 5).containsAll(copyOf(cur.getUncompacting(readers))));
+ Assert.assertEquals(3, copyOf(cur.getUncompacting(readers)).size());
+ Assert.assertTrue(cur.nonCompactingSStables().containsAll(readers.subList(2, 5)));
+ Assert.assertEquals(3, cur.nonCompactingSStables().size());
+
+ // check marking already compacting readers fails with an exception
+ testFailure(View.updateCompacting(emptySet(), readers.subList(0, 1)), cur);
+ testFailure(View.updateCompacting(emptySet(), readers.subList(1, 2)), cur);
+ testFailure(View.updateCompacting(copyOf(readers.subList(0, 1)), readers.subList(1, 2)), cur);
+
+ // make equivalents of readers.subList(0, 3) that are different instances
+ SSTableReader r0 = MockSchema.sstable(0), r1 = MockSchema.sstable(1), r2 = MockSchema.sstable(2);
+ // attempt to mark compacting a version not in the live set
+ testFailure(View.updateCompacting(emptySet(), of(r2)), cur);
+ // update one compacting, one non-compacting, of the liveset to another instance of the same readers;
+ // confirm liveset changes but compacting does not
+ cur = View.updateLiveSet(copyOf(readers.subList(1, 3)), of(r1, r2)).apply(cur);
+ Assert.assertSame(readers.get(0), cur.sstablesMap.get(r0));
+ Assert.assertSame(r1, cur.sstablesMap.get(r1));
+ Assert.assertSame(r2, cur.sstablesMap.get(r2));
+ testFailure(View.updateCompacting(emptySet(), readers.subList(2, 3)), cur);
+ Assert.assertSame(readers.get(1), Iterables.getFirst(Iterables.filter(cur.compacting, Predicates.equalTo(r1)), null));
+
+ // unmark compacting, and check our methods are all correctly updated
+ cur = View.updateCompacting(copyOf(readers.subList(0, 1)), emptySet()).apply(cur);
+ Assert.assertTrue(View.permitCompacting(concat(readers.subList(0, 1), of(r2), readers.subList(3, 5))).apply(cur));
+ Assert.assertFalse(View.permitCompacting(readers.subList(1, 2)).apply(cur));
+ testFailure(View.updateCompacting(emptySet(), readers.subList(1, 2)), cur);
+ testFailure(View.updateCompacting(copyOf(readers.subList(0, 2)), emptySet()), cur);
+ Assert.assertTrue(copyOf(concat(readers.subList(0, 1), readers.subList(2, 5))).containsAll(copyOf(cur.getUncompacting(readers))));
+ Assert.assertEquals(4, copyOf(cur.getUncompacting(readers)).size());
+ Assert.assertTrue(cur.nonCompactingSStables().containsAll(readers.subList(2, 5)));
+ Assert.assertTrue(cur.nonCompactingSStables().containsAll(readers.subList(0, 1)));
+ Assert.assertEquals(4, cur.nonCompactingSStables().size());
+ }
+
+ private static void testFailure(Function<View, ?> function, View view)
+ {
+ boolean failed = true;
+ try
+ {
+ function.apply(view);
+ failed = false;
+ }
+ catch (Throwable t)
+ {
+ }
+ Assert.assertTrue(failed);
+ }
+
+ @Test
+ public void testFlushing()
+ {
+ View initialView = fakeView(1, 0);
+ View cur = initialView;
+ Memtable memtable1 = initialView.getCurrentMemtable();
+ Memtable memtable2 = MockSchema.memtable();
+
+ cur = View.switchMemtable(memtable2).apply(cur);
+ Assert.assertEquals(2, cur.liveMemtables.size());
+ Assert.assertEquals(memtable1, cur.liveMemtables.get(0));
+ Assert.assertEquals(memtable2, cur.getCurrentMemtable());
+
+ Memtable memtable3 = MockSchema.memtable();
+ cur = View.switchMemtable(memtable3).apply(cur);
+ Assert.assertEquals(3, cur.liveMemtables.size());
+ Assert.assertEquals(0, cur.flushingMemtables.size());
+ Assert.assertEquals(memtable1, cur.liveMemtables.get(0));
+ Assert.assertEquals(memtable2, cur.liveMemtables.get(1));
+ Assert.assertEquals(memtable3, cur.getCurrentMemtable());
+
+ testFailure(View.replaceFlushed(memtable2, null), cur);
+
+ cur = View.markFlushing(memtable2).apply(cur);
+ Assert.assertTrue(cur.flushingMemtables.contains(memtable2));
+ Assert.assertEquals(2, cur.liveMemtables.size());
+ Assert.assertEquals(1, cur.flushingMemtables.size());
+ Assert.assertEquals(memtable2, cur.flushingMemtables.get(0));
+ Assert.assertEquals(memtable1, cur.liveMemtables.get(0));
+ Assert.assertEquals(memtable3, cur.getCurrentMemtable());
+
+ cur = View.markFlushing(memtable1).apply(cur);
+ Assert.assertEquals(1, cur.liveMemtables.size());
+ Assert.assertEquals(2, cur.flushingMemtables.size());
+ Assert.assertEquals(memtable1, cur.flushingMemtables.get(0));
+ Assert.assertEquals(memtable2, cur.flushingMemtables.get(1));
+ Assert.assertEquals(memtable3, cur.getCurrentMemtable());
+
+ cur = View.replaceFlushed(memtable2, null).apply(cur);
+ Assert.assertEquals(1, cur.liveMemtables.size());
+ Assert.assertEquals(1, cur.flushingMemtables.size());
+ Assert.assertEquals(memtable1, cur.flushingMemtables.get(0));
+ Assert.assertEquals(memtable3, cur.getCurrentMemtable());
+
+ SSTableReader sstable = MockSchema.sstable(1);
+ cur = View.replaceFlushed(memtable1, sstable).apply(cur);
+ Assert.assertEquals(0, cur.flushingMemtables.size());
+ Assert.assertEquals(1, cur.liveMemtables.size());
+ Assert.assertEquals(memtable3, cur.getCurrentMemtable());
+ Assert.assertEquals(1, cur.sstables.size());
+ Assert.assertEquals(sstable, cur.sstablesMap.get(sstable));
+ }
+
+ static View fakeView(int memtableCount, int sstableCount)
+ {
+ List<Memtable> memtables = new ArrayList<>();
+ List<SSTableReader> sstables = new ArrayList<>();
+ for (int i = 0 ; i < memtableCount ; i++)
+ memtables.add(MockSchema.memtable());
+ for (int i = 0 ; i < sstableCount ; i++)
+ sstables.add(MockSchema.sstable(i));
+ return new View(ImmutableList.copyOf(memtables), Collections.<Memtable>emptyList(), Helpers.identityMap(sstables),
+ Collections.<SSTableReader>emptySet(), SSTableIntervalTree.build(sstables));
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e5a76bdb/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java b/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java
index eac6094..518d80e 100644
--- a/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java
@@ -41,13 +41,15 @@ import org.apache.cassandra.cache.CachingOptions;
import org.apache.cassandra.config.KSMetaData;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.compaction.CompactionManager;
+import org.apache.cassandra.db.compaction.OperationType;
import org.apache.cassandra.db.filter.QueryFilter;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.locator.SimpleStrategy;
+import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
import org.apache.cassandra.metrics.RestorableMeter;
-import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.concurrent.OpOrder;
+import static com.google.common.collect.ImmutableMap.of;
+import static java.util.Arrays.asList;
import static org.apache.cassandra.io.sstable.Downsampling.BASE_SAMPLING_LEVEL;
import static org.apache.cassandra.io.sstable.IndexSummaryManager.DOWNSAMPLE_THESHOLD;
import static org.apache.cassandra.io.sstable.IndexSummaryManager.UPSAMPLE_THRESHOLD;
@@ -121,12 +123,15 @@ public class IndexSummaryManagerTest
return total;
}
- private static List<SSTableReader> resetSummaries(List<SSTableReader> sstables, long originalOffHeapSize) throws IOException
+ private static List<SSTableReader> resetSummaries(ColumnFamilyStore cfs, List<SSTableReader> sstables, long originalOffHeapSize) throws IOException
{
for (SSTableReader sstable : sstables)
sstable.overrideReadMeter(new RestorableMeter(100.0, 100.0));
- sstables = redistributeSummaries(Collections.EMPTY_LIST, sstables, originalOffHeapSize * sstables.size());
+ try (LifecycleTransaction txn = cfs.getTracker().tryModify(sstables, OperationType.UNKNOWN))
+ {
+ sstables = redistributeSummaries(Collections.EMPTY_LIST, of(cfs.metadata.cfId, txn), originalOffHeapSize * sstables.size());
+ }
for (SSTableReader sstable : sstables)
assertEquals(BASE_SAMPLING_LEVEL, sstable.getIndexSummarySamplingLevel());
@@ -230,7 +235,11 @@ public class IndexSummaryManagerTest
cfs.metadata.minIndexInterval(originalMinIndexInterval / 2);
SSTableReader sstable = cfs.getSSTables().iterator().next();
long summarySpace = sstable.getIndexSummaryOffHeapSize();
- IndexSummaryManager.redistributeSummaries(Collections.EMPTY_LIST, Arrays.asList(sstable), summarySpace);
+ try (LifecycleTransaction txn = cfs.getTracker().tryModify(asList(sstable), OperationType.UNKNOWN))
+ {
+ redistributeSummaries(Collections.EMPTY_LIST, of(cfs.metadata.cfId, txn), summarySpace);
+ }
+
sstable = cfs.getSSTables().iterator().next();
assertEquals(originalMinIndexInterval, sstable.getEffectiveIndexInterval(), 0.001);
assertEquals(numRows / originalMinIndexInterval, sstable.getIndexSummarySize());
@@ -238,7 +247,10 @@ public class IndexSummaryManagerTest
// keep the min_index_interval the same, but now give the summary enough space to grow by 50%
double previousInterval = sstable.getEffectiveIndexInterval();
int previousSize = sstable.getIndexSummarySize();
- IndexSummaryManager.redistributeSummaries(Collections.EMPTY_LIST, Arrays.asList(sstable), (long) Math.ceil(summarySpace * 1.5));
+ try (LifecycleTransaction txn = cfs.getTracker().tryModify(asList(sstable), OperationType.UNKNOWN))
+ {
+ redistributeSummaries(Collections.EMPTY_LIST, of(cfs.metadata.cfId, txn), (long) Math.ceil(summarySpace * 1.5));
+ }
sstable = cfs.getSSTables().iterator().next();
assertEquals(previousSize * 1.5, (double) sstable.getIndexSummarySize(), 1);
assertEquals(previousInterval * (1.0 / 1.5), sstable.getEffectiveIndexInterval(), 0.001);
@@ -246,7 +258,10 @@ public class IndexSummaryManagerTest
// return min_index_interval to it's original value (double it), but only give the summary enough space
// to have an effective index interval of twice the new min
cfs.metadata.minIndexInterval(originalMinIndexInterval);
- IndexSummaryManager.redistributeSummaries(Collections.EMPTY_LIST, Arrays.asList(sstable), (long) Math.ceil(summarySpace / 2.0));
+ try (LifecycleTransaction txn = cfs.getTracker().tryModify(asList(sstable), OperationType.UNKNOWN))
+ {
+ redistributeSummaries(Collections.EMPTY_LIST, of(cfs.metadata.cfId, txn), (long) Math.ceil(summarySpace / 2.0));
+ }
sstable = cfs.getSSTables().iterator().next();
assertEquals(originalMinIndexInterval * 2, sstable.getEffectiveIndexInterval(), 0.001);
assertEquals(numRows / (originalMinIndexInterval * 2), sstable.getIndexSummarySize());
@@ -256,7 +271,10 @@ public class IndexSummaryManagerTest
// result in an effective interval above the new max)
cfs.metadata.minIndexInterval(originalMinIndexInterval * 4);
cfs.metadata.maxIndexInterval(originalMinIndexInterval * 4);
- IndexSummaryManager.redistributeSummaries(Collections.EMPTY_LIST, Arrays.asList(sstable), 10);
+ try (LifecycleTransaction txn = cfs.getTracker().tryModify(asList(sstable), OperationType.UNKNOWN))
+ {
+ redistributeSummaries(Collections.EMPTY_LIST, of(cfs.metadata.cfId, txn), 10);
+ }
sstable = cfs.getSSTables().iterator().next();
assertEquals(cfs.metadata.getMinIndexInterval(), sstable.getEffectiveIndexInterval(), 0.001);
}
@@ -276,14 +294,20 @@ public class IndexSummaryManagerTest
for (SSTableReader sstable : sstables)
sstable.overrideReadMeter(new RestorableMeter(100.0, 100.0));
- IndexSummaryManager.redistributeSummaries(Collections.EMPTY_LIST, sstables, 1);
+ try (LifecycleTransaction txn = cfs.getTracker().tryModify(sstables, OperationType.UNKNOWN))
+ {
+ redistributeSummaries(Collections.EMPTY_LIST, of(cfs.metadata.cfId, txn), 10);
+ }
sstables = new ArrayList<>(cfs.getSSTables());
for (SSTableReader sstable : sstables)
assertEquals(cfs.metadata.getMaxIndexInterval(), sstable.getEffectiveIndexInterval(), 0.01);
// halve the max_index_interval
cfs.metadata.maxIndexInterval(cfs.metadata.getMaxIndexInterval() / 2);
- IndexSummaryManager.redistributeSummaries(Collections.EMPTY_LIST, sstables, 1);
+ try (LifecycleTransaction txn = cfs.getTracker().tryModify(sstables, OperationType.UNKNOWN))
+ {
+ redistributeSummaries(Collections.EMPTY_LIST, of(cfs.metadata.cfId, txn), 1);
+ }
sstables = new ArrayList<>(cfs.getSSTables());
for (SSTableReader sstable : sstables)
{
@@ -293,7 +317,10 @@ public class IndexSummaryManagerTest
// return max_index_interval to its original value
cfs.metadata.maxIndexInterval(cfs.metadata.getMaxIndexInterval() * 2);
- IndexSummaryManager.redistributeSummaries(Collections.EMPTY_LIST, sstables, 1);
+ try (LifecycleTransaction txn = cfs.getTracker().tryModify(sstables, OperationType.UNKNOWN))
+ {
+ redistributeSummaries(Collections.EMPTY_LIST, of(cfs.metadata.cfId, txn), 1);
+ }
for (SSTableReader sstable : cfs.getSSTables())
{
assertEquals(cfs.metadata.getMaxIndexInterval(), sstable.getEffectiveIndexInterval(), 0.01);
@@ -321,7 +348,10 @@ public class IndexSummaryManagerTest
long singleSummaryOffHeapSpace = sstables.get(0).getIndexSummaryOffHeapSize();
// there should be enough space to not downsample anything
- sstables = redistributeSummaries(Collections.EMPTY_LIST, sstables, (singleSummaryOffHeapSpace * numSSTables));
+ try (LifecycleTransaction txn = cfs.getTracker().tryModify(sstables, OperationType.UNKNOWN))
+ {
+ sstables = redistributeSummaries(Collections.EMPTY_LIST, of(cfs.metadata.cfId, txn), (singleSummaryOffHeapSpace * numSSTables));
+ }
for (SSTableReader sstable : sstables)
assertEquals(BASE_SAMPLING_LEVEL, sstable.getIndexSummarySamplingLevel());
assertEquals(singleSummaryOffHeapSpace * numSSTables, totalOffHeapSize(sstables));
@@ -329,26 +359,38 @@ public class IndexSummaryManagerTest
// everything should get cut in half
assert sstables.size() == 4;
- sstables = redistributeSummaries(Collections.EMPTY_LIST, sstables, (singleSummaryOffHeapSpace * (numSSTables / 2)));
+ try (LifecycleTransaction txn = cfs.getTracker().tryModify(sstables, OperationType.UNKNOWN))
+ {
+ sstables = redistributeSummaries(Collections.EMPTY_LIST, of(cfs.metadata.cfId, txn), (singleSummaryOffHeapSpace * (numSSTables / 2)));
+ }
for (SSTableReader sstable : sstables)
assertEquals(BASE_SAMPLING_LEVEL / 2, sstable.getIndexSummarySamplingLevel());
validateData(cfs, numRows);
// everything should get cut to a quarter
- sstables = redistributeSummaries(Collections.EMPTY_LIST, sstables, (singleSummaryOffHeapSpace * (numSSTables / 4)));
+ try (LifecycleTransaction txn = cfs.getTracker().tryModify(sstables, OperationType.UNKNOWN))
+ {
+ sstables = redistributeSummaries(Collections.EMPTY_LIST, of(cfs.metadata.cfId, txn), (singleSummaryOffHeapSpace * (numSSTables / 4)));
+ }
for (SSTableReader sstable : sstables)
assertEquals(BASE_SAMPLING_LEVEL / 4, sstable.getIndexSummarySamplingLevel());
validateData(cfs, numRows);
// upsample back up to half
- sstables = redistributeSummaries(Collections.EMPTY_LIST, sstables,(singleSummaryOffHeapSpace * (numSSTables / 2) + 4));
+ try (LifecycleTransaction txn = cfs.getTracker().tryModify(sstables, OperationType.UNKNOWN))
+ {
+ sstables = redistributeSummaries(Collections.EMPTY_LIST, of(cfs.metadata.cfId, txn), (singleSummaryOffHeapSpace * (numSSTables / 2) + 4));
+ }
assert sstables.size() == 4;
for (SSTableReader sstable : sstables)
assertEquals(BASE_SAMPLING_LEVEL / 2, sstable.getIndexSummarySamplingLevel());
validateData(cfs, numRows);
// upsample back up to the original index summary
- sstables = redistributeSummaries(Collections.EMPTY_LIST, sstables, (singleSummaryOffHeapSpace * numSSTables));
+ try (LifecycleTransaction txn = cfs.getTracker().tryModify(sstables, OperationType.UNKNOWN))
+ {
+ sstables = redistributeSummaries(Collections.EMPTY_LIST, of(cfs.metadata.cfId, txn), (singleSummaryOffHeapSpace * numSSTables));
+ }
for (SSTableReader sstable : sstables)
assertEquals(BASE_SAMPLING_LEVEL, sstable.getIndexSummarySamplingLevel());
validateData(cfs, numRows);
@@ -357,7 +399,10 @@ public class IndexSummaryManagerTest
// so the two cold sstables should get downsampled to be half of their original size
sstables.get(0).overrideReadMeter(new RestorableMeter(50.0, 50.0));
sstables.get(1).overrideReadMeter(new RestorableMeter(50.0, 50.0));
- sstables = redistributeSummaries(Collections.EMPTY_LIST, sstables, (singleSummaryOffHeapSpace * 3));
+ try (LifecycleTransaction txn = cfs.getTracker().tryModify(sstables, OperationType.UNKNOWN))
+ {
+ sstables = redistributeSummaries(Collections.EMPTY_LIST, of(cfs.metadata.cfId, txn), (singleSummaryOffHeapSpace * 3));
+ }
Collections.sort(sstables, hotnessComparator);
assertEquals(BASE_SAMPLING_LEVEL / 2, sstables.get(0).getIndexSummarySamplingLevel());
assertEquals(BASE_SAMPLING_LEVEL / 2, sstables.get(1).getIndexSummarySamplingLevel());
@@ -370,7 +415,10 @@ public class IndexSummaryManagerTest
double higherRate = 50.0 * (UPSAMPLE_THRESHOLD - (UPSAMPLE_THRESHOLD * 0.10));
sstables.get(0).overrideReadMeter(new RestorableMeter(lowerRate, lowerRate));
sstables.get(1).overrideReadMeter(new RestorableMeter(higherRate, higherRate));
- sstables = redistributeSummaries(Collections.EMPTY_LIST, sstables, (singleSummaryOffHeapSpace * 3));
+ try (LifecycleTransaction txn = cfs.getTracker().tryModify(sstables, OperationType.UNKNOWN))
+ {
+ sstables = redistributeSummaries(Collections.EMPTY_LIST, of(cfs.metadata.cfId, txn), (singleSummaryOffHeapSpace * 3));
+ }
Collections.sort(sstables, hotnessComparator);
assertEquals(BASE_SAMPLING_LEVEL / 2, sstables.get(0).getIndexSummarySamplingLevel());
assertEquals(BASE_SAMPLING_LEVEL / 2, sstables.get(1).getIndexSummarySamplingLevel());
@@ -379,13 +427,16 @@ public class IndexSummaryManagerTest
validateData(cfs, numRows);
// reset, and then this time, leave enough space for one of the cold sstables to not get downsampled
- sstables = resetSummaries(sstables, singleSummaryOffHeapSpace);
+ sstables = resetSummaries(cfs, sstables, singleSummaryOffHeapSpace);
sstables.get(0).overrideReadMeter(new RestorableMeter(1.0, 1.0));
sstables.get(1).overrideReadMeter(new RestorableMeter(2.0, 2.0));
sstables.get(2).overrideReadMeter(new RestorableMeter(1000.0, 1000.0));
sstables.get(3).overrideReadMeter(new RestorableMeter(1000.0, 1000.0));
- sstables = redistributeSummaries(Collections.EMPTY_LIST, sstables, (singleSummaryOffHeapSpace * 3) + 50);
+ try (LifecycleTransaction txn = cfs.getTracker().tryModify(sstables, OperationType.UNKNOWN))
+ {
+ sstables = redistributeSummaries(Collections.EMPTY_LIST, of(cfs.metadata.cfId, txn), (singleSummaryOffHeapSpace * 3) + 50);
+ }
Collections.sort(sstables, hotnessComparator);
if (sstables.get(0).getIndexSummarySamplingLevel() == minSamplingLevel)
@@ -406,7 +457,10 @@ public class IndexSummaryManagerTest
sstables.get(1).overrideReadMeter(new RestorableMeter(0.0, 0.0));
sstables.get(2).overrideReadMeter(new RestorableMeter(92, 92));
sstables.get(3).overrideReadMeter(new RestorableMeter(128.0, 128.0));
- sstables = redistributeSummaries(Collections.EMPTY_LIST, sstables, (long) (singleSummaryOffHeapSpace + (singleSummaryOffHeapSpace * (92.0 / BASE_SAMPLING_LEVEL))));
+ try (LifecycleTransaction txn = cfs.getTracker().tryModify(sstables, OperationType.UNKNOWN))
+ {
+ sstables = redistributeSummaries(Collections.EMPTY_LIST, of(cfs.metadata.cfId, txn), (long) (singleSummaryOffHeapSpace + (singleSummaryOffHeapSpace * (92.0 / BASE_SAMPLING_LEVEL))));
+ }
Collections.sort(sstables, hotnessComparator);
assertEquals(1, sstables.get(0).getIndexSummarySize()); // at the min sampling level
assertEquals(1, sstables.get(0).getIndexSummarySize()); // at the min sampling level
@@ -416,7 +470,10 @@ public class IndexSummaryManagerTest
validateData(cfs, numRows);
// Don't leave enough space for even the minimal index summaries
- sstables = redistributeSummaries(Collections.EMPTY_LIST, sstables, 10);
+ try (LifecycleTransaction txn = cfs.getTracker().tryModify(sstables, OperationType.UNKNOWN))
+ {
+ sstables = redistributeSummaries(Collections.EMPTY_LIST, of(cfs.metadata.cfId, txn), 10);
+ }
for (SSTableReader sstable : sstables)
assertEquals(1, sstable.getIndexSummarySize()); // at the min sampling level
validateData(cfs, numRows);
@@ -449,19 +506,19 @@ public class IndexSummaryManagerTest
SSTableReader original = sstables.get(0);
SSTableReader sstable = original;
- for (int samplingLevel = 1; samplingLevel < BASE_SAMPLING_LEVEL; samplingLevel++)
+ try (LifecycleTransaction txn = cfs.getTracker().tryModify(asList(sstable), OperationType.UNKNOWN))
{
- SSTableReader prev = sstable;
- sstable = sstable.cloneWithNewSummarySamplingLevel(cfs, samplingLevel);
- assertEquals(samplingLevel, sstable.getIndexSummarySamplingLevel());
- int expectedSize = (numRows * samplingLevel) / (sstable.metadata.getMinIndexInterval() * BASE_SAMPLING_LEVEL);
- assertEquals(expectedSize, sstable.getIndexSummarySize(), 1);
- if (prev != original)
- prev.selfRef().release();
+ for (int samplingLevel = 1; samplingLevel < BASE_SAMPLING_LEVEL; samplingLevel++)
+ {
+ sstable = sstable.cloneWithNewSummarySamplingLevel(cfs, samplingLevel);
+ assertEquals(samplingLevel, sstable.getIndexSummarySamplingLevel());
+ int expectedSize = (numRows * samplingLevel) / (sstable.metadata.getMinIndexInterval() * BASE_SAMPLING_LEVEL);
+ assertEquals(expectedSize, sstable.getIndexSummarySize(), 1);
+ txn.update(sstable, true);
+ txn.checkpoint();
+ }
+ txn.finish();
}
-
- // don't leave replaced SSTRs around to break other tests
- cfs.getDataTracker().replaceWithNewInstances(Collections.singleton(original), Collections.singleton(sstable));
}
@Test
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e5a76bdb/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java
index 82cb8d5..682d999 100644
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java
@@ -59,7 +59,9 @@ import org.apache.cassandra.db.Row;
import org.apache.cassandra.db.RowPosition;
import org.apache.cassandra.db.columniterator.IdentityQueryFilter;
import org.apache.cassandra.db.compaction.CompactionManager;
+import org.apache.cassandra.db.compaction.OperationType;
import org.apache.cassandra.db.composites.Composites;
+import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
import org.apache.cassandra.dht.LocalPartitioner;
import org.apache.cassandra.dht.LocalPartitioner.LocalToken;
import org.apache.cassandra.dht.Range;
@@ -498,8 +500,13 @@ public class SSTableReaderTest
}));
}
- SSTableReader replacement = sstable.cloneWithNewSummarySamplingLevel(store, 1);
- store.getDataTracker().replaceWithNewInstances(Arrays.asList(sstable), Arrays.asList(replacement));
+ SSTableReader replacement;
+ try (LifecycleTransaction txn = store.getTracker().tryModify(Arrays.asList(sstable), OperationType.UNKNOWN))
+ {
+ replacement = sstable.cloneWithNewSummarySamplingLevel(store, 1);
+ txn.update(replacement, true);
+ txn.finish();
+ }
for (Future future : futures)
future.get();