You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2015/01/14 11:55:41 UTC
[4/4] cassandra git commit: Merge branch 'cassandra-2.1' into trunk
Merge branch 'cassandra-2.1' into trunk
Conflicts:
src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
src/java/org/apache/cassandra/db/AtomDeserializer.java
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/0b723ce7
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/0b723ce7
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/0b723ce7
Branch: refs/heads/trunk
Commit: 0b723ce74d97fb2c75c8b685de9996b26ddfe683
Parents: f01b319 1c9c47d
Author: Sylvain Lebresne <sy...@datastax.com>
Authored: Wed Jan 14 11:55:31 2015 +0100
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Wed Jan 14 11:55:31 2015 +0100
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../SingleColumnPrimaryKeyRestrictions.java | 13 +++---
.../apache/cassandra/db/AtomDeserializer.java | 34 +++++++++++----
.../cassandra/db/composites/CellNameType.java | 2 +-
.../sstable/format/big/IndexedSliceReader.java | 46 ++++++++++++++++++--
.../format/big/SSTableNamesIterator.java | 25 ++++++++---
.../cassandra/cql3/RangeDeletionTest.java | 35 +++++++++++++++
7 files changed, 131 insertions(+), 25 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0b723ce7/CHANGES.txt
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0b723ce7/src/java/org/apache/cassandra/cql3/restrictions/SingleColumnPrimaryKeyRestrictions.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/cql3/restrictions/SingleColumnPrimaryKeyRestrictions.java
index d2a3885,0000000..e109036
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/cql3/restrictions/SingleColumnPrimaryKeyRestrictions.java
+++ b/src/java/org/apache/cassandra/cql3/restrictions/SingleColumnPrimaryKeyRestrictions.java
@@@ -1,308 -1,0 +1,305 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.cql3.restrictions;
+
+import java.nio.ByteBuffer;
+import java.util.*;
+
+import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.cql3.QueryOptions;
+import org.apache.cassandra.cql3.statements.Bound;
+import org.apache.cassandra.db.IndexExpression;
+import org.apache.cassandra.db.composites.CBuilder;
+import org.apache.cassandra.db.composites.CType;
+import org.apache.cassandra.db.composites.Composite;
+import org.apache.cassandra.db.composites.Composite.EOC;
+import org.apache.cassandra.db.composites.Composites;
+import org.apache.cassandra.db.composites.CompositesBuilder;
+import org.apache.cassandra.db.index.SecondaryIndexManager;
+import org.apache.cassandra.exceptions.InvalidRequestException;
+
+import static org.apache.cassandra.cql3.statements.RequestValidations.checkFalse;
+import static org.apache.cassandra.cql3.statements.RequestValidations.checkNotNull;
+import static org.apache.cassandra.cql3.statements.RequestValidations.checkTrue;
+
+/**
+ * A set of single column restrictions on a primary key part (partition key or clustering key).
+ */
+final class SingleColumnPrimaryKeyRestrictions extends AbstractPrimaryKeyRestrictions
+{
+ /**
+ * The restrictions.
+ */
+ private final SingleColumnRestrictions restrictions;
+
+ /**
+ * <code>true</code> if the restrictions are corresponding to an EQ, <code>false</code> otherwise.
+ */
+ private boolean eq;
+
+ /**
+ * <code>true</code> if the restrictions are corresponding to an IN, <code>false</code> otherwise.
+ */
+ private boolean in;
+
+ /**
+ * <code>true</code> if the restrictions are corresponding to a Slice, <code>false</code> otherwise.
+ */
+ private boolean slice;
+
+ /**
+ * <code>true</code> if the restrictions are corresponding to a Contains, <code>false</code> otherwise.
+ */
+ private boolean contains;
+
+ public SingleColumnPrimaryKeyRestrictions(CType ctype)
+ {
+ super(ctype);
+ this.restrictions = new SingleColumnRestrictions();
+ this.eq = true;
+ }
+
+ private SingleColumnPrimaryKeyRestrictions(SingleColumnPrimaryKeyRestrictions primaryKeyRestrictions,
+ SingleColumnRestriction restriction) throws InvalidRequestException
+ {
+ super(primaryKeyRestrictions.ctype);
+ this.restrictions = primaryKeyRestrictions.restrictions.addRestriction(restriction);
+
+ if (!primaryKeyRestrictions.isEmpty())
+ {
+ ColumnDefinition lastColumn = primaryKeyRestrictions.restrictions.lastColumn();
+ ColumnDefinition newColumn = restriction.getColumnDef();
+
+ checkFalse(primaryKeyRestrictions.isSlice() && newColumn.position() > lastColumn.position(),
+ "Clustering column \"%s\" cannot be restricted (preceding column \"%s\" is restricted by a non-EQ relation)",
+ newColumn.name,
+ lastColumn.name);
+
+ if (newColumn.position() < lastColumn.position())
+ checkFalse(restriction.isSlice(),
+ "PRIMARY KEY column \"%s\" cannot be restricted (preceding column \"%s\" is restricted by a non-EQ relation)",
+ restrictions.nextColumn(newColumn).name,
+ newColumn.name);
+ }
+
+ if (restriction.isSlice() || primaryKeyRestrictions.isSlice())
+ this.slice = true;
+ else if (restriction.isContains() || primaryKeyRestrictions.isContains())
+ this.contains = true;
+ else if (restriction.isIN())
+ this.in = true;
+ else
+ this.eq = true;
+ }
+
+ @Override
+ public boolean isSlice()
+ {
+ return slice;
+ }
+
+ @Override
+ public boolean isEQ()
+ {
+ return eq;
+ }
+
+ @Override
+ public boolean isIN()
+ {
+ return in;
+ }
+
+ @Override
+ public boolean isOnToken()
+ {
+ return false;
+ }
+
+ @Override
+ public boolean isContains()
+ {
+ return contains;
+ }
+
+ @Override
+ public boolean isMultiColumn()
+ {
+ return false;
+ }
+
+ @Override
+ public boolean usesFunction(String ksName, String functionName)
+ {
+ return restrictions.usesFunction(ksName, functionName);
+ }
+
+ @Override
+ public PrimaryKeyRestrictions mergeWith(Restriction restriction) throws InvalidRequestException
+ {
+ if (restriction.isMultiColumn())
+ {
+ checkTrue(isEmpty(),
+ "Mixing single column relations and multi column relations on clustering columns is not allowed");
+ return (PrimaryKeyRestrictions) restriction;
+ }
+
+ if (restriction.isOnToken())
+ {
+ if (isEmpty())
+ return (PrimaryKeyRestrictions) restriction;
+
+ return new TokenFilter(this, (TokenRestriction) restriction);
+ }
+
+ return new SingleColumnPrimaryKeyRestrictions(this, (SingleColumnRestriction) restriction);
+ }
+
+ @Override
+ public List<Composite> valuesAsComposites(QueryOptions options) throws InvalidRequestException
+ {
+ CompositesBuilder builder = new CompositesBuilder(ctype.builder(), ctype);
+ for (ColumnDefinition def : restrictions.getColumnDefs())
+ {
+ Restriction r = restrictions.getRestriction(def);
+ assert !r.isSlice();
+
+ List<ByteBuffer> values = r.values(options);
+
+ if (values.isEmpty())
+ return Collections.emptyList();
+
+ builder.addEachElementToAll(values);
+ checkFalse(builder.containsNull(), "Invalid null value for column %s", def.name);
+ }
+
+ return builder.build();
+ }
+
+ @Override
+ public List<Composite> boundsAsComposites(Bound bound, QueryOptions options) throws InvalidRequestException
+ {
+ CBuilder builder = ctype.builder();
+ List<ColumnDefinition> defs = new ArrayList<>(restrictions.getColumnDefs());
+
+ CompositesBuilder compositeBuilder = new CompositesBuilder(builder, ctype);
+ // The end-of-component of composite doesn't depend on whether the
+ // component type is reversed or not (i.e. the ReversedType is applied
+ // to the component comparator but not to the end-of-component itself),
+ // it only depends on whether the slice is reversed
+ int keyPosition = 0;
+ for (ColumnDefinition def : defs)
+ {
+ // In a restriction, we always have Bound.START < Bound.END for the "base" comparator.
+ // So if we're doing a reverse slice, we must inverse the bounds when giving them as start and end of the slice filter.
+ // But if the actual comparator itself is reversed, we must inversed the bounds too.
+ Bound b = !def.isReversedType() ? bound : bound.reverse();
+ Restriction r = restrictions.getRestriction(def);
+ if (keyPosition != def.position() || r.isContains())
- {
- EOC eoc = !compositeBuilder.isEmpty() && bound.isEnd() ? EOC.END : EOC.NONE;
- return compositeBuilder.buildWithEOC(eoc);
- }
++ return compositeBuilder.buildWithEOC(bound.isEnd() ? EOC.END : EOC.START);
++
+ if (r.isSlice())
+ {
+ if (!r.hasBound(b))
+ {
+ // There wasn't any non EQ relation on that key, we select all records having the preceding component as prefix.
+ // For composites, if there was preceding component and we're computing the end, we must change the last component
+ // End-Of-Component, otherwise we would be selecting only one record.
- EOC eoc = !compositeBuilder.isEmpty() && bound.isEnd() ? EOC.END : EOC.NONE;
- return compositeBuilder.buildWithEOC(eoc);
++ return compositeBuilder.buildWithEOC(bound.isEnd() ? EOC.END : EOC.START);
+ }
+
+ ByteBuffer value = checkNotNull(r.bounds(b, options).get(0), "Invalid null clustering key part %s", r);
+ compositeBuilder.addElementToAll(value);
+ Composite.EOC eoc = eocFor(r, bound, b);
+ return compositeBuilder.buildWithEOC(eoc);
+ }
+
+ List<ByteBuffer> values = r.values(options);
+
+ if (values.isEmpty())
+ return Collections.emptyList();
+
+ compositeBuilder.addEachElementToAll(values);
+
+ checkFalse(compositeBuilder.containsNull(), "Invalid null clustering key part %s", def.name);
+ keyPosition++;
+ }
+ // Means no relation at all or everything was an equal
+ // Note: if the builder is "full", there is no need to use the end-of-component bit. For columns selection,
+ // it would be harmless to do it. However, we use this method got the partition key too. And when a query
+ // with 2ndary index is done, and with the the partition provided with an EQ, we'll end up here, and in that
+ // case using the eoc would be bad, since for the random partitioner we have no guarantee that
+ // prefix.end() will sort after prefix (see #5240).
- EOC eoc = bound.isEnd() && compositeBuilder.hasRemaining() ? EOC.END : EOC.NONE;
++ EOC eoc = !compositeBuilder.hasRemaining() ? EOC.NONE : (bound.isEnd() ? EOC.END : EOC.START);
+ return compositeBuilder.buildWithEOC(eoc);
+ }
+
+ @Override
+ public List<ByteBuffer> values(QueryOptions options) throws InvalidRequestException
+ {
+ return Composites.toByteBuffers(valuesAsComposites(options));
+ }
+
+ @Override
+ public List<ByteBuffer> bounds(Bound b, QueryOptions options) throws InvalidRequestException
+ {
+ return Composites.toByteBuffers(boundsAsComposites(b, options));
+ }
+
+ private static Composite.EOC eocFor(Restriction r, Bound eocBound, Bound inclusiveBound)
+ {
+ if (eocBound.isStart())
+ return r.isInclusive(inclusiveBound) ? Composite.EOC.NONE : Composite.EOC.END;
+
+ return r.isInclusive(inclusiveBound) ? Composite.EOC.END : Composite.EOC.START;
+ }
+
+ @Override
+ public boolean hasBound(Bound b)
+ {
+ if (isEmpty())
+ return false;
+ return restrictions.lastRestriction().hasBound(b);
+ }
+
+ @Override
+ public boolean isInclusive(Bound b)
+ {
+ if (isEmpty())
+ return false;
+ return restrictions.lastRestriction().isInclusive(b);
+ }
+
+ @Override
+ public boolean hasSupportingIndex(SecondaryIndexManager indexManager)
+ {
+ return restrictions.hasSupportingIndex(indexManager);
+ }
+
+ @Override
+ public void addIndexExpressionTo(List<IndexExpression> expressions, QueryOptions options) throws InvalidRequestException
+ {
+ restrictions.addIndexExpressionTo(expressions, options);
+ }
+
+ @Override
+ public Collection<ColumnDefinition> getColumnDefs()
+ {
+ return restrictions.getColumnDefs();
+ }
- }
++}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0b723ce7/src/java/org/apache/cassandra/db/AtomDeserializer.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/AtomDeserializer.java
index 0c43422,a103647..c71321c
--- a/src/java/org/apache/cassandra/db/AtomDeserializer.java
+++ b/src/java/org/apache/cassandra/db/AtomDeserializer.java
@@@ -41,9 -40,13 +41,13 @@@ public class AtomDeserialize
private final DataInput in;
private final ColumnSerializer.Flag flag;
private final int expireBefore;
- private final Descriptor.Version version;
+ private final Version version;
+ // The "flag" for the next name (which correspond to the "masks" in ColumnSerializer) if it has been
+ // read already, Integer.MIN_VALUE otherwise;
+ private int nextFlags = Integer.MIN_VALUE;
+
- public AtomDeserializer(CellNameType type, DataInput in, ColumnSerializer.Flag flag, int expireBefore, Descriptor.Version version)
+ public AtomDeserializer(CellNameType type, DataInput in, ColumnSerializer.Flag flag, int expireBefore, Version version)
{
this.type = type;
this.nameDeserializer = type.newDeserializer(in);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0b723ce7/src/java/org/apache/cassandra/db/composites/CellNameType.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0b723ce7/src/java/org/apache/cassandra/io/sstable/format/big/IndexedSliceReader.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/io/sstable/format/big/IndexedSliceReader.java
index a69cff9,0000000..43fd046
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/io/sstable/format/big/IndexedSliceReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/big/IndexedSliceReader.java
@@@ -1,500 -1,0 +1,540 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.io.sstable.format.big;
+
+import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.Deque;
+import java.util.List;
+
+import com.google.common.collect.AbstractIterator;
+
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
+import org.apache.cassandra.db.composites.CellNameType;
+import org.apache.cassandra.db.composites.Composite;
+import org.apache.cassandra.db.filter.ColumnSlice;
+import org.apache.cassandra.io.sstable.CorruptSSTableException;
+import org.apache.cassandra.io.sstable.IndexHelper;
+import org.apache.cassandra.io.sstable.IndexHelper.IndexInfo;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.util.FileDataInput;
+import org.apache.cassandra.io.util.FileMark;
+import org.apache.cassandra.tracing.Tracing;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+/**
+ * This is a reader that finds the block for a starting column and returns blocks before/after it for each next call.
+ * This function assumes that the CF is sorted by name and exploits the name index.
+ */
+class IndexedSliceReader extends AbstractIterator<OnDiskAtom> implements OnDiskAtomIterator
+{
+ private final ColumnFamily emptyColumnFamily;
+
+ private final SSTableReader sstable;
+ private final List<IndexHelper.IndexInfo> indexes;
+ private final FileDataInput originalInput;
+ private FileDataInput file;
+ private final boolean reversed;
+ private final ColumnSlice[] slices;
+ private final BlockFetcher fetcher;
+ private final Deque<OnDiskAtom> blockColumns = new ArrayDeque<OnDiskAtom>();
+ private final CellNameType comparator;
+
+ // Holds range tombstone in reverse queries. See addColumn()
+ private final Deque<OnDiskAtom> rangeTombstonesReversed;
+
+ /**
+ * This slice reader assumes that slices are sorted correctly, e.g. that for forward lookup slices are in
+ * lexicographic order of start elements and that for reverse lookup they are in reverse lexicographic order of
+ * finish (reverse start) elements. i.e. forward: [a,b],[d,e],[g,h] reverse: [h,g],[e,d],[b,a]. This reader also
+ * assumes that validation has been performed in terms of intervals (no overlapping intervals).
+ */
+ IndexedSliceReader(SSTableReader sstable, RowIndexEntry indexEntry, FileDataInput input, ColumnSlice[] slices, boolean reversed)
+ {
+ Tracing.trace("Seeking to partition indexed section in data file");
+ this.sstable = sstable;
+ this.originalInput = input;
+ this.reversed = reversed;
+ this.slices = slices;
+ this.comparator = sstable.metadata.comparator;
+ this.rangeTombstonesReversed = reversed ? new ArrayDeque<OnDiskAtom>() : null;
+
+ try
+ {
+ this.indexes = indexEntry.columnsIndex();
+ emptyColumnFamily = ArrayBackedSortedColumns.factory.create(sstable.metadata);
+ if (indexes.isEmpty())
+ {
+ setToRowStart(indexEntry, input);
+ emptyColumnFamily.delete(DeletionTime.serializer.deserialize(file));
+ fetcher = new SimpleBlockFetcher();
+ }
+ else
+ {
+ emptyColumnFamily.delete(indexEntry.deletionTime());
+ fetcher = new IndexedBlockFetcher(indexEntry.position);
+ }
+ }
+ catch (IOException e)
+ {
+ sstable.markSuspect();
+ throw new CorruptSSTableException(e, file.getPath());
+ }
+ }
+
+ /**
+ * Sets the seek position to the start of the row for column scanning.
+ */
+ private void setToRowStart(RowIndexEntry rowEntry, FileDataInput in) throws IOException
+ {
+ if (in == null)
+ {
+ this.file = sstable.getFileDataInput(rowEntry.position);
+ }
+ else
+ {
+ this.file = in;
+ in.seek(rowEntry.position);
+ }
+ sstable.partitioner.decorateKey(ByteBufferUtil.readWithShortLength(file));
+ }
+
+ public ColumnFamily getColumnFamily()
+ {
+ return emptyColumnFamily;
+ }
+
+ public DecoratedKey getKey()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ protected OnDiskAtom computeNext()
+ {
+ while (true)
+ {
+ if (reversed)
+ {
+ // Return all tombstone for the block first (see addColumn() below)
+ OnDiskAtom column = rangeTombstonesReversed.poll();
+ if (column != null)
+ return column;
+ }
+
+ OnDiskAtom column = blockColumns.poll();
+ if (column == null)
+ {
+ if (!fetcher.fetchMoreData())
+ return endOfData();
+ }
+ else
+ {
+ return column;
+ }
+ }
+ }
+
+ public void close() throws IOException
+ {
+ if (originalInput == null && file != null)
+ file.close();
+ }
+
+ protected void addColumn(OnDiskAtom col)
+ {
+ if (reversed)
+ {
+ /*
+ * We put range tomstone markers at the beginning of the range they delete. But for reversed queries,
+ * the caller still need to know about a RangeTombstone before it sees any column that it covers.
+ * To make that simple, we keep said tombstones separate and return them all before any column for
+ * a given block.
+ */
+ if (col instanceof RangeTombstone)
+ rangeTombstonesReversed.addFirst(col);
+ else
+ blockColumns.addFirst(col);
+ }
+ else
+ {
+ blockColumns.addLast(col);
+ }
+ }
+
+ private abstract class BlockFetcher
+ {
+ protected int currentSliceIdx;
+
+ protected BlockFetcher(int sliceIdx)
+ {
+ this.currentSliceIdx = sliceIdx;
+ }
+
+ /*
+ * Return the smallest key selected by the current ColumnSlice.
+ */
+ protected Composite currentStart()
+ {
+ return reversed ? slices[currentSliceIdx].finish : slices[currentSliceIdx].start;
+ }
+
+ /*
+ * Return the biggest key selected by the current ColumnSlice.
+ */
+ protected Composite currentFinish()
+ {
+ return reversed ? slices[currentSliceIdx].start : slices[currentSliceIdx].finish;
+ }
+
+ protected abstract boolean setNextSlice();
+
+ protected abstract boolean fetchMoreData();
+
+ protected boolean isColumnBeforeSliceStart(OnDiskAtom column)
+ {
+ return isBeforeSliceStart(column.name());
+ }
+
+ protected boolean isBeforeSliceStart(Composite name)
+ {
+ Composite start = currentStart();
+ return !start.isEmpty() && comparator.compare(name, start) < 0;
+ }
+
+ protected boolean isColumnBeforeSliceFinish(OnDiskAtom column)
+ {
+ Composite finish = currentFinish();
+ return finish.isEmpty() || comparator.compare(column.name(), finish) <= 0;
+ }
+
+ protected boolean isAfterSliceFinish(Composite name)
+ {
+ Composite finish = currentFinish();
+ return !finish.isEmpty() && comparator.compare(name, finish) > 0;
+ }
+ }
+
+ private class IndexedBlockFetcher extends BlockFetcher
+ {
+ // where this row starts
+ private final long columnsStart;
+
+ // the index entry for the next block to deserialize
+ private int nextIndexIdx = -1;
+
+ // index of the last block we've read from disk;
+ private int lastDeserializedBlock = -1;
+
+ // For reversed, keep columns at the beginning of the last deserialized block that
+ // may still match a slice
+ private final Deque<OnDiskAtom> prefetched;
+
+ public IndexedBlockFetcher(long columnsStart)
+ {
+ super(-1);
+ this.columnsStart = columnsStart;
+ this.prefetched = reversed ? new ArrayDeque<OnDiskAtom>() : null;
+ setNextSlice();
+ }
+
+ protected boolean setNextSlice()
+ {
+ while (++currentSliceIdx < slices.length)
+ {
+ nextIndexIdx = IndexHelper.indexFor(slices[currentSliceIdx].start, indexes, comparator, reversed, nextIndexIdx);
+ if (nextIndexIdx < 0 || nextIndexIdx >= indexes.size())
+ // no index block for that slice
+ continue;
+
+ // Check if we can exclude this slice entirely from the index
+ IndexInfo info = indexes.get(nextIndexIdx);
+ if (reversed)
+ {
+ if (!isBeforeSliceStart(info.lastName))
+ return true;
+ }
+ else
+ {
+ if (!isAfterSliceFinish(info.firstName))
+ return true;
+ }
+ }
+ nextIndexIdx = -1;
+ return false;
+ }
+
+ protected boolean hasMoreSlice()
+ {
+ return currentSliceIdx < slices.length;
+ }
+
+ protected boolean fetchMoreData()
+ {
+ if (!hasMoreSlice())
+ return false;
+
+ // If we read blocks in reversed disk order, we may have columns from the previous block to handle.
+ // Note that prefetched keeps columns in reversed disk order.
++ // Also note that Range Tombstone handling is a bit tricky, because we may run into range tombstones
++ // that cover a slice *after* we've move to the previous slice. To keep it simple, we simply include
++ // every RT in prefetched: it's only slightly inefficient to do so and there is only so much RT that
++ // can be mistakenly added this way.
+ if (reversed && !prefetched.isEmpty())
+ {
++ // Whether we've found anything to return in prefetched
+ boolean gotSome = false;
+ // Avoids some comparison when we know it's not useful
+ boolean inSlice = false;
+
+ OnDiskAtom prefetchedCol;
+ while ((prefetchedCol = prefetched.peek() ) != null)
+ {
+ // col is before slice, we update the slice
+ if (isColumnBeforeSliceStart(prefetchedCol))
+ {
+ inSlice = false;
- if (!setNextSlice())
- return false;
++
++ // As explained above, we add RT unconditionally
++ if (prefetchedCol instanceof RangeTombstone)
++ {
++ blockColumns.addLast(prefetched.poll());
++ gotSome = true;
++ continue;
++ }
++
++ // Otherwise, we either move to the next slice or, if we have none (which can happen
++ // because we unwind prefetched no matter what due to RT), we skip the cell
++ if (hasMoreSlice())
++ setNextSlice();
++ else
++ prefetched.poll();
++
+ }
+ // col is within slice, all columns
+ // (we go in reverse, so as soon as we are in a slice, no need to check
+ // we're after the slice until we change slice)
+ else if (inSlice || isColumnBeforeSliceFinish(prefetchedCol))
+ {
+ blockColumns.addLast(prefetched.poll());
+ gotSome = true;
+ inSlice = true;
+ }
+ // if col is after slice, ignore
+ else
+ {
+ prefetched.poll();
+ }
+ }
+ if (gotSome)
+ return true;
+ }
+ try
+ {
+ return getNextBlock();
+ }
+ catch (IOException e)
+ {
+ throw new CorruptSSTableException(e, file.getPath());
+ }
+ }
+
+ private boolean getNextBlock() throws IOException
+ {
+ if (lastDeserializedBlock == nextIndexIdx)
+ {
+ if (reversed)
+ nextIndexIdx--;
+ else
+ nextIndexIdx++;
+ }
+ lastDeserializedBlock = nextIndexIdx;
+
+ // Are we done?
+ if (lastDeserializedBlock < 0 || lastDeserializedBlock >= indexes.size())
+ return false;
+
+ IndexInfo currentIndex = indexes.get(lastDeserializedBlock);
+
+ /* seek to the correct offset to the data, and calculate the data size */
+ long positionToSeek = columnsStart + currentIndex.offset;
+
+ // With new promoted indexes, our first seek in the data file will happen at that point.
+ if (file == null)
+ file = originalInput == null ? sstable.getFileDataInput(positionToSeek) : originalInput;
+
+ AtomDeserializer deserializer = emptyColumnFamily.metadata().getOnDiskDeserializer(file, sstable.descriptor.version);
+
+ file.seek(positionToSeek);
+ FileMark mark = file.mark();
+
+ // We remenber when we are whithin a slice to avoid some comparison
+ boolean inSlice = false;
+
+ // scan from index start
+ while (file.bytesPastMark(mark) < currentIndex.width || deserializer.hasUnprocessed())
+ {
+ // col is before slice
+ // (If in slice, don't bother checking that until we change slice)
+ Composite start = currentStart();
+ if (!inSlice && !start.isEmpty() && deserializer.compareNextTo(start) < 0)
+ {
++ // If it's a rangeTombstone, then we need to read it and include it unless it's end
++ // stops before our slice start.
++ if (deserializer.nextIsRangeTombstone())
++ {
++ RangeTombstone rt = (RangeTombstone)deserializer.readNext();
++ if (comparator.compare(rt.max, start) >= 0)
++ addColumn(rt);
++ continue;
++ }
++
+ if (reversed)
+ {
+ // the next slice select columns that are before the current one, so it may
+ // match this column, so keep it around.
+ prefetched.addFirst(deserializer.readNext());
+ }
+ else
+ {
+ deserializer.skipNext();
+ }
+ }
+ // col is within slice
+ else
+ {
+ Composite finish = currentFinish();
+ if (finish.isEmpty() || deserializer.compareNextTo(finish) <= 0)
+ {
+ inSlice = true;
+ addColumn(deserializer.readNext());
+ }
+ // col is after slice.
+ else
+ {
+ // When reading forward, if we hit a column that sorts after the current slice, it means we're done with this slice.
+ // For reversed, this may either mean that we're done with the current slice, or that we need to read the previous
+ // index block. However, we can be sure that we are in the first case though (the current slice is done) if the first
+ // columns of the block were not part of the current slice, i.e. if we have columns in prefetched.
+ if (reversed && prefetched.isEmpty())
+ break;
+
+ if (!setNextSlice())
+ break;
+
+ inSlice = false;
+
+ // The next index block now corresponds to the first block that may have columns for the newly set slice.
+ // So if it's different from the current block, we're done with this block. And in that case, we know
+ // that our prefetched columns won't match.
+ if (nextIndexIdx != lastDeserializedBlock)
+ {
+ if (reversed)
+ prefetched.clear();
+ break;
+ }
+
+ // Even if the next slice may have column in this blocks, if we're reversed, those columns have been
+ // prefetched and we're done with that block
+ if (reversed)
+ break;
+
+ // otherwise, we will deal with that column at the next iteration
+ }
+ }
+ }
+ return true;
+ }
+ }
+
+ private class SimpleBlockFetcher extends BlockFetcher
+ {
+ public SimpleBlockFetcher() throws IOException
+ {
+ // Since we have to deserialize in order and will read all slices might as well reverse the slices and
+ // behave as if it was not reversed
+ super(reversed ? slices.length - 1 : 0);
+
+ // We remenber when we are whithin a slice to avoid some comparison
+ boolean inSlice = false;
+
+ AtomDeserializer deserializer = emptyColumnFamily.metadata().getOnDiskDeserializer(file, sstable.descriptor.version);
+ while (deserializer.hasNext())
+ {
+ // col is before slice
+ // (If in slice, don't bother checking that until we change slice)
+ Composite start = currentStart();
+ if (!inSlice && !start.isEmpty() && deserializer.compareNextTo(start) < 0)
+ {
- deserializer.skipNext();
++ // If it's a rangeTombstone, then we need to read it and include it unless it's end
++ // stops before our slice start. Otherwise, we can skip it.
++ if (deserializer.nextIsRangeTombstone())
++ {
++ RangeTombstone rt = (RangeTombstone)deserializer.readNext();
++ if (comparator.compare(rt.max, start) >= 0)
++ addColumn(rt);
++ }
++ else
++ {
++ deserializer.skipNext();
++ }
+ continue;
+ }
+
+ // col is within slice
+ Composite finish = currentFinish();
+ if (finish.isEmpty() || deserializer.compareNextTo(finish) <= 0)
+ {
+ inSlice = true;
+ addColumn(deserializer.readNext());
+ }
+ // col is after slice. more slices?
+ else
+ {
+ inSlice = false;
+ if (!setNextSlice())
+ break;
+ }
+ }
+ }
+
+ protected boolean setNextSlice()
+ {
+ if (reversed)
+ {
+ if (currentSliceIdx <= 0)
+ return false;
+
+ currentSliceIdx--;
+ }
+ else
+ {
+ if (currentSliceIdx >= slices.length - 1)
+ return false;
+
+ currentSliceIdx++;
+ }
+ return true;
+ }
+
+ protected boolean fetchMoreData()
+ {
+ return false;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0b723ce7/src/java/org/apache/cassandra/io/sstable/format/big/SSTableNamesIterator.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/io/sstable/format/big/SSTableNamesIterator.java
index 07dc59a,0000000..c51e595
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/io/sstable/format/big/SSTableNamesIterator.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/big/SSTableNamesIterator.java
@@@ -1,250 -1,0 +1,265 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.io.sstable.format.big;
+
+import java.io.IOException;
+import java.util.*;
+
+import com.google.common.collect.AbstractIterator;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
+import org.apache.cassandra.db.composites.CellName;
+import org.apache.cassandra.db.composites.CellNameType;
+import org.apache.cassandra.io.sstable.CorruptSSTableException;
+import org.apache.cassandra.io.sstable.IndexHelper;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.util.FileDataInput;
+import org.apache.cassandra.io.util.FileMark;
+import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+class SSTableNamesIterator extends AbstractIterator<OnDiskAtom> implements OnDiskAtomIterator
+{
+ private ColumnFamily cf;
+ private final SSTableReader sstable;
+ private FileDataInput fileToClose;
+ private Iterator<OnDiskAtom> iter;
+ public final SortedSet<CellName> columns;
+ public final DecoratedKey key;
+
+ public SSTableNamesIterator(SSTableReader sstable, DecoratedKey key, SortedSet<CellName> columns)
+ {
+ assert columns != null;
+ this.sstable = sstable;
+ this.columns = columns;
+ this.key = key;
+
+ RowIndexEntry indexEntry = sstable.getPosition(key, SSTableReader.Operator.EQ);
+ if (indexEntry == null)
+ return;
+
+ try
+ {
+ read(sstable, null, indexEntry);
+ }
+ catch (IOException e)
+ {
+ sstable.markSuspect();
+ throw new CorruptSSTableException(e, sstable.getFilename());
+ }
+ finally
+ {
+ if (fileToClose != null)
+ FileUtils.closeQuietly(fileToClose);
+ }
+ }
+
+ public SSTableNamesIterator(SSTableReader sstable, FileDataInput file, DecoratedKey key, SortedSet<CellName> columns, RowIndexEntry indexEntry)
+ {
+ assert columns != null;
+ this.sstable = sstable;
+ this.columns = columns;
+ this.key = key;
+
+ try
+ {
+ read(sstable, file, indexEntry);
+ }
+ catch (IOException e)
+ {
+ sstable.markSuspect();
+ throw new CorruptSSTableException(e, sstable.getFilename());
+ }
+ }
+
+ private FileDataInput createFileDataInput(long position)
+ {
+ fileToClose = sstable.getFileDataInput(position);
+ return fileToClose;
+ }
+
+ private void read(SSTableReader sstable, FileDataInput file, RowIndexEntry indexEntry)
+ throws IOException
+ {
+ List<IndexHelper.IndexInfo> indexList;
+
+ // If the entry is not indexed or the index is not promoted, read from the row start
+ if (!indexEntry.isIndexed())
+ {
+ if (file == null)
+ file = createFileDataInput(indexEntry.position);
+ else
+ file.seek(indexEntry.position);
+
+ DecoratedKey keyInDisk = sstable.partitioner.decorateKey(ByteBufferUtil.readWithShortLength(file));
+ assert keyInDisk.equals(key) : String.format("%s != %s in %s", keyInDisk, key, file.getPath());
+ }
+
+ indexList = indexEntry.columnsIndex();
+
+ if (!indexEntry.isIndexed())
+ {
+ ColumnFamilySerializer serializer = ColumnFamily.serializer;
+ try
+ {
+ cf = ArrayBackedSortedColumns.factory.create(sstable.metadata);
+ cf.delete(DeletionTime.serializer.deserialize(file));
+ }
+ catch (Exception e)
+ {
+ throw new IOException(serializer + " failed to deserialize " + sstable.getColumnFamilyName() + " with " + sstable.metadata + " from " + file, e);
+ }
+ }
+ else
+ {
+ cf = ArrayBackedSortedColumns.factory.create(sstable.metadata);
+ cf.delete(indexEntry.deletionTime());
+ }
+
+ List<OnDiskAtom> result = new ArrayList<OnDiskAtom>();
+ if (indexList.isEmpty())
+ {
+ readSimpleColumns(file, columns, result);
+ }
+ else
+ {
+ readIndexedColumns(sstable.metadata, file, columns, indexList, indexEntry.position, result);
+ }
+
+ // create an iterator view of the columns we read
+ iter = result.iterator();
+ }
+
+ private void readSimpleColumns(FileDataInput file, SortedSet<CellName> columnNames, List<OnDiskAtom> result)
+ {
+ Iterator<OnDiskAtom> atomIterator = cf.metadata().getOnDiskIterator(file, sstable.descriptor.version);
+ int n = 0;
+ while (atomIterator.hasNext())
+ {
+ OnDiskAtom column = atomIterator.next();
+ if (column instanceof Cell)
+ {
+ if (columnNames.contains(column.name()))
+ {
+ result.add(column);
+ if (++n >= columns.size())
+ break;
+ }
+ }
+ else
+ {
+ result.add(column);
+ }
+ }
+ }
+
+ private void readIndexedColumns(CFMetaData metadata,
+ FileDataInput file,
+ SortedSet<CellName> columnNames,
+ List<IndexHelper.IndexInfo> indexList,
+ long basePosition,
+ List<OnDiskAtom> result)
+ throws IOException
+ {
+ /* get the various column ranges we have to read */
+ CellNameType comparator = metadata.comparator;
+ List<IndexHelper.IndexInfo> ranges = new ArrayList<IndexHelper.IndexInfo>();
+ int lastIndexIdx = -1;
+ for (CellName name : columnNames)
+ {
+ int index = IndexHelper.indexFor(name, indexList, comparator, false, lastIndexIdx);
+ if (index < 0 || index == indexList.size())
+ continue;
+ IndexHelper.IndexInfo indexInfo = indexList.get(index);
+ // Check the index block does contain the column names and that we haven't inserted this block yet.
+ if (comparator.compare(name, indexInfo.firstName) < 0 || index == lastIndexIdx)
+ continue;
+
+ ranges.add(indexInfo);
+ lastIndexIdx = index;
+ }
+
+ if (ranges.isEmpty())
+ return;
+
+ Iterator<CellName> toFetch = columnNames.iterator();
+ CellName nextToFetch = toFetch.next();
+ for (IndexHelper.IndexInfo indexInfo : ranges)
+ {
+ long positionToSeek = basePosition + indexInfo.offset;
+
+ // With new promoted indexes, our first seek in the data file will happen at that point.
+ if (file == null)
+ file = createFileDataInput(positionToSeek);
+
+ AtomDeserializer deserializer = cf.metadata().getOnDiskDeserializer(file, sstable.descriptor.version);
+ file.seek(positionToSeek);
+ FileMark mark = file.mark();
+ while (file.bytesPastMark(mark) < indexInfo.width && nextToFetch != null)
+ {
+ int cmp = deserializer.compareNextTo(nextToFetch);
- if (cmp == 0)
++ if (cmp < 0)
++ {
++ // If it's a rangeTombstone, then we need to read it and include
++ // it if it includes our target. Otherwise, we can skip it.
++ if (deserializer.nextIsRangeTombstone())
++ {
++ RangeTombstone rt = (RangeTombstone)deserializer.readNext();
++ if (comparator.compare(rt.max, nextToFetch) >= 0)
++ result.add(rt);
++ }
++ else
++ {
++ deserializer.skipNext();
++ }
++ }
++ else if (cmp == 0)
+ {
+ nextToFetch = toFetch.hasNext() ? toFetch.next() : null;
+ result.add(deserializer.readNext());
- continue;
+ }
-
- deserializer.skipNext();
- if (cmp > 0)
++ else
++ {
++ deserializer.skipNext();
+ nextToFetch = toFetch.hasNext() ? toFetch.next() : null;
++ }
+ }
+ }
+ }
+
+ public DecoratedKey getKey()
+ {
+ return key;
+ }
+
+ public ColumnFamily getColumnFamily()
+ {
+ return cf;
+ }
+
+ protected OnDiskAtom computeNext()
+ {
+ if (iter == null || !iter.hasNext())
+ return endOfData();
+ return iter.next();
+ }
+
+ public void close() throws IOException { }
+}