You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by be...@apache.org on 2015/05/22 10:46:35 UTC
[4/7] cassandra git commit: Extend Transactional API to sstable
lifecycle management
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e5a76bdb/src/java/org/apache/cassandra/db/lifecycle/LifecycleTransaction.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/lifecycle/LifecycleTransaction.java b/src/java/org/apache/cassandra/db/lifecycle/LifecycleTransaction.java
new file mode 100644
index 0000000..acc9747
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/lifecycle/LifecycleTransaction.java
@@ -0,0 +1,511 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.lifecycle;
+
+import java.util.*;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Function;
+import com.google.common.base.Predicate;
+import com.google.common.collect.*;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.db.compaction.OperationType;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.sstable.format.SSTableReader.UniqueIdentifier;
+import org.apache.cassandra.utils.concurrent.Transactional;
+
+import static com.google.common.base.Functions.compose;
+import static com.google.common.base.Predicates.*;
+import static com.google.common.collect.ImmutableSet.copyOf;
+import static com.google.common.collect.Iterables.*;
+import static java.util.Collections.singleton;
+import static org.apache.cassandra.db.lifecycle.Helpers.*;
+import static org.apache.cassandra.db.lifecycle.View.updateCompacting;
+import static org.apache.cassandra.db.lifecycle.View.updateLiveSet;
+import static org.apache.cassandra.utils.Throwables.maybeFail;
+import static org.apache.cassandra.utils.concurrent.Refs.release;
+import static org.apache.cassandra.utils.concurrent.Refs.selfRefs;
+
+public class LifecycleTransaction extends Transactional.AbstractTransactional
+{
+ private static final Logger logger = LoggerFactory.getLogger(LifecycleTransaction.class);
+
+ /**
+ * a class that represents accumulated modifications to the Tracker.
+ * has two instances, one containing modifications that are "staged" (i.e. invisible)
+ * and one containing those "logged" that have been made visible through a call to checkpoint()
+ */
+ private static class State
+ {
+ // readers that are either brand new, update a previous new reader, or update one of the original readers
+ final Set<SSTableReader> update = new HashSet<>();
+ // disjoint from update, represents a subset of originals that is no longer needed
+ final Set<SSTableReader> obsolete = new HashSet<>();
+
+ void log(State staged)
+ {
+ update.removeAll(staged.obsolete);
+ update.removeAll(staged.update);
+ update.addAll(staged.update);
+ obsolete.addAll(staged.obsolete);
+ }
+
+ boolean contains(SSTableReader reader)
+ {
+ return update.contains(reader) || obsolete.contains(reader);
+ }
+
+ boolean isEmpty()
+ {
+ return update.isEmpty() && obsolete.isEmpty();
+ }
+
+ void clear()
+ {
+ update.clear();
+ obsolete.clear();
+ }
+ }
+
+ public final Tracker tracker;
+ private final OperationType operationType;
+ // the original readers this transaction was opened over, and that it guards
+ // (no other transactions may operate over these readers concurrently)
+ private final Set<SSTableReader> originals = new HashSet<>();
+ // the set of readers we've marked as compacting (only updated on creation and in checkpoint())
+ private final Set<SSTableReader> marked = new HashSet<>();
+ // the identity set of readers we've ever encountered; used to ensure we don't accidentally revisit the
+ // same version of a reader. potentially a dangerous property if there are reference counting bugs
+ // as they won't be caught until the transaction's lifespan is over.
+ private final Set<UniqueIdentifier> identities = Collections.newSetFromMap(new IdentityHashMap<UniqueIdentifier, Boolean>());
+
+ // changes that have been made visible
+ private final State logged = new State();
+ // changes that are pending
+ private final State staged = new State();
+
+ /**
+ * construct a Transaction for use in an offline operation
+ */
+ public static LifecycleTransaction offline(OperationType operationType, SSTableReader reader)
+ {
+ return offline(operationType, singleton(reader));
+ }
+
+ /**
+ * construct a Transaction for use in an offline operation
+ */
+ public static LifecycleTransaction offline(OperationType operationType, Iterable<SSTableReader> readers)
+ {
+ // if offline, for simplicity we just use a dummy tracker
+ Tracker dummy = new Tracker(null, false);
+ dummy.addInitialSSTables(readers);
+ dummy.apply(updateCompacting(emptySet(), readers));
+ return new LifecycleTransaction(dummy, operationType, readers);
+ }
+
+ LifecycleTransaction(Tracker tracker, OperationType operationType, Iterable<SSTableReader> readers)
+ {
+ this.tracker = tracker;
+ this.operationType = operationType;
+ for (SSTableReader reader : readers)
+ {
+ originals.add(reader);
+ marked.add(reader);
+ identities.add(reader.instanceId);
+ }
+ }
+
+ public void doPrepare()
+ {
+ // note for future: in anticompaction two different operations use the same Transaction, and both prepareToCommit()
+ // separately: the second prepareToCommit is ignored as a "redundant" transition. since it is only a checkpoint
+ // (and these happen anyway) this is fine but if more logic gets inserted here than is performed in a checkpoint,
+ // it may break this use case, and care is needed
+ checkpoint();
+ }
+
+ /**
+ * point of no return: commit all changes, but leave all readers marked as compacting
+ */
+ public Throwable doCommit(Throwable accumulate)
+ {
+ assert staged.isEmpty() : "must be no actions introduced between prepareToCommit and a commit";
+
+ logger.debug("Committing update:{}, obsolete:{}", staged.update, staged.obsolete);
+
+ // this is now the point of no return; we cannot safely rollback, so we ignore exceptions until we're done
+ // we restore state by obsoleting our obsolete files, releasing our references to them, and updating our size
+ // and notification status for the obsolete and new files
+ accumulate = setupDeleteNotification(logged.update, tracker, accumulate);
+ accumulate = markObsolete(logged.obsolete, accumulate);
+ accumulate = tracker.updateSizeTracking(logged.obsolete, logged.update, accumulate);
+ accumulate = release(selfRefs(logged.obsolete), accumulate);
+ accumulate = tracker.notifySSTablesChanged(originals, logged.update, operationType, accumulate);
+ return accumulate;
+ }
+
+ /**
+ * undo all of the changes made by this transaction, resetting the state to its original form
+ */
+ public Throwable doAbort(Throwable accumulate)
+ {
+ if (logger.isDebugEnabled())
+ logger.debug("Aborting transaction over {}, with ({},{}) logged and ({},{}) staged", originals, logged.update, logged.obsolete, staged.update, staged.obsolete);
+
+ if (logged.isEmpty() && staged.isEmpty())
+ return accumulate;
+
+ // mark obsolete all readers that are not versions of those present in the original set
+ Iterable<SSTableReader> obsolete = filterOut(concatUniq(staged.update, logged.update), originals);
+ logger.debug("Obsoleting {}", obsolete);
+ accumulate = markObsolete(obsolete, accumulate);
+
+ // replace all updated readers with a version restored to its original state
+ accumulate = tracker.apply(updateLiveSet(logged.update, restoreUpdatedOriginals()), accumulate);
+ // setReplaced immediately preceding versions that have not been obsoleted
+ accumulate = setReplaced(logged.update, accumulate);
+ // we have replaced all of logged.update and never made visible staged.update,
+ // and the files we have logged as obsolete we clone fresh versions of, so they are no longer needed either
+ // any _staged_ obsoletes should either be in staged.update already, and dealt with there,
+ // or is still in its original form (so left as is); in either case no extra action is needed
+ accumulate = release(selfRefs(concat(staged.update, logged.update, logged.obsolete)), accumulate);
+ logged.clear();
+ staged.clear();
+ return accumulate;
+ }
+
+ @Override
+ protected Throwable doPostCleanup(Throwable accumulate)
+ {
+ return unmarkCompacting(marked, accumulate);
+ }
+
+ public void permitRedundantTransitions()
+ {
+ super.permitRedundantTransitions();
+ }
+
+ /**
+ * call when a consistent batch of changes is ready to be made atomically visible
+ * these will be exposed in the Tracker atomically, or an exception will be thrown; in this case
+ * the transaction should be rolled back
+ */
+ public void checkpoint()
+ {
+ maybeFail(checkpoint(null));
+ }
+ private Throwable checkpoint(Throwable accumulate)
+ {
+ if (logger.isDebugEnabled())
+ logger.debug("Checkpointing update:{}, obsolete:{}", staged.update, staged.obsolete);
+
+ if (staged.isEmpty())
+ return accumulate;
+
+ Set<SSTableReader> toUpdate = toUpdate();
+ Set<SSTableReader> fresh = copyOf(fresh());
+
+ // check the current versions of the readers we're replacing haven't somehow been replaced by someone else
+ checkNotReplaced(filterIn(toUpdate, staged.update));
+
+ // ensure any new readers are in the compacting set, since we aren't done with them yet
+ // and don't want anyone else messing with them
+ // apply atomically along with updating the live set of readers
+ tracker.apply(compose(updateCompacting(emptySet(), fresh),
+ updateLiveSet(toUpdate, staged.update)));
+
+ // log the staged changes and our newly marked readers
+ marked.addAll(fresh);
+ logged.log(staged);
+
+ // setup our tracker, and mark our prior versions replaced, also releasing our references to them
+ // we do not replace/release obsoleted readers, since we may need to restore them on rollback
+ accumulate = setReplaced(filterOut(toUpdate, staged.obsolete), accumulate);
+ accumulate = release(selfRefs(filterOut(toUpdate, staged.obsolete)), accumulate);
+
+ staged.clear();
+ return accumulate;
+ }
+
+ /**
+ * update a reader: if !original, this is a reader that is being introduced by this transaction;
+ * otherwise it must be in the originals() set, i.e. a reader guarded by this transaction
+ */
+ public void update(SSTableReader reader, boolean original)
+ {
+ assert !staged.update.contains(reader) : "each reader may only be updated once per checkpoint: " + reader;
+ assert !identities.contains(reader.instanceId) : "each reader instance may only be provided as an update once: " + reader;
+ // check it isn't obsolete, and that it matches the original flag
+ assert !(logged.obsolete.contains(reader) || staged.obsolete.contains(reader)) : "may not update a reader that has been obsoleted";
+ assert original == originals.contains(reader) : String.format("the 'original' indicator was incorrect (%s provided): %s", original, reader);
+ staged.update.add(reader);
+ identities.add(reader.instanceId);
+ reader.setupKeyCache();
+ }
+
+ /**
+ * mark this reader as for obsoletion. this does not actually obsolete the reader until commit() is called,
+ * but on checkpoint() the reader will be removed from the live set
+ */
+ public void obsolete(SSTableReader reader)
+ {
+ logger.debug("Staging for obsolescence {}", reader);
+ // check this is: a reader guarded by the transaction, an instance we have already worked with
+ // and that we haven't already obsoleted it, nor do we have other changes staged for it
+ assert identities.contains(reader.instanceId) : "only reader instances that have previously been provided may be obsoleted: " + reader;
+ assert originals.contains(reader) : "only readers in the 'original' set may be obsoleted: " + reader + " vs " + originals;
+ assert !(logged.obsolete.contains(reader) || staged.obsolete.contains(reader)) : "may not obsolete a reader that has already been obsoleted: " + reader;
+ assert !staged.update.contains(reader) : "may not obsolete a reader that has a staged update (must checkpoint first): " + reader;
+ assert current(reader) == reader : "may only obsolete the latest version of the reader: " + reader;
+ staged.obsolete.add(reader);
+ }
+
+ /**
+ * obsolete every file in the original transaction
+ */
+ public void obsoleteOriginals()
+ {
+ logger.debug("Staging for obsolescence {}", originals);
+ // if we're obsoleting, we should have no staged updates for the original files
+ assert Iterables.isEmpty(filterIn(staged.update, originals)) : staged.update;
+
+ // stage obsoletes for any currently visible versions of any original readers
+ Iterables.addAll(staged.obsolete, filterIn(current(), originals));
+ }
+
+ /**
+ * return the readers we're replacing in checkpoint(), i.e. the currently visible version of those in staged
+ */
+ private Set<SSTableReader> toUpdate()
+ {
+ return copyOf(filterIn(current(), staged.obsolete, staged.update));
+ }
+
+ /**
+ * new readers that haven't appeared previously (either in the original set or the logged updates)
+ */
+ private Iterable<SSTableReader> fresh()
+ {
+ return filterOut(staged.update,
+ originals, logged.update);
+ }
+
+ /**
+ * returns the currently visible readers managed by this transaction
+ */
+ public Iterable<SSTableReader> current()
+ {
+ // i.e., those that are updates that have been logged (made visible),
+ // and any original readers that have neither been obsoleted nor updated
+ return concat(logged.update, filterOut(originals, logged.update, logged.obsolete));
+ }
+
+ /**
+ * update the current replacement of any original reader back to its original start
+ */
+ private List<SSTableReader> restoreUpdatedOriginals()
+ {
+ Iterable<SSTableReader> torestore = filterIn(originals, logged.update, logged.obsolete);
+ return ImmutableList.copyOf(transform(torestore,
+ new Function<SSTableReader, SSTableReader>()
+ {
+ public SSTableReader apply(SSTableReader reader)
+ {
+ return current(reader).cloneWithNewStart(reader.first, null);
+ }
+ }));
+ }
+
+ /**
+ * the set of readers guarded by this transaction _in their original instance/state_
+ * call current(SSTableReader) on any reader in this set to get the latest instance
+ */
+ public Set<SSTableReader> originals()
+ {
+ return Collections.unmodifiableSet(originals);
+ }
+
+ /**
+ * indicates if the reader has been marked for obsoletion
+ */
+ public boolean isObsolete(SSTableReader reader)
+ {
+ return logged.obsolete.contains(reader) || staged.obsolete.contains(reader);
+ }
+
+ /**
+ * return the current version of the provided reader, whether or not it is visible or staged;
+ * i.e. returns the first version present by testing staged, logged and originals in order.
+ */
+ public SSTableReader current(SSTableReader reader)
+ {
+ Set<SSTableReader> container;
+ if (staged.contains(reader))
+ container = staged.update.contains(reader) ? staged.update : staged.obsolete;
+ else if (logged.contains(reader))
+ container = logged.update.contains(reader) ? logged.update : logged.obsolete;
+ else if (originals.contains(reader))
+ container = originals;
+ else throw new AssertionError();
+ return select(reader, container);
+ }
+
+ /**
+ * remove the reader from the set we're modifying
+ */
+ public void cancel(SSTableReader cancel)
+ {
+ logger.debug("Cancelling {} from transaction", cancel);
+ assert originals.contains(cancel) : "may only cancel a reader in the 'original' set: " + cancel + " vs " + originals;
+ assert !(staged.contains(cancel) || logged.contains(cancel)) : "may only cancel a reader that has not been updated or obsoleted in this transaction: " + cancel;
+ originals.remove(cancel);
+ marked.remove(cancel);
+ maybeFail(unmarkCompacting(singleton(cancel), null));
+ }
+
+ /**
+ * remove the readers from the set we're modifying
+ */
+ public void cancel(Iterable<SSTableReader> cancels)
+ {
+ for (SSTableReader cancel : cancels)
+ cancel(cancel);
+ }
+
+ /**
+ * remove the provided readers from this Transaction, and return a new Transaction to manage them
+ * only permitted to be called if the current Transaction has never been used
+ */
+ public LifecycleTransaction split(Collection<SSTableReader> readers)
+ {
+ logger.debug("Splitting {} into new transaction", readers);
+ checkUnused();
+ for (SSTableReader reader : readers)
+ assert identities.contains(reader.instanceId) : "may only split the same reader instance the transaction was opened with: " + reader;
+
+ for (SSTableReader reader : readers)
+ {
+ identities.remove(reader.instanceId);
+ originals.remove(reader);
+ marked.remove(reader);
+ }
+ return new LifecycleTransaction(tracker, operationType, readers);
+ }
+
+ /**
+ * check this transaction has never been used
+ */
+ private void checkUnused()
+ {
+ assert logged.isEmpty();
+ assert staged.isEmpty();
+ assert identities.size() == originals.size();
+ assert originals.size() == marked.size();
+ }
+
+ private Throwable unmarkCompacting(Set<SSTableReader> unmark, Throwable accumulate)
+ {
+ accumulate = tracker.apply(updateCompacting(unmark, emptySet()), accumulate);
+ // when the CFS is invalidated, it will call unreferenceSSTables(). However, unreferenceSSTables only deals
+ // with sstables that aren't currently being compacted. If there are ongoing compactions that finish or are
+ // interrupted after the CFS is invalidated, those sstables need to be unreferenced as well, so we do that here.
+ accumulate = tracker.dropSSTablesIfInvalid(accumulate);
+ return accumulate;
+ }
+
+ // convenience method for callers that know only one sstable is involved in the transaction
+ public SSTableReader onlyOne()
+ {
+ assert originals.size() == 1;
+ return getFirst(originals, null);
+ }
+
+ // a class representing the current state of the reader within this transaction, encoding the actions both logged
+ // and pending, and the reader instances that are visible now, and will be after the next checkpoint (with null
+ // indicating either obsolescence, or that the reader does not occur in the transaction; which is defined
+ // by the corresponding Action)
+ @VisibleForTesting
+ public static class ReaderState
+ {
+ public static enum Action
+ {
+ UPDATED, OBSOLETED, NONE;
+ public static Action get(boolean updated, boolean obsoleted)
+ {
+ assert !(updated && obsoleted);
+ return updated ? UPDATED : obsoleted ? OBSOLETED : NONE;
+ }
+ }
+
+ final Action staged;
+ final Action logged;
+ final SSTableReader nextVisible;
+ final SSTableReader currentlyVisible;
+ final boolean original;
+
+ public ReaderState(Action logged, Action staged, SSTableReader currentlyVisible, SSTableReader nextVisible, boolean original)
+ {
+ this.staged = staged;
+ this.logged = logged;
+ this.currentlyVisible = currentlyVisible;
+ this.nextVisible = nextVisible;
+ this.original = original;
+ }
+
+ public boolean equals(Object that)
+ {
+ return that instanceof ReaderState && equals((ReaderState) that);
+ }
+
+ public boolean equals(ReaderState that)
+ {
+ return this.staged == that.staged && this.logged == that.logged && this.original == that.original
+ && this.currentlyVisible == that.currentlyVisible && this.nextVisible == that.nextVisible;
+ }
+
+ public String toString()
+ {
+ return String.format("[logged=%s staged=%s original=%s]", logged, staged, original);
+ }
+
+ public static SSTableReader visible(SSTableReader reader, Predicate<SSTableReader> obsolete, Collection<SSTableReader> ... selectFrom)
+ {
+ return obsolete.apply(reader) ? null : selectFirst(reader, selectFrom);
+ }
+ }
+
+ @VisibleForTesting
+ public ReaderState state(SSTableReader reader)
+ {
+ SSTableReader currentlyVisible = ReaderState.visible(reader, in(logged.obsolete), logged.update, originals);
+ SSTableReader nextVisible = ReaderState.visible(reader, orIn(staged.obsolete, logged.obsolete), staged.update, logged.update, originals);
+ return new ReaderState(ReaderState.Action.get(logged.update.contains(reader), logged.obsolete.contains(reader)),
+ ReaderState.Action.get(staged.update.contains(reader), staged.obsolete.contains(reader)),
+ currentlyVisible, nextVisible, originals.contains(reader)
+ );
+ }
+
+ public String toString()
+ {
+ return originals.toString();
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e5a76bdb/src/java/org/apache/cassandra/db/lifecycle/SSTableIntervalTree.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/lifecycle/SSTableIntervalTree.java b/src/java/org/apache/cassandra/db/lifecycle/SSTableIntervalTree.java
new file mode 100644
index 0000000..ff2abcb
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/lifecycle/SSTableIntervalTree.java
@@ -0,0 +1,40 @@
+package org.apache.cassandra.db.lifecycle;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import com.google.common.collect.Iterables;
+
+import org.apache.cassandra.db.RowPosition;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.utils.Interval;
+import org.apache.cassandra.utils.IntervalTree;
+
+public class SSTableIntervalTree extends IntervalTree<RowPosition, SSTableReader, Interval<RowPosition, SSTableReader>>
+{
+ private static final SSTableIntervalTree EMPTY = new SSTableIntervalTree(null);
+
+ SSTableIntervalTree(Collection<Interval<RowPosition, SSTableReader>> intervals)
+ {
+ super(intervals);
+ }
+
+ public static SSTableIntervalTree empty()
+ {
+ return EMPTY;
+ }
+
+ public static SSTableIntervalTree build(Iterable<SSTableReader> sstables)
+ {
+ return new SSTableIntervalTree(buildIntervals(sstables));
+ }
+
+ public static List<Interval<RowPosition, SSTableReader>> buildIntervals(Iterable<SSTableReader> sstables)
+ {
+ List<Interval<RowPosition, SSTableReader>> intervals = new ArrayList<>(Iterables.size(sstables));
+ for (SSTableReader sstable : sstables)
+ intervals.add(Interval.<RowPosition, SSTableReader>create(sstable.first, sstable.last, sstable));
+ return intervals;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e5a76bdb/src/java/org/apache/cassandra/db/lifecycle/Tracker.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/lifecycle/Tracker.java b/src/java/org/apache/cassandra/db/lifecycle/Tracker.java
new file mode 100644
index 0000000..50f567f
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/lifecycle/Tracker.java
@@ -0,0 +1,468 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.lifecycle;
+
+import java.io.File;
+import java.util.*;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicReference;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Function;
+import com.google.common.base.Predicate;
+import com.google.common.base.Predicates;
+import com.google.common.collect.*;
+
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Directories;
+import org.apache.cassandra.db.Memtable;
+import org.apache.cassandra.db.commitlog.ReplayPosition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.compaction.OperationType;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.metrics.StorageMetrics;
+import org.apache.cassandra.notifications.*;
+import org.apache.cassandra.utils.Pair;
+import org.apache.cassandra.utils.concurrent.OpOrder;
+
+import static com.google.common.base.Predicates.and;
+import static com.google.common.collect.ImmutableSet.copyOf;
+import static com.google.common.collect.Iterables.filter;
+import static java.util.Collections.singleton;
+import static org.apache.cassandra.db.lifecycle.Helpers.*;
+import static org.apache.cassandra.db.lifecycle.View.permitCompacting;
+import static org.apache.cassandra.db.lifecycle.View.updateCompacting;
+import static org.apache.cassandra.db.lifecycle.View.updateLiveSet;
+import static org.apache.cassandra.utils.Throwables.maybeFail;
+import static org.apache.cassandra.utils.Throwables.merge;
+import static org.apache.cassandra.utils.concurrent.Refs.release;
+import static org.apache.cassandra.utils.concurrent.Refs.selfRefs;
+
+public class Tracker
+{
+ private static final Logger logger = LoggerFactory.getLogger(Tracker.class);
+
+ public final Collection<INotificationConsumer> subscribers = new CopyOnWriteArrayList<>();
+ public final ColumnFamilyStore cfstore;
+ final AtomicReference<View> view;
+ public final boolean loadsstables;
+
+ public Tracker(ColumnFamilyStore cfstore, boolean loadsstables)
+ {
+ this.cfstore = cfstore;
+ this.view = new AtomicReference<>();
+ this.loadsstables = loadsstables;
+ this.reset();
+ }
+
+ public LifecycleTransaction tryModify(SSTableReader sstable, OperationType operationType)
+ {
+ return tryModify(singleton(sstable), operationType);
+ }
+
+ /**
+ * @return a Transaction over the provided sstables if we are able to mark the given @param sstables as compacted, before anyone else
+ */
+ public LifecycleTransaction tryModify(Iterable<SSTableReader> sstables, OperationType operationType)
+ {
+ if (Iterables.isEmpty(sstables))
+ return new LifecycleTransaction(this, operationType, sstables);
+ if (null == apply(permitCompacting(sstables), updateCompacting(emptySet(), sstables)))
+ return null;
+ return new LifecycleTransaction(this, operationType, sstables);
+ }
+
+
+ // METHODS FOR ATOMICALLY MODIFYING THE VIEW
+
+ Pair<View, View> apply(Function<View, View> function)
+ {
+ return apply(Predicates.<View>alwaysTrue(), function);
+ }
+
+ Throwable apply(Function<View, View> function, Throwable accumulate)
+ {
+ try
+ {
+ apply(function);
+ }
+ catch (Throwable t)
+ {
+ accumulate = merge(accumulate, t);
+ }
+ return accumulate;
+ }
+
+ /**
+ * atomically tests permit against the view and applies function to it, if permit yields true, returning the original;
+ * otherwise the method aborts, returning null
+ */
+ Pair<View, View> apply(Predicate<View> permit, Function<View, View> function)
+ {
+ while (true)
+ {
+ View cur = view.get();
+ if (!permit.apply(cur))
+ return null;
+ View updated = function.apply(cur);
+ if (view.compareAndSet(cur, updated))
+ return Pair.create(cur, updated);
+ }
+ }
+
+ Throwable updateSizeTracking(Iterable<SSTableReader> oldSSTables, Iterable<SSTableReader> newSSTables, Throwable accumulate)
+ {
+ if (cfstore == null)
+ return accumulate;
+
+ long add = 0;
+ for (SSTableReader sstable : newSSTables)
+ {
+ if (logger.isDebugEnabled())
+ logger.debug("adding {} to list of files tracked for {}.{}", sstable.descriptor, cfstore.keyspace.getName(), cfstore.name);
+ try
+ {
+ add += sstable.bytesOnDisk();
+ }
+ catch (Throwable t)
+ {
+ accumulate = merge(accumulate, t);
+ }
+ }
+ long subtract = 0;
+ for (SSTableReader sstable : oldSSTables)
+ {
+ if (logger.isDebugEnabled())
+ logger.debug("removing {} from list of files tracked for {}.{}", sstable.descriptor, cfstore.keyspace.getName(), cfstore.name);
+ try
+ {
+ subtract += sstable.bytesOnDisk();
+ }
+ catch (Throwable t)
+ {
+ accumulate = merge(accumulate, t);
+ }
+ }
+ StorageMetrics.load.inc(add - subtract);
+ cfstore.metric.liveDiskSpaceUsed.inc(add - subtract);
+ // we don't subtract from total until the sstable is deleted
+ cfstore.metric.totalDiskSpaceUsed.inc(add);
+ return accumulate;
+ }
+
+ // SETUP / CLEANUP
+
+ public void addInitialSSTables(Iterable<SSTableReader> sstables)
+ {
+ maybeFail(setupDeleteNotification(sstables, this, null));
+ apply(updateLiveSet(emptySet(), sstables));
+ maybeFail(updateSizeTracking(emptySet(), sstables, null));
+ // no notifications or backup necessary
+ }
+
+ public void addSSTables(Iterable<SSTableReader> sstables)
+ {
+ addInitialSSTables(sstables);
+ for (SSTableReader sstable : sstables)
+ {
+ maybeIncrementallyBackup(sstable);
+ notifyAdded(sstable);
+ }
+ }
+
+ /** (Re)initializes the tracker, purging all references. */
+ @VisibleForTesting
+ public void reset()
+ {
+ view.set(new View(
+ cfstore != null ? ImmutableList.of(new Memtable(cfstore)) : Collections.<Memtable>emptyList(),
+ ImmutableList.<Memtable>of(),
+ Collections.<SSTableReader, SSTableReader>emptyMap(),
+ Collections.<SSTableReader>emptySet(),
+ SSTableIntervalTree.empty()));
+ }
+
+ public Throwable dropSSTablesIfInvalid(Throwable accumulate)
+ {
+ if (cfstore != null && !cfstore.isValid())
+ accumulate = dropSSTables(accumulate);
+ return accumulate;
+ }
+
+ public void dropSSTables()
+ {
+ maybeFail(dropSSTables(null));
+ }
+
+ public Throwable dropSSTables(Throwable accumulate)
+ {
+ return dropSSTables(Predicates.<SSTableReader>alwaysTrue(), OperationType.UNKNOWN, accumulate);
+ }
+
+ /**
+ * removes all sstables that are not busy compacting.
+ */
+ public Throwable dropSSTables(final Predicate<SSTableReader> remove, OperationType operationType, Throwable accumulate)
+ {
+ Pair<View, View> result = apply(new Function<View, View>()
+ {
+ public View apply(View view)
+ {
+ Set<SSTableReader> toremove = copyOf(filter(view.sstables, and(remove, notIn(view.compacting))));
+ return updateLiveSet(toremove, emptySet()).apply(view);
+ }
+ });
+
+ Set<SSTableReader> removed = Sets.difference(result.left.sstables, result.right.sstables);
+ assert Iterables.all(removed, remove);
+
+ if (!removed.isEmpty())
+ {
+ // notifySSTablesChanged -> LeveledManifest.promote doesn't like a no-op "promotion"
+ accumulate = notifySSTablesChanged(removed, Collections.<SSTableReader>emptySet(), operationType, accumulate);
+ accumulate = updateSizeTracking(removed, emptySet(), accumulate);
+ accumulate = markObsolete(removed, accumulate);
+ accumulate = release(selfRefs(removed), accumulate);
+ }
+ return accumulate;
+ }
+
+ /**
+ * Removes every SSTable in the directory from the Tracker's view.
+ * @param directory the unreadable directory, possibly with SSTables in it, but not necessarily.
+ */
+ public void removeUnreadableSSTables(final File directory)
+ {
+ maybeFail(dropSSTables(new Predicate<SSTableReader>()
+ {
+ public boolean apply(SSTableReader reader)
+ {
+ return reader.descriptor.directory.equals(directory);
+ }
+ }, OperationType.UNKNOWN, null));
+ }
+
+
+
+ // FLUSHING
+
+ /**
+ * get the Memtable that the ordered writeOp should be directed to
+ */
+ public Memtable getMemtableFor(OpOrder.Group opGroup, ReplayPosition replayPosition)
+ {
+ // since any new memtables appended to the list after we fetch it will be for operations started
+ // after us, we can safely assume that we will always find the memtable that 'accepts' us;
+ // if the barrier for any memtable is set whilst we are reading the list, it must accept us.
+
+ // there may be multiple memtables in the list that would 'accept' us, however we only ever choose
+ // the oldest such memtable, as accepts() only prevents us falling behind (i.e. ensures we don't
+ // assign operations to a memtable that was retired/queued before we started)
+ for (Memtable memtable : view.get().liveMemtables)
+ {
+ if (memtable.accepts(opGroup, replayPosition))
+ return memtable;
+ }
+ throw new AssertionError(view.get().liveMemtables.toString());
+ }
+
+ /**
+ * Switch the current memtable. This atomically appends a new memtable to the end of the list of active memtables,
+ * returning the previously last memtable. It leaves the previous Memtable in the list of live memtables until
+ * discarding(memtable) is called. These two methods must be synchronized/paired, i.e. m = switchMemtable
+ * must be followed by discarding(m), they cannot be interleaved.
+ *
+ * @return the previously active memtable
+ */
+ public Memtable switchMemtable(boolean truncating)
+ {
+ Memtable newMemtable = new Memtable(cfstore);
+ Pair<View, View> result = apply(View.switchMemtable(newMemtable));
+ if (truncating)
+ notifyRenewed(newMemtable);
+
+ return result.left.getCurrentMemtable();
+ }
+
+ public void markFlushing(Memtable memtable)
+ {
+ apply(View.markFlushing(memtable));
+ }
+
+ public void replaceFlushed(Memtable memtable, SSTableReader sstable)
+ {
+ if (sstable == null)
+ {
+ // sstable may be null if we flushed batchlog and nothing needed to be retained
+ // if it's null, we don't care what state the cfstore is in, we just replace it and continue
+ apply(View.replaceFlushed(memtable, null));
+ return;
+ }
+
+ sstable.setupDeleteNotification(this);
+ sstable.setupKeyCache();
+ // back up before creating a new Snapshot (which makes the new one eligible for compaction)
+ maybeIncrementallyBackup(sstable);
+
+ apply(View.replaceFlushed(memtable, sstable));
+
+ Throwable fail;
+ fail = updateSizeTracking(emptySet(), singleton(sstable), null);
+ // TODO: if we're invalidated, should we notifyadded AND removed, or just skip both?
+ fail = notifyAdded(sstable, fail);
+
+ if (cfstore != null && !cfstore.isValid())
+ dropSSTables();
+
+ maybeFail(fail);
+ }
+
+
+
+ // MISCELLANEOUS public utility calls
+
+ public Set<SSTableReader> getSSTables()
+ {
+ return view.get().sstables;
+ }
+
+ public Set<SSTableReader> getCompacting()
+ {
+ return view.get().compacting;
+ }
+
+ public Set<SSTableReader> getUncompacting()
+ {
+ return view.get().nonCompactingSStables();
+ }
+
+ public Iterable<SSTableReader> getUncompacting(Iterable<SSTableReader> candidates)
+ {
+ return view.get().getUncompacting(candidates);
+ }
+
+ public void maybeIncrementallyBackup(final SSTableReader sstable)
+ {
+ if (!DatabaseDescriptor.isIncrementalBackupsEnabled())
+ return;
+
+ File backupsDir = Directories.getBackupsDirectory(sstable.descriptor);
+ sstable.createLinks(FileUtils.getCanonicalPath(backupsDir));
+ }
+
+ public void spaceReclaimed(long size)
+ {
+ if (cfstore != null)
+ cfstore.metric.totalDiskSpaceUsed.dec(size);
+ }
+
+
+
+ // NOTIFICATION
+
+ Throwable notifySSTablesChanged(Collection<SSTableReader> removed, Collection<SSTableReader> added, OperationType compactionType, Throwable accumulate)
+ {
+ INotification notification = new SSTableListChangedNotification(added, removed, compactionType);
+ for (INotificationConsumer subscriber : subscribers)
+ {
+ try
+ {
+ subscriber.handleNotification(notification, this);
+ }
+ catch (Throwable t)
+ {
+ accumulate = merge(accumulate, t);
+ }
+ }
+ return accumulate;
+ }
+
+ Throwable notifyAdded(SSTableReader added, Throwable accumulate)
+ {
+ INotification notification = new SSTableAddedNotification(added);
+ for (INotificationConsumer subscriber : subscribers)
+ {
+ try
+ {
+ subscriber.handleNotification(notification, this);
+ }
+ catch (Throwable t)
+ {
+ accumulate = merge(accumulate, t);
+ }
+ }
+ return accumulate;
+ }
+
+ public void notifyAdded(SSTableReader added)
+ {
+ maybeFail(notifyAdded(added, null));
+ }
+
+ public void notifySSTableRepairedStatusChanged(Collection<SSTableReader> repairStatusesChanged)
+ {
+ INotification notification = new SSTableRepairStatusChanged(repairStatusesChanged);
+ for (INotificationConsumer subscriber : subscribers)
+ subscriber.handleNotification(notification, this);
+ }
+
+ public void notifyDeleting(SSTableReader deleting)
+ {
+ INotification notification = new SSTableDeletingNotification(deleting);
+ for (INotificationConsumer subscriber : subscribers)
+ subscriber.handleNotification(notification, this);
+ }
+
+ public void notifyRenewed(Memtable renewed)
+ {
+ INotification notification = new MemtableRenewedNotification(renewed);
+ for (INotificationConsumer subscriber : subscribers)
+ subscriber.handleNotification(notification, this);
+ }
+
+ public void notifyTruncated(long truncatedAt)
+ {
+ INotification notification = new TruncationNotification(truncatedAt);
+ for (INotificationConsumer subscriber : subscribers)
+ subscriber.handleNotification(notification, this);
+ }
+
+ public void subscribe(INotificationConsumer consumer)
+ {
+ subscribers.add(consumer);
+ }
+
+ public void unsubscribe(INotificationConsumer consumer)
+ {
+ subscribers.remove(consumer);
+ }
+
+ private static Set<SSTableReader> emptySet()
+ {
+ return Collections.emptySet();
+ }
+
+ public View getView()
+ {
+ return view.get();
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e5a76bdb/src/java/org/apache/cassandra/db/lifecycle/View.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/lifecycle/View.java b/src/java/org/apache/cassandra/db/lifecycle/View.java
new file mode 100644
index 0000000..0d1100b
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/lifecycle/View.java
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.lifecycle;
+
+import java.util.*;
+
+import com.google.common.base.Function;
+import com.google.common.base.Functions;
+import com.google.common.base.Predicate;
+import com.google.common.collect.*;
+
+import org.apache.cassandra.db.Memtable;
+import org.apache.cassandra.db.RowPosition;
+import org.apache.cassandra.dht.AbstractBounds;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.utils.Interval;
+
+import static com.google.common.base.Predicates.equalTo;
+import static com.google.common.base.Predicates.not;
+import static com.google.common.collect.ImmutableList.copyOf;
+import static com.google.common.collect.ImmutableList.of;
+import static com.google.common.collect.Iterables.all;
+import static com.google.common.collect.Iterables.concat;
+import static com.google.common.collect.Iterables.filter;
+import static java.util.Collections.singleton;
+import static org.apache.cassandra.db.lifecycle.Helpers.emptySet;
+import static org.apache.cassandra.db.lifecycle.Helpers.replace;
+
+/**
+ * An immutable structure holding the current memtable, the memtables pending
+ * flush, the sstables for a column family, and the sstables that are active
+ * in compaction (a subset of the sstables).
+ *
+ * Modifications to instances are all performed via a Function produced by the static methods in this class.
+ * These are composed as necessary and provided to the Tracker.apply() methods, which atomically reject or
+ * accept and apply the changes to the View.
+ *
+ */
+public class View
+{
+ /**
+ * ordinarily a list of size 1, but when preparing to flush will contain both the memtable we will flush
+ * and the new replacement memtable, until all outstanding write operations on the old table complete.
+ * The last item in the list is always the "current" memtable.
+ */
+ public final List<Memtable> liveMemtables;
+ /**
+ * contains all memtables that are no longer referenced for writing and are queued for / in the process of being
+ * flushed. In chronologically ascending order.
+ */
+ public final List<Memtable> flushingMemtables;
+ public final Set<SSTableReader> compacting;
+ public final Set<SSTableReader> sstables;
+ // we use a Map here so that we can easily perform identity checks as well as equality checks.
+ // When marking compacting, we now indicate if we expect the sstables to be present (by default we do),
+ // and we then check that not only are they all present in the live set, but that the exact instance present is
+ // the one we made our decision to compact against.
+ public final Map<SSTableReader, SSTableReader> sstablesMap;
+
+ public final SSTableIntervalTree intervalTree;
+
+ View(List<Memtable> liveMemtables, List<Memtable> flushingMemtables, Map<SSTableReader, SSTableReader> sstables, Set<SSTableReader> compacting, SSTableIntervalTree intervalTree)
+ {
+ assert liveMemtables != null;
+ assert flushingMemtables != null;
+ assert sstables != null;
+ assert compacting != null;
+ assert intervalTree != null;
+
+ this.liveMemtables = liveMemtables;
+ this.flushingMemtables = flushingMemtables;
+
+ this.sstablesMap = sstables;
+ this.sstables = sstablesMap.keySet();
+ this.compacting = compacting;
+ this.intervalTree = intervalTree;
+ }
+
+ public Memtable getCurrentMemtable()
+ {
+ return liveMemtables.get(liveMemtables.size() - 1);
+ }
+
+ /**
+ * @return the active memtable and all the memtables that are pending flush.
+ */
+ public Iterable<Memtable> getAllMemtables()
+ {
+ return concat(flushingMemtables, liveMemtables);
+ }
+
+ public Sets.SetView<SSTableReader> nonCompactingSStables()
+ {
+ return Sets.difference(sstables, compacting);
+ }
+
+ public Iterable<SSTableReader> getUncompacting(Iterable<SSTableReader> candidates)
+ {
+ return filter(candidates, new Predicate<SSTableReader>()
+ {
+ public boolean apply(SSTableReader sstable)
+ {
+ return !compacting.contains(sstable);
+ }
+ });
+ }
+
+ @Override
+ public String toString()
+ {
+ return String.format("View(pending_count=%d, sstables=%s, compacting=%s)", liveMemtables.size() + flushingMemtables.size() - 1, sstables, compacting);
+ }
+
+ public List<SSTableReader> sstablesInBounds(AbstractBounds<RowPosition> rowBounds)
+ {
+ if (intervalTree.isEmpty())
+ return Collections.emptyList();
+ RowPosition stopInTree = rowBounds.right.isMinimum() ? intervalTree.max() : rowBounds.right;
+ return intervalTree.search(Interval.<RowPosition, SSTableReader>create(rowBounds.left, stopInTree));
+ }
+
+ // METHODS TO CONSTRUCT FUNCTIONS FOR MODIFYING A VIEW:
+
+ // return a function to un/mark the provided readers compacting in a view
+ static Function<View, View> updateCompacting(final Set<SSTableReader> unmark, final Iterable<SSTableReader> mark)
+ {
+ if (unmark.isEmpty() && Iterables.isEmpty(mark))
+ return Functions.identity();
+ return new Function<View, View>()
+ {
+ public View apply(View view)
+ {
+ assert all(mark, Helpers.idIn(view.sstablesMap));
+ return new View(view.liveMemtables, view.flushingMemtables, view.sstablesMap,
+ replace(view.compacting, unmark, mark),
+ view.intervalTree);
+ }
+ };
+ }
+
+ // construct a predicate to reject views that do not permit us to mark these readers compacting;
+ // i.e. one of them is either already compacting, has been compacted, or has been replaced
+ static Predicate<View> permitCompacting(final Iterable<SSTableReader> readers)
+ {
+ return new Predicate<View>()
+ {
+ public boolean apply(View view)
+ {
+ for (SSTableReader reader : readers)
+ if (view.compacting.contains(reader) || view.sstablesMap.get(reader) != reader || reader.isMarkedCompacted())
+ return false;
+ return true;
+ }
+ };
+ }
+
+ // construct a function to change the liveset in a Snapshot
+ static Function<View, View> updateLiveSet(final Set<SSTableReader> remove, final Iterable<SSTableReader> add)
+ {
+ if (remove.isEmpty() && Iterables.isEmpty(add))
+ return Functions.identity();
+ return new Function<View, View>()
+ {
+ public View apply(View view)
+ {
+ Map<SSTableReader, SSTableReader> sstableMap = replace(view.sstablesMap, remove, add);
+ return new View(view.liveMemtables, view.flushingMemtables, sstableMap, view.compacting,
+ SSTableIntervalTree.build(sstableMap.keySet()));
+ }
+ };
+ }
+
+ // called prior to initiating flush: add newMemtable to liveMemtables, making it the latest memtable
+ static Function<View, View> switchMemtable(final Memtable newMemtable)
+ {
+ return new Function<View, View>()
+ {
+ public View apply(View view)
+ {
+ List<Memtable> newLive = ImmutableList.<Memtable>builder().addAll(view.liveMemtables).add(newMemtable).build();
+ assert newLive.size() == view.liveMemtables.size() + 1;
+ return new View(newLive, view.flushingMemtables, view.sstablesMap, view.compacting, view.intervalTree);
+ }
+ };
+ }
+
+ // called before flush: move toFlush from liveMemtables to flushingMemtables
+ static Function<View, View> markFlushing(final Memtable toFlush)
+ {
+ return new Function<View, View>()
+ {
+ public View apply(View view)
+ {
+ List<Memtable> live = view.liveMemtables, flushing = view.flushingMemtables;
+ List<Memtable> newLive = copyOf(filter(live, not(equalTo(toFlush))));
+ List<Memtable> newFlushing = copyOf(concat(filter(flushing, lessThan(toFlush)),
+ of(toFlush),
+ filter(flushing, not(lessThan(toFlush)))));
+ assert newLive.size() == live.size() - 1;
+ assert newFlushing.size() == flushing.size() + 1;
+ return new View(newLive, newFlushing, view.sstablesMap, view.compacting, view.intervalTree);
+ }
+ };
+ }
+
+ // called after flush: removes memtable from flushingMemtables, and inserts flushed into the live sstable set
+ static Function<View, View> replaceFlushed(final Memtable memtable, final SSTableReader flushed)
+ {
+ return new Function<View, View>()
+ {
+ public View apply(View view)
+ {
+ List<Memtable> flushingMemtables = copyOf(filter(view.flushingMemtables, not(equalTo(memtable))));
+ assert flushingMemtables.size() == view.flushingMemtables.size() - 1;
+
+ if (flushed == null)
+ return new View(view.liveMemtables, flushingMemtables, view.sstablesMap,
+ view.compacting, view.intervalTree);
+
+ Map<SSTableReader, SSTableReader> sstableMap = replace(view.sstablesMap, emptySet(), singleton(flushed));
+ return new View(view.liveMemtables, flushingMemtables, sstableMap, view.compacting,
+ SSTableIntervalTree.build(sstableMap.keySet()));
+ }
+ };
+ }
+
+ private static <T extends Comparable<T>> Predicate<T> lessThan(final T lessThan)
+ {
+ return new Predicate<T>()
+ {
+ public boolean apply(T t)
+ {
+ return t.compareTo(lessThan) < 0;
+ }
+ };
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e5a76bdb/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java b/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
index a6c7a8b..c994a3d 100644
--- a/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
+++ b/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
@@ -403,7 +403,7 @@ public class CompressionMetadata
count = chunkIndex;
}
- protected Throwable doCleanup(Throwable failed)
+ protected Throwable doPreCleanup(Throwable failed)
{
return offsets.close(failed);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e5a76bdb/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java b/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java
index 81e487c..9bfbc99 100644
--- a/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java
+++ b/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java
@@ -19,24 +19,15 @@ package org.apache.cassandra.io.sstable;
import java.io.IOException;
import java.lang.management.ManagementFactory;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
+import java.util.*;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import javax.management.MBeanServer;
import javax.management.ObjectName;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Multimap;
-import com.google.common.collect.Sets;
+import com.google.common.collect.*;
+
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -44,8 +35,10 @@ import org.slf4j.LoggerFactory;
import org.apache.cassandra.concurrent.DebuggableScheduledThreadPoolExecutor;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.db.DataTracker;
+import org.apache.cassandra.db.compaction.OperationType;
import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
+import org.apache.cassandra.db.lifecycle.View;
import org.apache.cassandra.utils.Pair;
import org.apache.cassandra.utils.WrappedRunnable;
@@ -209,22 +202,25 @@ public class IndexSummaryManager implements IndexSummaryManagerMBean
* Returns a Pair of all compacting and non-compacting sstables. Non-compacting sstables will be marked as
* compacting.
*/
- private Pair<List<SSTableReader>, Multimap<DataTracker, SSTableReader>> getCompactingAndNonCompactingSSTables()
+ private Pair<List<SSTableReader>, Map<UUID, LifecycleTransaction>> getCompactingAndNonCompactingSSTables()
{
List<SSTableReader> allCompacting = new ArrayList<>();
- Multimap<DataTracker, SSTableReader> allNonCompacting = HashMultimap.create();
+ Map<UUID, LifecycleTransaction> allNonCompacting = new HashMap<>();
for (Keyspace ks : Keyspace.all())
{
for (ColumnFamilyStore cfStore: ks.getColumnFamilyStores())
{
Set<SSTableReader> nonCompacting, allSSTables;
+ LifecycleTransaction txn = null;
do
{
- allSSTables = cfStore.getDataTracker().getSSTables();
- nonCompacting = Sets.newHashSet(cfStore.getDataTracker().getUncompactingSSTables(allSSTables));
+ View view = cfStore.getTracker().getView();
+ allSSTables = view.sstables;
+ nonCompacting = ImmutableSet.copyOf(view.getUncompacting(allSSTables));
}
- while (!(nonCompacting.isEmpty() || cfStore.getDataTracker().markCompacting(nonCompacting)));
- allNonCompacting.putAll(cfStore.getDataTracker(), nonCompacting);
+ while (null == (txn = cfStore.getTracker().tryModify(nonCompacting, OperationType.UNKNOWN)));
+
+ allNonCompacting.put(cfStore.metadata.cfId, txn);
allCompacting.addAll(Sets.difference(allSSTables, nonCompacting));
}
}
@@ -233,50 +229,57 @@ public class IndexSummaryManager implements IndexSummaryManagerMBean
public void redistributeSummaries() throws IOException
{
- Pair<List<SSTableReader>, Multimap<DataTracker, SSTableReader>> compactingAndNonCompacting = getCompactingAndNonCompactingSSTables();
+ Pair<List<SSTableReader>, Map<UUID, LifecycleTransaction>> compactingAndNonCompacting = getCompactingAndNonCompactingSSTables();
try
{
- redistributeSummaries(compactingAndNonCompacting.left, Lists.newArrayList(compactingAndNonCompacting.right.values()), this.memoryPoolBytes);
+ redistributeSummaries(compactingAndNonCompacting.left, compactingAndNonCompacting.right, this.memoryPoolBytes);
}
finally
{
- for(DataTracker tracker : compactingAndNonCompacting.right.keySet())
- tracker.unmarkCompacting(compactingAndNonCompacting.right.get(tracker));
+ for (LifecycleTransaction modifier : compactingAndNonCompacting.right.values())
+ modifier.close();
}
}
/**
* Attempts to fairly distribute a fixed pool of memory for index summaries across a set of SSTables based on
* their recent read rates.
- * @param nonCompacting a list of sstables to share the memory pool across
+ * @param transactions containing the sstables we are to redistribute the memory pool across
* @param memoryPoolBytes a size (in bytes) that the total index summary space usage should stay close to or
* under, if possible
* @return a list of new SSTableReader instances
*/
@VisibleForTesting
- public static List<SSTableReader> redistributeSummaries(List<SSTableReader> compacting, List<SSTableReader> nonCompacting, long memoryPoolBytes) throws IOException
+ public static List<SSTableReader> redistributeSummaries(List<SSTableReader> compacting, Map<UUID, LifecycleTransaction> transactions, long memoryPoolBytes) throws IOException
{
- long total = 0;
- for (SSTableReader sstable : Iterables.concat(compacting, nonCompacting))
- total += sstable.getIndexSummaryOffHeapSize();
-
List<SSTableReader> oldFormatSSTables = new ArrayList<>();
- for (SSTableReader sstable : nonCompacting)
+ List<SSTableReader> redistribute = new ArrayList<>();
+ for (LifecycleTransaction txn : transactions.values())
{
- // We can't change the sampling level of sstables with the old format, because the serialization format
- // doesn't include the sampling level. Leave this one as it is. (See CASSANDRA-8993 for details.)
- logger.trace("SSTable {} cannot be re-sampled due to old sstable format", sstable);
- if (!sstable.descriptor.version.hasSamplingLevel())
- oldFormatSSTables.add(sstable);
+ for (SSTableReader sstable : ImmutableList.copyOf(txn.originals()))
+ {
+ // We can't change the sampling level of sstables with the old format, because the serialization format
+ // doesn't include the sampling level. Leave this one as it is. (See CASSANDRA-8993 for details.)
+ logger.trace("SSTable {} cannot be re-sampled due to old sstable format", sstable);
+ if (!sstable.descriptor.version.hasSamplingLevel())
+ {
+ oldFormatSSTables.add(sstable);
+ txn.cancel(sstable);
+ }
+ }
+ redistribute.addAll(txn.originals());
}
- nonCompacting.removeAll(oldFormatSSTables);
+
+ long total = 0;
+ for (SSTableReader sstable : Iterables.concat(compacting, redistribute))
+ total += sstable.getIndexSummaryOffHeapSize();
logger.debug("Beginning redistribution of index summaries for {} sstables with memory pool size {} MB; current spaced used is {} MB",
- nonCompacting.size(), memoryPoolBytes / 1024L / 1024L, total / 1024.0 / 1024.0);
+ redistribute.size(), memoryPoolBytes / 1024L / 1024L, total / 1024.0 / 1024.0);
- final Map<SSTableReader, Double> readRates = new HashMap<>(nonCompacting.size());
+ final Map<SSTableReader, Double> readRates = new HashMap<>(redistribute.size());
double totalReadsPerSec = 0.0;
- for (SSTableReader sstable : nonCompacting)
+ for (SSTableReader sstable : redistribute)
{
if (sstable.getReadMeter() != null)
{
@@ -288,7 +291,7 @@ public class IndexSummaryManager implements IndexSummaryManagerMBean
logger.trace("Total reads/sec across all sstables in index summary resize process: {}", totalReadsPerSec);
// copy and sort by read rates (ascending)
- List<SSTableReader> sstablesByHotness = new ArrayList<>(nonCompacting);
+ List<SSTableReader> sstablesByHotness = new ArrayList<>(redistribute);
Collections.sort(sstablesByHotness, new ReadRateComparator(readRates));
long remainingBytes = memoryPoolBytes;
@@ -297,7 +300,10 @@ public class IndexSummaryManager implements IndexSummaryManagerMBean
logger.trace("Index summaries for compacting SSTables are using {} MB of space",
(memoryPoolBytes - remainingBytes) / 1024.0 / 1024.0);
- List<SSTableReader> newSSTables = adjustSamplingLevels(sstablesByHotness, totalReadsPerSec, remainingBytes);
+ List<SSTableReader> newSSTables = adjustSamplingLevels(sstablesByHotness, transactions, totalReadsPerSec, remainingBytes);
+
+ for (LifecycleTransaction txn : transactions.values())
+ txn.finish();
total = 0;
for (SSTableReader sstable : Iterables.concat(compacting, oldFormatSSTables, newSSTables))
@@ -308,7 +314,7 @@ public class IndexSummaryManager implements IndexSummaryManagerMBean
return newSSTables;
}
- private static List<SSTableReader> adjustSamplingLevels(List<SSTableReader> sstables,
+ private static List<SSTableReader> adjustSamplingLevels(List<SSTableReader> sstables, Map<UUID, LifecycleTransaction> transactions,
double totalReadsPerSec, long memoryPoolCapacity) throws IOException
{
@@ -410,26 +416,16 @@ public class IndexSummaryManager implements IndexSummaryManagerMBean
toDownsample.addAll(forceResample);
toDownsample.addAll(toUpsample);
toDownsample.addAll(forceUpsample);
- Multimap<DataTracker, SSTableReader> replacedByTracker = HashMultimap.create();
- Multimap<DataTracker, SSTableReader> replacementsByTracker = HashMultimap.create();
for (ResampleEntry entry : toDownsample)
{
SSTableReader sstable = entry.sstable;
logger.debug("Re-sampling index summary for {} from {}/{} to {}/{} of the original number of entries",
sstable, sstable.getIndexSummarySamplingLevel(), Downsampling.BASE_SAMPLING_LEVEL,
entry.newSamplingLevel, Downsampling.BASE_SAMPLING_LEVEL);
- ColumnFamilyStore cfs = Keyspace.open(sstable.getKeyspaceName()).getColumnFamilyStore(sstable.getColumnFamilyName());
+ ColumnFamilyStore cfs = Keyspace.open(sstable.metadata.ksName).getColumnFamilyStore(sstable.metadata.cfId);
SSTableReader replacement = sstable.cloneWithNewSummarySamplingLevel(cfs, entry.newSamplingLevel);
- DataTracker tracker = cfs.getDataTracker();
-
- replacedByTracker.put(tracker, sstable);
- replacementsByTracker.put(tracker, replacement);
- }
-
- for (DataTracker tracker : replacedByTracker.keySet())
- {
- tracker.replaceWithNewInstances(replacedByTracker.get(tracker), replacementsByTracker.get(tracker));
- newSSTables.addAll(replacementsByTracker.get(tracker));
+ newSSTables.add(replacement);
+ transactions.get(sstable.metadata.cfId).update(replacement, true);
}
return newSSTables;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e5a76bdb/src/java/org/apache/cassandra/io/sstable/SSTableDeletingTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableDeletingTask.java b/src/java/org/apache/cassandra/io/sstable/SSTableDeletingTask.java
index db54557..cc837ba 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableDeletingTask.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableDeletingTask.java
@@ -23,15 +23,16 @@ import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.TimeUnit;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Sets;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.concurrent.ScheduledExecutors;
-import org.apache.cassandra.db.DataTracker;
-import org.apache.cassandra.db.SystemKeyspace;
+import org.apache.cassandra.db.lifecycle.Tracker;
import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.concurrent.Blocker;
public class SSTableDeletingTask implements Runnable
{
@@ -42,11 +43,12 @@ public class SSTableDeletingTask implements Runnable
// Additionally, we need to make sure to delete the data file first, so on restart the others
// will be recognized as GCable.
private static final Set<SSTableDeletingTask> failedTasks = new CopyOnWriteArraySet<>();
+ private static final Blocker blocker = new Blocker();
private final SSTableReader referent;
private final Descriptor desc;
private final Set<Component> components;
- private DataTracker tracker;
+ private Tracker tracker;
/**
* realDescriptor is the actual descriptor for the sstable, the descriptor inside
@@ -70,13 +72,18 @@ public class SSTableDeletingTask implements Runnable
}
}
- public void setTracker(DataTracker tracker)
+ public void setTracker(Tracker tracker)
{
// the tracker is used only to notify listeners of deletion of the sstable;
// since deletion of a non-final file is not really deletion of the sstable,
// we don't want to notify the listeners in this event
- if (desc.type == Descriptor.Type.FINAL)
- this.tracker = tracker;
+ assert desc.type == Descriptor.Type.FINAL;
+ this.tracker = tracker;
+ }
+
+ public Tracker getTracker()
+ {
+ return tracker;
}
public void schedule()
@@ -86,6 +93,7 @@ public class SSTableDeletingTask implements Runnable
public void run()
{
+ blocker.ask();
long size = referent.bytesOnDisk();
if (tracker != null)
@@ -119,6 +127,7 @@ public class SSTableDeletingTask implements Runnable
}
/** for tests */
+ @VisibleForTesting
public static void waitForDeletions()
{
Runnable runnable = new Runnable()
@@ -130,5 +139,11 @@ public class SSTableDeletingTask implements Runnable
FBUtilities.waitOnFuture(ScheduledExecutors.nonPeriodicTasks.schedule(runnable, 0, TimeUnit.MILLISECONDS));
}
+
+ @VisibleForTesting
+ public static void pauseDeletions(boolean stop)
+ {
+ blocker.block(stop);
+ }
}