You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2015/06/30 12:48:05 UTC
[41/51] [partial] cassandra git commit: Storage engine refactor,
a.k.a CASSANDRA-8099
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/Conflicts.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Conflicts.java b/src/java/org/apache/cassandra/db/Conflicts.java
new file mode 100644
index 0000000..fa0e819
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/Conflicts.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db;
+
+import java.nio.ByteBuffer;
+
+import org.apache.cassandra.db.context.CounterContext;
+
+public abstract class Conflicts
+{
+ private Conflicts() {}
+
+ public enum Resolution { LEFT_WINS, MERGE, RIGHT_WINS };
+
+ public static Resolution resolveRegular(long leftTimestamp,
+ boolean leftLive,
+ int leftLocalDeletionTime,
+ ByteBuffer leftValue,
+ long rightTimestamp,
+ boolean rightLive,
+ int rightLocalDeletionTime,
+ ByteBuffer rightValue)
+ {
+ if (leftTimestamp != rightTimestamp)
+ return leftTimestamp < rightTimestamp ? Resolution.RIGHT_WINS : Resolution.LEFT_WINS;
+
+ if (leftLive != rightLive)
+ return leftLive ? Resolution.RIGHT_WINS : Resolution.LEFT_WINS;
+
+ int c = leftValue.compareTo(rightValue);
+ if (c < 0)
+ return Resolution.RIGHT_WINS;
+ else if (c > 0)
+ return Resolution.LEFT_WINS;
+
+ // Prefer the longest ttl if relevant
+ return leftLocalDeletionTime < rightLocalDeletionTime ? Resolution.RIGHT_WINS : Resolution.LEFT_WINS;
+ }
+
+ public static Resolution resolveCounter(long leftTimestamp,
+ boolean leftLive,
+ ByteBuffer leftValue,
+ long rightTimestamp,
+ boolean rightLive,
+ ByteBuffer rightValue)
+ {
+ // No matter what the counter cell's timestamp is, a tombstone always takes precedence. See CASSANDRA-7346.
+ if (!leftLive)
+ // left is a tombstone: it has precedence over right if either right is not a tombstone, or left has a greater timestamp
+ return rightLive || leftTimestamp > rightTimestamp ? Resolution.LEFT_WINS : Resolution.RIGHT_WINS;
+
+ // If right is a tombstone, since left isn't one, it has precedence
+ if (!rightLive)
+ return Resolution.RIGHT_WINS;
+
+ return Resolution.MERGE;
+ }
+
+ public static ByteBuffer mergeCounterValues(ByteBuffer left, ByteBuffer right)
+ {
+ return CounterContext.instance().merge(left, right);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/CounterCell.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/CounterCell.java b/src/java/org/apache/cassandra/db/CounterCell.java
deleted file mode 100644
index cda1200..0000000
--- a/src/java/org/apache/cassandra/db/CounterCell.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db;
-
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.db.context.CounterContext;
-import org.apache.cassandra.utils.concurrent.OpOrder;
-import org.apache.cassandra.utils.memory.AbstractAllocator;
-import org.apache.cassandra.utils.memory.MemtableAllocator;
-
-/**
- * A column that represents a partitioned counter.
- */
-public interface CounterCell extends Cell
-{
- static final CounterContext contextManager = CounterContext.instance();
-
- public long timestampOfLastDelete();
-
- public long total();
-
- public boolean hasLegacyShards();
-
- public Cell markLocalToBeCleared();
-
- CounterCell localCopy(CFMetaData metadata, AbstractAllocator allocator);
-
- CounterCell localCopy(CFMetaData metaData, MemtableAllocator allocator, OpOrder.Group opGroup);
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/CounterMutation.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/CounterMutation.java b/src/java/org/apache/cassandra/db/CounterMutation.java
index 58717b4..f87c66c 100644
--- a/src/java/org/apache/cassandra/db/CounterMutation.java
+++ b/src/java/org/apache/cassandra/db/CounterMutation.java
@@ -19,7 +19,6 @@ package org.apache.cassandra.db;
import java.io.DataInput;
import java.io.IOException;
-import java.nio.ByteBuffer;
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
@@ -27,12 +26,15 @@ import java.util.concurrent.locks.Lock;
import com.google.common.base.Function;
import com.google.common.base.Objects;
import com.google.common.collect.Iterables;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.PeekingIterator;
import com.google.common.util.concurrent.Striped;
import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.db.composites.CellName;
+import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.db.filter.*;
+import org.apache.cassandra.db.partitions.*;
import org.apache.cassandra.db.context.CounterContext;
-import org.apache.cassandra.db.filter.NamesQueryFilter;
import org.apache.cassandra.exceptions.WriteTimeoutException;
import org.apache.cassandra.io.IVersionedSerializer;
import org.apache.cassandra.io.util.DataOutputPlus;
@@ -41,6 +43,7 @@ import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.service.CacheService;
import org.apache.cassandra.tracing.Tracing;
import org.apache.cassandra.utils.*;
+import org.apache.cassandra.utils.concurrent.OpOrder;
public class CounterMutation implements IMutation
{
@@ -67,9 +70,9 @@ public class CounterMutation implements IMutation
return mutation.getColumnFamilyIds();
}
- public Collection<ColumnFamily> getColumnFamilies()
+ public Collection<PartitionUpdate> getPartitionUpdates()
{
- return mutation.getColumnFamilies();
+ return mutation.getPartitionUpdates();
}
public Mutation getMutation()
@@ -77,7 +80,7 @@ public class CounterMutation implements IMutation
return mutation;
}
- public ByteBuffer key()
+ public DecoratedKey key()
{
return mutation.key();
}
@@ -111,19 +114,14 @@ public class CounterMutation implements IMutation
Mutation result = new Mutation(getKeyspaceName(), key());
Keyspace keyspace = Keyspace.open(getKeyspaceName());
- int count = 0;
- for (ColumnFamily cf : getColumnFamilies())
- count += cf.getColumnCount();
-
- List<Lock> locks = new ArrayList<>(count);
- Tracing.trace("Acquiring {} counter locks", count);
+ List<Lock> locks = new ArrayList<>();
+ Tracing.trace("Acquiring counter locks");
try
{
grabCounterLocks(keyspace, locks);
- for (ColumnFamily cf : getColumnFamilies())
- result.add(processModifications(cf));
+ for (PartitionUpdate upd : getPartitionUpdates())
+ result.add(processModifications(upd));
result.apply();
- updateCounterCache(result, keyspace);
return result;
}
finally
@@ -160,141 +158,144 @@ public class CounterMutation implements IMutation
*/
private Iterable<Object> getCounterLockKeys()
{
- return Iterables.concat(Iterables.transform(getColumnFamilies(), new Function<ColumnFamily, Iterable<Object>>()
+ return Iterables.concat(Iterables.transform(getPartitionUpdates(), new Function<PartitionUpdate, Iterable<Object>>()
{
- public Iterable<Object> apply(final ColumnFamily cf)
+ public Iterable<Object> apply(final PartitionUpdate update)
{
- return Iterables.transform(cf, new Function<Cell, Object>()
+ return Iterables.concat(Iterables.transform(update, new Function<Row, Iterable<Object>>()
{
- public Object apply(Cell cell)
+ public Iterable<Object> apply(final Row row)
{
- return Objects.hashCode(cf.id(), key(), cell.name());
+ return Iterables.concat(Iterables.transform(row, new Function<Cell, Object>()
+ {
+ public Object apply(final Cell cell)
+ {
+ return Objects.hashCode(update.metadata().cfId, key(), row.clustering(), cell.column(), cell.path());
+ }
+ }));
}
- });
+ }));
}
}));
}
- // Replaces all the CounterUpdateCell-s with updated regular CounterCell-s
- private ColumnFamily processModifications(ColumnFamily changesCF)
+ private PartitionUpdate processModifications(PartitionUpdate changes)
{
- ColumnFamilyStore cfs = Keyspace.open(getKeyspaceName()).getColumnFamilyStore(changesCF.id());
+ ColumnFamilyStore cfs = Keyspace.open(getKeyspaceName()).getColumnFamilyStore(changes.metadata().cfId);
- ColumnFamily resultCF = changesCF.cloneMeShallow();
+ List<PartitionUpdate.CounterMark> marks = changes.collectCounterMarks();
- List<CounterUpdateCell> counterUpdateCells = new ArrayList<>(changesCF.getColumnCount());
- for (Cell cell : changesCF)
+ if (CacheService.instance.counterCache.getCapacity() != 0)
{
- if (cell instanceof CounterUpdateCell)
- counterUpdateCells.add((CounterUpdateCell)cell);
- else
- resultCF.addColumn(cell);
+ Tracing.trace("Fetching {} counter values from cache", marks.size());
+ updateWithCurrentValuesFromCache(marks, cfs);
+ if (marks.isEmpty())
+ return changes;
}
- if (counterUpdateCells.isEmpty())
- return resultCF; // only DELETEs
-
- ClockAndCount[] currentValues = getCurrentValues(counterUpdateCells, cfs);
- for (int i = 0; i < counterUpdateCells.size(); i++)
- {
- ClockAndCount currentValue = currentValues[i];
- CounterUpdateCell update = counterUpdateCells.get(i);
+ Tracing.trace("Reading {} counter values from the CF", marks.size());
+ updateWithCurrentValuesFromCFS(marks, cfs);
- long clock = currentValue.clock + 1L;
- long count = currentValue.count + update.delta();
+ // What's remain is new counters
+ for (PartitionUpdate.CounterMark mark : marks)
+ updateWithCurrentValue(mark, ClockAndCount.BLANK, cfs);
- resultCF.addColumn(new BufferCounterCell(update.name(),
- CounterContext.instance().createGlobal(CounterId.getLocalId(), clock, count),
- update.timestamp()));
- }
-
- return resultCF;
+ return changes;
}
- // Attempt to load the current values(s) from cache. If that fails, read the rest from the cfs.
- private ClockAndCount[] getCurrentValues(List<CounterUpdateCell> counterUpdateCells, ColumnFamilyStore cfs)
+ private void updateWithCurrentValue(PartitionUpdate.CounterMark mark, ClockAndCount currentValue, ColumnFamilyStore cfs)
{
- ClockAndCount[] currentValues = new ClockAndCount[counterUpdateCells.size()];
- int remaining = counterUpdateCells.size();
+ long clock = currentValue.clock + 1L;
+ long count = currentValue.count + CounterContext.instance().total(mark.value());
- if (CacheService.instance.counterCache.getCapacity() != 0)
- {
- Tracing.trace("Fetching {} counter values from cache", counterUpdateCells.size());
- remaining = getCurrentValuesFromCache(counterUpdateCells, cfs, currentValues);
- if (remaining == 0)
- return currentValues;
- }
-
- Tracing.trace("Reading {} counter values from the CF", remaining);
- getCurrentValuesFromCFS(counterUpdateCells, cfs, currentValues);
+ mark.setValue(CounterContext.instance().createGlobal(CounterId.getLocalId(), clock, count));
- return currentValues;
+ // Cache the newly updated value
+ cfs.putCachedCounter(key().getKey(), mark.clustering(), mark.column(), mark.path(), ClockAndCount.create(clock, count));
}
// Returns the count of cache misses.
- private int getCurrentValuesFromCache(List<CounterUpdateCell> counterUpdateCells,
- ColumnFamilyStore cfs,
- ClockAndCount[] currentValues)
+ private void updateWithCurrentValuesFromCache(List<PartitionUpdate.CounterMark> marks, ColumnFamilyStore cfs)
{
- int cacheMisses = 0;
- for (int i = 0; i < counterUpdateCells.size(); i++)
+ Iterator<PartitionUpdate.CounterMark> iter = marks.iterator();
+ while (iter.hasNext())
{
- ClockAndCount cached = cfs.getCachedCounter(key(), counterUpdateCells.get(i).name());
+ PartitionUpdate.CounterMark mark = iter.next();
+ ClockAndCount cached = cfs.getCachedCounter(key().getKey(), mark.clustering(), mark.column(), mark.path());
if (cached != null)
- currentValues[i] = cached;
- else
- cacheMisses++;
+ {
+ updateWithCurrentValue(mark, cached, cfs);
+ iter.remove();
+ }
}
- return cacheMisses;
}
// Reads the missing current values from the CFS.
- private void getCurrentValuesFromCFS(List<CounterUpdateCell> counterUpdateCells,
- ColumnFamilyStore cfs,
- ClockAndCount[] currentValues)
+ private void updateWithCurrentValuesFromCFS(List<PartitionUpdate.CounterMark> marks, ColumnFamilyStore cfs)
{
- SortedSet<CellName> names = new TreeSet<>(cfs.metadata.comparator);
- for (int i = 0; i < currentValues.length; i++)
- if (currentValues[i] == null)
- names.add(counterUpdateCells.get(i).name());
-
- ReadCommand cmd = new SliceByNamesReadCommand(getKeyspaceName(), key(), cfs.metadata.cfName, Long.MIN_VALUE, new NamesQueryFilter(names));
- Row row = cmd.getRow(cfs.keyspace);
- ColumnFamily cf = row == null ? null : row.cf;
+ ColumnFilter.Builder builder = ColumnFilter.selectionBuilder();
+ NavigableSet<Clustering> names = new TreeSet<>(cfs.metadata.comparator);
+ for (PartitionUpdate.CounterMark mark : marks)
+ {
+ names.add(mark.clustering().takeAlias());
+ if (mark.path() == null)
+ builder.add(mark.column());
+ else
+ builder.select(mark.column(), mark.path());
+ }
- for (int i = 0; i < currentValues.length; i++)
+ int nowInSec = FBUtilities.nowInSeconds();
+ ClusteringIndexNamesFilter filter = new ClusteringIndexNamesFilter(names, false);
+ SinglePartitionReadCommand cmd = SinglePartitionReadCommand.create(cfs.metadata, nowInSec, key(), builder.build(), filter);
+ PeekingIterator<PartitionUpdate.CounterMark> markIter = Iterators.peekingIterator(marks.iterator());
+ try (OpOrder.Group op = cfs.readOrdering.start(); RowIterator partition = UnfilteredRowIterators.filter(cmd.queryMemtableAndDisk(cfs, op), nowInSec))
{
- if (currentValues[i] != null)
- continue;
+ updateForRow(markIter, partition.staticRow(), cfs);
- Cell cell = cf == null ? null : cf.getColumn(counterUpdateCells.get(i).name());
- if (cell == null || !cell.isLive()) // absent or a tombstone.
- currentValues[i] = ClockAndCount.BLANK;
- else
- currentValues[i] = CounterContext.instance().getLocalClockAndCount(cell.value());
+ while (partition.hasNext())
+ {
+ if (!markIter.hasNext())
+ return;
+
+ updateForRow(markIter, partition.next(), cfs);
+ }
}
}
- private void updateCounterCache(Mutation applied, Keyspace keyspace)
+ private int compare(Clustering c1, Clustering c2, ColumnFamilyStore cfs)
{
- if (CacheService.instance.counterCache.getCapacity() == 0)
- return;
+ if (c1 == Clustering.STATIC_CLUSTERING)
+ return c2 == Clustering.STATIC_CLUSTERING ? 0 : -1;
+ if (c2 == Clustering.STATIC_CLUSTERING)
+ return 1;
- for (ColumnFamily cf : applied.getColumnFamilies())
- {
- ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(cf.id());
- for (Cell cell : cf)
- if (cell instanceof CounterCell)
- cfs.putCachedCounter(key(), cell.name(), CounterContext.instance().getLocalClockAndCount(cell.value()));
- }
+ return cfs.getComparator().compare(c1, c2);
}
- public void addAll(IMutation m)
+ private void updateForRow(PeekingIterator<PartitionUpdate.CounterMark> markIter, Row row, ColumnFamilyStore cfs)
{
- if (!(m instanceof CounterMutation))
- throw new IllegalArgumentException();
- CounterMutation cm = (CounterMutation)m;
- mutation.addAll(cm.mutation);
+ int cmp = 0;
+ // If the mark is before the row, we have no value for this mark, just consume it
+ while (markIter.hasNext() && (cmp = compare(markIter.peek().clustering(), row.clustering(), cfs)) < 0)
+ markIter.next();
+
+ if (!markIter.hasNext())
+ return;
+
+ while (cmp == 0)
+ {
+ PartitionUpdate.CounterMark mark = markIter.next();
+ Cell cell = mark.path() == null ? row.getCell(mark.column()) : row.getCell(mark.column(), mark.path());
+ if (cell != null)
+ {
+ updateWithCurrentValue(mark, CounterContext.instance().getLocalClockAndCount(cell.value()), cfs);
+ markIter.remove();
+ }
+ if (!markIter.hasNext())
+ return;
+
+ cmp = compare(markIter.peek().clustering(), row.clustering(), cfs);
+ }
}
public long getTimeout()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/CounterUpdateCell.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/CounterUpdateCell.java b/src/java/org/apache/cassandra/db/CounterUpdateCell.java
deleted file mode 100644
index 58ac365..0000000
--- a/src/java/org/apache/cassandra/db/CounterUpdateCell.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db;
-
-/**
- * A counter update while it hasn't been applied yet by the leader replica.
- *
- * Contains a single counter update. When applied by the leader replica, this
- * is transformed to a relevant CounterCell. This Cell is a temporary data
- * structure that should never be stored inside a memtable or an sstable.
- */
-public interface CounterUpdateCell extends Cell
-{
- public long delta();
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/DataRange.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DataRange.java b/src/java/org/apache/cassandra/db/DataRange.java
index 1e6f8c8..909d6ed 100644
--- a/src/java/org/apache/cassandra/db/DataRange.java
+++ b/src/java/org/apache/cassandra/db/DataRange.java
@@ -6,7 +6,6 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
@@ -17,302 +16,391 @@
*/
package org.apache.cassandra.db;
+import java.io.DataInput;
+import java.io.IOException;
import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Comparator;
-import java.util.List;
-import com.google.common.base.Objects;
import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.db.columniterator.IdentityQueryFilter;
-import org.apache.cassandra.db.composites.Composite;
-import org.apache.cassandra.db.composites.Composites;
+import org.apache.cassandra.config.ColumnDefinition;
import org.apache.cassandra.db.filter.*;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.marshal.CompositeType;
import org.apache.cassandra.dht.*;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.net.MessagingService;
/**
- * Groups key range and column filter for range queries.
- *
- * The main "trick" of this class is that the column filter can only
- * be obtained by providing the row key on which the column filter will
- * be applied (which we always know before actually querying the columns).
- *
- * This allows the paging DataRange to return a filter for most rows but a
- * potentially different ones for the starting and stopping key. Could
- * allow more fancy stuff in the future too, like column filters that
- * depend on the actual key value :)
+ * Groups both the range of partitions to query, and the clustering index filter to
+ * apply for each partition (for a (partition) range query).
+ * <p>
+ * The main "trick" is that the clustering index filter can only be obtained by
+ * providing the partition key on which the filter will be applied. This is
+ * necessary when paging range queries, as we might need a different filter
+ * for the starting key than for other keys (because the previous page we had
+ * queried may have ended in the middle of a partition).
*/
public class DataRange
{
- protected final AbstractBounds<RowPosition> keyRange;
- protected IDiskAtomFilter columnFilter;
- protected final boolean selectFullRow;
+ public static final Serializer serializer = new Serializer();
- public DataRange(AbstractBounds<RowPosition> range, IDiskAtomFilter columnFilter)
+ private final AbstractBounds<PartitionPosition> keyRange;
+ protected final ClusteringIndexFilter clusteringIndexFilter;
+
+ /**
+ * Creates a {@code DataRange} given a range of partition keys and a clustering index filter. The
+ * return {@code DataRange} will return the same filter for all keys.
+ *
+ * @param range the range over partition keys to use.
+ * @param clusteringIndexFilter the clustering index filter to use.
+ */
+ public DataRange(AbstractBounds<PartitionPosition> range, ClusteringIndexFilter clusteringIndexFilter)
{
this.keyRange = range;
- this.columnFilter = columnFilter;
- this.selectFullRow = columnFilter instanceof SliceQueryFilter
- ? isFullRowSlice((SliceQueryFilter)columnFilter)
- : false;
+ this.clusteringIndexFilter = clusteringIndexFilter;
}
- public static boolean isFullRowSlice(SliceQueryFilter filter)
+ /**
+ * Creates a {@code DataRange} to query all data (over the whole ring).
+ *
+ * @param partitioner the partitioner in use for the table.
+ *
+ * @return the newly create {@code DataRange}.
+ */
+ public static DataRange allData(IPartitioner partitioner)
{
- return filter.slices.length == 1
- && filter.start().isEmpty()
- && filter.finish().isEmpty()
- && filter.count == Integer.MAX_VALUE;
+ return forTokenRange(new Range<Token>(partitioner.getMinimumToken(), partitioner.getMinimumToken()));
}
- public static DataRange allData(IPartitioner partitioner)
+ /**
+ * Creates a {@code DataRange} to query all rows over the provided token range.
+ *
+ * @param tokenRange the (partition key) token range to query.
+ *
+ * @return the newly create {@code DataRange}.
+ */
+ public static DataRange forTokenRange(Range<Token> tokenRange)
{
- return forTokenRange(new Range<Token>(partitioner.getMinimumToken(), partitioner.getMinimumToken()));
+ return forKeyRange(Range.makeRowRange(tokenRange));
}
- public static DataRange forTokenRange(Range<Token> keyRange)
+ /**
+ * Creates a {@code DataRange} to query all rows over the provided key range.
+ *
+ * @param keyRange the (partition key) range to query.
+ *
+ * @return the newly create {@code DataRange}.
+ */
+ public static DataRange forKeyRange(Range<PartitionPosition> keyRange)
{
- return forKeyRange(Range.makeRowRange(keyRange));
+ return new DataRange(keyRange, new ClusteringIndexSliceFilter(Slices.ALL, false));
}
- public static DataRange forKeyRange(Range<RowPosition> keyRange)
+ /**
+ * Creates a {@code DataRange} to query all partitions of the ring using the provided
+ * clustering index filter.
+ *
+ * @param partitioner the partitioner in use for the table queried.
+ * @param filter the clustering index filter to use.
+ *
+ * @return the newly create {@code DataRange}.
+ */
+ public static DataRange allData(IPartitioner partitioner, ClusteringIndexFilter filter)
{
- return new DataRange(keyRange, new IdentityQueryFilter());
+ return new DataRange(Range.makeRowRange(new Range<Token>(partitioner.getMinimumToken(), partitioner.getMinimumToken())), filter);
}
- public AbstractBounds<RowPosition> keyRange()
+ /**
+ * The range of partition key queried by this {@code DataRange}.
+ *
+ * @return the range of partition key queried by this {@code DataRange}.
+ */
+ public AbstractBounds<PartitionPosition> keyRange()
{
return keyRange;
}
- public RowPosition startKey()
+ /**
+ * The start of the partition key range queried by this {@code DataRange}.
+ *
+ * @return the start of the partition key range queried by this {@code DataRange}.
+ */
+ public PartitionPosition startKey()
{
return keyRange.left;
}
- public RowPosition stopKey()
+ /**
+ * The end of the partition key range queried by this {@code DataRange}.
+ *
+ * @return the end of the partition key range queried by this {@code DataRange}.
+ */
+ public PartitionPosition stopKey()
{
return keyRange.right;
}
/**
- * Returns true if tombstoned partitions should not be included in results or count towards the limit.
- * See CASSANDRA-8490 for more details on why this is needed (and done this way).
- * */
- public boolean ignoredTombstonedPartitions()
+ * Whether the underlying clustering index filter is a names filter or not.
+ *
+ * @return Whether the underlying clustering index filter is a names filter or not.
+ */
+ public boolean isNamesQuery()
{
- if (!(columnFilter instanceof SliceQueryFilter))
- return false;
-
- return ((SliceQueryFilter) columnFilter).compositesToGroup == SliceQueryFilter.IGNORE_TOMBSTONED_PARTITIONS;
+ return clusteringIndexFilter instanceof ClusteringIndexNamesFilter;
}
- // Whether the bounds of this DataRange actually wraps around.
+ /**
+ * Whether the range queried by this {@code DataRange} actually wraps around.
+ *
+ * @return whether the range queried by this {@code DataRange} actually wraps around.
+ */
public boolean isWrapAround()
{
- // On range can ever wrap
+ // Only range can ever wrap
return keyRange instanceof Range && ((Range<?>)keyRange).isWrapAround();
}
- public boolean contains(RowPosition pos)
+ /**
+ * Whether the provided ring position is covered by this {@code DataRange}.
+ *
+ * @return whether the provided ring position is covered by this {@code DataRange}.
+ */
+ public boolean contains(PartitionPosition pos)
{
return keyRange.contains(pos);
}
- public int getLiveCount(ColumnFamily data, long now)
+ /**
+ * Whether this {@code DataRange} queries everything (has no restriction neither on the
+ * partition queried, nor within the queried partition).
+ *
+ * @return Whether this {@code DataRange} queries everything.
+ */
+ public boolean isUnrestricted()
{
- return columnFilter instanceof SliceQueryFilter
- ? ((SliceQueryFilter)columnFilter).lastCounted()
- : columnFilter.getLiveCount(data, now);
+ return startKey().isMinimum() && stopKey().isMinimum() && clusteringIndexFilter.selectsAllPartition();
}
- public boolean selectsFullRowFor(ByteBuffer rowKey)
+ /**
+ * The clustering index filter to use for the provided key.
+ * <p>
+ * This may or may not be the same filter for all keys (that is, paging range
+ * use a different filter for their start key).
+ *
+ * @param key the partition key for which we want the clustering index filter.
+ *
+ * @return the clustering filter to use for {@code key}.
+ */
+ public ClusteringIndexFilter clusteringIndexFilter(DecoratedKey key)
{
- return selectFullRow;
+ return clusteringIndexFilter;
}
/**
- * Returns a column filter that should be used for a particular row key. Note that in the case of paging,
- * slice starts and ends may change depending on the row key.
+ * Returns a new {@code DataRange} for use when paging {@code this} range.
+ *
+ * @param range the range of partition keys to query.
+ * @param comparator the comparator for the table queried.
+ * @param lastReturned the clustering for the last result returned by the previous page, i.e. the result we want to start our new page
+ * from. This last returned must <b>must</b> correspond to left bound of {@code range} (in other words, {@code range.left} must be the
+ * partition key for that {@code lastReturned} result).
+ * @param inclusive whether or not we want to include the {@code lastReturned} in the newly returned page of results.
+ *
+ * @return a new {@code DataRange} suitable for paging {@code this} range given the {@code lastRetuned} result of the previous page.
*/
- public IDiskAtomFilter columnFilter(ByteBuffer rowKey)
+ public DataRange forPaging(AbstractBounds<PartitionPosition> range, ClusteringComparator comparator, Clustering lastReturned, boolean inclusive)
{
- return columnFilter;
+ return new Paging(range, clusteringIndexFilter, comparator, lastReturned, inclusive);
}
/**
- * Sets a new limit on the number of (grouped) cells to fetch. This is currently only used when the query limit applies
- * to CQL3 rows.
+ * Returns a new {@code DataRange} equivalent to {@code this} one but restricted to the provided sub-range.
+ *
+ * @param range the sub-range to use for the newly returned data range. Note that assumes that {@code range} is a proper
+ * sub-range of the initial range but doesn't validate it. You should make sure to only provided sub-ranges however or this
+ * might throw off the paging case (see Paging.forSubRange()).
+ *
+ * @return a new {@code DataRange} using {@code range} as partition key range and the clustering index filter filter from {@code this}.
*/
- public void updateColumnsLimit(int count)
+ public DataRange forSubRange(AbstractBounds<PartitionPosition> range)
{
- columnFilter.updateColumnsLimit(count);
+ return new DataRange(range, clusteringIndexFilter);
}
- public static class Paging extends DataRange
+ public String toString(CFMetaData metadata)
{
- // The slice of columns that we want to fetch for each row, ignoring page start/end issues.
- private final SliceQueryFilter sliceFilter;
+ return String.format("range=%s pfilter=%s", keyRange.getString(metadata.getKeyValidator()), clusteringIndexFilter.toString(metadata));
+ }
- private final CFMetaData cfm;
+ public String toCQLString(CFMetaData metadata)
+ {
+ if (isUnrestricted())
+ return "UNRESTRICTED";
- private final Comparator<Composite> comparator;
+ StringBuilder sb = new StringBuilder();
- // used to restrict the start of the slice for the first partition in the range
- private final Composite firstPartitionColumnStart;
+ boolean needAnd = false;
+ if (!startKey().isMinimum())
+ {
+ appendClause(startKey(), sb, metadata, true, keyRange.isStartInclusive());
+ needAnd = true;
+ }
+ if (!stopKey().isMinimum())
+ {
+ if (needAnd)
+ sb.append(" AND ");
+ appendClause(stopKey(), sb, metadata, false, keyRange.isEndInclusive());
+ needAnd = true;
+ }
- // used to restrict the end of the slice for the last partition in the range
- private final Composite lastPartitionColumnFinish;
+ String filterString = clusteringIndexFilter.toCQLString(metadata);
+ if (!filterString.isEmpty())
+ sb.append(needAnd ? " AND " : "").append(filterString);
- // tracks the last key that we updated the filter for to avoid duplicating work
- private ByteBuffer lastKeyFilterWasUpdatedFor;
+ return sb.toString();
+ }
- private Paging(AbstractBounds<RowPosition> range, SliceQueryFilter filter, Composite firstPartitionColumnStart,
- Composite lastPartitionColumnFinish, CFMetaData cfm, Comparator<Composite> comparator)
+ private void appendClause(PartitionPosition pos, StringBuilder sb, CFMetaData metadata, boolean isStart, boolean isInclusive)
+ {
+ sb.append("token(");
+ sb.append(ColumnDefinition.toCQLString(metadata.partitionKeyColumns()));
+ sb.append(") ").append(getOperator(isStart, isInclusive)).append(" ");
+ if (pos instanceof DecoratedKey)
+ {
+ sb.append("token(");
+ appendKeyString(sb, metadata.getKeyValidator(), ((DecoratedKey)pos).getKey());
+ sb.append(")");
+ }
+ else
+ {
+ sb.append(((Token.KeyBound)pos).getToken());
+ }
+ }
+
+ private static String getOperator(boolean isStart, boolean isInclusive)
+ {
+ return isStart
+ ? (isInclusive ? ">=" : ">")
+ : (isInclusive ? "<=" : "<");
+ }
+
+ // TODO: this is reused in SinglePartitionReadCommand but this should not really be here. Ideally
+ // we need a more "native" handling of composite partition keys.
+ public static void appendKeyString(StringBuilder sb, AbstractType<?> type, ByteBuffer key)
+ {
+ if (type instanceof CompositeType)
+ {
+ CompositeType ct = (CompositeType)type;
+ ByteBuffer[] values = ct.split(key);
+ for (int i = 0; i < ct.types.size(); i++)
+ sb.append(i == 0 ? "" : ", ").append(ct.types.get(i).getString(values[i]));
+ }
+ else
+ {
+ sb.append(type.getString(key));
+ }
+ }
+
+ /**
+ * Specialized {@code DataRange} used for the paging case.
+ * <p>
+ * It uses the clustering of the last result of the previous page to restrict the filter on the
+ * first queried partition (the one for that last result) so it only fetch results that follow that
+ * last result. In other words, this makes sure this resume paging where we left off.
+ */
+ private static class Paging extends DataRange
+ {
+ private final ClusteringComparator comparator;
+ private final Clustering lastReturned;
+ private final boolean inclusive;
+
+ private Paging(AbstractBounds<PartitionPosition> range,
+ ClusteringIndexFilter filter,
+ ClusteringComparator comparator,
+ Clustering lastReturned,
+ boolean inclusive)
{
super(range, filter);
// When using a paging range, we don't allow wrapped ranges, as it's unclear how to handle them properly.
- // This is ok for now since we only need this in range slice queries, and the range are "unwrapped" in that case.
+ // This is ok for now since we only need this in range queries, and the range are "unwrapped" in that case.
assert !(range instanceof Range) || !((Range<?>)range).isWrapAround() || range.right.isMinimum() : range;
+ assert lastReturned != null;
- this.sliceFilter = filter;
- this.cfm = cfm;
this.comparator = comparator;
- this.firstPartitionColumnStart = firstPartitionColumnStart;
- this.lastPartitionColumnFinish = lastPartitionColumnFinish;
- this.lastKeyFilterWasUpdatedFor = null;
+ this.lastReturned = lastReturned;
+ this.inclusive = inclusive;
}
- public Paging(AbstractBounds<RowPosition> range, SliceQueryFilter filter, Composite columnStart, Composite columnFinish, CFMetaData cfm)
+ @Override
+ public ClusteringIndexFilter clusteringIndexFilter(DecoratedKey key)
{
- this(range, filter, columnStart, columnFinish, cfm, filter.isReversed() ? cfm.comparator.reverseComparator() : cfm.comparator);
+ return key.equals(startKey())
+ ? clusteringIndexFilter.forPaging(comparator, lastReturned, inclusive)
+ : clusteringIndexFilter;
}
@Override
- public boolean selectsFullRowFor(ByteBuffer rowKey)
+ public DataRange forSubRange(AbstractBounds<PartitionPosition> range)
{
- // If we initial filter is not the full filter, don't bother
- if (!selectFullRow)
- return false;
-
- if (!equals(startKey(), rowKey) && !equals(stopKey(), rowKey))
- return true;
+ // This is called for subrange of the initial range. So either it's the beginning of the initial range,
+ // and we need to preserver lastReturned, or it's not, and we don't care about it anymore.
+ return range.left.equals(keyRange().left)
+ ? new Paging(range, clusteringIndexFilter, comparator, lastReturned, inclusive)
+ : new DataRange(range, clusteringIndexFilter);
+ }
- return isFullRowSlice((SliceQueryFilter)columnFilter(rowKey));
+ @Override
+ public boolean isUnrestricted()
+ {
+ return false;
}
+ }
- private boolean equals(RowPosition pos, ByteBuffer rowKey)
+ public static class Serializer
+ {
+ public void serialize(DataRange range, DataOutputPlus out, int version, CFMetaData metadata) throws IOException
{
- return pos instanceof DecoratedKey && ((DecoratedKey)pos).getKey().equals(rowKey);
+ AbstractBounds.rowPositionSerializer.serialize(range.keyRange, out, version);
+ ClusteringIndexFilter.serializer.serialize(range.clusteringIndexFilter, out, version);
+ boolean isPaging = range instanceof Paging;
+ out.writeBoolean(isPaging);
+ if (isPaging)
+ {
+ Clustering.serializer.serialize(((Paging)range).lastReturned, out, version, metadata.comparator.subtypes());
+ out.writeBoolean(((Paging)range).inclusive);
+ }
}
- @Override
- public IDiskAtomFilter columnFilter(ByteBuffer rowKey)
+ public DataRange deserialize(DataInput in, int version, CFMetaData metadata) throws IOException
{
- /*
- * We have that ugly hack that for slice queries, when we ask for
- * the live count, we reach into the query filter to get the last
- * counter number of columns to avoid recounting.
- * Maybe we should just remove that hack, but in the meantime, we
- * need to keep a reference the last returned filter.
- */
- if (equals(startKey(), rowKey) || equals(stopKey(), rowKey))
+ AbstractBounds<PartitionPosition> range = AbstractBounds.rowPositionSerializer.deserialize(in, MessagingService.globalPartitioner(), version);
+ ClusteringIndexFilter filter = ClusteringIndexFilter.serializer.deserialize(in, version, metadata);
+ if (in.readBoolean())
{
- if (!rowKey.equals(lastKeyFilterWasUpdatedFor))
- {
- this.lastKeyFilterWasUpdatedFor = rowKey;
- columnFilter = sliceFilter.withUpdatedSlices(slicesForKey(rowKey));
- }
+ ClusteringComparator comparator = metadata.comparator;
+ Clustering lastReturned = Clustering.serializer.deserialize(in, version, comparator.subtypes());
+ boolean inclusive = in.readBoolean();
+ return new Paging(range, filter, comparator, lastReturned, inclusive);
}
else
{
- columnFilter = sliceFilter;
+ return new DataRange(range, filter);
}
-
- return columnFilter;
}
- /** Returns true if the slice includes static columns, false otherwise. */
- private boolean sliceIncludesStatics(ColumnSlice slice, boolean reversed, CFMetaData cfm)
+ public long serializedSize(DataRange range, int version, CFMetaData metadata)
{
- return cfm.hasStaticColumns() &&
- slice.includes(reversed ? cfm.comparator.reverseComparator() : cfm.comparator, cfm.comparator.staticPrefix().end());
- }
+ long size = AbstractBounds.rowPositionSerializer.serializedSize(range.keyRange, version)
+ + ClusteringIndexFilter.serializer.serializedSize(range.clusteringIndexFilter, version)
+ + 1; // isPaging boolean
- private ColumnSlice[] slicesForKey(ByteBuffer key)
- {
- // Also note that firstPartitionColumnStart and lastPartitionColumnFinish, when used, only "restrict" the filter slices,
- // it doesn't expand on them. As such, we can ignore the case where they are empty and we do
- // as it screw up with the logic below (see #6592)
- Composite newStart = equals(startKey(), key) && !firstPartitionColumnStart.isEmpty() ? firstPartitionColumnStart : null;
- Composite newFinish = equals(stopKey(), key) && !lastPartitionColumnFinish.isEmpty() ? lastPartitionColumnFinish : null;
-
- // in the common case, we'll have the same number of slices
- List<ColumnSlice> newSlices = new ArrayList<>(sliceFilter.slices.length);
-
- // Check our slices to see if any fall before the page start (in which case they can be removed) or
- // if they contain the page start (in which case they should start from the page start). However, if the
- // slices would include static columns, we need to ensure they are also fetched, and so a separate
- // slice for the static columns may be required.
- // Note that if the query is reversed, we can't handle statics by simply adding a separate slice here, so
- // the reversed case is handled by SliceFromReadCommand instead. See CASSANDRA-8502 for more details.
- for (ColumnSlice slice : sliceFilter.slices)
+ if (range instanceof Paging)
{
- if (newStart != null)
- {
- if (slice.isBefore(comparator, newStart))
- {
- if (!sliceFilter.reversed && sliceIncludesStatics(slice, false, cfm))
- newSlices.add(new ColumnSlice(Composites.EMPTY, cfm.comparator.staticPrefix().end()));
-
- continue;
- }
-
- if (slice.includes(comparator, newStart))
- {
- if (!sliceFilter.reversed && sliceIncludesStatics(slice, false, cfm) && !newStart.equals(Composites.EMPTY))
- newSlices.add(new ColumnSlice(Composites.EMPTY, cfm.comparator.staticPrefix().end()));
-
- slice = new ColumnSlice(newStart, slice.finish);
- }
-
- // once we see a slice that either includes the page start or is after it, we can stop checking
- // against the page start (because the slices are ordered)
- newStart = null;
- }
-
- assert newStart == null;
- if (newFinish != null && !slice.isBefore(comparator, newFinish))
- {
- if (slice.includes(comparator, newFinish))
- newSlices.add(new ColumnSlice(slice.start, newFinish));
- // In any case, we're done
- break;
- }
- newSlices.add(slice);
+ size += Clustering.serializer.serializedSize(((Paging)range).lastReturned, version, metadata.comparator.subtypes(), TypeSizes.NATIVE);
+ size += 1; // inclusive boolean
}
-
- return newSlices.toArray(new ColumnSlice[newSlices.size()]);
- }
-
- @Override
- public void updateColumnsLimit(int count)
- {
- columnFilter.updateColumnsLimit(count);
- sliceFilter.updateColumnsLimit(count);
- }
-
- @Override
- public String toString()
- {
- return Objects.toStringHelper(this)
- .add("keyRange", keyRange)
- .add("sliceFilter", sliceFilter)
- .add("columnFilter", columnFilter)
- .add("firstPartitionColumnStart", firstPartitionColumnStart == null ? "null" : cfm.comparator.getString(firstPartitionColumnStart))
- .add("lastPartitionColumnFinish", lastPartitionColumnFinish == null ? "null" : cfm.comparator.getString(lastPartitionColumnFinish))
- .toString();
+ return size;
}
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/DecoratedKey.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DecoratedKey.java b/src/java/org/apache/cassandra/db/DecoratedKey.java
index cc62a15..92d6414 100644
--- a/src/java/org/apache/cassandra/db/DecoratedKey.java
+++ b/src/java/org/apache/cassandra/db/DecoratedKey.java
@@ -36,7 +36,7 @@ import org.apache.cassandra.utils.IFilter.FilterKey;
* if this matters, you can subclass RP to use a stronger hash, or use a non-lossy tokenization scheme (as in the
* OrderPreservingPartitioner classes).
*/
-public abstract class DecoratedKey implements RowPosition, FilterKey
+public abstract class DecoratedKey implements PartitionPosition, FilterKey
{
public static final Comparator<DecoratedKey> comparator = new Comparator<DecoratedKey>()
{
@@ -72,7 +72,7 @@ public abstract class DecoratedKey implements RowPosition, FilterKey
return ByteBufferUtil.compareUnsigned(getKey(), other.getKey()) == 0; // we compare faster than BB.equals for array backed BB
}
- public int compareTo(RowPosition pos)
+ public int compareTo(PartitionPosition pos)
{
if (this == pos)
return 0;
@@ -86,7 +86,7 @@ public abstract class DecoratedKey implements RowPosition, FilterKey
return cmp == 0 ? ByteBufferUtil.compareUnsigned(getKey(), otherKey.getKey()) : cmp;
}
- public static int compareTo(IPartitioner partitioner, ByteBuffer key, RowPosition position)
+ public static int compareTo(IPartitioner partitioner, ByteBuffer key, PartitionPosition position)
{
// delegate to Token.KeyBound if needed
if (!(position instanceof DecoratedKey))
@@ -113,9 +113,9 @@ public abstract class DecoratedKey implements RowPosition, FilterKey
return false;
}
- public RowPosition.Kind kind()
+ public PartitionPosition.Kind kind()
{
- return RowPosition.Kind.ROW_KEY;
+ return PartitionPosition.Kind.ROW_KEY;
}
@Override
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/DeletedCell.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DeletedCell.java b/src/java/org/apache/cassandra/db/DeletedCell.java
deleted file mode 100644
index 998c409..0000000
--- a/src/java/org/apache/cassandra/db/DeletedCell.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db;
-
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.utils.concurrent.OpOrder;
-import org.apache.cassandra.utils.memory.AbstractAllocator;
-import org.apache.cassandra.utils.memory.MemtableAllocator;
-
-public interface DeletedCell extends Cell
-{
- DeletedCell localCopy(CFMetaData metadata, AbstractAllocator allocator);
-
- DeletedCell localCopy(CFMetaData metaData, MemtableAllocator allocator, OpOrder.Group opGroup);
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/DeletionInfo.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DeletionInfo.java b/src/java/org/apache/cassandra/db/DeletionInfo.java
index 048324a..e54d6b1 100644
--- a/src/java/org/apache/cassandra/db/DeletionInfo.java
+++ b/src/java/org/apache/cassandra/db/DeletionInfo.java
@@ -17,40 +17,32 @@
*/
package org.apache.cassandra.db;
-import java.io.DataInput;
-import java.io.IOException;
-import java.security.MessageDigest;
-import java.util.Comparator;
import java.util.Iterator;
import com.google.common.base.Objects;
import com.google.common.collect.Iterators;
import org.apache.cassandra.cache.IMeasurableMemory;
-import org.apache.cassandra.db.composites.CType;
-import org.apache.cassandra.db.composites.Composite;
-import org.apache.cassandra.io.IVersionedSerializer;
-import org.apache.cassandra.io.util.DataOutputPlus;
-import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.db.rows.Cell;
import org.apache.cassandra.utils.ObjectSizes;
import org.apache.cassandra.utils.memory.AbstractAllocator;
/**
- * A combination of a top-level (or row) tombstone and range tombstones describing the deletions
- * within a {@link ColumnFamily} (or row).
+ * A combination of a top-level (partition) tombstone and range tombstones describing the deletions
+ * within a partition.
*/
public class DeletionInfo implements IMeasurableMemory
{
private static final long EMPTY_SIZE = ObjectSizes.measure(new DeletionInfo(0, 0));
/**
- * This represents a deletion of the entire row. We can't represent this within the RangeTombstoneList, so it's
- * kept separately. This also slightly optimizes the common case of a full row deletion.
+ * This represents a deletion of the entire partition. We can't represent this within the RangeTombstoneList, so it's
+ * kept separately. This also slightly optimizes the common case of a full partition deletion.
*/
- private DeletionTime topLevel;
+ private DeletionTime partitionDeletion;
/**
- * A list of range tombstones within the row. This is left as null if there are no range tombstones
+ * A list of range tombstones within the partition. This is left as null if there are no range tombstones
* (to save an allocation (since it's a common case).
*/
private RangeTombstoneList ranges;
@@ -65,28 +57,23 @@ public class DeletionInfo implements IMeasurableMemory
{
// Pre-1.1 node may return MIN_VALUE for non-deleted container, but the new default is MAX_VALUE
// (see CASSANDRA-3872)
- this(new DeletionTime(markedForDeleteAt, localDeletionTime == Integer.MIN_VALUE ? Integer.MAX_VALUE : localDeletionTime));
+ this(new SimpleDeletionTime(markedForDeleteAt, localDeletionTime == Integer.MIN_VALUE ? Integer.MAX_VALUE : localDeletionTime));
}
- public DeletionInfo(DeletionTime topLevel)
+ public DeletionInfo(DeletionTime partitionDeletion)
{
- this(topLevel, null);
+ this(partitionDeletion, null);
}
- public DeletionInfo(Composite start, Composite end, Comparator<Composite> comparator, long markedForDeleteAt, int localDeletionTime)
+ public DeletionInfo(ClusteringComparator comparator, Slice slice, long markedForDeleteAt, int localDeletionTime)
{
this(DeletionTime.LIVE, new RangeTombstoneList(comparator, 1));
- ranges.add(start, end, markedForDeleteAt, localDeletionTime);
+ ranges.add(slice.start(), slice.end(), markedForDeleteAt, localDeletionTime);
}
- public DeletionInfo(RangeTombstone rangeTombstone, Comparator<Composite> comparator)
+ public DeletionInfo(DeletionTime partitionDeletion, RangeTombstoneList ranges)
{
- this(rangeTombstone.min, rangeTombstone.max, comparator, rangeTombstone.data.markedForDeleteAt, rangeTombstone.data.localDeletionTime);
- }
-
- private DeletionInfo(DeletionTime topLevel, RangeTombstoneList ranges)
- {
- this.topLevel = topLevel;
+ this.partitionDeletion = partitionDeletion.takeAlias();
this.ranges = ranges;
}
@@ -100,17 +87,16 @@ public class DeletionInfo implements IMeasurableMemory
public DeletionInfo copy()
{
- return new DeletionInfo(topLevel, ranges == null ? null : ranges.copy());
+ return new DeletionInfo(partitionDeletion, ranges == null ? null : ranges.copy());
}
public DeletionInfo copy(AbstractAllocator allocator)
{
-
RangeTombstoneList rangesCopy = null;
if (ranges != null)
rangesCopy = ranges.copy(allocator);
- return new DeletionInfo(topLevel, rangesCopy);
+ return new DeletionInfo(partitionDeletion, rangesCopy);
}
/**
@@ -118,106 +104,31 @@ public class DeletionInfo implements IMeasurableMemory
*/
public boolean isLive()
{
- return topLevel.isLive() && (ranges == null || ranges.isEmpty());
+ return partitionDeletion.isLive() && (ranges == null || ranges.isEmpty());
}
/**
- * Return whether a given cell is deleted by the container having this deletion info.
+ * Return whether a given cell is deleted by this deletion info.
*
+ * @param clustering the clustering for the cell to check.
* @param cell the cell to check.
* @return true if the cell is deleted, false otherwise
*/
- public boolean isDeleted(Cell cell)
+ private boolean isDeleted(Clustering clustering, Cell cell)
{
- // We do rely on this test: if topLevel.markedForDeleteAt is MIN_VALUE, we should not
- // consider the column deleted even if timestamp=MIN_VALUE, otherwise this break QueryFilter.isRelevant
+ // If we're live, don't consider anything deleted, even if the cell ends up having as timestamp Long.MIN_VALUE
+ // (which shouldn't happen in practice, but it would invalid to consider it deleted if it does).
if (isLive())
return false;
- if (cell.timestamp() <= topLevel.markedForDeleteAt)
+ if (cell.livenessInfo().timestamp() <= partitionDeletion.markedForDeleteAt())
return true;
// No matter what the counter cell's timestamp is, a tombstone always takes precedence. See CASSANDRA-7346.
- if (!topLevel.isLive() && cell instanceof CounterCell)
- return true;
-
- return ranges != null && ranges.isDeleted(cell);
- }
-
- /**
- * Returns a new {@link InOrderTester} in forward order.
- */
- public InOrderTester inOrderTester()
- {
- return inOrderTester(false);
- }
-
- /**
- * Returns a new {@link InOrderTester} given the order in which
- * columns will be passed to it.
- */
- public InOrderTester inOrderTester(boolean reversed)
- {
- return new InOrderTester(reversed);
- }
-
- /**
- * Purge every tombstones that are older than {@code gcbefore}.
- *
- * @param gcBefore timestamp (in seconds) before which tombstones should be purged
- */
- public void purge(int gcBefore)
- {
- topLevel = topLevel.localDeletionTime < gcBefore ? DeletionTime.LIVE : topLevel;
-
- if (ranges != null)
- {
- ranges.purge(gcBefore);
- if (ranges.isEmpty())
- ranges = null;
- }
- }
-
- /**
- * Evaluates difference between this deletion info and superset for read repair
- *
- * @return the difference between the two, or LIVE if no difference
- */
- public DeletionInfo diff(DeletionInfo superset)
- {
- RangeTombstoneList rangeDiff = superset.ranges == null || superset.ranges.isEmpty()
- ? null
- : ranges == null ? superset.ranges : ranges.diff(superset.ranges);
-
- return topLevel.markedForDeleteAt != superset.topLevel.markedForDeleteAt || rangeDiff != null
- ? new DeletionInfo(superset.topLevel, rangeDiff)
- : DeletionInfo.live();
- }
-
-
- /**
- * Digests deletion info. Used to trigger read repair on mismatch.
- */
- public void updateDigest(MessageDigest digest)
- {
- if (topLevel.markedForDeleteAt != Long.MIN_VALUE)
- digest.update(ByteBufferUtil.bytes(topLevel.markedForDeleteAt));
-
- if (ranges != null)
- ranges.updateDigest(digest);
- }
-
- /**
- * Returns true if {@code purge} would remove the top-level tombstone or any of the range
- * tombstones, false otherwise.
- * @param gcBefore timestamp (in seconds) before which tombstones should be purged
- */
- public boolean hasPurgeableTombstones(int gcBefore)
- {
- if (topLevel.localDeletionTime < gcBefore)
+ if (!partitionDeletion.isLive() && cell.isCounterCell())
return true;
- return ranges != null && ranges.hasPurgeableTombstones(gcBefore);
+ return ranges != null && ranges.isDeleted(clustering, cell);
}
/**
@@ -227,11 +138,11 @@ public class DeletionInfo implements IMeasurableMemory
*/
public void add(DeletionTime newInfo)
{
- if (topLevel.markedForDeleteAt < newInfo.markedForDeleteAt)
- topLevel = newInfo;
+ if (newInfo.supersedes(partitionDeletion))
+ partitionDeletion = newInfo;
}
- public void add(RangeTombstone tombstone, Comparator<Composite> comparator)
+ public void add(RangeTombstone tombstone, ClusteringComparator comparator)
{
if (ranges == null)
ranges = new RangeTombstoneList(comparator, 1);
@@ -248,7 +159,7 @@ public class DeletionInfo implements IMeasurableMemory
*/
public DeletionInfo add(DeletionInfo newInfo)
{
- add(newInfo.topLevel);
+ add(newInfo.partitionDeletion);
if (ranges == null)
ranges = newInfo.ranges == null ? null : newInfo.ranges.copy();
@@ -258,53 +169,30 @@ public class DeletionInfo implements IMeasurableMemory
return this;
}
- /**
- * Returns the minimum timestamp in any of the range tombstones or the top-level tombstone.
- */
- public long minTimestamp()
- {
- return ranges == null
- ? topLevel.markedForDeleteAt
- : Math.min(topLevel.markedForDeleteAt, ranges.minMarkedAt());
- }
-
- /**
- * Returns the maximum timestamp in any of the range tombstones or the top-level tombstone.
- */
- public long maxTimestamp()
- {
- return ranges == null
- ? topLevel.markedForDeleteAt
- : Math.max(topLevel.markedForDeleteAt, ranges.maxMarkedAt());
- }
-
- /**
- * Returns the top-level (or "row") tombstone.
- */
- public DeletionTime getTopLevelDeletion()
+ public DeletionTime getPartitionDeletion()
{
- return topLevel;
+ return partitionDeletion;
}
// Use sparingly, not the most efficient thing
- public Iterator<RangeTombstone> rangeIterator()
+ public Iterator<RangeTombstone> rangeIterator(boolean reversed)
{
- return ranges == null ? Iterators.<RangeTombstone>emptyIterator() : ranges.iterator();
+ return ranges == null ? Iterators.<RangeTombstone>emptyIterator() : ranges.iterator(reversed);
}
- public Iterator<RangeTombstone> rangeIterator(Composite start, Composite finish)
+ public Iterator<RangeTombstone> rangeIterator(Slice slice, boolean reversed)
{
- return ranges == null ? Iterators.<RangeTombstone>emptyIterator() : ranges.iterator(start, finish);
+ return ranges == null ? Iterators.<RangeTombstone>emptyIterator() : ranges.iterator(slice, reversed);
}
- public RangeTombstone rangeCovering(Composite name)
+ public RangeTombstone rangeCovering(Clustering name)
{
return ranges == null ? null : ranges.search(name);
}
public int dataSize()
{
- int size = TypeSizes.NATIVE.sizeof(topLevel.markedForDeleteAt);
+ int size = TypeSizes.NATIVE.sizeof(partitionDeletion.markedForDeleteAt());
return size + (ranges == null ? 0 : ranges.dataSize());
}
@@ -323,45 +211,43 @@ public class DeletionInfo implements IMeasurableMemory
*/
public boolean mayModify(DeletionInfo delInfo)
{
- return topLevel.compareTo(delInfo.topLevel) > 0 || hasRanges();
+ return partitionDeletion.compareTo(delInfo.partitionDeletion) > 0 || hasRanges();
}
@Override
public String toString()
{
if (ranges == null || ranges.isEmpty())
- return String.format("{%s}", topLevel);
+ return String.format("{%s}", partitionDeletion);
else
- return String.format("{%s, ranges=%s}", topLevel, rangesAsString());
+ return String.format("{%s, ranges=%s}", partitionDeletion, rangesAsString());
}
private String rangesAsString()
{
assert !ranges.isEmpty();
StringBuilder sb = new StringBuilder();
- CType type = (CType)ranges.comparator();
- assert type != null;
- Iterator<RangeTombstone> iter = rangeIterator();
+ ClusteringComparator cc = ranges.comparator();
+ Iterator<RangeTombstone> iter = rangeIterator(false);
while (iter.hasNext())
{
RangeTombstone i = iter.next();
- sb.append("[");
- sb.append(type.getString(i.min)).append("-");
- sb.append(type.getString(i.max)).append(", ");
- sb.append(i.data);
- sb.append("]");
+ sb.append(i.deletedSlice().toString(cc));
+ sb.append("@");
+ sb.append(i.deletionTime());
}
return sb.toString();
}
// Updates all the timestamp of the deletion contained in this DeletionInfo to be {@code timestamp}.
- public void updateAllTimestamp(long timestamp)
+ public DeletionInfo updateAllTimestamp(long timestamp)
{
- if (topLevel.markedForDeleteAt != Long.MIN_VALUE)
- topLevel = new DeletionTime(timestamp, topLevel.localDeletionTime);
+ if (partitionDeletion.markedForDeleteAt() != Long.MIN_VALUE)
+ partitionDeletion = new SimpleDeletionTime(timestamp, partitionDeletion.localDeletionTime());
if (ranges != null)
ranges.updateAllTimestamp(timestamp);
+ return this;
}
@Override
@@ -370,100 +256,18 @@ public class DeletionInfo implements IMeasurableMemory
if(!(o instanceof DeletionInfo))
return false;
DeletionInfo that = (DeletionInfo)o;
- return topLevel.equals(that.topLevel) && Objects.equal(ranges, that.ranges);
+ return partitionDeletion.equals(that.partitionDeletion) && Objects.equal(ranges, that.ranges);
}
@Override
public final int hashCode()
{
- return Objects.hashCode(topLevel, ranges);
+ return Objects.hashCode(partitionDeletion, ranges);
}
@Override
public long unsharedHeapSize()
{
- return EMPTY_SIZE + topLevel.unsharedHeapSize() + (ranges == null ? 0 : ranges.unsharedHeapSize());
- }
-
- public static class Serializer implements IVersionedSerializer<DeletionInfo>
- {
- private final RangeTombstoneList.Serializer rtlSerializer;
-
- public Serializer(CType type)
- {
- this.rtlSerializer = new RangeTombstoneList.Serializer(type);
- }
-
- public void serialize(DeletionInfo info, DataOutputPlus out, int version) throws IOException
- {
- DeletionTime.serializer.serialize(info.topLevel, out);
- rtlSerializer.serialize(info.ranges, out, version);
- }
-
- public DeletionInfo deserialize(DataInput in, int version) throws IOException
- {
- DeletionTime topLevel = DeletionTime.serializer.deserialize(in);
- RangeTombstoneList ranges = rtlSerializer.deserialize(in, version);
- return new DeletionInfo(topLevel, ranges);
- }
-
- public long serializedSize(DeletionInfo info, TypeSizes typeSizes, int version)
- {
- long size = DeletionTime.serializer.serializedSize(info.topLevel, typeSizes);
- return size + rtlSerializer.serializedSize(info.ranges, typeSizes, version);
- }
-
- public long serializedSize(DeletionInfo info, int version)
- {
- return serializedSize(info, TypeSizes.NATIVE, version);
- }
- }
-
- /**
- * This object allow testing whether a given column (name/timestamp) is deleted
- * or not by this DeletionInfo, assuming that the columns given to this
- * object are passed in forward or reversed comparator sorted order.
- *
- * This is more efficient that calling DeletionInfo.isDeleted() repeatedly
- * in that case.
- */
- public class InOrderTester
- {
- /*
- * Note that because because range tombstone are added to this DeletionInfo while we iterate,
- * `ranges` may be null initially and we need to wait for the first range to create the tester (once
- * created the test will pick up new tombstones however). We are guaranteed that a range tombstone
- * will be added *before* we test any column that it may delete, so this is ok.
- */
- private RangeTombstoneList.InOrderTester tester;
- private final boolean reversed;
-
- private InOrderTester(boolean reversed)
- {
- this.reversed = reversed;
- }
-
- public boolean isDeleted(Cell cell)
- {
- if (cell.timestamp() <= topLevel.markedForDeleteAt)
- return true;
-
- // No matter what the counter cell's timestamp is, a tombstone always takes precedence. See CASSANDRA-7346.
- if (!topLevel.isLive() && cell instanceof CounterCell)
- return true;
-
- /*
- * We don't optimize the reversed case for now because RangeTombstoneList
- * is always in forward sorted order.
- */
- if (reversed)
- return DeletionInfo.this.isDeleted(cell);
-
- // Maybe create the tester if we hadn't yet and we now have some ranges (see above).
- if (tester == null && ranges != null)
- tester = ranges.inOrderTester();
-
- return tester != null && tester.isDeleted(cell);
- }
+ return EMPTY_SIZE + partitionDeletion.unsharedHeapSize() + (ranges == null ? 0 : ranges.unsharedHeapSize());
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/DeletionTime.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DeletionTime.java b/src/java/org/apache/cassandra/db/DeletionTime.java
index 7165417..f070778 100644
--- a/src/java/org/apache/cassandra/db/DeletionTime.java
+++ b/src/java/org/apache/cassandra/db/DeletionTime.java
@@ -19,58 +19,56 @@ package org.apache.cassandra.db;
import java.io.DataInput;
import java.io.IOException;
+import java.security.MessageDigest;
-import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Objects;
import org.apache.cassandra.cache.IMeasurableMemory;
import org.apache.cassandra.io.ISerializer;
import org.apache.cassandra.io.util.DataOutputPlus;
import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.ObjectSizes;
-import org.codehaus.jackson.annotate.JsonIgnore;
/**
- * A top-level (row) tombstone.
+ * Information on deletion of a storage engine object.
*/
-public class DeletionTime implements Comparable<DeletionTime>, IMeasurableMemory
+public abstract class DeletionTime implements Comparable<DeletionTime>, IMeasurableMemory, Aliasable<DeletionTime>
{
- private static final long EMPTY_SIZE = ObjectSizes.measure(new DeletionTime(0, 0));
+ private static final long EMPTY_SIZE = ObjectSizes.measure(new SimpleDeletionTime(0, 0));
/**
* A special DeletionTime that signifies that there is no top-level (row) tombstone.
*/
- public static final DeletionTime LIVE = new DeletionTime(Long.MIN_VALUE, Integer.MAX_VALUE);
+ public static final DeletionTime LIVE = new SimpleDeletionTime(Long.MIN_VALUE, Integer.MAX_VALUE);
+
+ public static final Serializer serializer = new Serializer();
/**
* A timestamp (typically in microseconds since the unix epoch, although this is not enforced) after which
* data should be considered deleted. If set to Long.MIN_VALUE, this implies that the data has not been marked
* for deletion at all.
*/
- public final long markedForDeleteAt;
+ public abstract long markedForDeleteAt();
/**
* The local server timestamp, in seconds since the unix epoch, at which this tombstone was created. This is
* only used for purposes of purging the tombstone after gc_grace_seconds have elapsed.
*/
- public final int localDeletionTime;
-
- public static final Serializer serializer = new Serializer();
-
- @VisibleForTesting
- public DeletionTime(long markedForDeleteAt, int localDeletionTime)
- {
- this.markedForDeleteAt = markedForDeleteAt;
- this.localDeletionTime = localDeletionTime;
- }
+ public abstract int localDeletionTime();
/**
* Returns whether this DeletionTime is live, that is deletes no columns.
*/
- @JsonIgnore
public boolean isLive()
{
- return markedForDeleteAt == Long.MIN_VALUE && localDeletionTime == Integer.MAX_VALUE;
+ return markedForDeleteAt() == Long.MIN_VALUE && localDeletionTime() == Integer.MAX_VALUE;
+ }
+
+ public void digest(MessageDigest digest)
+ {
+ FBUtilities.updateWithLong(digest, markedForDeleteAt());
+ FBUtilities.updateWithInt(digest, localDeletionTime());
}
@Override
@@ -79,48 +77,58 @@ public class DeletionTime implements Comparable<DeletionTime>, IMeasurableMemory
if(!(o instanceof DeletionTime))
return false;
DeletionTime that = (DeletionTime)o;
- return markedForDeleteAt == that.markedForDeleteAt && localDeletionTime == that.localDeletionTime;
+ return markedForDeleteAt() == that.markedForDeleteAt() && localDeletionTime() == that.localDeletionTime();
}
@Override
public final int hashCode()
{
- return Objects.hashCode(markedForDeleteAt, localDeletionTime);
+ return Objects.hashCode(markedForDeleteAt(), localDeletionTime());
}
@Override
public String toString()
{
- return String.format("deletedAt=%d, localDeletion=%d", markedForDeleteAt, localDeletionTime);
+ return String.format("deletedAt=%d, localDeletion=%d", markedForDeleteAt(), localDeletionTime());
}
public int compareTo(DeletionTime dt)
{
- if (markedForDeleteAt < dt.markedForDeleteAt)
+ if (markedForDeleteAt() < dt.markedForDeleteAt())
return -1;
- else if (markedForDeleteAt > dt.markedForDeleteAt)
+ else if (markedForDeleteAt() > dt.markedForDeleteAt())
return 1;
- else if (localDeletionTime < dt.localDeletionTime)
+ else if (localDeletionTime() < dt.localDeletionTime())
return -1;
- else if (localDeletionTime > dt.localDeletionTime)
+ else if (localDeletionTime() > dt.localDeletionTime())
return -1;
else
return 0;
}
- public boolean isGcAble(int gcBefore)
+ public boolean supersedes(DeletionTime dt)
{
- return localDeletionTime < gcBefore;
+ return markedForDeleteAt() > dt.markedForDeleteAt() || (markedForDeleteAt() == dt.markedForDeleteAt() && localDeletionTime() > dt.localDeletionTime());
}
- public boolean isDeleted(OnDiskAtom atom)
+ public boolean isPurgeable(long maxPurgeableTimestamp, int gcBefore)
{
- return atom.timestamp() <= markedForDeleteAt;
+ return markedForDeleteAt() < maxPurgeableTimestamp && localDeletionTime() < gcBefore;
}
- public boolean supersedes(DeletionTime dt)
+ public boolean deletes(LivenessInfo info)
+ {
+ return deletes(info.timestamp());
+ }
+
+ public boolean deletes(long timestamp)
+ {
+ return timestamp <= markedForDeleteAt();
+ }
+
+ public int dataSize()
{
- return this.markedForDeleteAt > dt.markedForDeleteAt;
+ return 12;
}
public long unsharedHeapSize()
@@ -132,8 +140,8 @@ public class DeletionTime implements Comparable<DeletionTime>, IMeasurableMemory
{
public void serialize(DeletionTime delTime, DataOutputPlus out) throws IOException
{
- out.writeInt(delTime.localDeletionTime);
- out.writeLong(delTime.markedForDeleteAt);
+ out.writeInt(delTime.localDeletionTime());
+ out.writeLong(delTime.markedForDeleteAt());
}
public DeletionTime deserialize(DataInput in) throws IOException
@@ -142,7 +150,7 @@ public class DeletionTime implements Comparable<DeletionTime>, IMeasurableMemory
long mfda = in.readLong();
return mfda == Long.MIN_VALUE && ldt == Integer.MAX_VALUE
? LIVE
- : new DeletionTime(mfda, ldt);
+ : new SimpleDeletionTime(mfda, ldt);
}
public void skip(DataInput in) throws IOException
@@ -152,8 +160,8 @@ public class DeletionTime implements Comparable<DeletionTime>, IMeasurableMemory
public long serializedSize(DeletionTime delTime, TypeSizes typeSizes)
{
- return typeSizes.sizeof(delTime.localDeletionTime)
- + typeSizes.sizeof(delTime.markedForDeleteAt);
+ return typeSizes.sizeof(delTime.localDeletionTime())
+ + typeSizes.sizeof(delTime.markedForDeleteAt());
}
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/DeletionTimeArray.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DeletionTimeArray.java b/src/java/org/apache/cassandra/db/DeletionTimeArray.java
new file mode 100644
index 0000000..77eb953
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/DeletionTimeArray.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db;
+
+import java.util.Arrays;
+
+import org.apache.cassandra.utils.ObjectSizes;
+
+/**
+ * Utility class to store an array of deletion times a bit efficiently.
+ */
+public class DeletionTimeArray
+{
+ private long[] markedForDeleteAts;
+ private int[] delTimes;
+
+ public DeletionTimeArray(int initialCapacity)
+ {
+ this.markedForDeleteAts = new long[initialCapacity];
+ this.delTimes = new int[initialCapacity];
+ clear();
+ }
+
+ public void clear(int i)
+ {
+ markedForDeleteAts[i] = Long.MIN_VALUE;
+ delTimes[i] = Integer.MAX_VALUE;
+ }
+
+ public void set(int i, DeletionTime dt)
+ {
+ this.markedForDeleteAts[i] = dt.markedForDeleteAt();
+ this.delTimes[i] = dt.localDeletionTime();
+ }
+
+ public int size()
+ {
+ return markedForDeleteAts.length;
+ }
+
+ public void resize(int newSize)
+ {
+ int prevSize = size();
+
+ markedForDeleteAts = Arrays.copyOf(markedForDeleteAts, newSize);
+ delTimes = Arrays.copyOf(delTimes, newSize);
+
+ Arrays.fill(markedForDeleteAts, prevSize, newSize, Long.MIN_VALUE);
+ Arrays.fill(delTimes, prevSize, newSize, Integer.MAX_VALUE);
+ }
+
+ public boolean supersedes(int i, DeletionTime dt)
+ {
+ return markedForDeleteAts[i] > dt.markedForDeleteAt();
+ }
+
+ public boolean supersedes(int i, int j)
+ {
+ return markedForDeleteAts[i] > markedForDeleteAts[j];
+ }
+
+ public void swap(int i, int j)
+ {
+ long m = markedForDeleteAts[j];
+ int l = delTimes[j];
+
+ move(i, j);
+
+ markedForDeleteAts[i] = m;
+ delTimes[i] = l;
+ }
+
+ public void move(int i, int j)
+ {
+ markedForDeleteAts[j] = markedForDeleteAts[i];
+ delTimes[j] = delTimes[i];
+ }
+
+ public boolean isLive(int i)
+ {
+ return markedForDeleteAts[i] > Long.MIN_VALUE;
+ }
+
+ public void clear()
+ {
+ Arrays.fill(markedForDeleteAts, Long.MIN_VALUE);
+ Arrays.fill(delTimes, Integer.MAX_VALUE);
+ }
+
+ public int dataSize()
+ {
+ return 12 * markedForDeleteAts.length;
+ }
+
+ public long unsharedHeapSize()
+ {
+ return ObjectSizes.sizeOfArray(markedForDeleteAts)
+ + ObjectSizes.sizeOfArray(delTimes);
+ }
+
+ public void copy(DeletionTimeArray other)
+ {
+ assert size() == other.size();
+ for (int i = 0; i < size(); i++)
+ {
+ markedForDeleteAts[i] = other.markedForDeleteAts[i];
+ delTimes[i] = other.delTimes[i];
+ }
+ }
+
+ public static class Cursor extends DeletionTime
+ {
+ private DeletionTimeArray array;
+ private int i;
+
+ public Cursor setTo(DeletionTimeArray array, int i)
+ {
+ this.array = array;
+ this.i = i;
+ return this;
+ }
+
+ public long markedForDeleteAt()
+ {
+ return array.markedForDeleteAts[i];
+ }
+
+ public int localDeletionTime()
+ {
+ return array.delTimes[i];
+ }
+
+ public DeletionTime takeAlias()
+ {
+ return new SimpleDeletionTime(markedForDeleteAt(), localDeletionTime());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/ExpiringCell.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ExpiringCell.java b/src/java/org/apache/cassandra/db/ExpiringCell.java
deleted file mode 100644
index 5fc0f94..0000000
--- a/src/java/org/apache/cassandra/db/ExpiringCell.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db;
-
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.utils.concurrent.OpOrder;
-import org.apache.cassandra.utils.memory.AbstractAllocator;
-import org.apache.cassandra.utils.memory.MemtableAllocator;
-
-/**
- * Alternative to Cell that have an expiring time.
- * ExpiringCell is immutable (as Cell is).
- *
- * Note that ExpiringCell does not override Cell.getMarkedForDeleteAt,
- * which means that it's in the somewhat unintuitive position of being deleted (after its expiration)
- * without having a time-at-which-it-became-deleted. (Because ttl is a server-side measurement,
- * we can't mix it with the timestamp field, which is client-supplied and whose resolution we
- * can't assume anything about.)
- */
-public interface ExpiringCell extends Cell
-{
- public static final int MAX_TTL = 20 * 365 * 24 * 60 * 60; // 20 years in seconds
-
- public int getTimeToLive();
-
- ExpiringCell localCopy(CFMetaData metadata, AbstractAllocator allocator);
-
- ExpiringCell localCopy(CFMetaData metaData, MemtableAllocator allocator, OpOrder.Group opGroup);
-}