You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2015/06/30 12:48:01 UTC
[37/51] [partial] cassandra git commit: Storage engine refactor,
a.k.a CASSANDRA-8099
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/RangeSliceCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/RangeSliceCommand.java b/src/java/org/apache/cassandra/db/RangeSliceCommand.java
deleted file mode 100644
index 664eeee..0000000
--- a/src/java/org/apache/cassandra/db/RangeSliceCommand.java
+++ /dev/null
@@ -1,246 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db;
-
-import java.io.DataInput;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import com.google.common.base.Objects;
-
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.Schema;
-import org.apache.cassandra.db.filter.ExtendedFilter;
-import org.apache.cassandra.db.filter.IDiskAtomFilter;
-import org.apache.cassandra.dht.AbstractBounds;
-import org.apache.cassandra.io.IVersionedSerializer;
-import org.apache.cassandra.io.util.DataOutputPlus;
-import org.apache.cassandra.net.MessageOut;
-import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.service.pager.Pageable;
-
-public class RangeSliceCommand extends AbstractRangeCommand implements Pageable
-{
- public static final RangeSliceCommandSerializer serializer = new RangeSliceCommandSerializer();
-
- public final int maxResults;
- public final boolean countCQL3Rows;
- public final boolean isPaging;
-
- public RangeSliceCommand(String keyspace,
- String columnFamily,
- long timestamp,
- IDiskAtomFilter predicate,
- AbstractBounds<RowPosition> range,
- int maxResults)
- {
- this(keyspace, columnFamily, timestamp, predicate, range, null, maxResults, false, false);
- }
-
- public RangeSliceCommand(String keyspace,
- String columnFamily,
- long timestamp,
- IDiskAtomFilter predicate,
- AbstractBounds<RowPosition> range,
- List<IndexExpression> row_filter,
- int maxResults)
- {
- this(keyspace, columnFamily, timestamp, predicate, range, row_filter, maxResults, false, false);
- }
-
- public RangeSliceCommand(String keyspace,
- String columnFamily,
- long timestamp,
- IDiskAtomFilter predicate,
- AbstractBounds<RowPosition> range,
- List<IndexExpression> rowFilter,
- int maxResults,
- boolean countCQL3Rows,
- boolean isPaging)
- {
- super(keyspace, columnFamily, timestamp, range, predicate, rowFilter);
- this.maxResults = maxResults;
- this.countCQL3Rows = countCQL3Rows;
- this.isPaging = isPaging;
- }
-
- public MessageOut<RangeSliceCommand> createMessage()
- {
- return new MessageOut<>(MessagingService.Verb.RANGE_SLICE, this, serializer);
- }
-
- public AbstractRangeCommand forSubRange(AbstractBounds<RowPosition> subRange)
- {
- return new RangeSliceCommand(keyspace,
- columnFamily,
- timestamp,
- predicate.cloneShallow(),
- subRange,
- rowFilter,
- maxResults,
- countCQL3Rows,
- isPaging);
- }
-
- public AbstractRangeCommand withUpdatedLimit(int newLimit)
- {
- return new RangeSliceCommand(keyspace,
- columnFamily,
- timestamp,
- predicate.cloneShallow(),
- keyRange,
- rowFilter,
- newLimit,
- countCQL3Rows,
- isPaging);
- }
-
- public int limit()
- {
- return maxResults;
- }
-
- public boolean countCQL3Rows()
- {
- return countCQL3Rows;
- }
-
- public List<Row> executeLocally()
- {
- ColumnFamilyStore cfs = Keyspace.open(keyspace).getColumnFamilyStore(columnFamily);
-
- ExtendedFilter exFilter = cfs.makeExtendedFilter(keyRange, predicate, rowFilter, maxResults, countCQL3Rows, isPaging, timestamp);
- if (cfs.indexManager.hasIndexFor(rowFilter))
- return cfs.search(exFilter);
- else
- return cfs.getRangeSlice(exFilter);
- }
-
- @Override
- public String toString()
- {
- return Objects.toStringHelper(this)
- .add("keyspace", keyspace)
- .add("columnFamily", columnFamily)
- .add("predicate", predicate)
- .add("keyRange", keyRange)
- .add("rowFilter", rowFilter)
- .add("maxResults", maxResults)
- .add("counterCQL3Rows", countCQL3Rows)
- .add("timestamp", timestamp)
- .toString();
- }
-}
-
-class RangeSliceCommandSerializer implements IVersionedSerializer<RangeSliceCommand>
-{
- public void serialize(RangeSliceCommand sliceCommand, DataOutputPlus out, int version) throws IOException
- {
- out.writeUTF(sliceCommand.keyspace);
- out.writeUTF(sliceCommand.columnFamily);
- out.writeLong(sliceCommand.timestamp);
-
- CFMetaData metadata = Schema.instance.getCFMetaData(sliceCommand.keyspace, sliceCommand.columnFamily);
-
- metadata.comparator.diskAtomFilterSerializer().serialize(sliceCommand.predicate, out, version);
-
- if (sliceCommand.rowFilter == null)
- {
- out.writeInt(0);
- }
- else
- {
- out.writeInt(sliceCommand.rowFilter.size());
- for (IndexExpression expr : sliceCommand.rowFilter)
- {
- expr.writeTo(out);
- }
- }
- MessagingService.validatePartitioner(sliceCommand.keyRange);
- AbstractBounds.rowPositionSerializer.serialize(sliceCommand.keyRange, out, version);
- out.writeInt(sliceCommand.maxResults);
- out.writeBoolean(sliceCommand.countCQL3Rows);
- out.writeBoolean(sliceCommand.isPaging);
- }
-
- public RangeSliceCommand deserialize(DataInput in, int version) throws IOException
- {
- String keyspace = in.readUTF();
- String columnFamily = in.readUTF();
- long timestamp = in.readLong();
-
- CFMetaData metadata = Schema.instance.getCFMetaData(keyspace, columnFamily);
- if (metadata == null)
- {
- String message = String.format("Got range slice command for nonexistent table %s.%s. If the table was just " +
- "created, this is likely due to the schema not being fully propagated. Please wait for schema " +
- "agreement on table creation." , keyspace, columnFamily);
- throw new UnknownColumnFamilyException(message, null);
- }
-
- IDiskAtomFilter predicate = metadata.comparator.diskAtomFilterSerializer().deserialize(in, version);
-
- List<IndexExpression> rowFilter;
- int filterCount = in.readInt();
- rowFilter = new ArrayList<>(filterCount);
- for (int i = 0; i < filterCount; i++)
- {
- rowFilter.add(IndexExpression.readFrom(in));
- }
- AbstractBounds<RowPosition> range = AbstractBounds.rowPositionSerializer.deserialize(in, MessagingService.globalPartitioner(), version);
-
- int maxResults = in.readInt();
- boolean countCQL3Rows = in.readBoolean();
- boolean isPaging = in.readBoolean();
- return new RangeSliceCommand(keyspace, columnFamily, timestamp, predicate, range, rowFilter, maxResults, countCQL3Rows, isPaging);
- }
-
- public long serializedSize(RangeSliceCommand rsc, int version)
- {
- long size = TypeSizes.NATIVE.sizeof(rsc.keyspace);
- size += TypeSizes.NATIVE.sizeof(rsc.columnFamily);
- size += TypeSizes.NATIVE.sizeof(rsc.timestamp);
-
- CFMetaData metadata = Schema.instance.getCFMetaData(rsc.keyspace, rsc.columnFamily);
-
- IDiskAtomFilter filter = rsc.predicate;
-
- size += metadata.comparator.diskAtomFilterSerializer().serializedSize(filter, version);
-
- if (rsc.rowFilter == null)
- {
- size += TypeSizes.NATIVE.sizeof(0);
- }
- else
- {
- size += TypeSizes.NATIVE.sizeof(rsc.rowFilter.size());
- for (IndexExpression expr : rsc.rowFilter)
- {
- size += TypeSizes.NATIVE.sizeofWithShortLength(expr.column);
- size += TypeSizes.NATIVE.sizeof(expr.operator.ordinal());
- size += TypeSizes.NATIVE.sizeofWithShortLength(expr.value);
- }
- }
- size += AbstractBounds.rowPositionSerializer.serializedSize(rsc.keyRange, version);
- size += TypeSizes.NATIVE.sizeof(rsc.maxResults);
- size += TypeSizes.NATIVE.sizeof(rsc.countCQL3Rows);
- size += TypeSizes.NATIVE.sizeof(rsc.isPaging);
- return size;
- }
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/RangeSliceReply.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/RangeSliceReply.java b/src/java/org/apache/cassandra/db/RangeSliceReply.java
deleted file mode 100644
index ed1f523..0000000
--- a/src/java/org/apache/cassandra/db/RangeSliceReply.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db;
-
-import java.io.DataInput;
-import java.io.DataInputStream;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.commons.lang3.StringUtils;
-
-import org.apache.cassandra.io.IVersionedSerializer;
-import org.apache.cassandra.io.util.DataOutputPlus;
-import org.apache.cassandra.io.util.FastByteArrayInputStream;
-import org.apache.cassandra.net.MessageOut;
-import org.apache.cassandra.net.MessagingService;
-
-public class RangeSliceReply
-{
- public static final RangeSliceReplySerializer serializer = new RangeSliceReplySerializer();
-
- public final List<Row> rows;
-
- public RangeSliceReply(List<Row> rows)
- {
- this.rows = rows;
- }
-
- public MessageOut<RangeSliceReply> createMessage()
- {
- return new MessageOut<RangeSliceReply>(MessagingService.Verb.REQUEST_RESPONSE, this, serializer);
- }
-
- @Override
- public String toString()
- {
- return "RangeSliceReply{" +
- "rows=" + StringUtils.join(rows, ",") +
- '}';
- }
-
- public static RangeSliceReply read(byte[] body, int version) throws IOException
- {
- try (DataInputStream dis = new DataInputStream(new FastByteArrayInputStream(body)))
- {
- return serializer.deserialize(dis, version);
- }
- }
-
- private static class RangeSliceReplySerializer implements IVersionedSerializer<RangeSliceReply>
- {
- public void serialize(RangeSliceReply rsr, DataOutputPlus out, int version) throws IOException
- {
- out.writeInt(rsr.rows.size());
- for (Row row : rsr.rows)
- Row.serializer.serialize(row, out, version);
- }
-
- public RangeSliceReply deserialize(DataInput in, int version) throws IOException
- {
- int rowCount = in.readInt();
- List<Row> rows = new ArrayList<Row>(rowCount);
- for (int i = 0; i < rowCount; i++)
- rows.add(Row.serializer.deserialize(in, version));
- return new RangeSliceReply(rows);
- }
-
- public long serializedSize(RangeSliceReply rsr, int version)
- {
- int size = TypeSizes.NATIVE.sizeof(rsr.rows.size());
- for (Row row : rsr.rows)
- size += Row.serializer.serializedSize(row, version);
- return size;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/RangeTombstone.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/RangeTombstone.java b/src/java/org/apache/cassandra/db/RangeTombstone.java
index da483fc..3373afa 100644
--- a/src/java/org/apache/cassandra/db/RangeTombstone.java
+++ b/src/java/org/apache/cassandra/db/RangeTombstone.java
@@ -19,12 +19,12 @@ package org.apache.cassandra.db;
import java.io.DataInput;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.security.MessageDigest;
import java.util.*;
import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.db.composites.CType;
-import org.apache.cassandra.db.composites.Composite;
+import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.io.ISSTableSerializer;
import org.apache.cassandra.io.sstable.format.Version;
import org.apache.cassandra.io.util.DataOutputBuffer;
@@ -32,350 +32,142 @@ import org.apache.cassandra.io.util.DataOutputPlus;
import org.apache.cassandra.serializers.MarshalException;
import org.apache.cassandra.utils.Interval;
-public class RangeTombstone extends Interval<Composite, DeletionTime> implements OnDiskAtom
+/**
+ * A range tombstone is a tombstone that covers a slice/range of rows.
+ * <p>
+ * Note that in most of the storage engine, a range tombstone is actually represented by its separated
+ * opening and closing bound, see {@link RangeTombstoneMarker}. So in practice, this is only used when
+ * full partitions are materialized in memory in a {@code Partition} object, and more precisely through
+ * the use of a {@code RangeTombstoneList} in a {@code DeletionInfo} object.
+ */
+public class RangeTombstone
{
- public RangeTombstone(Composite start, Composite stop, long markedForDeleteAt, int localDeletionTime)
- {
- this(start, stop, new DeletionTime(markedForDeleteAt, localDeletionTime));
- }
-
- public RangeTombstone(Composite start, Composite stop, DeletionTime delTime)
- {
- super(start, stop, delTime);
- }
-
- public Composite name()
- {
- return min;
- }
+ private final Slice slice;
+ private final DeletionTime deletion;
- public int getLocalDeletionTime()
+ public RangeTombstone(Slice slice, DeletionTime deletion)
{
- return data.localDeletionTime;
+ this.slice = slice;
+ this.deletion = deletion.takeAlias();
}
- public long timestamp()
+ /**
+ * The slice of rows that is deleted by this range tombstone.
+ *
+ * @return the slice of rows that is deleted by this range tombstone.
+ */
+ public Slice deletedSlice()
{
- return data.markedForDeleteAt;
+ return slice;
}
- public void validateFields(CFMetaData metadata) throws MarshalException
+ /**
+ * The deletion time for this (range) tombstone.
+ *
+ * @return the deletion time for this range tombstone.
+ */
+ public DeletionTime deletionTime()
{
- metadata.comparator.validate(min);
- metadata.comparator.validate(max);
+ return deletion;
}
- public void updateDigest(MessageDigest digest)
+ @Override
+ public boolean equals(Object other)
{
- digest.update(min.toByteBuffer().duplicate());
- digest.update(max.toByteBuffer().duplicate());
-
- try (DataOutputBuffer buffer = new DataOutputBuffer())
- {
- buffer.writeLong(data.markedForDeleteAt);
- digest.update(buffer.getData(), 0, buffer.getLength());
- }
- catch (IOException e)
- {
- throw new RuntimeException(e);
- }
- }
-
- /**
- * This tombstone supersedes another one if it is more recent and cover a
- * bigger range than rt.
- */
- public boolean supersedes(RangeTombstone rt, Comparator<Composite> comparator)
- {
- if (rt.data.markedForDeleteAt > data.markedForDeleteAt)
+ if(!(other instanceof RangeTombstone))
return false;
- return comparator.compare(min, rt.min) <= 0 && comparator.compare(max, rt.max) >= 0;
+ RangeTombstone that = (RangeTombstone)other;
+ return this.deletedSlice().equals(that.deletedSlice())
+ && this.deletionTime().equals(that.deletionTime());
}
- public boolean includes(Comparator<Composite> comparator, Composite name)
+ @Override
+ public int hashCode()
{
- return comparator.compare(name, min) >= 0 && comparator.compare(name, max) <= 0;
+ return Objects.hash(deletedSlice(), deletionTime());
}
/**
- * Tracks opened RangeTombstones when iterating over a partition.
+ * The bound of a range tombstone.
* <p>
- * This tracker must be provided all the atoms of a given partition in
- * order (to the {@code update} method). Given this, it keeps enough
- * information to be able to decide if one of an atom is deleted (shadowed)
- * by a previously open RT. One the tracker can prove a given range
- * tombstone cannot be useful anymore (that is, as soon as we've seen an
- * atom that is after the end of that RT), it discards this RT. In other
- * words, the maximum memory used by this object should be proportional to
- * the maximum number of RT that can be simultaneously open (and this
- * should fairly low in practice).
+ * This is the same than for a slice but it includes "boundaries" between ranges. A boundary simply condensed
+ * a close and an opening "bound" into a single object. There is 2 main reasons for these "shortcut" boundaries:
+ * 1) When merging multiple iterators having range tombstones (that are represented by their start and end markers),
+ * we need to know when a range is close on an iterator, if it is reopened right away. Otherwise, we cannot
+ * easily produce the markers on the merged iterators within risking to fail the sorting guarantees of an
+ * iterator. See this comment for more details: https://goo.gl/yyB5mR.
+ * 2) This saves some storage space.
*/
- public static class Tracker
+ public static class Bound extends Slice.Bound
{
- private final Comparator<Composite> comparator;
-
- // A list the currently open RTs. We keep the list sorted in order of growing end bounds as for a
- // new atom, this allows to efficiently find the RTs that are now useless (if any). Also note that because
- // atom are passed to the tracker in order, any RT that is tracked can be assumed as opened, i.e. we
- // never have to test the RTs start since it's always assumed to be less than what we have.
- // Also note that this will store expired RTs (#7810). Those will be of type ExpiredRangeTombstone and
- // will be ignored by writeOpenedMarker.
- private final List<RangeTombstone> openedTombstones = new LinkedList<RangeTombstone>();
-
- // Total number of atoms written by writeOpenedMarker().
- private int atomCount;
+ public static final Serializer serializer = new Serializer();
- /**
- * Creates a new tracker given the table comparator.
- *
- * @param comparator the comparator for the table this will track atoms
- * for. The tracker assumes that atoms will be later provided to the
- * tracker in {@code comparator} order.
- */
- public Tracker(Comparator<Composite> comparator)
+ public Bound(Kind kind, ByteBuffer[] values)
{
- this.comparator = comparator;
+ super(kind, values);
}
- /**
- * Computes the RangeTombstone that are needed at the beginning of an index
- * block starting with {@code firstColumn}.
- *
- * @return the total serialized size of said tombstones and write them to
- * {@code out} it if isn't null.
- */
- public long writeOpenedMarker(OnDiskAtom firstColumn, DataOutputPlus out, OnDiskAtom.Serializer atomSerializer) throws IOException
+ public static RangeTombstone.Bound inclusiveOpen(boolean reversed, ByteBuffer[] boundValues)
{
- long size = 0;
- if (openedTombstones.isEmpty())
- return size;
-
- /*
- * Compute the markers that needs to be written at the beginning of
- * this block. We need to write one if it is the more recent
- * (opened) tombstone for at least some part of its range.
- */
- List<RangeTombstone> toWrite = new LinkedList<RangeTombstone>();
- outer:
- for (RangeTombstone tombstone : openedTombstones)
- {
- // If the first column is outside the range, skip it (in case update() hasn't been called yet)
- if (comparator.compare(firstColumn.name(), tombstone.max) > 0)
- continue;
-
- if (tombstone instanceof ExpiredRangeTombstone)
- continue;
-
- RangeTombstone updated = new RangeTombstone(firstColumn.name(), tombstone.max, tombstone.data);
-
- Iterator<RangeTombstone> iter = toWrite.iterator();
- while (iter.hasNext())
- {
- RangeTombstone other = iter.next();
- if (other.supersedes(updated, comparator))
- break outer;
- if (updated.supersedes(other, comparator))
- iter.remove();
- }
- toWrite.add(tombstone);
- }
-
- for (RangeTombstone tombstone : toWrite)
- {
- size += atomSerializer.serializedSizeForSSTable(tombstone);
- atomCount++;
- if (out != null)
- atomSerializer.serializeForSSTable(tombstone, out);
- }
- return size;
- }
-
- /**
- * The total number of atoms written by calls to the method {@link #writeOpenedMarker}.
- */
- public int writtenAtom()
- {
- return atomCount;
- }
-
- /**
- * Update this tracker given an {@code atom}.
- * <p>
- * This method first test if some range tombstone can be discarded due
- * to the knowledge of that new atom. Then, if it's a range tombstone,
- * it adds it to the tracker.
- * <p>
- * Note that this method should be called on *every* atom of a partition for
- * the tracker to work as efficiently as possible (#9486).
- */
- public void update(OnDiskAtom atom, boolean isExpired)
- {
- // Get rid of now useless RTs
- ListIterator<RangeTombstone> iterator = openedTombstones.listIterator();
- while (iterator.hasNext())
- {
- // If this tombstone stops before the new atom, it is now useless since it cannot cover this or any future
- // atoms. Otherwise, if a RT ends after the new atom, then we know that's true of any following atom too
- // since maxOrderingSet is sorted by end bounds
- RangeTombstone t = iterator.next();
- if (comparator.compare(atom.name(), t.max) > 0)
- {
- iterator.remove();
- }
- else
- {
- // If the atom is a RT, we'll add it next and for that we want to start by looking at the atom we just
- // returned, so rewind the iterator.
- iterator.previous();
- break;
- }
- }
-
- // If it's a RT, adds it.
- if (atom instanceof RangeTombstone)
- {
- RangeTombstone toAdd = (RangeTombstone)atom;
- if (isExpired)
- toAdd = new ExpiredRangeTombstone(toAdd);
-
- // We want to maintain openedTombstones in end bounds order so we find where to insert the new element
- // and add it. While doing so, we also check if that new tombstone fully shadow or is fully shadowed
- // by an existing tombstone so we avoid tracking more tombstone than necessary (and we know this will
- // at least happend for start-of-index-block repeated range tombstones).
- while (iterator.hasNext())
- {
- RangeTombstone existing = iterator.next();
- int cmp = comparator.compare(toAdd.max, existing.max);
- if (cmp > 0)
- {
- // the new one covers more than the existing one. If the new one happens to also supersedes
- // the existing one, remove the existing one. In any case, we're not done yet.
- if (toAdd.data.supersedes(existing.data))
- iterator.remove();
- }
- else
- {
- // the new one is included in the existing one. If the new one supersedes the existing one,
- // then we add the new one (and if the new one ends like the existing one, we can actually remove
- // the existing one), otherwise we can actually ignore it. In any case, we're done.
- if (toAdd.data.supersedes(existing.data))
- {
- if (cmp == 0)
- iterator.set(toAdd);
- else
- insertBefore(toAdd, iterator);
- }
- return;
- }
- }
- // If we reach here, either we had no tombstones and the new one ends after all existing ones.
- iterator.add(toAdd);
- }
+ return new Bound(reversed ? Kind.INCL_END_BOUND : Kind.INCL_START_BOUND, boundValues);
}
- /**
- * Adds the provided {@code tombstone} _before_ the last element returned by {@code iterator.next()}.
- * <p>
- * This method assumes that {@code iterator.next()} has been called prior to this method call, i.e. that
- * {@code iterator.hasPrevious() == true}.
- */
- private static void insertBefore(RangeTombstone tombstone, ListIterator<RangeTombstone> iterator)
+ public static RangeTombstone.Bound exclusiveOpen(boolean reversed, ByteBuffer[] boundValues)
{
- assert iterator.hasPrevious();
- iterator.previous();
- iterator.add(tombstone);
- iterator.next();
+ return new Bound(reversed ? Kind.EXCL_END_BOUND : Kind.EXCL_START_BOUND, boundValues);
}
- /**
- * Tests if the provided column is deleted by one of the tombstone
- * tracked by this tracker.
- * <p>
- * This method should be called on columns in the same order than for the update()
- * method. Note that this method does not update the tracker so the update() method
- * should still be called on {@code column} (it doesn't matter if update is called
- * before or after this call).
- */
- public boolean isDeleted(Cell cell)
+ public static RangeTombstone.Bound inclusiveClose(boolean reversed, ByteBuffer[] boundValues)
{
- // We know every tombstone kept are "open", start before the column. So the
- // column is deleted if any of the tracked tombstone ends after the column
- // (this will be the case of every RT if update() has been called before this
- // method, but we might have a few RT to skip otherwise) and the RT deletion is
- // actually more recent than the column timestamp.
- for (RangeTombstone tombstone : openedTombstones)
- {
- if (comparator.compare(cell.name(), tombstone.max) <= 0
- && tombstone.timestamp() >= cell.timestamp())
- return true;
- }
- return false;
+ return new Bound(reversed ? Kind.INCL_START_BOUND : Kind.INCL_END_BOUND, boundValues);
}
- /**
- * The tracker needs to track expired range tombstone but keep tracks that they are
- * expired, so this is what this class is used for.
- */
- private static class ExpiredRangeTombstone extends RangeTombstone
+ public static RangeTombstone.Bound exclusiveClose(boolean reversed, ByteBuffer[] boundValues)
{
- private ExpiredRangeTombstone(RangeTombstone tombstone)
- {
- super(tombstone.min, tombstone.max, tombstone.data);
- }
+ return new Bound(reversed ? Kind.EXCL_START_BOUND : Kind.EXCL_END_BOUND, boundValues);
}
- }
-
- public static class Serializer implements ISSTableSerializer<RangeTombstone>
- {
- private final CType type;
- public Serializer(CType type)
+ public static RangeTombstone.Bound inclusiveCloseExclusiveOpen(boolean reversed, ByteBuffer[] boundValues)
{
- this.type = type;
+ return new Bound(reversed ? Kind.EXCL_END_INCL_START_BOUNDARY : Kind.INCL_END_EXCL_START_BOUNDARY, boundValues);
}
- public void serializeForSSTable(RangeTombstone t, DataOutputPlus out) throws IOException
+ public static RangeTombstone.Bound exclusiveCloseInclusiveOpen(boolean reversed, ByteBuffer[] boundValues)
{
- type.serializer().serialize(t.min, out);
- out.writeByte(ColumnSerializer.RANGE_TOMBSTONE_MASK);
- type.serializer().serialize(t.max, out);
- DeletionTime.serializer.serialize(t.data, out);
+ return new Bound(reversed ? Kind.INCL_END_EXCL_START_BOUNDARY : Kind.EXCL_END_INCL_START_BOUNDARY, boundValues);
}
- public RangeTombstone deserializeFromSSTable(DataInput in, Version version) throws IOException
+ @Override
+ public Bound withNewKind(Kind kind)
{
- Composite min = type.serializer().deserialize(in);
-
- int b = in.readUnsignedByte();
- assert (b & ColumnSerializer.RANGE_TOMBSTONE_MASK) != 0;
- return deserializeBody(in, min, version);
+ return new Bound(kind, values);
}
- public RangeTombstone deserializeBody(DataInput in, Composite min, Version version) throws IOException
+ public static class Serializer
{
- Composite max = type.serializer().deserialize(in);
- DeletionTime dt = DeletionTime.serializer.deserialize(in);
- // If the max equals the min.end(), we can avoid keeping an extra ByteBuffer in memory by using
- // min.end() instead of max
- Composite minEnd = min.end();
- max = minEnd.equals(max) ? minEnd : max;
- return new RangeTombstone(min, max, dt);
- }
+ public void serialize(RangeTombstone.Bound bound, DataOutputPlus out, int version, List<AbstractType<?>> types) throws IOException
+ {
+ out.writeByte(bound.kind().ordinal());
+ out.writeShort(bound.size());
+ ClusteringPrefix.serializer.serializeValuesWithoutSize(bound, out, version, types);
+ }
- public void skipBody(DataInput in, Version version) throws IOException
- {
- type.serializer().skip(in);
- DeletionTime.serializer.skip(in);
- }
+ public long serializedSize(RangeTombstone.Bound bound, int version, List<AbstractType<?>> types, TypeSizes sizes)
+ {
+ return 1 // kind ordinal
+ + sizes.sizeof((short)bound.size())
+ + ClusteringPrefix.serializer.valuesWithoutSizeSerializedSize(bound, version, types, sizes);
+ }
- public long serializedSizeForSSTable(RangeTombstone t)
- {
- TypeSizes typeSizes = TypeSizes.NATIVE;
- return type.serializer().serializedSize(t.min, typeSizes)
- + 1 // serialization flag
- + type.serializer().serializedSize(t.max, typeSizes)
- + DeletionTime.serializer.serializedSize(t.data, typeSizes);
+ public Kind deserialize(DataInput in, int version, List<AbstractType<?>> types, Writer writer) throws IOException
+ {
+ Kind kind = Kind.values()[in.readByte()];
+ writer.writeBoundKind(kind);
+ int size = in.readUnsignedShort();
+ ClusteringPrefix.serializer.deserializeValuesWithoutSize(in, size, version, types, writer);
+ return kind;
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/RangeTombstoneList.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/RangeTombstoneList.java b/src/java/org/apache/cassandra/db/RangeTombstoneList.java
index 37f1ef4..0c27bc4 100644
--- a/src/java/org/apache/cassandra/db/RangeTombstoneList.java
+++ b/src/java/org/apache/cassandra/db/RangeTombstoneList.java
@@ -20,9 +20,7 @@ package org.apache.cassandra.db;
import java.io.DataInput;
import java.io.IOException;
import java.nio.ByteBuffer;
-import java.security.MessageDigest;
import java.util.Arrays;
-import java.util.Comparator;
import java.util.Iterator;
import com.google.common.collect.AbstractIterator;
@@ -31,12 +29,9 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.cache.IMeasurableMemory;
-import org.apache.cassandra.db.composites.CType;
-import org.apache.cassandra.db.composites.CellName;
-import org.apache.cassandra.db.composites.Composite;
+import org.apache.cassandra.db.rows.*;
import org.apache.cassandra.io.IVersionedSerializer;
import org.apache.cassandra.io.util.DataOutputPlus;
-import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.utils.ObjectSizes;
import org.apache.cassandra.utils.memory.AbstractAllocator;
@@ -62,19 +57,19 @@ public class RangeTombstoneList implements Iterable<RangeTombstone>, IMeasurable
private static long EMPTY_SIZE = ObjectSizes.measure(new RangeTombstoneList(null, 0));
- private final Comparator<Composite> comparator;
+ private final ClusteringComparator comparator;
// Note: we don't want to use a List for the markedAts and delTimes to avoid boxing. We could
// use a List for starts and ends, but having arrays everywhere is almost simpler.
- private Composite[] starts;
- private Composite[] ends;
+ private Slice.Bound[] starts;
+ private Slice.Bound[] ends;
private long[] markedAts;
private int[] delTimes;
private long boundaryHeapSize;
private int size;
- private RangeTombstoneList(Comparator<Composite> comparator, Composite[] starts, Composite[] ends, long[] markedAts, int[] delTimes, long boundaryHeapSize, int size)
+ private RangeTombstoneList(ClusteringComparator comparator, Slice.Bound[] starts, Slice.Bound[] ends, long[] markedAts, int[] delTimes, long boundaryHeapSize, int size)
{
assert starts.length == ends.length && starts.length == markedAts.length && starts.length == delTimes.length;
this.comparator = comparator;
@@ -86,9 +81,9 @@ public class RangeTombstoneList implements Iterable<RangeTombstone>, IMeasurable
this.boundaryHeapSize = boundaryHeapSize;
}
- public RangeTombstoneList(Comparator<Composite> comparator, int capacity)
+ public RangeTombstoneList(ClusteringComparator comparator, int capacity)
{
- this(comparator, new Composite[capacity], new Composite[capacity], new long[capacity], new int[capacity], 0, 0);
+ this(comparator, new Slice.Bound[capacity], new Slice.Bound[capacity], new long[capacity], new int[capacity], 0, 0);
}
public boolean isEmpty()
@@ -101,7 +96,7 @@ public class RangeTombstoneList implements Iterable<RangeTombstone>, IMeasurable
return size;
}
- public Comparator<Composite> comparator()
+ public ClusteringComparator comparator()
{
return comparator;
}
@@ -119,27 +114,36 @@ public class RangeTombstoneList implements Iterable<RangeTombstone>, IMeasurable
public RangeTombstoneList copy(AbstractAllocator allocator)
{
RangeTombstoneList copy = new RangeTombstoneList(comparator,
- new Composite[size],
- new Composite[size],
- Arrays.copyOf(markedAts, size),
- Arrays.copyOf(delTimes, size),
- boundaryHeapSize, size);
+ new Slice.Bound[size],
+ new Slice.Bound[size],
+ Arrays.copyOf(markedAts, size),
+ Arrays.copyOf(delTimes, size),
+ boundaryHeapSize, size);
for (int i = 0; i < size; i++)
{
- assert !(starts[i] instanceof AbstractNativeCell || ends[i] instanceof AbstractNativeCell); //this should never happen
-
- copy.starts[i] = starts[i].copy(null, allocator);
- copy.ends[i] = ends[i].copy(null, allocator);
+ copy.starts[i] = clone(starts[i], allocator);
+ copy.ends[i] = clone(ends[i], allocator);
}
return copy;
}
+ private static Slice.Bound clone(Slice.Bound bound, AbstractAllocator allocator)
+ {
+ ByteBuffer[] values = new ByteBuffer[bound.size()];
+ for (int i = 0; i < values.length; i++)
+ values[i] = allocator.clone(bound.get(i));
+ return new Slice.Bound(bound.kind(), values);
+ }
+
public void add(RangeTombstone tombstone)
{
- add(tombstone.min, tombstone.max, tombstone.data.markedForDeleteAt, tombstone.data.localDeletionTime);
+ add(tombstone.deletedSlice().start(),
+ tombstone.deletedSlice().end(),
+ tombstone.deletionTime().markedForDeleteAt(),
+ tombstone.deletionTime().localDeletionTime());
}
/**
@@ -148,7 +152,7 @@ public class RangeTombstoneList implements Iterable<RangeTombstone>, IMeasurable
* This method will be faster if the new tombstone sort after all the currently existing ones (this is a common use case),
* but it doesn't assume it.
*/
- public void add(Composite start, Composite end, long markedAt, int delTime)
+ public void add(Slice.Bound start, Slice.Bound end, long markedAt, int delTime)
{
if (isEmpty())
{
@@ -215,7 +219,7 @@ public class RangeTombstoneList implements Iterable<RangeTombstone>, IMeasurable
int j = 0;
while (i < size && j < tombstones.size)
{
- if (comparator.compare(tombstones.starts[j], ends[i]) <= 0)
+ if (comparator.compare(tombstones.starts[j], ends[i]) < 0)
{
insertFrom(i, tombstones.starts[j], tombstones.ends[j], tombstones.markedAts[j], tombstones.delTimes[j]);
j++;
@@ -235,34 +239,26 @@ public class RangeTombstoneList implements Iterable<RangeTombstone>, IMeasurable
* Returns whether the given name/timestamp pair is deleted by one of the tombstone
* of this RangeTombstoneList.
*/
- public boolean isDeleted(Cell cell)
+ public boolean isDeleted(Clustering clustering, Cell cell)
{
- int idx = searchInternal(cell.name(), 0);
+ int idx = searchInternal(clustering, 0, size);
// No matter what the counter cell's timestamp is, a tombstone always takes precedence. See CASSANDRA-7346.
- return idx >= 0 && (cell instanceof CounterCell || markedAts[idx] >= cell.timestamp());
- }
-
- /**
- * Returns a new {@link InOrderTester}.
- */
- InOrderTester inOrderTester()
- {
- return new InOrderTester();
+ return idx >= 0 && (cell.isCounterCell() || markedAts[idx] >= cell.livenessInfo().timestamp());
}
/**
* Returns the DeletionTime for the tombstone overlapping {@code name} (there can't be more than one),
* or null if {@code name} is not covered by any tombstone.
*/
- public DeletionTime searchDeletionTime(Composite name)
+ public DeletionTime searchDeletionTime(Clustering name)
{
- int idx = searchInternal(name, 0);
- return idx < 0 ? null : new DeletionTime(markedAts[idx], delTimes[idx]);
+ int idx = searchInternal(name, 0, size);
+ return idx < 0 ? null : new SimpleDeletionTime(markedAts[idx], delTimes[idx]);
}
- public RangeTombstone search(Composite name)
+ public RangeTombstone search(Clustering name)
{
- int idx = searchInternal(name, 0);
+ int idx = searchInternal(name, 0, size);
return idx < 0 ? null : rangeTombstone(idx);
}
@@ -270,20 +266,15 @@ public class RangeTombstoneList implements Iterable<RangeTombstone>, IMeasurable
* Return is the index of the range covering name if name is covered. If the return idx is negative,
* no range cover name and -idx-1 is the index of the first range whose start is greater than name.
*/
- private int searchInternal(Composite name, int startIdx)
+ private int searchInternal(ClusteringPrefix name, int startIdx, int endIdx)
{
if (isEmpty())
return -1;
- int pos = Arrays.binarySearch(starts, startIdx, size, name, comparator);
+ int pos = Arrays.binarySearch(starts, startIdx, endIdx, name, comparator);
if (pos >= 0)
{
- // We're exactly on an interval start. The one subtility is that we need to check if
- // the previous is not equal to us and doesn't have a higher marked at
- if (pos > 0 && comparator.compare(name, ends[pos-1]) == 0 && markedAts[pos-1] > markedAts[pos])
- return pos-1;
- else
- return pos;
+ return pos;
}
else
{
@@ -308,93 +299,94 @@ public class RangeTombstoneList implements Iterable<RangeTombstone>, IMeasurable
return dataSize;
}
- public long minMarkedAt()
+ public void updateAllTimestamp(long timestamp)
{
- long min = Long.MAX_VALUE;
for (int i = 0; i < size; i++)
- min = Math.min(min, markedAts[i]);
- return min;
+ markedAts[i] = timestamp;
}
- public long maxMarkedAt()
+ private RangeTombstone rangeTombstone(int idx)
{
- long max = Long.MIN_VALUE;
- for (int i = 0; i < size; i++)
- max = Math.max(max, markedAts[i]);
- return max;
+ return new RangeTombstone(Slice.make(starts[idx], ends[idx]), new SimpleDeletionTime(markedAts[idx], delTimes[idx]));
}
- public void updateAllTimestamp(long timestamp)
+ private RangeTombstone rangeTombstoneWithNewStart(int idx, Slice.Bound newStart)
{
- for (int i = 0; i < size; i++)
- markedAts[i] = timestamp;
+ return new RangeTombstone(Slice.make(newStart, ends[idx]), new SimpleDeletionTime(markedAts[idx], delTimes[idx]));
}
- /**
- * Removes all range tombstones whose local deletion time is older than gcBefore.
- */
- public void purge(int gcBefore)
+ private RangeTombstone rangeTombstoneWithNewEnd(int idx, Slice.Bound newEnd)
{
- int j = 0;
- for (int i = 0; i < size; i++)
- {
- if (delTimes[i] >= gcBefore)
- setInternal(j++, starts[i], ends[i], markedAts[i], delTimes[i]);
- }
- size = j;
+ return new RangeTombstone(Slice.make(starts[idx], newEnd), new SimpleDeletionTime(markedAts[idx], delTimes[idx]));
}
- /**
- * Returns whether {@code purge(gcBefore)} would remove something or not.
- */
- public boolean hasPurgeableTombstones(int gcBefore)
+ private RangeTombstone rangeTombstoneWithNewBounds(int idx, Slice.Bound newStart, Slice.Bound newEnd)
{
- for (int i = 0; i < size; i++)
- {
- if (delTimes[i] < gcBefore)
- return true;
- }
- return false;
+ return new RangeTombstone(Slice.make(newStart, newEnd), new SimpleDeletionTime(markedAts[idx], delTimes[idx]));
}
- private RangeTombstone rangeTombstone(int idx)
+ public Iterator<RangeTombstone> iterator()
{
- return new RangeTombstone(starts[idx], ends[idx], markedAts[idx], delTimes[idx]);
+ return iterator(false);
}
- public Iterator<RangeTombstone> iterator()
+ public Iterator<RangeTombstone> iterator(boolean reversed)
{
- return new AbstractIterator<RangeTombstone>()
- {
- private int idx;
+ return reversed
+ ? new AbstractIterator<RangeTombstone>()
+ {
+ private int idx = size - 1;
- protected RangeTombstone computeNext()
- {
- if (idx >= size)
- return endOfData();
+ protected RangeTombstone computeNext()
+ {
+ if (idx < 0)
+ return endOfData();
- return rangeTombstone(idx++);
- }
- };
+ return rangeTombstone(idx--);
+ }
+ }
+ : new AbstractIterator<RangeTombstone>()
+ {
+ private int idx;
+
+ protected RangeTombstone computeNext()
+ {
+ if (idx >= size)
+ return endOfData();
+
+ return rangeTombstone(idx++);
+ }
+ };
}
- public Iterator<RangeTombstone> iterator(Composite from, Composite till)
+ public Iterator<RangeTombstone> iterator(final Slice slice, boolean reversed)
{
- int startIdx = from.isEmpty() ? 0 : searchInternal(from, 0);
+ return reversed ? reverseIterator(slice) : forwardIterator(slice);
+ }
+
+ private Iterator<RangeTombstone> forwardIterator(final Slice slice)
+ {
+ int startIdx = slice.start() == Slice.Bound.BOTTOM ? 0 : searchInternal(slice.start(), 0, size);
final int start = startIdx < 0 ? -startIdx-1 : startIdx;
if (start >= size)
return Iterators.<RangeTombstone>emptyIterator();
- int finishIdx = till.isEmpty() ? size : searchInternal(till, start);
- // if stopIdx is the first range after 'till' we care only until the previous range
+ int finishIdx = slice.end() == Slice.Bound.TOP ? size - 1 : searchInternal(slice.end(), start, size);
+ // if stopIdx is the first range after 'slice.end()' we care only until the previous range
final int finish = finishIdx < 0 ? -finishIdx-2 : finishIdx;
- // Note: the following is true because we know 'from' is before 'till' in sorted order.
if (start > finish)
return Iterators.<RangeTombstone>emptyIterator();
- else if (start == finish)
- return Iterators.<RangeTombstone>singletonIterator(rangeTombstone(start));
+
+ if (start == finish)
+ {
+ // We want to make sure the range are stricly included within the queried slice as this
+ // make it easier to combine things when iterating over successive slices.
+ Slice.Bound s = comparator.compare(starts[start], slice.start()) < 0 ? slice.start() : starts[start];
+ Slice.Bound e = comparator.compare(slice.end(), ends[start]) < 0 ? slice.end() : ends[start];
+ return Iterators.<RangeTombstone>singletonIterator(rangeTombstoneWithNewBounds(start, s, e));
+ }
return new AbstractIterator<RangeTombstone>()
{
@@ -405,76 +397,63 @@ public class RangeTombstoneList implements Iterable<RangeTombstone>, IMeasurable
if (idx >= size || idx > finish)
return endOfData();
+ // We want to make sure the range are stricly included within the queried slice as this
+ // make it easier to combine things when iterating over successive slices. This means that
+ // for the first and last range we might have to "cut" the range returned.
+ if (idx == start && comparator.compare(starts[idx], slice.start()) < 0)
+ return rangeTombstoneWithNewStart(idx++, slice.start());
+ if (idx == finish && comparator.compare(slice.end(), ends[idx]) < 0)
+ return rangeTombstoneWithNewEnd(idx++, slice.end());
return rangeTombstone(idx++);
}
};
}
- /**
- * Evaluates a diff between superset (known to be all merged tombstones) and this list for read repair
- *
- * @return null if there is no difference
- */
- public RangeTombstoneList diff(RangeTombstoneList superset)
+ private Iterator<RangeTombstone> reverseIterator(final Slice slice)
{
- if (isEmpty())
- return superset;
+ int startIdx = slice.end() == Slice.Bound.TOP ? 0 : searchInternal(slice.end(), 0, size);
+ // if startIdx is the first range after 'slice.end()' we care only until the previous range
+ final int start = startIdx < 0 ? -startIdx-2 : startIdx;
- RangeTombstoneList diff = null;
-
- int j = 0; // index to iterate through our own list
- for (int i = 0; i < superset.size; i++)
- {
- // we can assume that this list is a subset of the superset list
- while (j < size && comparator.compare(starts[j], superset.starts[i]) < 0)
- j++;
+ if (start >= size)
+ return Iterators.<RangeTombstone>emptyIterator();
- if (j >= size)
- {
- // we're at the end of our own list, add the remainder of the superset to the diff
- if (i < superset.size)
- {
- if (diff == null)
- diff = new RangeTombstoneList(comparator, superset.size - i);
+ int finishIdx = slice.start() == Slice.Bound.BOTTOM ? 0 : searchInternal(slice.start(), 0, start);
+ // if stopIdx is the first range after 'slice.end()' we care only until the previous range
+ final int finish = finishIdx < 0 ? -finishIdx-1 : finishIdx;
- for(int k = i; k < superset.size; k++)
- diff.add(superset.starts[k], superset.ends[k], superset.markedAts[k], superset.delTimes[k]);
- }
- return diff;
- }
+ if (start < finish)
+ return Iterators.<RangeTombstone>emptyIterator();
- // we don't care about local deletion time here, because it doesn't matter for read repair
- if (!starts[j].equals(superset.starts[i])
- || !ends[j].equals(superset.ends[i])
- || markedAts[j] != superset.markedAts[i])
- {
- if (diff == null)
- diff = new RangeTombstoneList(comparator, Math.min(8, superset.size - i));
- diff.add(superset.starts[i], superset.ends[i], superset.markedAts[i], superset.delTimes[i]);
- }
+ if (start == finish)
+ {
+ // We want to make sure the range are stricly included within the queried slice as this
+ // make it easier to combine things when iterator over successive slices.
+ Slice.Bound s = comparator.compare(starts[start], slice.start()) < 0 ? slice.start() : starts[start];
+ Slice.Bound e = comparator.compare(slice.end(), ends[start]) < 0 ? slice.end() : ends[start];
+ return Iterators.<RangeTombstone>singletonIterator(rangeTombstoneWithNewBounds(start, s, e));
}
- return diff;
- }
-
- /**
- * Calculates digest for triggering read repair on mismatch
- */
- public void updateDigest(MessageDigest digest)
- {
- ByteBuffer longBuffer = ByteBuffer.allocate(8);
- for (int i = 0; i < size; i++)
+ return new AbstractIterator<RangeTombstone>()
{
- for (int j = 0; j < starts[i].size(); j++)
- digest.update(starts[i].get(j).duplicate());
- for (int j = 0; j < ends[i].size(); j++)
- digest.update(ends[i].get(j).duplicate());
+ private int idx = start;
- longBuffer.putLong(0, markedAts[i]);
- digest.update(longBuffer.array(), 0, 8);
- }
- }
+ protected RangeTombstone computeNext()
+ {
+ if (idx < 0 || idx < finish)
+ return endOfData();
+ // We want to make sure the range are stricly included within the queried slice as this
+ // make it easier to combine things when iterator over successive slices. This means that
+ // for the first and last range we might have to "cut" the range returned.
+ if (idx == start && comparator.compare(slice.end(), ends[idx]) < 0)
+ return rangeTombstoneWithNewEnd(idx--, slice.end());
+ if (idx == finish && comparator.compare(starts[idx], slice.start()) < 0)
+ return rangeTombstoneWithNewStart(idx--, slice.start());
+ return rangeTombstone(idx++);
+ }
+ };
+ }
@Override
public boolean equals(Object o)
@@ -525,51 +504,24 @@ public class RangeTombstoneList implements Iterable<RangeTombstone>, IMeasurable
/*
* Inserts a new element starting at index i. This method assumes that:
- * ends[i-1] <= start <= ends[i]
+ * ends[i-1] < start < ends[i]
+ * (note that we cannot have start == end since both will at least have a different bound "kind")
*
* A RangeTombstoneList is a list of range [s_0, e_0]...[s_n, e_n] such that:
* - s_i <= e_i
- * - e_i <= s_i+1
- * - if s_i == e_i and e_i == s_i+1 then s_i+1 < e_i+1
- * Basically, range are non overlapping except for their bound and in order. And while
- * we allow ranges with the same value for the start and end, we don't allow repeating
- * such range (so we can't have [0, 0][0, 0] even though it would respect the first 2
- * conditions).
- *
+ * - e_i < s_i+1
+ * Basically, range are non overlapping and in order.
*/
- private void insertFrom(int i, Composite start, Composite end, long markedAt, int delTime)
+ private void insertFrom(int i, Slice.Bound start, Slice.Bound end, long markedAt, int delTime)
{
while (i < size)
{
- assert i == 0 || comparator.compare(ends[i-1], start) <= 0;
+ assert start.isStart() && end.isEnd();
+ assert i == 0 || comparator.compare(ends[i-1], start) < 0;
+ assert comparator.compare(start, ends[i]) < 0;
- int c = comparator.compare(start, ends[i]);
- assert c <= 0;
- if (c == 0)
- {
- // If start == ends[i], then we can insert from the next one (basically the new element
- // really start at the next element), except for the case where starts[i] == ends[i].
- // In this latter case, if we were to move to next element, we could end up with ...[x, x][x, x]...
- if (comparator.compare(starts[i], ends[i]) == 0)
- {
- // The current element cover a single value which is equal to the start of the inserted
- // element. If the inserted element overwrites the current one, just remove the current
- // (it's included in what we insert) and proceed with the insert.
- if (markedAt > markedAts[i])
- {
- removeInternal(i);
- continue;
- }
-
- // Otherwise (the current singleton interval override the new one), we want to leave the
- // current element and move to the next, unless start == end since that means the new element
- // is in fact fully covered by the current one (so we're done)
- if (comparator.compare(start, end) == 0)
- return;
- }
- i++;
- continue;
- }
+ if (Slice.isEmpty(comparator, start, end))
+ return;
// Do we overwrite the current element?
if (markedAt > markedAts[i])
@@ -579,26 +531,24 @@ public class RangeTombstoneList implements Iterable<RangeTombstone>, IMeasurable
// First deal with what might come before the newly added one.
if (comparator.compare(starts[i], start) < 0)
{
- addInternal(i, starts[i], start, markedAts[i], delTimes[i]);
- i++;
- // We don't need to do the following line, but in spirit that's what we want to do
- // setInternal(i, start, ends[i], markedAts, delTime])
+ Slice.Bound newEnd = start.invert();
+ if (!Slice.isEmpty(comparator, starts[i], newEnd))
+ {
+ addInternal(i, starts[i], start.invert(), markedAts[i], delTimes[i]);
+ i++;
+ setInternal(i, start, ends[i], markedAts[i], delTimes[i]);
+ }
}
// now, start <= starts[i]
- // Does the new element stops before/at the current one,
+ // Does the new element stops before the current one,
int endCmp = comparator.compare(end, starts[i]);
- if (endCmp <= 0)
+ if (endCmp < 0)
{
- // Here start <= starts[i] and end <= starts[i]
- // This means the current element is before the current one. However, one special
- // case is if end == starts[i] and starts[i] == ends[i]. In that case,
- // the new element entirely overwrite the current one and we can just overwrite
- if (endCmp == 0 && comparator.compare(starts[i], ends[i]) == 0)
- setInternal(i, start, end, markedAt, delTime);
- else
- addInternal(i, start, end, markedAt, delTime);
+ // Here start <= starts[i] and end < starts[i]
+ // This means the current element is before the current one.
+ addInternal(i, start, end, markedAt, delTime);
return;
}
@@ -617,20 +567,29 @@ public class RangeTombstoneList implements Iterable<RangeTombstone>, IMeasurable
return;
}
- setInternal(i, start, ends[i], markedAt, delTime);
- if (cmp == 0)
+ // Otherwise, the new element overwite until the min(end, next start)
+ if (comparator.compare(end, starts[i+1]) < 0)
+ {
+ setInternal(i, start, end, markedAt, delTime);
+ // We have fully handled the new element so we're done
return;
+ }
- start = ends[i];
+ setInternal(i, start, starts[i+1].invert(), markedAt, delTime);
+ start = starts[i+1];
i++;
}
else
{
- // We don't ovewrite fully. Insert the new interval, and then update the now next
+ // We don't overwrite fully. Insert the new interval, and then update the now next
// one to reflect the not overwritten parts. We're then done.
addInternal(i, start, end, markedAt, delTime);
i++;
- setInternal(i, end, ends[i], markedAts[i], delTimes[i]);
+ Slice.Bound newStart = end.invert();
+ if (!Slice.isEmpty(comparator, newStart, ends[i]))
+ {
+ setInternal(i, newStart, ends[i], markedAts[i], delTimes[i]);
+ }
return;
}
}
@@ -644,13 +603,17 @@ public class RangeTombstoneList implements Iterable<RangeTombstone>, IMeasurable
// If we stop before the start of the current element, just insert the new
// interval and we're done; otherwise insert until the beginning of the
// current element
- if (comparator.compare(end, starts[i]) <= 0)
+ if (comparator.compare(end, starts[i]) < 0)
{
addInternal(i, start, end, markedAt, delTime);
return;
}
- addInternal(i, start, starts[i], markedAt, delTime);
- i++;
+ Slice.Bound newEnd = starts[i].invert();
+ if (!Slice.isEmpty(comparator, start, newEnd))
+ {
+ addInternal(i, start, newEnd, markedAt, delTime);
+ i++;
+ }
}
// After that, we're overwritten on the current element but might have
@@ -660,7 +623,7 @@ public class RangeTombstoneList implements Iterable<RangeTombstone>, IMeasurable
if (comparator.compare(end, ends[i]) <= 0)
return;
- start = ends[i];
+ start = ends[i].invert();
i++;
}
}
@@ -677,7 +640,7 @@ public class RangeTombstoneList implements Iterable<RangeTombstone>, IMeasurable
/*
* Adds the new tombstone at index i, growing and/or moving elements to make room for it.
*/
- private void addInternal(int i, Composite start, Composite end, long markedAt, int delTime)
+ private void addInternal(int i, Slice.Bound start, Slice.Bound end, long markedAt, int delTime)
{
assert i >= 0;
@@ -730,12 +693,12 @@ public class RangeTombstoneList implements Iterable<RangeTombstone>, IMeasurable
delTimes = grow(delTimes, size, newLength, i);
}
- private static Composite[] grow(Composite[] a, int size, int newLength, int i)
+ private static Slice.Bound[] grow(Slice.Bound[] a, int size, int newLength, int i)
{
if (i < 0 || i >= size)
return Arrays.copyOf(a, newLength);
- Composite[] newA = new Composite[newLength];
+ Slice.Bound[] newA = new Slice.Bound[newLength];
System.arraycopy(a, 0, newA, 0, i);
System.arraycopy(a, i, newA, i+1, size - i);
return newA;
@@ -780,7 +743,7 @@ public class RangeTombstoneList implements Iterable<RangeTombstone>, IMeasurable
starts[i] = null;
}
- private void setInternal(int i, Composite start, Composite end, long markedAt, int delTime)
+ private void setInternal(int i, Slice.Bound start, Slice.Bound end, long markedAt, int delTime)
{
if (starts[i] != null)
boundaryHeapSize -= starts[i].unsharedHeapSize() + ends[i].unsharedHeapSize();
@@ -802,82 +765,91 @@ public class RangeTombstoneList implements Iterable<RangeTombstone>, IMeasurable
+ ObjectSizes.sizeOfArray(delTimes);
}
+ // TODO: This should be moved someplace else as it shouldn't be used directly: some ranges might become
+ // complex deletion times. We'll also only need this for backward compatibility, this isn't used otherwise.
public static class Serializer implements IVersionedSerializer<RangeTombstoneList>
{
- private final CType type;
+ private final LegacyLayout layout;
- public Serializer(CType type)
+ public Serializer(LegacyLayout layout)
{
- this.type = type;
+ this.layout = layout;
}
public void serialize(RangeTombstoneList tombstones, DataOutputPlus out, int version) throws IOException
{
- if (tombstones == null)
- {
- out.writeInt(0);
- return;
- }
-
- out.writeInt(tombstones.size);
- for (int i = 0; i < tombstones.size; i++)
- {
- type.serializer().serialize(tombstones.starts[i], out);
- type.serializer().serialize(tombstones.ends[i], out);
- out.writeInt(tombstones.delTimes[i]);
- out.writeLong(tombstones.markedAts[i]);
- }
+ // TODO
+ throw new UnsupportedOperationException();
+ //if (tombstones == null)
+ //{
+ // out.writeInt(0);
+ // return;
+ //}
+
+ //out.writeInt(tombstones.size);
+ //for (int i = 0; i < tombstones.size; i++)
+ //{
+ // layout.serializer().serialize(tombstones.starts[i], out);
+ // layout.serializer().serialize(tombstones.ends[i], out);
+ // out.writeInt(tombstones.delTimes[i]);
+ // out.writeLong(tombstones.markedAts[i]);
+ //}
}
public RangeTombstoneList deserialize(DataInput in, int version) throws IOException
{
- int size = in.readInt();
- if (size == 0)
- return null;
-
- RangeTombstoneList tombstones = new RangeTombstoneList(type, size);
-
- for (int i = 0; i < size; i++)
- {
- Composite start = type.serializer().deserialize(in);
- Composite end = type.serializer().deserialize(in);
- int delTime = in.readInt();
- long markedAt = in.readLong();
-
- if (version >= MessagingService.VERSION_20)
- {
- tombstones.setInternal(i, start, end, markedAt, delTime);
- }
- else
- {
- /*
- * The old implementation used to have range sorted by left value, but with potentially
- * overlapping range. So we need to use the "slow" path.
- */
- tombstones.add(start, end, markedAt, delTime);
- }
- }
-
- // The "slow" path take care of updating the size, but not the fast one
- if (version >= MessagingService.VERSION_20)
- tombstones.size = size;
- return tombstones;
+ // TODO
+ throw new UnsupportedOperationException();
+
+ //int size = in.readInt();
+ //if (size == 0)
+ // return null;
+
+ //RangeTombstoneList tombstones = new RangeTombstoneList(layout, size);
+
+ //for (int i = 0; i < size; i++)
+ //{
+ // Slice.Bound start = layout.serializer().deserialize(in);
+ // Slice.Bound end = layout.serializer().deserialize(in);
+ // int delTime = in.readInt();
+ // long markedAt = in.readLong();
+
+ // if (version >= MessagingService.VERSION_20)
+ // {
+ // tombstones.setInternal(i, start, end, markedAt, delTime);
+ // }
+ // else
+ // {
+ // /*
+ // * The old implementation used to have range sorted by left value, but with potentially
+ // * overlapping range. So we need to use the "slow" path.
+ // */
+ // tombstones.add(start, end, markedAt, delTime);
+ // }
+ //}
+
+ //// The "slow" path take care of updating the size, but not the fast one
+ //if (version >= MessagingService.VERSION_20)
+ // tombstones.size = size;
+ //return tombstones;
}
public long serializedSize(RangeTombstoneList tombstones, TypeSizes typeSizes, int version)
{
- if (tombstones == null)
- return typeSizes.sizeof(0);
-
- long size = typeSizes.sizeof(tombstones.size);
- for (int i = 0; i < tombstones.size; i++)
- {
- size += type.serializer().serializedSize(tombstones.starts[i], typeSizes);
- size += type.serializer().serializedSize(tombstones.ends[i], typeSizes);
- size += typeSizes.sizeof(tombstones.delTimes[i]);
- size += typeSizes.sizeof(tombstones.markedAts[i]);
- }
- return size;
+ // TODO
+ throw new UnsupportedOperationException();
+ //if (tombstones == null)
+ // return typeSizes.sizeof(0);
+
+ //long size = typeSizes.sizeof(tombstones.size);
+ //for (int i = 0; i < tombstones.size; i++)
+ //{
+ // size += type.serializer().serializedSize(tombstones.starts[i], typeSizes);
+ // size += type.serializer().serializedSize(tombstones.ends[i], typeSizes);
+ // size += typeSizes.sizeof(tombstones.delTimes[i]);
+ // size += typeSizes.sizeof(tombstones.markedAts[i]);
+ //}
+ //return size;
}
public long serializedSize(RangeTombstoneList tombstones, int version)
@@ -885,56 +857,4 @@ public class RangeTombstoneList implements Iterable<RangeTombstone>, IMeasurable
return serializedSize(tombstones, TypeSizes.NATIVE, version);
}
}
-
- /**
- * This object allow testing whether a given column (name/timestamp) is deleted
- * or not by this RangeTombstoneList, assuming that the column given to this
- * object are passed in (comparator) sorted order.
- *
- * This is more efficient that calling RangeTombstoneList.isDeleted() repeatedly
- * in that case since we're able to take the sorted nature of the RangeTombstoneList
- * into account.
- */
- public class InOrderTester
- {
- private int idx;
-
- public boolean isDeleted(Cell cell)
- {
- CellName name = cell.name();
- long timestamp = cell.timestamp();
-
- while (idx < size)
- {
- int cmp = comparator.compare(name, starts[idx]);
-
- if (cmp < 0)
- {
- return false;
- }
- else if (cmp == 0)
- {
- // No matter what the counter cell's timestamp is, a tombstone always takes precedence. See CASSANDRA-7346.
- if (cell instanceof CounterCell)
- return true;
-
- // As for searchInternal, we need to check the previous end
- if (idx > 0 && comparator.compare(name, ends[idx-1]) == 0 && markedAts[idx-1] > markedAts[idx])
- return markedAts[idx-1] >= timestamp;
- else
- return markedAts[idx] >= timestamp;
- }
- else
- {
- if (comparator.compare(name, ends[idx]) <= 0)
- return markedAts[idx] >= timestamp || cell instanceof CounterCell;
- else
- idx++;
- }
- }
-
- return false;
- }
- }
-
}