You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2015/07/22 18:05:41 UTC
[12/15] cassandra git commit: Simplify some 8099's implementations
http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/Slice.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Slice.java b/src/java/org/apache/cassandra/db/Slice.java
index 05c2977..2ffb91e 100644
--- a/src/java/org/apache/cassandra/db/Slice.java
+++ b/src/java/org/apache/cassandra/db/Slice.java
@@ -17,7 +17,6 @@
*/
package org.apache.cassandra.db;
-import java.io.DataInput;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.security.MessageDigest;
@@ -25,6 +24,7 @@ import java.util.*;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputPlus;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.ObjectSizes;
@@ -68,8 +68,8 @@ public class Slice
private Slice(Bound start, Bound end)
{
assert start.isStart() && end.isEnd();
- this.start = start.takeAlias();
- this.end = end.takeAlias();
+ this.start = start;
+ this.end = end;
}
public static Slice make(Bound start, Bound end)
@@ -331,7 +331,7 @@ public class Slice
+ Bound.serializer.serializedSize(slice.end, version, types);
}
- public Slice deserialize(DataInput in, int version, List<AbstractType<?>> types) throws IOException
+ public Slice deserialize(DataInputPlus in, int version, List<AbstractType<?>> types) throws IOException
{
Bound start = Bound.serializer.deserialize(in, version, types);
Bound end = Bound.serializer.deserialize(in, version, types);
@@ -346,21 +346,19 @@ public class Slice
*/
public static class Bound extends AbstractClusteringPrefix
{
- private static final long EMPTY_SIZE = ObjectSizes.measure(new Bound(Kind.INCL_START_BOUND, new ByteBuffer[0]));
public static final Serializer serializer = new Serializer();
- /** The smallest start bound, i.e. the one that starts before any row. */
- public static final Bound BOTTOM = inclusiveStartOf();
- /** The biggest end bound, i.e. the one that ends after any row. */
- public static final Bound TOP = inclusiveEndOf();
-
- protected final Kind kind;
- protected final ByteBuffer[] values;
+ /**
+ * The smallest and biggest bound. Note that as range tomstone bounds are (special case) of slice bounds,
+ * we want the BOTTOM and TOP to be the same object, but we alias them here because it's cleaner when dealing
+ * with slices to refer to Slice.Bound.BOTTOM and Slice.Bound.TOP.
+ */
+ public static final Bound BOTTOM = RangeTombstone.Bound.BOTTOM;
+ public static final Bound TOP = RangeTombstone.Bound.TOP;
protected Bound(Kind kind, ByteBuffer[] values)
{
- this.kind = kind;
- this.values = values;
+ super(kind, values);
}
public static Bound create(Kind kind, ByteBuffer[] values)
@@ -396,22 +394,6 @@ public class Slice
return create(Kind.EXCL_END_BOUND, values);
}
- public static Bound exclusiveStartOf(ClusteringPrefix prefix)
- {
- ByteBuffer[] values = new ByteBuffer[prefix.size()];
- for (int i = 0; i < prefix.size(); i++)
- values[i] = prefix.get(i);
- return exclusiveStartOf(values);
- }
-
- public static Bound inclusiveEndOf(ClusteringPrefix prefix)
- {
- ByteBuffer[] values = new ByteBuffer[prefix.size()];
- for (int i = 0; i < prefix.size(); i++)
- values[i] = prefix.get(i);
- return inclusiveEndOf(values);
- }
-
public static Bound create(ClusteringComparator comparator, boolean isStart, boolean isInclusive, Object... values)
{
CBuilder builder = CBuilder.create(comparator);
@@ -426,21 +408,6 @@ public class Slice
return builder.buildBound(isStart, isInclusive);
}
- public Kind kind()
- {
- return kind;
- }
-
- public int size()
- {
- return values.length;
- }
-
- public ByteBuffer get(int i)
- {
- return values[i];
- }
-
public Bound withNewKind(Kind kind)
{
assert !kind.isBoundary();
@@ -480,24 +447,6 @@ public class Slice
return withNewKind(kind().invert());
}
- public ByteBuffer[] getRawValues()
- {
- return values;
- }
-
- public void digest(MessageDigest digest)
- {
- for (int i = 0; i < size(); i++)
- digest.update(get(i).duplicate());
- FBUtilities.updateWithByte(digest, kind().ordinal());
- }
-
- public void writeTo(Slice.Bound.Writer writer)
- {
- super.writeTo(writer);
- writer.writeBoundKind(kind());
- }
-
// For use by intersects, it's called with the sstable bound opposite to the slice bound
// (so if the slice bound is a start, it's call with the max sstable bound)
private int compareTo(ClusteringComparator comparator, List<ByteBuffer> sstableBound)
@@ -544,66 +493,10 @@ public class Slice
return sb.append(")").toString();
}
- // Overriding to get a more precise type
- @Override
- public Bound takeAlias()
- {
- return this;
- }
-
- @Override
- public long unsharedHeapSize()
- {
- return EMPTY_SIZE + ObjectSizes.sizeOnHeapOf(values);
- }
-
- public long unsharedHeapSizeExcludingData()
- {
- return EMPTY_SIZE + ObjectSizes.sizeOnHeapExcludingData(values);
- }
-
- public static Builder builder(int size)
- {
- return new Builder(size);
- }
-
- public interface Writer extends ClusteringPrefix.Writer
- {
- public void writeBoundKind(Kind kind);
- }
-
- public static class Builder implements Writer
- {
- private final ByteBuffer[] values;
- private Kind kind;
- private int idx;
-
- private Builder(int size)
- {
- this.values = new ByteBuffer[size];
- }
-
- public void writeClusteringValue(ByteBuffer value)
- {
- values[idx++] = value;
- }
-
- public void writeBoundKind(Kind kind)
- {
- this.kind = kind;
- }
-
- public Slice.Bound build()
- {
- assert idx == values.length;
- return Slice.Bound.create(kind, values);
- }
- }
-
/**
* Serializer for slice bounds.
* <p>
- * Contrarily to {@code Clustering}, a slice bound can only be a true prefix of the full clustering, so we actually record
+ * Contrarily to {@code Clustering}, a slice bound can be a true prefix of the full clustering, so we actually record
* its size.
*/
public static class Serializer
@@ -622,31 +515,21 @@ public class Slice
+ ClusteringPrefix.serializer.valuesWithoutSizeSerializedSize(bound, version, types);
}
- public Slice.Bound deserialize(DataInput in, int version, List<AbstractType<?>> types) throws IOException
+ public Slice.Bound deserialize(DataInputPlus in, int version, List<AbstractType<?>> types) throws IOException
{
Kind kind = Kind.values()[in.readByte()];
return deserializeValues(in, kind, version, types);
}
- public Slice.Bound deserializeValues(DataInput in, Kind kind, int version, List<AbstractType<?>> types) throws IOException
+ public Slice.Bound deserializeValues(DataInputPlus in, Kind kind, int version, List<AbstractType<?>> types) throws IOException
{
int size = in.readUnsignedShort();
if (size == 0)
return kind.isStart() ? BOTTOM : TOP;
- Builder builder = builder(size);
- ClusteringPrefix.serializer.deserializeValuesWithoutSize(in, size, version, types, builder);
- builder.writeBoundKind(kind);
- return builder.build();
- }
-
- public void deserializeValues(DataInput in, Bound.Kind kind, int version, List<AbstractType<?>> types, Writer writer) throws IOException
- {
- int size = in.readUnsignedShort();
- ClusteringPrefix.serializer.deserializeValuesWithoutSize(in, size, version, types, writer);
- writer.writeBoundKind(kind);
+ ByteBuffer[] values = ClusteringPrefix.serializer.deserializeValuesWithoutSize(in, size, version, types);
+ return Slice.Bound.create(kind, values);
}
-
}
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/Slices.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Slices.java b/src/java/org/apache/cassandra/db/Slices.java
index a6c690b..32ca06d 100644
--- a/src/java/org/apache/cassandra/db/Slices.java
+++ b/src/java/org/apache/cassandra/db/Slices.java
@@ -17,7 +17,6 @@
*/
package org.apache.cassandra.db;
-import java.io.DataInput;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.*;
@@ -28,6 +27,7 @@ import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.ColumnDefinition;
import org.apache.cassandra.db.rows.*;
import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputPlus;
/**
@@ -318,7 +318,7 @@ public abstract class Slices implements Iterable<Slice>
return size;
}
- public Slices deserialize(DataInput in, int version, CFMetaData metadata) throws IOException
+ public Slices deserialize(DataInputPlus in, int version, CFMetaData metadata) throws IOException
{
int size = in.readInt();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/SystemKeyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SystemKeyspace.java b/src/java/org/apache/cassandra/db/SystemKeyspace.java
index e8247a3..df7e7ef 100644
--- a/src/java/org/apache/cassandra/db/SystemKeyspace.java
+++ b/src/java/org/apache/cassandra/db/SystemKeyspace.java
@@ -1160,7 +1160,7 @@ public final class SystemKeyspace
// delete all previous values with a single range tombstone.
int nowInSec = FBUtilities.nowInSeconds();
- update.addRangeTombstone(Slice.make(SizeEstimates.comparator, table), new SimpleDeletionTime(timestamp - 1, nowInSec));
+ update.add(new RangeTombstone(Slice.make(SizeEstimates.comparator, table), new DeletionTime(timestamp - 1, nowInSec)));
// add a CQL row for each primary token range.
for (Map.Entry<Range<Token>, Pair<Long, Long>> entry : estimates.entrySet())
http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/TypeSizes.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/TypeSizes.java b/src/java/org/apache/cassandra/db/TypeSizes.java
index a9e432f..73766c8 100644
--- a/src/java/org/apache/cassandra/db/TypeSizes.java
+++ b/src/java/org/apache/cassandra/db/TypeSizes.java
@@ -68,6 +68,11 @@ public final class TypeSizes
return sizeof(value.remaining()) + value.remaining();
}
+ public static int sizeofWithVIntLength(ByteBuffer value)
+ {
+ return sizeofVInt(value.remaining()) + value.remaining();
+ }
+
public static int sizeof(boolean value)
{
return BOOL_SIZE;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/UnfilteredDeserializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/UnfilteredDeserializer.java b/src/java/org/apache/cassandra/db/UnfilteredDeserializer.java
index cf7c2dd..b3709d2 100644
--- a/src/java/org/apache/cassandra/db/UnfilteredDeserializer.java
+++ b/src/java/org/apache/cassandra/db/UnfilteredDeserializer.java
@@ -111,8 +111,7 @@ public abstract class UnfilteredDeserializer
private boolean isReady;
private boolean isDone;
- private final ReusableRow row;
- private final RangeTombstoneMarker.Builder markerBuilder;
+ private final Row.Builder builder;
private CurrentDeserializer(CFMetaData metadata,
DataInputPlus in,
@@ -122,8 +121,7 @@ public abstract class UnfilteredDeserializer
super(metadata, in, helper);
this.header = header;
this.clusteringDeserializer = new ClusteringPrefix.Deserializer(metadata.comparator, in, header);
- this.row = new ReusableRow(metadata.clusteringColumns().size(), header.columns().regulars, true, metadata.isCounter());
- this.markerBuilder = new RangeTombstoneMarker.Builder(metadata.clusteringColumns().size());
+ this.builder = ArrayBackedRow.sortedBuilder(helper.fetchedRegularColumns(header));
}
public boolean hasNext() throws IOException
@@ -181,17 +179,13 @@ public abstract class UnfilteredDeserializer
isReady = false;
if (UnfilteredSerializer.kind(nextFlags) == Unfiltered.Kind.RANGE_TOMBSTONE_MARKER)
{
- markerBuilder.reset();
- RangeTombstone.Bound.Kind kind = clusteringDeserializer.deserializeNextBound(markerBuilder);
- UnfilteredSerializer.serializer.deserializeMarkerBody(in, header, kind.isBoundary(), markerBuilder);
- return markerBuilder.build();
+ RangeTombstone.Bound bound = clusteringDeserializer.deserializeNextBound();
+ return UnfilteredSerializer.serializer.deserializeMarkerBody(in, header, bound);
}
else
{
- Row.Writer writer = row.writer();
- clusteringDeserializer.deserializeNextClustering(writer);
- UnfilteredSerializer.serializer.deserializeRowBody(in, header, helper, nextFlags, writer);
- return row;
+ builder.newRow(clusteringDeserializer.deserializeNextClustering());
+ return UnfilteredSerializer.serializer.deserializeRowBody(in, header, helper, nextFlags, builder);
}
}
@@ -205,7 +199,7 @@ public abstract class UnfilteredDeserializer
}
else
{
- UnfilteredSerializer.serializer.skipRowBody(in, header, helper, nextFlags);
+ UnfilteredSerializer.serializer.skipRowBody(in, header, nextFlags);
}
}
@@ -221,7 +215,6 @@ public abstract class UnfilteredDeserializer
private final boolean readAllAsDynamic;
private boolean skipStatic;
- private int nextFlags;
private boolean isDone;
private boolean isStart = true;
@@ -254,13 +247,7 @@ public abstract class UnfilteredDeserializer
public boolean hasNext() throws IOException
{
- if (nextAtom != null)
- return true;
-
- if (isDone)
- return false;
-
- return deserializeNextAtom();
+ return nextAtom != null || (!isDone && deserializeNextAtom());
}
private boolean deserializeNextAtom() throws IOException
@@ -392,6 +379,7 @@ public abstract class UnfilteredDeserializer
grouper.addAtom(nextAtom);
while (deserializeNextAtom() && grouper.addAtom(nextAtom))
{
+ // Nothing to do, deserializeNextAtom() changes nextAtom and it's then added to the grouper
}
// if this was the first static row, we're done with it. Otherwise, we're also done with static.
http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/columniterator/AbstractSSTableIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/columniterator/AbstractSSTableIterator.java b/src/java/org/apache/cassandra/db/columniterator/AbstractSSTableIterator.java
index b406251..8625112 100644
--- a/src/java/org/apache/cassandra/db/columniterator/AbstractSSTableIterator.java
+++ b/src/java/org/apache/cassandra/db/columniterator/AbstractSSTableIterator.java
@@ -38,8 +38,6 @@ import org.apache.cassandra.utils.ByteBufferUtil;
abstract class AbstractSSTableIterator implements SliceableUnfilteredRowIterator
{
- private static final Logger logger = LoggerFactory.getLogger(AbstractSSTableIterator.class);
-
protected final SSTableReader sstable;
protected final DecoratedKey key;
protected final DeletionTime partitionLevelDeletion;
@@ -65,7 +63,7 @@ abstract class AbstractSSTableIterator implements SliceableUnfilteredRowIterator
this.sstable = sstable;
this.key = key;
this.columns = columnFilter;
- this.helper = new SerializationHelper(sstable.descriptor.version.correspondingMessagingVersion(), SerializationHelper.Flag.LOCAL, columnFilter);
+ this.helper = new SerializationHelper(sstable.metadata, sstable.descriptor.version.correspondingMessagingVersion(), SerializationHelper.Flag.LOCAL, columnFilter);
this.isForThrift = isForThrift;
if (indexEntry == null)
@@ -81,7 +79,7 @@ abstract class AbstractSSTableIterator implements SliceableUnfilteredRowIterator
{
// We seek to the beginning to the partition if either:
// - the partition is not indexed; we then have a single block to read anyway
- // and we need to read the partition deletion time.
+ // (and we need to read the partition deletion time).
// - we're querying static columns.
boolean needSeekAtPartitionStart = !indexEntry.isIndexed() || !columns.fetchedColumns().statics.isEmpty();
@@ -104,24 +102,24 @@ abstract class AbstractSSTableIterator implements SliceableUnfilteredRowIterator
// Note that this needs to be called after file != null and after the partitionDeletion has been set, but before readStaticRow
// (since it uses it) so we can't move that up (but we'll be able to simplify as soon as we drop support for the old file format).
- this.reader = needsReader ? createReader(indexEntry, file, needSeekAtPartitionStart, shouldCloseFile) : null;
+ this.reader = needsReader ? createReader(indexEntry, file, true, shouldCloseFile) : null;
this.staticRow = readStaticRow(sstable, file, helper, columns.fetchedColumns().statics, isForThrift, reader == null ? null : reader.deserializer);
}
else
{
this.partitionLevelDeletion = indexEntry.deletionTime();
this.staticRow = Rows.EMPTY_STATIC_ROW;
- this.reader = needsReader ? createReader(indexEntry, file, needSeekAtPartitionStart, shouldCloseFile) : null;
+ this.reader = needsReader ? createReader(indexEntry, file, false, shouldCloseFile) : null;
}
- if (reader == null && shouldCloseFile)
+ if (reader == null && file != null && shouldCloseFile)
file.close();
}
catch (IOException e)
{
sstable.markSuspect();
String filePath = file.getPath();
- if (shouldCloseFile && file != null)
+ if (shouldCloseFile)
{
try
{
@@ -164,7 +162,7 @@ abstract class AbstractSSTableIterator implements SliceableUnfilteredRowIterator
if (statics.isEmpty() || isForThrift)
return Rows.EMPTY_STATIC_ROW;
- assert sstable.metadata.isStaticCompactTable() && !isForThrift;
+ assert sstable.metadata.isStaticCompactTable();
// As said above, if it's a CQL query and the table is a "static compact", the only exposed columns are the
// static ones. So we don't have to mark the position to seek back later.
@@ -221,45 +219,13 @@ abstract class AbstractSSTableIterator implements SliceableUnfilteredRowIterator
public boolean hasNext()
{
- try
- {
- return reader != null && reader.hasNext();
- }
- catch (IOException e)
- {
- try
- {
- closeInternal();
- }
- catch (IOException suppressed)
- {
- e.addSuppressed(suppressed);
- }
- sstable.markSuspect();
- throw new CorruptSSTableException(e, reader.file.getPath());
- }
+ return reader != null && reader.hasNext();
}
public Unfiltered next()
{
- try
- {
- assert reader != null;
- return reader.next();
- }
- catch (IOException e)
- {
- try
- {
- closeInternal();
- }
- catch (IOException suppressed)
- {
- e.addSuppressed(suppressed);
- }
- sstable.markSuspect();
- throw new CorruptSSTableException(e, reader.file.getPath());
- }
+ assert reader != null;
+ return reader.next();
}
public Iterator<Unfiltered> slice(Slice slice)
@@ -269,7 +235,8 @@ abstract class AbstractSSTableIterator implements SliceableUnfilteredRowIterator
if (reader == null)
return Collections.emptyIterator();
- return reader.slice(slice);
+ reader.setForSlice(slice);
+ return reader;
}
catch (IOException e)
{
@@ -317,7 +284,7 @@ abstract class AbstractSSTableIterator implements SliceableUnfilteredRowIterator
}
}
- protected abstract class Reader
+ protected abstract class Reader implements Iterator<Unfiltered>
{
private final boolean shouldCloseFile;
public FileDataInput file;
@@ -327,12 +294,19 @@ abstract class AbstractSSTableIterator implements SliceableUnfilteredRowIterator
// Records the currently open range tombstone (if any)
protected DeletionTime openMarker = null;
- protected Reader(FileDataInput file, boolean shouldCloseFile)
+ // !isInit means we have never seeked in the file and thus should seek before reading anything
+ protected boolean isInit;
+
+ protected Reader(FileDataInput file, boolean isInit, boolean shouldCloseFile)
{
this.file = file;
+ this.isInit = isInit;
this.shouldCloseFile = shouldCloseFile;
+
if (file != null)
createDeserializer();
+ else
+ assert !isInit;
}
private void createDeserializer()
@@ -369,9 +343,62 @@ abstract class AbstractSSTableIterator implements SliceableUnfilteredRowIterator
return toReturn;
}
- public abstract boolean hasNext() throws IOException;
- public abstract Unfiltered next() throws IOException;
- public abstract Iterator<Unfiltered> slice(Slice slice) throws IOException;
+ public boolean hasNext()
+ {
+ try
+ {
+ if (!isInit)
+ {
+ init();
+ isInit = true;
+ }
+
+ return hasNextInternal();
+ }
+ catch (IOException e)
+ {
+ try
+ {
+ closeInternal();
+ }
+ catch (IOException suppressed)
+ {
+ e.addSuppressed(suppressed);
+ }
+ sstable.markSuspect();
+ throw new CorruptSSTableException(e, reader.file.getPath());
+ }
+ }
+
+ public Unfiltered next()
+ {
+ try
+ {
+ return nextInternal();
+ }
+ catch (IOException e)
+ {
+ try
+ {
+ closeInternal();
+ }
+ catch (IOException suppressed)
+ {
+ e.addSuppressed(suppressed);
+ }
+ sstable.markSuspect();
+ throw new CorruptSSTableException(e, reader.file.getPath());
+ }
+ }
+
+ // Called is hasNext() is called but we haven't been yet initialized
+ protected abstract void init() throws IOException;
+
+ // Set the reader so its hasNext/next methods return values within the provided slice
+ public abstract void setForSlice(Slice slice) throws IOException;
+
+ protected abstract boolean hasNextInternal() throws IOException;
+ protected abstract Unfiltered nextInternal() throws IOException;
public void close() throws IOException
{
@@ -380,35 +407,61 @@ abstract class AbstractSSTableIterator implements SliceableUnfilteredRowIterator
}
}
- protected abstract class IndexedReader extends Reader
+ // Used by indexed readers to store where they are of the index.
+ protected static class IndexState
{
- protected final RowIndexEntry indexEntry;
- protected final List<IndexHelper.IndexInfo> indexes;
+ private final Reader reader;
+ private final ClusteringComparator comparator;
- protected int currentIndexIdx = -1;
+ private final RowIndexEntry indexEntry;
+ private final List<IndexHelper.IndexInfo> indexes;
+ private final boolean reversed;
- // Marks the beginning of the block corresponding to currentIndexIdx.
- protected FileMark mark;
+ private int currentIndexIdx = -1;
- // !isInit means we have never seeked in the file and thus shouldn't read as we could be anywhere
- protected boolean isInit;
+ // Marks the beginning of the block corresponding to currentIndexIdx.
+ private FileMark mark;
- protected IndexedReader(FileDataInput file, boolean shouldCloseFile, RowIndexEntry indexEntry, boolean isInit)
+ public IndexState(Reader reader, ClusteringComparator comparator, RowIndexEntry indexEntry, boolean reversed)
{
- super(file, shouldCloseFile);
+ this.reader = reader;
+ this.comparator = comparator;
this.indexEntry = indexEntry;
this.indexes = indexEntry.columnsIndex();
- this.isInit = isInit;
+ this.reversed = reversed;
+ this.currentIndexIdx = reversed ? indexEntry.columnsIndex().size() : -1;
+ }
+
+ public boolean isDone()
+ {
+ return reversed ? currentIndexIdx < 0 : currentIndexIdx >= indexes.size();
}
- // Should be called when we're at the beginning of blockIdx.
- protected void updateBlock(int blockIdx) throws IOException
+ // Sets the reader to the beginning of blockIdx.
+ public void setToBlock(int blockIdx) throws IOException
{
- seekToPosition(indexEntry.position + indexes.get(blockIdx).offset);
+ if (blockIdx >= 0 && blockIdx < indexes.size())
+ reader.seekToPosition(indexEntry.position + indexes.get(blockIdx).offset);
currentIndexIdx = blockIdx;
- openMarker = blockIdx > 0 ? indexes.get(blockIdx - 1).endOpenMarker : null;
- mark = file.mark();
+ reader.openMarker = blockIdx > 0 ? indexes.get(blockIdx - 1).endOpenMarker : null;
+ mark = reader.file.mark();
+ }
+
+ public int blocksCount()
+ {
+ return indexes.size();
+ }
+
+ // Check if we've crossed an index boundary (based on the mark on the beginning of the index block).
+ public boolean isPastCurrentBlock()
+ {
+ return currentIndexIdx < indexes.size() && reader.file.bytesPastMark(mark) >= currentIndex().width;
+ }
+
+ public int currentBlockIdx()
+ {
+ return currentIndexIdx;
}
public IndexHelper.IndexInfo currentIndex()
@@ -416,9 +469,16 @@ abstract class AbstractSSTableIterator implements SliceableUnfilteredRowIterator
return indexes.get(currentIndexIdx);
}
- public IndexHelper.IndexInfo previousIndex()
+ // Finds the index of the first block containing the provided bound, starting at the current index.
+ // Will be -1 if the bound is before any block, and blocksCount() if it is after every block.
+ public int findBlockIndex(Slice.Bound bound)
{
- return currentIndexIdx <= 1 ? null : indexes.get(currentIndexIdx - 1);
+ if (bound == Slice.Bound.BOTTOM)
+ return -1;
+ if (bound == Slice.Bound.TOP)
+ return blocksCount();
+
+ return IndexHelper.indexFor(bound, indexes, comparator, reversed, currentIndexIdx);
}
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/columniterator/SSTableIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/columniterator/SSTableIterator.java b/src/java/org/apache/cassandra/db/columniterator/SSTableIterator.java
index 4fd5205..a58ea3e 100644
--- a/src/java/org/apache/cassandra/db/columniterator/SSTableIterator.java
+++ b/src/java/org/apache/cassandra/db/columniterator/SSTableIterator.java
@@ -18,30 +18,19 @@
package org.apache.cassandra.db.columniterator;
import java.io.IOException;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-
-import com.google.common.collect.AbstractIterator;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import java.util.NoSuchElementException;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.filter.ColumnFilter;
import org.apache.cassandra.db.rows.*;
import org.apache.cassandra.io.sstable.format.SSTableReader;
-import org.apache.cassandra.io.sstable.CorruptSSTableException;
-import org.apache.cassandra.io.sstable.IndexHelper;
import org.apache.cassandra.io.util.FileDataInput;
-import org.apache.cassandra.utils.ByteBufferUtil;
/**
* A Cell Iterator over SSTable
*/
public class SSTableIterator extends AbstractSSTableIterator
{
- private static final Logger logger = LoggerFactory.getLogger(SSTableIterator.class);
-
public SSTableIterator(SSTableReader sstable, DecoratedKey key, ColumnFilter columns, boolean isForThrift)
{
this(sstable, null, key, sstable.getPosition(key, SSTableReader.Operator.EQ), columns, isForThrift);
@@ -71,222 +60,215 @@ public class SSTableIterator extends AbstractSSTableIterator
private class ForwardReader extends Reader
{
+ // The start of the current slice. This will be null as soon as we know we've passed that bound.
+ protected Slice.Bound start;
+ // The end of the current slice. Will never be null.
+ protected Slice.Bound end = Slice.Bound.TOP;
+
+ protected Unfiltered next; // the next element to return: this is computed by hasNextInternal().
+
+ protected boolean sliceDone; // set to true once we know we have no more result for the slice. This is in particular
+ // used by the indexed reader when we know we can't have results based on the index.
+
private ForwardReader(FileDataInput file, boolean isAtPartitionStart, boolean shouldCloseFile)
{
- super(file, shouldCloseFile);
- assert isAtPartitionStart;
+ super(file, isAtPartitionStart, shouldCloseFile);
}
- public boolean hasNext() throws IOException
+ protected void init() throws IOException
{
- assert deserializer != null;
- return deserializer.hasNext();
+ // We should always have been initialized (at the beginning of the partition). Only indexed readers may
+ // have to initialize.
+ throw new IllegalStateException();
}
- public Unfiltered next() throws IOException
+ public void setForSlice(Slice slice) throws IOException
{
- return deserializer.readNext();
+ start = slice.start() == Slice.Bound.BOTTOM ? null : slice.start();
+ end = slice.end();
+
+ sliceDone = false;
+ next = null;
}
- public Iterator<Unfiltered> slice(final Slice slice) throws IOException
+ // Skip all data that comes before the currently set slice.
+ // Return what should be returned at the end of this, or null if nothing should.
+ private Unfiltered handlePreSliceData() throws IOException
{
- return new AbstractIterator<Unfiltered>()
+ // Note that the following comparison is not strict. The reason is that the only cases
+ // where it can be == is if the "next" is a RT start marker (either a '[' of a ')[' boundary),
+ // and if we had a strict inequality and an open RT marker before this, we would issue
+ // the open marker first, and then return then next later, which would send in the
+ // stream both '[' (or '(') and then ')[' for the same clustering value, which is wrong.
+ // By using a non-strict inequality, we avoid that problem (if we do get ')[' for the same
+ // clustering value than the slice, we'll simply record it in 'openMarker').
+ while (deserializer.hasNext() && deserializer.compareNextTo(start) <= 0)
{
- private boolean beforeStart = true;
+ if (deserializer.nextIsRow())
+ deserializer.skipNext();
+ else
+ updateOpenMarker((RangeTombstoneMarker)deserializer.readNext());
+ }
+
+ Slice.Bound sliceStart = start;
+ start = null;
+
+ // We've reached the beginning of our queried slice. If we have an open marker
+ // we should return that first.
+ if (openMarker != null)
+ return new RangeTombstoneBoundMarker(sliceStart, openMarker);
- protected Unfiltered computeNext()
+ return null;
+ }
+
+ // Compute the next element to return, assuming we're in the middle to the slice
+ // and the next element is either in the slice, or just after it. Returns null
+ // if we're done with the slice.
+ protected Unfiltered computeNext() throws IOException
+ {
+ if (!deserializer.hasNext() || deserializer.compareNextTo(end) > 0)
+ return null;
+
+ Unfiltered next = deserializer.readNext();
+ if (next.kind() == Unfiltered.Kind.RANGE_TOMBSTONE_MARKER)
+ updateOpenMarker((RangeTombstoneMarker)next);
+ return next;
+ }
+
+ protected boolean hasNextInternal() throws IOException
+ {
+ if (next != null)
+ return true;
+
+ if (sliceDone)
+ return false;
+
+ assert deserializer != null;
+
+ if (start != null)
+ {
+ Unfiltered unfiltered = handlePreSliceData();
+ if (unfiltered != null)
{
- try
- {
- // While we're before the start of the slice, we can skip row but we should keep
- // track of open range tombstones
- if (beforeStart)
- {
- // Note that the following comparison is not strict. The reason is that the only cases
- // where it can be == is if the "next" is a RT start marker (either a '[' of a ')[' boundary),
- // and if we had a strict inequality and an open RT marker before this, we would issue
- // the open marker first, and then return then next later, which would yet in the
- // stream both '[' (or '(') and then ')[' for the same clustering value, which is wrong.
- // By using a non-strict inequality, we avoid that problem (if we do get ')[' for the same
- // clustering value than the slice, we'll simply record it in 'openMarker').
- while (deserializer.hasNext() && deserializer.compareNextTo(slice.start()) <= 0)
- {
- if (deserializer.nextIsRow())
- deserializer.skipNext();
- else
- updateOpenMarker((RangeTombstoneMarker)deserializer.readNext());
- }
-
- beforeStart = false;
-
- // We've reached the beginning of our queried slice. If we have an open marker
- // we should return that first.
- if (openMarker != null)
- return new RangeTombstoneBoundMarker(slice.start(), openMarker);
- }
-
- if (deserializer.hasNext() && deserializer.compareNextTo(slice.end()) <= 0)
- {
- Unfiltered next = deserializer.readNext();
- if (next.kind() == Unfiltered.Kind.RANGE_TOMBSTONE_MARKER)
- updateOpenMarker((RangeTombstoneMarker)next);
- return next;
- }
-
- // If we have an open marker, we should close it before finishing
- if (openMarker != null)
- return new RangeTombstoneBoundMarker(slice.end(), getAndClearOpenMarker());
-
- return endOfData();
- }
- catch (IOException e)
- {
- try
- {
- close();
- }
- catch (IOException suppressed)
- {
- e.addSuppressed(suppressed);
- }
- sstable.markSuspect();
- throw new CorruptSSTableException(e, file.getPath());
- }
+ next = unfiltered;
+ return true;
}
- };
+ }
+
+ next = computeNext();
+ if (next != null)
+ return true;
+
+ // If we have an open marker, we should close it before finishing
+ if (openMarker != null)
+ {
+ next = new RangeTombstoneBoundMarker(end, getAndClearOpenMarker());
+ return true;
+ }
+
+ sliceDone = true; // not absolutely necessary but accurate and cheap
+ return false;
+ }
+
+ protected Unfiltered nextInternal() throws IOException
+ {
+ if (!hasNextInternal())
+ throw new NoSuchElementException();
+
+ Unfiltered toReturn = next;
+ next = null;
+ return toReturn;
}
}
- private class ForwardIndexedReader extends IndexedReader
+ private class ForwardIndexedReader extends ForwardReader
{
+ private final IndexState indexState;
+
+ private int lastBlockIdx; // the last index block that has data for the current query
+
private ForwardIndexedReader(RowIndexEntry indexEntry, FileDataInput file, boolean isAtPartitionStart, boolean shouldCloseFile)
{
- super(file, shouldCloseFile, indexEntry, isAtPartitionStart);
+ super(file, isAtPartitionStart, shouldCloseFile);
+ this.indexState = new IndexState(this, sstable.metadata.comparator, indexEntry, false);
+ this.lastBlockIdx = indexState.blocksCount(); // if we never call setForSlice, that's where we want to stop
}
- public boolean hasNext() throws IOException
+ @Override
+ protected void init() throws IOException
{
- // If it's called before we've created the file, create it. This then mean
- // we're reading from the beginning of the partition.
- if (!isInit)
- {
- seekToPosition(indexEntry.position);
- ByteBufferUtil.skipShortLength(file); // partition key
- DeletionTime.serializer.skip(file); // partition deletion
- if (sstable.header.hasStatic())
- UnfilteredSerializer.serializer.skipStaticRow(file, sstable.header, helper);
- isInit = true;
- }
- return deserializer.hasNext();
+ // If this is called, it means we're calling hasNext() before any call to setForSlice. Which means
+ // we're reading everything from the beginning. So just set us up at the beginning of the first block.
+ indexState.setToBlock(0);
}
- public Unfiltered next() throws IOException
+ @Override
+ public void setForSlice(Slice slice) throws IOException
{
- return deserializer.readNext();
- }
+ super.setForSlice(slice);
- public Iterator<Unfiltered> slice(final Slice slice) throws IOException
- {
- final List<IndexHelper.IndexInfo> indexes = indexEntry.columnsIndex();
+ isInit = true;
// if our previous slicing already got us the biggest row in the sstable, we're done
- if (currentIndexIdx >= indexes.size())
- return Collections.emptyIterator();
+ if (indexState.isDone())
+ {
+ sliceDone = true;
+ return;
+ }
// Find the first index block we'll need to read for the slice.
- final int startIdx = IndexHelper.indexFor(slice.start(), indexes, sstable.metadata.comparator, false, currentIndexIdx);
- if (startIdx >= indexes.size())
- return Collections.emptyIterator();
+ int startIdx = indexState.findBlockIndex(slice.start());
+ if (startIdx >= indexState.blocksCount())
+ {
+ sliceDone = true;
+ return;
+ }
// If that's the last block we were reading, we're already where we want to be. Otherwise,
// seek to that first block
- if (startIdx != currentIndexIdx)
- updateBlock(startIdx);
+ if (startIdx != indexState.currentBlockIdx())
+ indexState.setToBlock(startIdx);
// Find the last index block we'll need to read for the slice.
- final int endIdx = IndexHelper.indexFor(slice.end(), indexes, sstable.metadata.comparator, false, startIdx);
-
- final IndexHelper.IndexInfo startIndex = currentIndex();
+ lastBlockIdx = indexState.findBlockIndex(slice.end());
// The index search is based on the last name of the index blocks, so at that point we have that:
- // 1) indexes[startIdx - 1].lastName < slice.start <= indexes[startIdx].lastName
- // 2) indexes[endIdx - 1].lastName < slice.end <= indexes[endIdx].lastName
- // so if startIdx == endIdx and slice.end < indexes[startIdx].firstName, we're guaranteed that the
- // whole slice is between the previous block end and this bloc start, and thus has no corresponding
+ // 1) indexes[currentIdx - 1].lastName < slice.start <= indexes[currentIdx].lastName
+ // 2) indexes[lastBlockIdx - 1].lastName < slice.end <= indexes[lastBlockIdx].lastName
+ // so if currentIdx == lastBlockIdx and slice.end < indexes[currentIdx].firstName, we're guaranteed that the
+ // whole slice is between the previous block end and this block start, and thus has no corresponding
// data. One exception is if the previous block ends with an openMarker as it will cover our slice
// and we need to return it.
- if (startIdx == endIdx && metadata().comparator.compare(slice.end(), startIndex.firstName) < 0 && openMarker == null && sstable.descriptor.version.storeRows())
- return Collections.emptyIterator();
-
- return new AbstractIterator<Unfiltered>()
+ if (indexState.currentBlockIdx() == lastBlockIdx
+ && metadata().comparator.compare(slice.end(), indexState.currentIndex().firstName) < 0
+ && openMarker == null
+ && sstable.descriptor.version.storeRows())
{
- private boolean beforeStart = true;
- private int currentIndexIdx = startIdx;
+ sliceDone = true;
+ }
+ }
- protected Unfiltered computeNext()
- {
- try
- {
- // While we're before the start of the slice, we can skip row but we should keep
- // track of open range tombstones
- if (beforeStart)
- {
- // See ForwardReader equivalent method to see why this inequality is not strict.
- while (deserializer.hasNext() && deserializer.compareNextTo(slice.start()) <= 0)
- {
- if (deserializer.nextIsRow())
- deserializer.skipNext();
- else
- updateOpenMarker((RangeTombstoneMarker)deserializer.readNext());
- }
-
- beforeStart = false;
-
- // We've reached the beginning of our queried slice. If we have an open marker
- // we should return that first.
- if (openMarker != null)
- return new RangeTombstoneBoundMarker(slice.start(), openMarker);
- }
-
- // If we've crossed an index block boundary, update our informations
- if (currentIndexIdx < indexes.size() && file.bytesPastMark(mark) >= currentIndex().width)
- updateBlock(++currentIndexIdx);
-
- // Return the next atom unless we've reached the end, or we're beyond our slice
- // end (note that unless we're on the last block for the slice, there is no point
- // in checking the slice end).
- if (currentIndexIdx < indexes.size()
- && currentIndexIdx <= endIdx
- && deserializer.hasNext()
- && (currentIndexIdx != endIdx || deserializer.compareNextTo(slice.end()) <= 0))
- {
- Unfiltered next = deserializer.readNext();
- if (next.kind() == Unfiltered.Kind.RANGE_TOMBSTONE_MARKER)
- updateOpenMarker((RangeTombstoneMarker)next);
- return next;
- }
-
- // If we have an open marker, we should close it before finishing
- if (openMarker != null)
- return new RangeTombstoneBoundMarker(slice.end(), getAndClearOpenMarker());
-
- return endOfData();
- }
- catch (IOException e)
- {
- try
- {
- close();
- }
- catch (IOException suppressed)
- {
- e.addSuppressed(suppressed);
- }
- sstable.markSuspect();
- throw new CorruptSSTableException(e, file.getPath());
- }
- }
- };
+ @Override
+ protected Unfiltered computeNext() throws IOException
+ {
+ // Our previous read might have made us cross an index block boundary. If so, update our informations.
+ if (indexState.isPastCurrentBlock())
+ indexState.setToBlock(indexState.currentBlockIdx() + 1);
+
+ // Return the next unfiltered unless we've reached the end, or we're beyond our slice
+ // end (note that unless we're on the last block for the slice, there is no point
+ // in checking the slice end).
+ if (indexState.isDone()
+ || indexState.currentBlockIdx() > lastBlockIdx
+ || !deserializer.hasNext()
+ || (indexState.currentBlockIdx() == lastBlockIdx && deserializer.compareNextTo(end) > 0))
+ return null;
+
+
+ Unfiltered next = deserializer.readNext();
+ if (next.kind() == Unfiltered.Kind.RANGE_TOMBSTONE_MARKER)
+ updateOpenMarker((RangeTombstoneMarker)next);
+ return next;
}
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/columniterator/SSTableReversedIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/columniterator/SSTableReversedIterator.java b/src/java/org/apache/cassandra/db/columniterator/SSTableReversedIterator.java
index 0e18d4a..e15d330 100644
--- a/src/java/org/apache/cassandra/db/columniterator/SSTableReversedIterator.java
+++ b/src/java/org/apache/cassandra/db/columniterator/SSTableReversedIterator.java
@@ -20,29 +20,19 @@ package org.apache.cassandra.db.columniterator;
import java.io.IOException;
import java.util.*;
-import com.google.common.collect.AbstractIterator;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.filter.ColumnFilter;
import org.apache.cassandra.db.rows.*;
-import org.apache.cassandra.db.partitions.AbstractPartitionData;
+import org.apache.cassandra.db.partitions.AbstractThreadUnsafePartition;
import org.apache.cassandra.io.sstable.format.SSTableReader;
-import org.apache.cassandra.io.sstable.CorruptSSTableException;
-import org.apache.cassandra.io.sstable.IndexHelper;
import org.apache.cassandra.io.util.FileDataInput;
-import org.apache.cassandra.io.util.FileMark;
-import org.apache.cassandra.utils.ByteBufferUtil;
/**
* A Cell Iterator in reversed clustering order over SSTable
*/
public class SSTableReversedIterator extends AbstractSSTableIterator
{
- private static final Logger logger = LoggerFactory.getLogger(SSTableReversedIterator.class);
-
public SSTableReversedIterator(SSTableReader sstable, DecoratedKey key, ColumnFilter columns, boolean isForThrift)
{
this(sstable, null, key, sstable.getPosition(key, SSTableReader.Operator.EQ), columns, isForThrift);
@@ -70,319 +60,290 @@ public class SSTableReversedIterator extends AbstractSSTableIterator
return true;
}
- private ReusablePartitionData createBuffer(int blocksCount)
+ private class ReverseReader extends Reader
{
- int estimatedRowCount = 16;
- int columnCount = metadata().partitionColumns().regulars.columnCount();
- if (columnCount == 0 || metadata().clusteringColumns().size() == 0)
+ protected ReusablePartitionData buffer;
+ protected Iterator<Unfiltered> iterator;
+
+ private ReverseReader(FileDataInput file, boolean isAtPartitionStart, boolean shouldCloseFile)
{
- estimatedRowCount = 1;
+ super(file, isAtPartitionStart, shouldCloseFile);
}
- else
+
+ protected ReusablePartitionData createBuffer(int blocksCount)
{
- try
+ int estimatedRowCount = 16;
+ int columnCount = metadata().partitionColumns().regulars.columnCount();
+ if (columnCount == 0 || metadata().clusteringColumns().isEmpty())
{
- // To avoid wasted resizing we guess-estimate the number of rows we're likely to read. For that
- // we use the stats on the number of rows per partition for that sstable.
- // FIXME: so far we only keep stats on cells, so to get a rough estimate on the number of rows,
- // we divide by the number of regular columns the table has. We should fix once we collect the
- // stats on rows
- int estimatedRowsPerPartition = (int)(sstable.getEstimatedColumnCount().percentile(0.75) / columnCount);
- estimatedRowCount = Math.max(estimatedRowsPerPartition / blocksCount, 1);
+ estimatedRowCount = 1;
}
- catch (IllegalStateException e)
+ else
{
- // The EstimatedHistogram mean() method can throw this (if it overflows). While such overflow
- // shouldn't happen, it's not worth taking the risk of letting the exception bubble up.
+ try
+ {
+ // To avoid wasted resizing we guess-estimate the number of rows we're likely to read. For that
+ // we use the stats on the number of rows per partition for that sstable.
+ // FIXME: so far we only keep stats on cells, so to get a rough estimate on the number of rows,
+ // we divide by the number of regular columns the table has. We should fix once we collect the
+ // stats on rows
+ int estimatedRowsPerPartition = (int)(sstable.getEstimatedColumnCount().percentile(0.75) / columnCount);
+ estimatedRowCount = Math.max(estimatedRowsPerPartition / blocksCount, 1);
+ }
+ catch (IllegalStateException e)
+ {
+ // The EstimatedHistogram mean() method can throw this (if it overflows). While such overflow
+ // shouldn't happen, it's not worth taking the risk of letting the exception bubble up.
+ }
}
+ return new ReusablePartitionData(metadata(), partitionKey(), columns(), estimatedRowCount);
}
- return new ReusablePartitionData(metadata(), partitionKey(), DeletionTime.LIVE, columns(), estimatedRowCount);
- }
-
- private class ReverseReader extends Reader
- {
- private ReusablePartitionData partition;
- private UnfilteredRowIterator iterator;
- private ReverseReader(FileDataInput file, boolean isAtPartitionStart, boolean shouldCloseFile)
+ protected void init() throws IOException
{
- super(file, shouldCloseFile);
- assert isAtPartitionStart;
+ // We should always have been initialized (at the beginning of the partition). Only indexed readers may
+ // have to initialize.
+ throw new IllegalStateException();
}
- public boolean hasNext() throws IOException
+ public void setForSlice(Slice slice) throws IOException
{
- if (partition == null)
+ // If we have read the data, just create the iterator for the slice. Otherwise, read the data.
+ if (buffer == null)
{
- partition = createBuffer(1);
- partition.populateFrom(this, null, null, new Tester()
- {
- public boolean isDone()
- {
- return false;
- }
- });
- iterator = partition.unfilteredIterator(columns, Slices.ALL, true);
+ buffer = createBuffer(1);
+ // Note that we can reuse that buffer between slices (we could alternatively re-read from disk
+ // every time, but that feels more wasteful) so we want to include everything from the beginning.
+ // We can stop at the slice end however since any following slice will be before that.
+ loadFromDisk(null, slice.end());
}
+ setIterator(slice);
+ }
+
+ protected void setIterator(Slice slice)
+ {
+ assert buffer != null;
+ iterator = buffer.unfilteredIterator(columns, Slices.with(metadata().comparator, slice), true);
+ }
+
+ protected boolean hasNextInternal() throws IOException
+ {
+ // If we've never called setForSlice, we're reading everything
+ if (iterator == null)
+ setForSlice(Slice.ALL);
+
return iterator.hasNext();
}
- public Unfiltered next() throws IOException
+ protected Unfiltered nextInternal() throws IOException
{
if (!hasNext())
throw new NoSuchElementException();
return iterator.next();
}
- public Iterator<Unfiltered> slice(final Slice slice) throws IOException
+ protected boolean stopReadingDisk()
{
- if (partition == null)
+ return false;
+ }
+
+ // Reads the unfiltered from disk and load them into the reader buffer. It stops reading when either the partition
+ // is fully read, or when stopReadingDisk() returns true.
+ protected void loadFromDisk(Slice.Bound start, Slice.Bound end) throws IOException
+ {
+ buffer.reset();
+
+ // If the start might be in this block, skip everything that comes before it.
+ if (start != null)
{
- partition = createBuffer(1);
- partition.populateFrom(this, slice.start(), slice.end(), new Tester()
+ while (deserializer.hasNext() && deserializer.compareNextTo(start) <= 0 && !stopReadingDisk())
{
- public boolean isDone()
- {
- return false;
- }
- });
+ if (deserializer.nextIsRow())
+ deserializer.skipNext();
+ else
+ updateOpenMarker((RangeTombstoneMarker)deserializer.readNext());
+ }
}
- return partition.unfilteredIterator(columns, Slices.with(metadata().comparator, slice), true);
- }
- }
-
- private class ReverseIndexedReader extends IndexedReader
- {
- private ReusablePartitionData partition;
- private UnfilteredRowIterator iterator;
-
- private ReverseIndexedReader(RowIndexEntry indexEntry, FileDataInput file, boolean isAtPartitionStart, boolean shouldCloseFile)
- {
- super(file, shouldCloseFile, indexEntry, isAtPartitionStart);
- this.currentIndexIdx = indexEntry.columnsIndex().size();
- }
+ // If we have an open marker, it's either one from what we just skipped (if start != null), or it's from the previous index block.
+ if (openMarker != null)
+ {
+ RangeTombstone.Bound markerStart = start == null ? RangeTombstone.Bound.BOTTOM : RangeTombstone.Bound.fromSliceBound(start);
+ buffer.add(new RangeTombstoneBoundMarker(markerStart, openMarker));
+ }
- public boolean hasNext() throws IOException
- {
- // If it's called before we've created the file, create it. This then mean
- // we're reading from the end of the partition.
- if (!isInit)
+ // Now deserialize everything until we reach our requested end (if we have one)
+ while (deserializer.hasNext()
+ && (end == null || deserializer.compareNextTo(end) <= 0)
+ && !stopReadingDisk())
{
- seekToPosition(indexEntry.position);
- ByteBufferUtil.skipShortLength(file); // partition key
- DeletionTime.serializer.skip(file); // partition deletion
- if (sstable.header.hasStatic())
- UnfilteredSerializer.serializer.skipStaticRow(file, sstable.header, helper);
- isInit = true;
+ Unfiltered unfiltered = deserializer.readNext();
+ buffer.add(unfiltered);
+
+ if (unfiltered.isRangeTombstoneMarker())
+ updateOpenMarker((RangeTombstoneMarker)unfiltered);
}
- if (partition == null)
+ // If we have an open marker, we should close it before finishing
+ if (openMarker != null)
{
- partition = createBuffer(indexes.size());
- partition.populateFrom(this, null, null, new Tester()
- {
- public boolean isDone()
- {
- return false;
- }
- });
- iterator = partition.unfilteredIterator(columns, Slices.ALL, true);
+ // If we have no end and still an openMarker, this means we're indexed and the marker is closed in a following block.
+ RangeTombstone.Bound markerEnd = end == null ? RangeTombstone.Bound.TOP : RangeTombstone.Bound.fromSliceBound(end);
+ buffer.add(new RangeTombstoneBoundMarker(markerEnd, getAndClearOpenMarker()));
}
- return iterator.hasNext();
+ buffer.build();
}
+ }
+
+ private class ReverseIndexedReader extends ReverseReader
+ {
+ private final IndexState indexState;
- public Unfiltered next() throws IOException
+ // The slice we're currently iterating over
+ private Slice slice;
+ // The last index block to consider for the slice
+ private int lastBlockIdx;
+
+ private ReverseIndexedReader(RowIndexEntry indexEntry, FileDataInput file, boolean isAtPartitionStart, boolean shouldCloseFile)
{
- if (!hasNext())
- throw new NoSuchElementException();
- return iterator.next();
+ super(file, isAtPartitionStart, shouldCloseFile);
+ this.indexState = new IndexState(this, sstable.metadata.comparator, indexEntry, true);
}
- private void prepareBlock(int blockIdx, Slice.Bound start, Slice.Bound end) throws IOException
+ protected void init() throws IOException
{
- updateBlock(blockIdx);
-
- if (partition == null)
- partition = createBuffer(indexes.size());
- else
- partition.clear();
-
- final FileMark fileMark = mark;
- final long width = currentIndex().width;
-
- partition.populateFrom(this, start, end, new Tester()
- {
- public boolean isDone()
- {
- return file.bytesPastMark(fileMark) >= width;
- }
- });
+ // If this is called, it means we're calling hasNext() before any call to setForSlice. Which means
+ // we're reading everything from the end. So just set us up on the last block.
+ indexState.setToBlock(indexState.blocksCount() - 1);
}
@Override
- public Iterator<Unfiltered> slice(final Slice slice) throws IOException
+ public void setForSlice(Slice slice) throws IOException
{
- // if our previous slicing already got us the smallest row in the sstable, we're done
- if (currentIndexIdx < 0)
- return Collections.emptyIterator();
+ this.slice = slice;
+ isInit = true;
- final List<IndexHelper.IndexInfo> indexes = indexEntry.columnsIndex();
+ // if our previous slicing already got us pas the beginning of the sstable, we're done
+ if (indexState.isDone())
+ {
+ iterator = Collections.emptyIterator();
+ return;
+ }
// Find the first index block we'll need to read for the slice.
- final int startIdx = IndexHelper.indexFor(slice.end(), indexes, sstable.metadata.comparator, true, currentIndexIdx);
+ int startIdx = indexState.findBlockIndex(slice.end());
if (startIdx < 0)
- return Collections.emptyIterator();
+ {
+ iterator = Collections.emptyIterator();
+ return;
+ }
- // Find the last index block we'll need to read for the slice.
- int lastIdx = IndexHelper.indexFor(slice.start(), indexes, sstable.metadata.comparator, true, startIdx);
+ boolean isCurrentBlock = startIdx == indexState.currentBlockIdx();
+ if (!isCurrentBlock)
+ indexState.setToBlock(startIdx);
- // The index search is by firstname and so lastIdx is such that
- // indexes[lastIdx].firstName < slice.start <= indexes[lastIdx + 1].firstName
- // However, if indexes[lastIdx].lastName < slice.start we can bump lastIdx.
- if (lastIdx >= 0 && metadata().comparator.compare(indexes.get(lastIdx).lastName, slice.start()) < 0)
- ++lastIdx;
+ lastBlockIdx = indexState.findBlockIndex(slice.start());
- final int endIdx = lastIdx;
+ if (!isCurrentBlock)
+ readCurrentBlock(true);
- // Because we're reversed, even if it is our current block, we should re-prepare the block since we would
- // have skipped anything not in the previous slice.
- prepareBlock(startIdx, slice.start(), slice.end());
+ setIterator(slice);
+ }
- return new AbstractIterator<Unfiltered>()
- {
- private Iterator<Unfiltered> currentBlockIterator = partition.unfilteredIterator(columns, Slices.with(metadata().comparator, slice), true);
+ @Override
+ protected boolean hasNextInternal() throws IOException
+ {
+ if (super.hasNextInternal())
+ return true;
+
+ // We have nothing more for our current block, move the previous one.
+ int previousBlockIdx = indexState.currentBlockIdx() - 1;
+ if (previousBlockIdx < 0 || previousBlockIdx < lastBlockIdx)
+ return false;
+
+ // The slice start can be in
+ indexState.setToBlock(previousBlockIdx);
+ readCurrentBlock(false);
+ setIterator(slice);
+ // since that new block is within the bounds we've computed in setToSlice(), we know there will
+ // always be something matching the slice unless we're on the lastBlockIdx (in which case there
+ // may or may not be results, but if there isn't, we're done for the slice).
+ return iterator.hasNext();
+ }
- protected Unfiltered computeNext()
- {
- try
- {
- if (currentBlockIterator.hasNext())
- return currentBlockIterator.next();
-
- --currentIndexIdx;
- if (currentIndexIdx < 0 || currentIndexIdx < endIdx)
- return endOfData();
-
- // Note that since we know we're read blocks backward, there is no point in checking the slice end, so we pass null
- prepareBlock(currentIndexIdx, slice.start(), null);
- currentBlockIterator = partition.unfilteredIterator(columns, Slices.with(metadata().comparator, slice), true);
- return computeNext();
- }
- catch (IOException e)
- {
- try
- {
- close();
- }
- catch (IOException suppressed)
- {
- e.addSuppressed(suppressed);
- }
- sstable.markSuspect();
- throw new CorruptSSTableException(e, file.getPath());
- }
- }
- };
+ /**
+ * Reads the current block, the last one we've set.
+ *
+ * @param canIncludeSliceEnd whether the block can include the slice end.
+ */
+ private void readCurrentBlock(boolean canIncludeSliceEnd) throws IOException
+ {
+ if (buffer == null)
+ buffer = createBuffer(indexState.blocksCount());
+
+ boolean canIncludeSliceStart = indexState.currentBlockIdx() == lastBlockIdx;
+ loadFromDisk(canIncludeSliceStart ? slice.start() : null, canIncludeSliceEnd ? slice.end() : null);
}
- }
- private abstract class Tester
- {
- public abstract boolean isDone();
+ @Override
+ protected boolean stopReadingDisk()
+ {
+ return indexState.isPastCurrentBlock();
+ }
}
- private class ReusablePartitionData extends AbstractPartitionData
+ private class ReusablePartitionData extends AbstractThreadUnsafePartition
{
- private final Writer rowWriter;
- private final RangeTombstoneCollector markerWriter;
+ private MutableDeletionInfo.Builder deletionBuilder;
+ private MutableDeletionInfo deletionInfo;
private ReusablePartitionData(CFMetaData metadata,
DecoratedKey partitionKey,
- DeletionTime deletionTime,
PartitionColumns columns,
int initialRowCapacity)
{
- super(metadata, partitionKey, deletionTime, columns, initialRowCapacity, false);
+ super(metadata, partitionKey, columns, new ArrayList<>(initialRowCapacity));
+ }
- this.rowWriter = new Writer(true);
- // Note that even though the iterator handles the reverse case, this object holds the data for a single index bock, and we read index blocks in
- // forward clustering order.
- this.markerWriter = new RangeTombstoneCollector(false);
+ public DeletionInfo deletionInfo()
+ {
+ return deletionInfo;
}
- // Note that this method is here rather than in the readers because we want to use it for both readers and they
- // don't extend one another
- private void populateFrom(Reader reader, Slice.Bound start, Slice.Bound end, Tester tester) throws IOException
+ protected boolean canHaveShadowedData()
{
- // If we have a start bound, skip everything that comes before it.
- while (reader.deserializer.hasNext() && start != null && reader.deserializer.compareNextTo(start) <= 0 && !tester.isDone())
- {
- if (reader.deserializer.nextIsRow())
- reader.deserializer.skipNext();
- else
- reader.updateOpenMarker((RangeTombstoneMarker)reader.deserializer.readNext());
- }
+ return false;
+ }
- // If we have an open marker, it's either one from what we just skipped (if start != null), or it's from the previous index block.
- if (reader.openMarker != null)
- {
- // If we have no start but still an openMarker, this means we're indexed and it's coming from the previous block
- Slice.Bound markerStart = start;
- if (start == null)
- {
- ClusteringPrefix c = ((IndexedReader)reader).previousIndex().lastName;
- markerStart = Slice.Bound.exclusiveStartOf(c);
- }
- writeMarker(markerStart, reader.openMarker);
- }
+ public Row staticRow()
+ {
+ return Rows.EMPTY_STATIC_ROW; // we don't actually use that
+ }
- // Now deserialize everything until we reach our requested end (if we have one)
- while (reader.deserializer.hasNext()
- && (end == null || reader.deserializer.compareNextTo(end) <= 0)
- && !tester.isDone())
- {
- Unfiltered unfiltered = reader.deserializer.readNext();
- if (unfiltered.kind() == Unfiltered.Kind.ROW)
- {
- ((Row) unfiltered).copyTo(rowWriter);
- }
- else
- {
- RangeTombstoneMarker marker = (RangeTombstoneMarker) unfiltered;
- reader.updateOpenMarker(marker);
- marker.copyTo(markerWriter);
- }
- }
+ public RowStats stats()
+ {
+ return RowStats.NO_STATS; // we don't actually use that
+ }
- // If we have an open marker, we should close it before finishing
- if (reader.openMarker != null)
- {
- // If we no end and still an openMarker, this means we're indexed and the marker can be close using the blocks end
- Slice.Bound markerEnd = end;
- if (end == null)
- {
- ClusteringPrefix c = ((IndexedReader)reader).currentIndex().lastName;
- markerEnd = Slice.Bound.inclusiveEndOf(c);
- }
- writeMarker(markerEnd, reader.getAndClearOpenMarker());
- }
+ public void add(Unfiltered unfiltered)
+ {
+ if (unfiltered.isRow())
+ rows.add((Row)unfiltered);
+ else
+ deletionBuilder.add((RangeTombstoneMarker)unfiltered);
}
- private void writeMarker(Slice.Bound bound, DeletionTime dt)
+ public void reset()
{
- bound.writeTo(markerWriter);
- markerWriter.writeBoundDeletion(dt);
- markerWriter.endOfMarker();
+ rows.clear();
+ deletionBuilder = MutableDeletionInfo.builder(partitionLevelDeletion, metadata().comparator, false);
}
- @Override
- public void clear()
+ public void build()
{
- super.clear();
- rowWriter.reset();
- markerWriter.reset();
+ deletionInfo = deletionBuilder.build();
+ deletionBuilder = null;
}
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java b/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
index b3cb370..0149582 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
@@ -26,7 +26,6 @@ import org.apache.cassandra.db.index.SecondaryIndexManager;
import org.apache.cassandra.db.partitions.*;
import org.apache.cassandra.db.rows.*;
import org.apache.cassandra.io.sstable.ISSTableScanner;
-import org.apache.cassandra.io.sstable.format.SSTableFormat;
import org.apache.cassandra.metrics.CompactionMetrics;
/**
@@ -34,7 +33,7 @@ import org.apache.cassandra.metrics.CompactionMetrics;
* <p>
* On top of the actual merging the source iterators, this class:
* <ul>
- * <li>purge gc-able tombstones if possible (see PurgingPartitionIterator below).</li>
+ * <li>purge gc-able tombstones if possible (see PurgeIterator below).</li>
* <li>update 2ndary indexes if necessary (as we don't read-before-write on index updates, index entries are
* not deleted on deletion of the base table data, which is ok because we'll fix index inconsistency
* on reads. This however mean that potentially obsolete index entries could be kept a long time for
@@ -65,12 +64,9 @@ public class CompactionIterator extends CompactionInfo.Holder implements Unfilte
*/
private final long[] mergeCounters;
- private final UnfilteredPartitionIterator mergedIterator;
+ private final UnfilteredPartitionIterator compacted;
private final CompactionMetrics metrics;
- // The number of row/RT merged by the iterator
- private int merged;
-
public CompactionIterator(OperationType type, List<ISSTableScanner> scanners, CompactionController controller, int nowInSec, UUID compactionId)
{
this(type, scanners, controller, nowInSec, compactionId, null);
@@ -96,9 +92,9 @@ public class CompactionIterator extends CompactionInfo.Holder implements Unfilte
if (metrics != null)
metrics.beginCompaction(this);
- this.mergedIterator = scanners.isEmpty()
- ? UnfilteredPartitionIterators.EMPTY
- : UnfilteredPartitionIterators.convertExpiredCellsToTombstones(new PurgingPartitionIterator(UnfilteredPartitionIterators.merge(scanners, nowInSec, listener()), controller), nowInSec);
+ this.compacted = scanners.isEmpty()
+ ? UnfilteredPartitionIterators.EMPTY
+ : new PurgeIterator(UnfilteredPartitionIterators.merge(scanners, nowInSec, listener()), controller);
}
public boolean isForThrift()
@@ -143,57 +139,46 @@ public class CompactionIterator extends CompactionInfo.Holder implements Unfilte
CompactionIterator.this.updateCounterFor(merged);
- /*
- * The row level listener does 2 things:
- * - It updates 2ndary indexes for deleted/shadowed cells
- * - It updates progress regularly (every UNFILTERED_TO_UPDATE_PROGRESS)
- */
- final SecondaryIndexManager.Updater indexer = type == OperationType.COMPACTION
- ? controller.cfs.indexManager.gcUpdaterFor(partitionKey, nowInSec)
- : SecondaryIndexManager.nullUpdater;
+ if (type != OperationType.COMPACTION || !controller.cfs.indexManager.hasIndexes())
+ return null;
- return new UnfilteredRowIterators.MergeListener()
+ // If we have a 2ndary index, we must update it with deleted/shadowed cells.
+ // TODO: this should probably be done asynchronously and batched.
+ final SecondaryIndexManager.Updater indexer = controller.cfs.indexManager.gcUpdaterFor(partitionKey, nowInSec);
+ final RowDiffListener diffListener = new RowDiffListener()
{
- private Clustering clustering;
+ public void onPrimaryKeyLivenessInfo(int i, Clustering clustering, LivenessInfo merged, LivenessInfo original)
+ {
+ }
- public void onMergePartitionLevelDeletion(DeletionTime mergedDeletion, DeletionTime[] versions)
+ public void onDeletion(int i, Clustering clustering, DeletionTime merged, DeletionTime original)
{
}
- public void onMergingRows(Clustering clustering, LivenessInfo mergedInfo, DeletionTime mergedDeletion, Row[] versions)
+ public void onComplexDeletion(int i, Clustering clustering, ColumnDefinition column, DeletionTime merged, DeletionTime original)
{
- this.clustering = clustering;
}
- public void onMergedComplexDeletion(ColumnDefinition c, DeletionTime mergedCompositeDeletion, DeletionTime[] versions)
+ public void onCell(int i, Clustering clustering, Cell merged, Cell original)
{
+ if (original != null && (merged == null || !merged.isLive(nowInSec)))
+ indexer.remove(clustering, original);
}
+ };
- public void onMergedCells(Cell mergedCell, Cell[] versions)
+ return new UnfilteredRowIterators.MergeListener()
+ {
+ public void onMergedPartitionLevelDeletion(DeletionTime mergedDeletion, DeletionTime[] versions)
{
- if (indexer == SecondaryIndexManager.nullUpdater)
- return;
-
- for (int i = 0; i < versions.length; i++)
- {
- Cell version = versions[i];
- if (version != null && (mergedCell == null || !mergedCell.equals(version)))
- indexer.remove(clustering, version);
- }
}
- public void onRowDone()
+ public void onMergedRows(Row merged, Columns columns, Row[] versions)
{
- int merged = ++CompactionIterator.this.merged;
- if (merged % UNFILTERED_TO_UPDATE_PROGRESS == 0)
- updateBytesRead();
+ Rows.diff(merged, columns, versions, diffListener);
}
public void onMergedRangeTombstoneMarkers(RangeTombstoneMarker mergedMarker, RangeTombstoneMarker[] versions)
{
- int merged = ++CompactionIterator.this.merged;
- if (merged % UNFILTERED_TO_UPDATE_PROGRESS == 0)
- updateBytesRead();
}
public void close()
@@ -218,12 +203,12 @@ public class CompactionIterator extends CompactionInfo.Holder implements Unfilte
public boolean hasNext()
{
- return mergedIterator.hasNext();
+ return compacted.hasNext();
}
public UnfilteredRowIterator next()
{
- return mergedIterator.next();
+ return compacted.next();
}
public void remove()
@@ -235,7 +220,7 @@ public class CompactionIterator extends CompactionInfo.Holder implements Unfilte
{
try
{
- mergedIterator.close();
+ compacted.close();
}
finally
{
@@ -249,7 +234,7 @@ public class CompactionIterator extends CompactionInfo.Holder implements Unfilte
return this.getCompactionInfo().toString();
}
- private class PurgingPartitionIterator extends TombstonePurgingPartitionIterator
+ private class PurgeIterator extends PurgingPartitionIterator
{
private final CompactionController controller;
@@ -257,28 +242,33 @@ public class CompactionIterator extends CompactionInfo.Holder implements Unfilte
private long maxPurgeableTimestamp;
private boolean hasCalculatedMaxPurgeableTimestamp;
- private PurgingPartitionIterator(UnfilteredPartitionIterator toPurge, CompactionController controller)
+ private long compactedUnfiltered;
+
+ private PurgeIterator(UnfilteredPartitionIterator toPurge, CompactionController controller)
{
super(toPurge, controller.gcBefore);
this.controller = controller;
}
@Override
- protected void onEmpty(DecoratedKey key)
+ protected void onEmptyPartitionPostPurge(DecoratedKey key)
{
if (type == OperationType.COMPACTION)
controller.cfs.invalidateCachedPartition(key);
}
@Override
- protected boolean shouldFilter(UnfilteredRowIterator iterator)
+ protected void onNewPartition(DecoratedKey key)
{
- currentKey = iterator.partitionKey();
+ currentKey = key;
hasCalculatedMaxPurgeableTimestamp = false;
+ }
- // TODO: we could be able to skip filtering if UnfilteredRowIterator was giving us some stats
- // (like the smallest local deletion time).
- return true;
+ @Override
+ protected void updateProgress()
+ {
+ if ((++compactedUnfiltered) % UNFILTERED_TO_UPDATE_PROGRESS == 0)
+ updateBytesRead();
}
/*
http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/filter/AbstractClusteringIndexFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/AbstractClusteringIndexFilter.java b/src/java/org/apache/cassandra/db/filter/AbstractClusteringIndexFilter.java
index 29ea7fe..ed7584b 100644
--- a/src/java/org/apache/cassandra/db/filter/AbstractClusteringIndexFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/AbstractClusteringIndexFilter.java
@@ -17,13 +17,13 @@
*/
package org.apache.cassandra.db.filter;
-import java.io.DataInput;
import java.io.IOException;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.ColumnDefinition;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.marshal.ReversedType;
+import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputPlus;
public abstract class AbstractClusteringIndexFilter implements ClusteringIndexFilter
@@ -68,7 +68,7 @@ public abstract class AbstractClusteringIndexFilter implements ClusteringIndexFi
int i = 0;
for (ColumnDefinition column : metadata.clusteringColumns())
sb.append(i++ == 0 ? "" : ", ").append(column.name).append(column.type instanceof ReversedType ? " ASC" : " DESC");
- sb.append(")");
+ sb.append(')');
}
}
@@ -84,7 +84,7 @@ public abstract class AbstractClusteringIndexFilter implements ClusteringIndexFi
filter.serializeInternal(out, version);
}
- public ClusteringIndexFilter deserialize(DataInput in, int version, CFMetaData metadata) throws IOException
+ public ClusteringIndexFilter deserialize(DataInputPlus in, int version, CFMetaData metadata) throws IOException
{
Kind kind = Kind.values()[in.readUnsignedByte()];
boolean reversed = in.readBoolean();
@@ -104,6 +104,6 @@ public abstract class AbstractClusteringIndexFilter implements ClusteringIndexFi
protected static abstract class InternalDeserializer
{
- public abstract ClusteringIndexFilter deserialize(DataInput in, int version, CFMetaData metadata, boolean reversed) throws IOException;
+ public abstract ClusteringIndexFilter deserialize(DataInputPlus in, int version, CFMetaData metadata, boolean reversed) throws IOException;
}
}