You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2015/06/30 12:47:42 UTC
[18/51] [partial] cassandra git commit: Storage engine refactor,
a.k.a CASSANDRA-8099
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java
new file mode 100644
index 0000000..2c71cf3
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java
@@ -0,0 +1,770 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.rows;
+
+import java.nio.ByteBuffer;
+import java.util.*;
+import java.security.MessageDigest;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import com.google.common.collect.AbstractIterator;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.partitions.PartitionUpdate;
+import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.io.sstable.CorruptSSTableException;
+import org.apache.cassandra.serializers.MarshalException;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.IMergeIterator;
+import org.apache.cassandra.utils.MergeIterator;
+import org.apache.cassandra.utils.memory.AbstractAllocator;
+
+/**
+ * Static methods to work with atom iterators.
+ */
+public abstract class UnfilteredRowIterators
+{
+ private static final Logger logger = LoggerFactory.getLogger(UnfilteredRowIterators.class);
+
+ private UnfilteredRowIterators() {}
+
+ public interface MergeListener
+ {
+ public void onMergePartitionLevelDeletion(DeletionTime mergedDeletion, DeletionTime[] versions);
+
+ public void onMergingRows(Clustering clustering, LivenessInfo mergedInfo, DeletionTime mergedDeletion, Row[] versions);
+ public void onMergedComplexDeletion(ColumnDefinition c, DeletionTime mergedComplexDeletion, DeletionTime[] versions);
+ public void onMergedCells(Cell mergedCell, Cell[] versions);
+ public void onRowDone();
+
+ public void onMergedRangeTombstoneMarkers(RangeTombstoneMarker merged, RangeTombstoneMarker[] versions);
+
+ public void close();
+ }
+
+ /**
+ * Returns a iterator that only returns rows with only live content.
+ *
+ * This is mainly used in the CQL layer when we know we don't care about deletion
+ * infos (and since an UnfilteredRowIterator cannot shadow it's own data, we know everyting
+ * returned isn't shadowed by a tombstone).
+ */
+ public static RowIterator filter(UnfilteredRowIterator iter, int nowInSec)
+ {
+ return new FilteringIterator(iter, nowInSec);
+
+ }
+
+ /**
+ * Returns an iterator that is the result of merging other iterators.
+ */
+ public static UnfilteredRowIterator merge(List<UnfilteredRowIterator> iterators, int nowInSec)
+ {
+ assert !iterators.isEmpty();
+ if (iterators.size() == 1)
+ return iterators.get(0);
+
+ return UnfilteredRowMergeIterator.create(iterators, nowInSec, null);
+ }
+
+ /**
+ * Returns an iterator that is the result of merging other iterators, and using
+ * specific MergeListener.
+ *
+ * Note that this method assumes that there is at least 2 iterators to merge.
+ */
+ public static UnfilteredRowIterator merge(List<UnfilteredRowIterator> iterators, int nowInSec, MergeListener mergeListener)
+ {
+ assert mergeListener != null;
+ return UnfilteredRowMergeIterator.create(iterators, nowInSec, mergeListener);
+ }
+
+ /**
+ * Returns an empty atom iterator for a given partition.
+ */
+ public static UnfilteredRowIterator emptyIterator(final CFMetaData cfm, final DecoratedKey partitionKey, final boolean isReverseOrder)
+ {
+ return new UnfilteredRowIterator()
+ {
+ public CFMetaData metadata()
+ {
+ return cfm;
+ }
+
+ public boolean isReverseOrder()
+ {
+ return isReverseOrder;
+ }
+
+ public PartitionColumns columns()
+ {
+ return PartitionColumns.NONE;
+ }
+
+ public DecoratedKey partitionKey()
+ {
+ return partitionKey;
+ }
+
+ public DeletionTime partitionLevelDeletion()
+ {
+ return DeletionTime.LIVE;
+ }
+
+ public Row staticRow()
+ {
+ return Rows.EMPTY_STATIC_ROW;
+ }
+
+ public RowStats stats()
+ {
+ return RowStats.NO_STATS;
+ }
+
+ public boolean hasNext()
+ {
+ return false;
+ }
+
+ public Unfiltered next()
+ {
+ throw new NoSuchElementException();
+ }
+
+ public void remove()
+ {
+ }
+
+ public void close()
+ {
+ }
+ };
+ }
+
+ public static void digest(UnfilteredRowIterator iterator, MessageDigest digest)
+ {
+ // TODO: we're not computing digest the same way that old nodes. This
+ // means we'll have digest mismatches during upgrade. We should pass the messaging version of
+ // the node this is for (which might mean computing the digest last, and won't work
+ // for schema (where we announce the version through gossip to everyone))
+ digest.update(iterator.partitionKey().getKey().duplicate());
+ iterator.partitionLevelDeletion().digest(digest);
+ iterator.columns().digest(digest);
+ FBUtilities.updateWithBoolean(digest, iterator.isReverseOrder());
+ iterator.staticRow().digest(digest);
+
+ while (iterator.hasNext())
+ {
+ Unfiltered unfiltered = iterator.next();
+ if (unfiltered.kind() == Unfiltered.Kind.ROW)
+ ((Row) unfiltered).digest(digest);
+ else
+ ((RangeTombstoneMarker) unfiltered).digest(digest);
+ }
+ }
+
+ /**
+ * Returns an iterator that concatenate two atom iterators.
+ * This method assumes that both iterator are from the same partition and that the atom from
+ * {@code iter2} come after the ones of {@code iter1} (that is, that concatenating the iterator
+ * make sense).
+ */
+ public static UnfilteredRowIterator concat(final UnfilteredRowIterator iter1, final UnfilteredRowIterator iter2)
+ {
+ assert iter1.metadata().cfId.equals(iter2.metadata().cfId)
+ && iter1.partitionKey().equals(iter2.partitionKey())
+ && iter1.partitionLevelDeletion().equals(iter2.partitionLevelDeletion())
+ && iter1.isReverseOrder() == iter2.isReverseOrder()
+ && iter1.columns().equals(iter2.columns())
+ && iter1.staticRow().equals(iter2.staticRow());
+
+ return new AbstractUnfilteredRowIterator(iter1.metadata(),
+ iter1.partitionKey(),
+ iter1.partitionLevelDeletion(),
+ iter1.columns(),
+ iter1.staticRow(),
+ iter1.isReverseOrder(),
+ iter1.stats())
+ {
+ protected Unfiltered computeNext()
+ {
+ if (iter1.hasNext())
+ return iter1.next();
+
+ return iter2.hasNext() ? iter2.next() : endOfData();
+ }
+
+ @Override
+ public void close()
+ {
+ try
+ {
+ iter1.close();
+ }
+ finally
+ {
+ iter2.close();
+ }
+ }
+ };
+ }
+
+ public static UnfilteredRowIterator cloningIterator(UnfilteredRowIterator iterator, final AbstractAllocator allocator)
+ {
+ return new WrappingUnfilteredRowIterator(iterator)
+ {
+ private final CloningRow cloningRow = new CloningRow();
+ private final RangeTombstoneMarker.Builder markerBuilder = new RangeTombstoneMarker.Builder(iterator.metadata().comparator.size());
+
+ public Unfiltered next()
+ {
+ Unfiltered next = super.next();
+ return next.kind() == Unfiltered.Kind.ROW
+ ? cloningRow.setTo((Row)next)
+ : clone((RangeTombstoneMarker)next);
+ }
+
+ private RangeTombstoneMarker clone(RangeTombstoneMarker marker)
+ {
+ markerBuilder.reset();
+
+ RangeTombstone.Bound bound = marker.clustering();
+ for (int i = 0; i < bound.size(); i++)
+ markerBuilder.writeClusteringValue(allocator.clone(bound.get(i)));
+ markerBuilder.writeBoundKind(bound.kind());
+ if (marker.isBoundary())
+ {
+ RangeTombstoneBoundaryMarker bm = (RangeTombstoneBoundaryMarker)marker;
+ markerBuilder.writeBoundaryDeletion(bm.endDeletionTime(), bm.startDeletionTime());
+ }
+ else
+ {
+ markerBuilder.writeBoundDeletion(((RangeTombstoneBoundMarker)marker).deletionTime());
+ }
+ markerBuilder.endOfMarker();
+ return markerBuilder.build();
+ }
+
+ class CloningRow extends WrappingRow
+ {
+ private final CloningClustering cloningClustering = new CloningClustering();
+ private final CloningCell cloningCell = new CloningCell();
+
+ protected Cell filterCell(Cell cell)
+ {
+ return cloningCell.setTo(cell);
+ }
+
+ @Override
+ public Clustering clustering()
+ {
+ return cloningClustering.setTo(super.clustering());
+ }
+ }
+
+ class CloningClustering extends Clustering
+ {
+ private Clustering wrapped;
+
+ public Clustering setTo(Clustering wrapped)
+ {
+ this.wrapped = wrapped;
+ return this;
+ }
+
+ public int size()
+ {
+ return wrapped.size();
+ }
+
+ public ByteBuffer get(int i)
+ {
+ ByteBuffer value = wrapped.get(i);
+ return value == null ? null : allocator.clone(value);
+ }
+
+ public ByteBuffer[] getRawValues()
+ {
+ throw new UnsupportedOperationException();
+ }
+ }
+
+ class CloningCell extends AbstractCell
+ {
+ private Cell wrapped;
+
+ public Cell setTo(Cell wrapped)
+ {
+ this.wrapped = wrapped;
+ return this;
+ }
+
+ public ColumnDefinition column()
+ {
+ return wrapped.column();
+ }
+
+ public boolean isCounterCell()
+ {
+ return wrapped.isCounterCell();
+ }
+
+ public ByteBuffer value()
+ {
+ return allocator.clone(wrapped.value());
+ }
+
+ public LivenessInfo livenessInfo()
+ {
+ return wrapped.livenessInfo();
+ }
+
+ public CellPath path()
+ {
+ CellPath path = wrapped.path();
+ if (path == null)
+ return null;
+
+ assert path.size() == 1;
+ return CellPath.create(allocator.clone(path.get(0)));
+ }
+ }
+ };
+ }
+
+ /**
+ * Turns the given iterator into an update.
+ *
+ * Warning: this method does not close the provided iterator, it is up to
+ * the caller to close it.
+ */
+ public static PartitionUpdate toUpdate(UnfilteredRowIterator iterator)
+ {
+ PartitionUpdate update = new PartitionUpdate(iterator.metadata(), iterator.partitionKey(), iterator.columns(), 1);
+
+ update.addPartitionDeletion(iterator.partitionLevelDeletion());
+
+ if (iterator.staticRow() != Rows.EMPTY_STATIC_ROW)
+ iterator.staticRow().copyTo(update.staticWriter());
+
+ while (iterator.hasNext())
+ {
+ Unfiltered unfiltered = iterator.next();
+ if (unfiltered.kind() == Unfiltered.Kind.ROW)
+ ((Row) unfiltered).copyTo(update.writer());
+ else
+ ((RangeTombstoneMarker) unfiltered).copyTo(update.markerWriter(iterator.isReverseOrder()));
+ }
+
+ return update;
+ }
+
+ /**
+ * Validate that the data of the provided iterator is valid, that is that the values
+ * it contains are valid for the type they represent, and more generally that the
+ * infos stored are sensible.
+ *
+ * This is mainly used by scrubber to detect problems in sstables.
+ *
+ * @param iterator the partition to check.
+ * @param filename the name of the file the data is comming from.
+ * @return an iterator that returns the same data than {@code iterator} but that
+ * checks said data and throws a {@code CorruptedSSTableException} if it detects
+ * invalid data.
+ */
+ public static UnfilteredRowIterator withValidation(UnfilteredRowIterator iterator, final String filename)
+ {
+ return new WrappingUnfilteredRowIterator(iterator)
+ {
+ public Unfiltered next()
+ {
+ Unfiltered next = super.next();
+ try
+ {
+ next.validateData(metadata());
+ return next;
+ }
+ catch (MarshalException me)
+ {
+ throw new CorruptSSTableException(me, filename);
+ }
+ }
+ };
+ }
+
+ /**
+ * Convert all expired cells to equivalent tombstones.
+ * <p>
+ * Once a cell expires, it acts exactly as a tombstone and this until it is purged. But in particular that
+ * means we don't care about the value of an expired cell, and it is thus equivalent but more efficient to
+ * replace the expired cell by an equivalent tombstone (that has no value).
+ *
+ * @param iterator the iterator in which to conver expired cells.
+ * @param nowInSec the current time to use to decide if a cell is expired.
+ * @return an iterator that returns the same data than {@code iterator} but with all expired cells converted
+ * to equivalent tombstones.
+ */
+ public static UnfilteredRowIterator convertExpiredCellsToTombstones(UnfilteredRowIterator iterator, final int nowInSec)
+ {
+ return new FilteringRowIterator(iterator)
+ {
+ protected FilteringRow makeRowFilter()
+ {
+ return new FilteringRow()
+ {
+ @Override
+ protected Cell filterCell(Cell cell)
+ {
+ Cell filtered = super.filterCell(cell);
+ if (filtered == null)
+ return null;
+
+ LivenessInfo info = filtered.livenessInfo();
+ if (info.hasTTL() && !filtered.isLive(nowInSec))
+ {
+ // The column is now expired, we can safely return a simple tombstone. Note that as long as the expiring
+ // column and the tombstone put together live longer than GC grace seconds, we'll fulfil our responsibility
+ // to repair. See discussion at
+ // http://cassandra-user-incubator-apache-org.3065146.n2.nabble.com/repair-compaction-and-tombstone-rows-td7583481.html
+ return Cells.create(filtered.column(),
+ filtered.isCounterCell(),
+ ByteBufferUtil.EMPTY_BYTE_BUFFER,
+ SimpleLivenessInfo.forDeletion(info.timestamp(), info.localDeletionTime() - info.ttl()),
+ filtered.path());
+ }
+ else
+ {
+ return filtered;
+ }
+ }
+ };
+ }
+ };
+ }
+
+ /**
+ * Wraps the provided iterator so it logs the returned atoms for debugging purposes.
+ * <p>
+ * Note that this is only meant for debugging as this can log a very large amount of
+ * logging at INFO.
+ */
+ public static UnfilteredRowIterator loggingIterator(UnfilteredRowIterator iterator, final String id, final boolean fullDetails)
+ {
+ CFMetaData metadata = iterator.metadata();
+ logger.info("[{}] Logging iterator on {}.{}, partition key={}, reversed={}, deletion={}",
+ id,
+ metadata.ksName,
+ metadata.cfName,
+ metadata.getKeyValidator().getString(iterator.partitionKey().getKey()),
+ iterator.isReverseOrder(),
+ iterator.partitionLevelDeletion().markedForDeleteAt());
+
+ return new WrappingUnfilteredRowIterator(iterator)
+ {
+ @Override
+ public Row staticRow()
+ {
+ Row row = super.staticRow();
+ if (!row.isEmpty())
+ logger.info("[{}] {}", id, row.toString(metadata(), fullDetails));
+ return row;
+ }
+
+ @Override
+ public Unfiltered next()
+ {
+ Unfiltered next = super.next();
+ if (next.kind() == Unfiltered.Kind.ROW)
+ logger.info("[{}] {}", id, ((Row)next).toString(metadata(), fullDetails));
+ else
+ logger.info("[{}] {}", id, ((RangeTombstoneMarker)next).toString(metadata()));
+ return next;
+ }
+ };
+ }
+
+ /**
+ * A wrapper over MergeIterator to implement the UnfilteredRowIterator interface.
+ */
+ private static class UnfilteredRowMergeIterator extends AbstractUnfilteredRowIterator
+ {
+ private final IMergeIterator<Unfiltered, Unfiltered> mergeIterator;
+ private final MergeListener listener;
+
+ private UnfilteredRowMergeIterator(CFMetaData metadata,
+ List<UnfilteredRowIterator> iterators,
+ PartitionColumns columns,
+ DeletionTime partitionDeletion,
+ int nowInSec,
+ boolean reversed,
+ MergeListener listener)
+ {
+ super(metadata,
+ iterators.get(0).partitionKey(),
+ partitionDeletion,
+ columns,
+ mergeStaticRows(metadata, iterators, columns.statics, nowInSec, listener, partitionDeletion),
+ reversed,
+ mergeStats(iterators));
+
+ this.listener = listener;
+ this.mergeIterator = MergeIterator.get(iterators,
+ reversed ? metadata.comparator.reversed() : metadata.comparator,
+ new MergeReducer(metadata, iterators.size(), reversed, nowInSec));
+ }
+
+ private static UnfilteredRowMergeIterator create(List<UnfilteredRowIterator> iterators, int nowInSec, MergeListener listener)
+ {
+ try
+ {
+ checkForInvalidInput(iterators);
+ return new UnfilteredRowMergeIterator(iterators.get(0).metadata(),
+ iterators,
+ collectColumns(iterators),
+ collectPartitionLevelDeletion(iterators, listener),
+ nowInSec,
+ iterators.get(0).isReverseOrder(),
+ listener);
+ }
+ catch (RuntimeException | Error e)
+ {
+ try
+ {
+ FBUtilities.closeAll(iterators);
+ }
+ catch (Exception suppressed)
+ {
+ e.addSuppressed(suppressed);
+ }
+ throw e;
+ }
+ }
+
+ @SuppressWarnings("resource") // We're not really creating any resource here
+ private static void checkForInvalidInput(List<UnfilteredRowIterator> iterators)
+ {
+ if (iterators.isEmpty())
+ return;
+
+ UnfilteredRowIterator first = iterators.get(0);
+ for (int i = 1; i < iterators.size(); i++)
+ {
+ UnfilteredRowIterator iter = iterators.get(i);
+ assert first.metadata().cfId.equals(iter.metadata().cfId);
+ assert first.partitionKey().equals(iter.partitionKey());
+ assert first.isReverseOrder() == iter.isReverseOrder();
+ }
+ }
+
+ @SuppressWarnings("resource") // We're not really creating any resource here
+ private static DeletionTime collectPartitionLevelDeletion(List<UnfilteredRowIterator> iterators, MergeListener listener)
+ {
+ DeletionTime[] versions = listener == null ? null : new DeletionTime[iterators.size()];
+
+ DeletionTime delTime = DeletionTime.LIVE;
+ for (int i = 0; i < iterators.size(); i++)
+ {
+ UnfilteredRowIterator iter = iterators.get(i);
+ DeletionTime iterDeletion = iter.partitionLevelDeletion();
+ if (listener != null)
+ versions[i] = iterDeletion;
+ if (!delTime.supersedes(iterDeletion))
+ delTime = iterDeletion;
+ }
+ if (listener != null && !delTime.isLive())
+ listener.onMergePartitionLevelDeletion(delTime, versions);
+ return delTime;
+ }
+
+ private static Row mergeStaticRows(CFMetaData metadata,
+ List<UnfilteredRowIterator> iterators,
+ Columns columns,
+ int nowInSec,
+ MergeListener listener,
+ DeletionTime partitionDeletion)
+ {
+ if (columns.isEmpty())
+ return Rows.EMPTY_STATIC_ROW;
+
+ Row.Merger merger = Row.Merger.createStatic(metadata, iterators.size(), nowInSec, columns, listener);
+ for (int i = 0; i < iterators.size(); i++)
+ merger.add(i, iterators.get(i).staticRow());
+
+ // Note that we should call 'takeAlias' on the result in theory, but we know that we
+ // won't reuse the merger and so that it's ok not to.
+ Row merged = merger.merge(partitionDeletion);
+ return merged == null ? Rows.EMPTY_STATIC_ROW : merged;
+ }
+
+ private static PartitionColumns collectColumns(List<UnfilteredRowIterator> iterators)
+ {
+ PartitionColumns first = iterators.get(0).columns();
+ Columns statics = first.statics;
+ Columns regulars = first.regulars;
+ for (int i = 1; i < iterators.size(); i++)
+ {
+ PartitionColumns cols = iterators.get(i).columns();
+ statics = statics.mergeTo(cols.statics);
+ regulars = regulars.mergeTo(cols.regulars);
+ }
+ return statics == first.statics && regulars == first.regulars
+ ? first
+ : new PartitionColumns(statics, regulars);
+ }
+
+ private static RowStats mergeStats(List<UnfilteredRowIterator> iterators)
+ {
+ RowStats stats = RowStats.NO_STATS;
+ for (UnfilteredRowIterator iter : iterators)
+ stats = stats.mergeWith(iter.stats());
+ return stats;
+ }
+
+ protected Unfiltered computeNext()
+ {
+ while (mergeIterator.hasNext())
+ {
+ Unfiltered merged = mergeIterator.next();
+ if (merged != null)
+ return merged;
+ }
+ return endOfData();
+ }
+
+ public void close()
+ {
+ // This will close the input iterators
+ FileUtils.closeQuietly(mergeIterator);
+
+ if (listener != null)
+ listener.close();
+ }
+
+ /**
+ * Specific reducer for merge operations that rewrite the same reusable
+ * row every time. This also skip cells shadowed by range tombstones when writing.
+ */
+ private class MergeReducer extends MergeIterator.Reducer<Unfiltered, Unfiltered>
+ {
+ private Unfiltered.Kind nextKind;
+
+ private final Row.Merger rowMerger;
+ private final RangeTombstoneMarker.Merger markerMerger;
+
+ private MergeReducer(CFMetaData metadata, int size, boolean reversed, int nowInSec)
+ {
+ this.rowMerger = Row.Merger.createRegular(metadata, size, nowInSec, columns().regulars, listener);
+ this.markerMerger = new RangeTombstoneMarker.Merger(metadata, size, partitionLevelDeletion(), reversed, listener);
+ }
+
+ @Override
+ public boolean trivialReduceIsTrivial()
+ {
+ return listener == null;
+ }
+
+ public void reduce(int idx, Unfiltered current)
+ {
+ nextKind = current.kind();
+ if (nextKind == Unfiltered.Kind.ROW)
+ rowMerger.add(idx, (Row)current);
+ else
+ markerMerger.add(idx, (RangeTombstoneMarker)current);
+ }
+
+ protected Unfiltered getReduced()
+ {
+ return nextKind == Unfiltered.Kind.ROW
+ ? rowMerger.merge(markerMerger.activeDeletion())
+ : markerMerger.merge();
+ }
+
+ protected void onKeyChange()
+ {
+ if (nextKind == Unfiltered.Kind.ROW)
+ rowMerger.clear();
+ else
+ markerMerger.clear();
+ }
+ }
+ }
+
+ private static class FilteringIterator extends AbstractIterator<Row> implements RowIterator
+ {
+ private final UnfilteredRowIterator iter;
+ private final int nowInSec;
+ private final TombstoneFilteringRow filter;
+
+ public FilteringIterator(UnfilteredRowIterator iter, int nowInSec)
+ {
+ this.iter = iter;
+ this.nowInSec = nowInSec;
+ this.filter = new TombstoneFilteringRow(nowInSec);
+ }
+
+ public CFMetaData metadata()
+ {
+ return iter.metadata();
+ }
+
+ public boolean isReverseOrder()
+ {
+ return iter.isReverseOrder();
+ }
+
+ public PartitionColumns columns()
+ {
+ return iter.columns();
+ }
+
+ public DecoratedKey partitionKey()
+ {
+ return iter.partitionKey();
+ }
+
+ public Row staticRow()
+ {
+ Row row = iter.staticRow();
+ return row.isEmpty() ? row : new TombstoneFilteringRow(nowInSec).setTo(row);
+ }
+
+ protected Row computeNext()
+ {
+ while (iter.hasNext())
+ {
+ Unfiltered next = iter.next();
+ if (next.kind() != Unfiltered.Kind.ROW)
+ continue;
+
+ Row row = filter.setTo((Row)next);
+ if (!row.isEmpty())
+ return row;
+ }
+ return endOfData();
+ }
+
+ public void close()
+ {
+ iter.close();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java b/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java
new file mode 100644
index 0000000..a5a0c75
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java
@@ -0,0 +1,706 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.rows;
+
+import java.io.DataInput;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.*;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.SearchIterator;
+
+/**
+ * Serialize/deserialize a single Unfiltered for the intra-node protocol.
+ *
+ * The encode format for an unfiltered is <flags>(<row>|<marker>) where:
+ *
+ * <flags> is a byte whose bits are flags. The rightmost 1st bit is only
+ * set to indicate the end of the partition. The 2nd bit indicates
+ * whether the reminder is a range tombstone marker (otherwise it's a row).
+ * If it's a row then the 3rd bit indicates if it's static, the 4th bit
+ * indicates the presence of a row timestamp, the 5th the presence of a row
+ * ttl, the 6th the presence of row deletion and the 7th indicates the
+ * presence of complex deletion times.
+ * <row> is <clustering>[<timestamp>][<ttl>][<deletion>]<sc1>...<sci><cc1>...<ccj> where
+ * <clustering> is the row clustering as serialized by
+ * {@code Clustering.serializer}. Note that static row are an exception and
+ * don't have this. <timestamp>, <ttl> and <deletion> are the row timestamp, ttl and deletion
+ * whose presence is determined by the flags. <sci> is the simple columns of the row and <ccj> the
+ * complex ones. There is actually 2 slightly different possible layout for those
+ * cell: a dense one and a sparse one. Which one is used depends on the serialization
+ * header and more precisely of {@link SerializationHeader.useSparseColumnLayout()}:
+ * 1) in the dense layout, there will be as many <sci> and <ccj> as there is columns
+ * in the serialization header. *Each simple column <sci> will simply be a <cell>
+ * (which might have no value, see below), while each <ccj> will be
+ * [<delTime>]<cell1>...<celln><emptyCell> where <delTime> is the deletion for
+ * this complex column (if flags indicates it present), <celln> are the <cell>
+ * for this complex column and <emptyCell> is a last cell that will have no value
+ * to indicate the end of this column.
+ * 2) in the sparse layout, there won't be "empty" cells, i.e. only the column that
+ * actually have a cell are represented. For that, each <sci> and <ccj> start
+ * by a 2 byte index that points to the column in the header it belongs to. After
+ * that, each <sci> and <ccj> is the same than for the dense layout. But contrarily
+ * to the dense layout we won't know how many elements are serialized so a 2 byte
+ * marker with a value of -1 will indicates the end of the row.
+ * <marker> is <bound><deletion> where <bound> is the marker bound as serialized
+ * by {@code Slice.Bound.serializer} and <deletion> is the marker deletion
+ * time.
+ *
+ * <cell> A cell start with a 1 byte <flag>. Thre rightmost 1st bit indicates
+ * if there is actually a value for this cell. If this flag is unset,
+ * nothing more follows for the cell. The 2nd and third flag indicates if
+ * it's a deleted or expiring cell. The 4th flag indicates if the value
+ * is empty or not. The 5th and 6th indicates if the timestamp and ttl/
+ * localDeletionTime for the cell are the same than the row one (if that
+ * is the case, those are not repeated for the cell).Follows the <value>
+ * (unless it's marked empty in the flag) and a delta-encoded long <timestamp>
+ * (unless the flag tells to use the row level one).
+ * Then if it's a deleted or expiring cell a delta-encoded int <localDelTime>
+ * and if it's expiring a delta-encoded int <ttl> (unless it's an expiring cell
+ * and the ttl and localDeletionTime are indicated by the flags to be the same
+ * than the row ones, in which case none of those appears).
+ */
+public class UnfilteredSerializer
+{
+ private static final Logger logger = LoggerFactory.getLogger(UnfilteredSerializer.class);
+
+ public static final UnfilteredSerializer serializer = new UnfilteredSerializer();
+
+ // Unfiltered flags
+ private final static int END_OF_PARTITION = 0x01;
+ private final static int IS_MARKER = 0x02;
+ // For rows
+ private final static int IS_STATIC = 0x04;
+ private final static int HAS_TIMESTAMP = 0x08;
+ private final static int HAS_TTL = 0x10;
+ private final static int HAS_DELETION = 0x20;
+ private final static int HAS_COMPLEX_DELETION = 0x40;
+
+ // Cell flags
+ private final static int PRESENCE_MASK = 0x01;
+ private final static int DELETION_MASK = 0x02;
+ private final static int EXPIRATION_MASK = 0x04;
+ private final static int EMPTY_VALUE_MASK = 0x08;
+ private final static int USE_ROW_TIMESTAMP = 0x10;
+ private final static int USE_ROW_TTL = 0x20;
+
+ public void serialize(Unfiltered unfiltered, SerializationHeader header, DataOutputPlus out, int version)
+ throws IOException
+ {
+ if (unfiltered.kind() == Unfiltered.Kind.RANGE_TOMBSTONE_MARKER)
+ {
+ serialize((RangeTombstoneMarker) unfiltered, header, out, version);
+ }
+ else
+ {
+ serialize((Row) unfiltered, header, out, version);
+ }
+ }
+
+ public void serialize(Row row, SerializationHeader header, DataOutputPlus out, int version)
+ throws IOException
+ {
+ int flags = 0;
+ boolean isStatic = row.isStatic();
+
+ LivenessInfo pkLiveness = row.primaryKeyLivenessInfo();
+ DeletionTime deletion = row.deletion();
+ boolean hasComplexDeletion = row.hasComplexDeletion();
+
+ if (isStatic)
+ flags |= IS_STATIC;
+ if (pkLiveness.hasTimestamp())
+ flags |= HAS_TIMESTAMP;
+ if (pkLiveness.hasTTL())
+ flags |= HAS_TTL;
+ if (!deletion.isLive())
+ flags |= HAS_DELETION;
+ if (hasComplexDeletion)
+ flags |= HAS_COMPLEX_DELETION;
+
+ out.writeByte((byte)flags);
+ if (!isStatic)
+ Clustering.serializer.serialize(row.clustering(), out, version, header.clusteringTypes());
+
+ if ((flags & HAS_TIMESTAMP) != 0)
+ out.writeLong(header.encodeTimestamp(pkLiveness.timestamp()));
+ if ((flags & HAS_TTL) != 0)
+ {
+ out.writeInt(header.encodeTTL(pkLiveness.ttl()));
+ out.writeInt(header.encodeDeletionTime(pkLiveness.localDeletionTime()));
+ }
+ if ((flags & HAS_DELETION) != 0)
+ UnfilteredRowIteratorSerializer.writeDelTime(deletion, header, out);
+
+ Columns columns = header.columns(isStatic);
+ int simpleCount = columns.simpleColumnCount();
+ boolean useSparse = header.useSparseColumnLayout(isStatic);
+ SearchIterator<ColumnDefinition, ColumnData> cells = row.searchIterator();
+
+ for (int i = 0; i < simpleCount; i++)
+ writeSimpleColumn(i, cells.next(columns.getSimple(i)), header, out, pkLiveness, useSparse);
+
+ for (int i = simpleCount; i < columns.columnCount(); i++)
+ writeComplexColumn(i, cells.next(columns.getComplex(i - simpleCount)), hasComplexDeletion, header, out, pkLiveness, useSparse);
+
+ if (useSparse)
+ out.writeShort(-1);
+ }
+
+ private void writeSimpleColumn(int idx, ColumnData data, SerializationHeader header, DataOutputPlus out, LivenessInfo rowLiveness, boolean useSparse)
+ throws IOException
+ {
+ if (useSparse)
+ {
+ if (data == null)
+ return;
+
+ out.writeShort(idx);
+ }
+
+ writeCell(data == null ? null : data.cell(), header, out, rowLiveness);
+ }
+
+ private void writeComplexColumn(int idx, ColumnData data, boolean hasComplexDeletion, SerializationHeader header, DataOutputPlus out, LivenessInfo rowLiveness, boolean useSparse)
+ throws IOException
+ {
+ Iterator<Cell> cells = data == null ? null : data.cells();
+ DeletionTime deletion = data == null ? DeletionTime.LIVE : data.complexDeletion();
+
+ if (useSparse)
+ {
+ assert hasComplexDeletion || deletion.isLive();
+ if (cells == null && deletion.isLive())
+ return;
+
+ out.writeShort(idx);
+ }
+
+ if (hasComplexDeletion)
+ UnfilteredRowIteratorSerializer.writeDelTime(deletion, header, out);
+
+ if (cells != null)
+ while (cells.hasNext())
+ writeCell(cells.next(), header, out, rowLiveness);
+
+ writeCell(null, header, out, rowLiveness);
+ }
+
+ public void serialize(RangeTombstoneMarker marker, SerializationHeader header, DataOutputPlus out, int version)
+ throws IOException
+ {
+ out.writeByte((byte)IS_MARKER);
+ RangeTombstone.Bound.serializer.serialize(marker.clustering(), out, version, header.clusteringTypes());
+
+ if (marker.isBoundary())
+ {
+ RangeTombstoneBoundaryMarker bm = (RangeTombstoneBoundaryMarker)marker;
+ UnfilteredRowIteratorSerializer.writeDelTime(bm.endDeletionTime(), header, out);
+ UnfilteredRowIteratorSerializer.writeDelTime(bm.startDeletionTime(), header, out);
+ }
+ else
+ {
+ UnfilteredRowIteratorSerializer.writeDelTime(((RangeTombstoneBoundMarker)marker).deletionTime(), header, out);
+ }
+ }
+
+ public long serializedSize(Unfiltered unfiltered, SerializationHeader header, int version, TypeSizes sizes)
+ {
+ return unfiltered.kind() == Unfiltered.Kind.RANGE_TOMBSTONE_MARKER
+ ? serializedSize((RangeTombstoneMarker) unfiltered, header, version, sizes)
+ : serializedSize((Row) unfiltered, header, version, sizes);
+ }
+
+ public long serializedSize(Row row, SerializationHeader header, int version, TypeSizes sizes)
+ {
+ long size = 1; // flags
+
+ boolean isStatic = row.isStatic();
+ LivenessInfo pkLiveness = row.primaryKeyLivenessInfo();
+ DeletionTime deletion = row.deletion();
+ boolean hasComplexDeletion = row.hasComplexDeletion();
+
+ if (!isStatic)
+ size += Clustering.serializer.serializedSize(row.clustering(), version, header.clusteringTypes(), sizes);
+
+ if (pkLiveness.hasTimestamp())
+ size += sizes.sizeof(header.encodeTimestamp(pkLiveness.timestamp()));
+ if (pkLiveness.hasTTL())
+ {
+ size += sizes.sizeof(header.encodeTTL(pkLiveness.ttl()));
+ size += sizes.sizeof(header.encodeDeletionTime(pkLiveness.localDeletionTime()));
+ }
+ if (!deletion.isLive())
+ size += UnfilteredRowIteratorSerializer.delTimeSerializedSize(deletion, header, sizes);
+
+ Columns columns = header.columns(isStatic);
+ int simpleCount = columns.simpleColumnCount();
+ boolean useSparse = header.useSparseColumnLayout(isStatic);
+ SearchIterator<ColumnDefinition, ColumnData> cells = row.searchIterator();
+
+ for (int i = 0; i < simpleCount; i++)
+ size += sizeOfSimpleColumn(i, cells.next(columns.getSimple(i)), header, sizes, pkLiveness, useSparse);
+
+ for (int i = simpleCount; i < columns.columnCount(); i++)
+ size += sizeOfComplexColumn(i, cells.next(columns.getComplex(i - simpleCount)), hasComplexDeletion, header, sizes, pkLiveness, useSparse);
+
+ if (useSparse)
+ size += sizes.sizeof((short)-1);
+
+ return size;
+ }
+
+ private long sizeOfSimpleColumn(int idx, ColumnData data, SerializationHeader header, TypeSizes sizes, LivenessInfo rowLiveness, boolean useSparse)
+ {
+ long size = 0;
+ if (useSparse)
+ {
+ if (data == null)
+ return size;
+
+ size += sizes.sizeof((short)idx);
+ }
+ return size + sizeOfCell(data == null ? null : data.cell(), header, sizes, rowLiveness);
+ }
+
+ private long sizeOfComplexColumn(int idx, ColumnData data, boolean hasComplexDeletion, SerializationHeader header, TypeSizes sizes, LivenessInfo rowLiveness, boolean useSparse)
+ {
+ long size = 0;
+ Iterator<Cell> cells = data == null ? null : data.cells();
+ DeletionTime deletion = data == null ? DeletionTime.LIVE : data.complexDeletion();
+ if (useSparse)
+ {
+ assert hasComplexDeletion || deletion.isLive();
+ if (cells == null && deletion.isLive())
+ return size;
+
+ size += sizes.sizeof((short)idx);
+ }
+
+ if (hasComplexDeletion)
+ size += UnfilteredRowIteratorSerializer.delTimeSerializedSize(deletion, header, sizes);
+
+ if (cells != null)
+ while (cells.hasNext())
+ size += sizeOfCell(cells.next(), header, sizes, rowLiveness);
+
+ return size + sizeOfCell(null, header, sizes, rowLiveness);
+ }
+
+ public long serializedSize(RangeTombstoneMarker marker, SerializationHeader header, int version, TypeSizes sizes)
+ {
+ long size = 1 // flags
+ + RangeTombstone.Bound.serializer.serializedSize(marker.clustering(), version, header.clusteringTypes(), sizes);
+
+ if (marker.isBoundary())
+ {
+ RangeTombstoneBoundaryMarker bm = (RangeTombstoneBoundaryMarker)marker;
+ size += UnfilteredRowIteratorSerializer.delTimeSerializedSize(bm.endDeletionTime(), header, sizes);
+ size += UnfilteredRowIteratorSerializer.delTimeSerializedSize(bm.startDeletionTime(), header, sizes);
+ }
+ else
+ {
+ size += UnfilteredRowIteratorSerializer.delTimeSerializedSize(((RangeTombstoneBoundMarker)marker).deletionTime(), header, sizes);
+ }
+ return size;
+ }
+
+ public void writeEndOfPartition(DataOutputPlus out) throws IOException
+ {
+ out.writeByte((byte)1);
+ }
+
+ public long serializedSizeEndOfPartition(TypeSizes sizes)
+ {
+ return 1;
+ }
+
+ public Unfiltered.Kind deserialize(DataInput in,
+ SerializationHeader header,
+ SerializationHelper helper,
+ Row.Writer rowWriter,
+ RangeTombstoneMarker.Writer markerWriter)
+ throws IOException
+ {
+ int flags = in.readUnsignedByte();
+ if (isEndOfPartition(flags))
+ return null;
+
+ if (kind(flags) == Unfiltered.Kind.RANGE_TOMBSTONE_MARKER)
+ {
+ RangeTombstone.Bound.Kind kind = RangeTombstone.Bound.serializer.deserialize(in, helper.version, header.clusteringTypes(), markerWriter);
+ deserializeMarkerBody(in, header, kind.isBoundary(), markerWriter);
+ return Unfiltered.Kind.RANGE_TOMBSTONE_MARKER;
+ }
+ else
+ {
+ assert !isStatic(flags); // deserializeStaticRow should be used for that.
+ Clustering.serializer.deserialize(in, helper.version, header.clusteringTypes(), rowWriter);
+ deserializeRowBody(in, header, helper, flags, rowWriter);
+ return Unfiltered.Kind.ROW;
+ }
+ }
+
+ public Row deserializeStaticRow(DataInput in, SerializationHeader header, SerializationHelper helper)
+ throws IOException
+ {
+ int flags = in.readUnsignedByte();
+ assert !isEndOfPartition(flags) && kind(flags) == Unfiltered.Kind.ROW && isStatic(flags);
+ StaticRow.Builder builder = StaticRow.builder(header.columns().statics, true, header.columns().statics.hasCounters());
+ deserializeRowBody(in, header, helper, flags, builder);
+ return builder.build();
+ }
+
+ public void skipStaticRow(DataInput in, SerializationHeader header, SerializationHelper helper) throws IOException
+ {
+ int flags = in.readUnsignedByte();
+ assert !isEndOfPartition(flags) && kind(flags) == Unfiltered.Kind.ROW && isStatic(flags) : "Flags is " + flags;
+ skipRowBody(in, header, helper, flags);
+ }
+
+ public void deserializeMarkerBody(DataInput in,
+ SerializationHeader header,
+ boolean isBoundary,
+ RangeTombstoneMarker.Writer writer)
+ throws IOException
+ {
+ if (isBoundary)
+ writer.writeBoundaryDeletion(UnfilteredRowIteratorSerializer.readDelTime(in, header), UnfilteredRowIteratorSerializer.readDelTime(in, header));
+ else
+ writer.writeBoundDeletion(UnfilteredRowIteratorSerializer.readDelTime(in, header));
+ writer.endOfMarker();
+ }
+
+ public void skipMarkerBody(DataInput in, SerializationHeader header, boolean isBoundary) throws IOException
+ {
+ if (isBoundary)
+ {
+ UnfilteredRowIteratorSerializer.skipDelTime(in, header);
+ UnfilteredRowIteratorSerializer.skipDelTime(in, header);
+ }
+ else
+ {
+ UnfilteredRowIteratorSerializer.skipDelTime(in, header);
+ }
+ }
+
+ public void deserializeRowBody(DataInput in,
+ SerializationHeader header,
+ SerializationHelper helper,
+ int flags,
+ Row.Writer writer)
+ throws IOException
+ {
+ boolean isStatic = isStatic(flags);
+ boolean hasTimestamp = (flags & HAS_TIMESTAMP) != 0;
+ boolean hasTTL = (flags & HAS_TTL) != 0;
+ boolean hasDeletion = (flags & HAS_DELETION) != 0;
+ boolean hasComplexDeletion = (flags & HAS_COMPLEX_DELETION) != 0;
+
+ long timestamp = hasTimestamp ? header.decodeTimestamp(in.readLong()) : LivenessInfo.NO_TIMESTAMP;
+ int ttl = hasTTL ? header.decodeTTL(in.readInt()) : LivenessInfo.NO_TTL;
+ int localDeletionTime = hasTTL ? header.decodeDeletionTime(in.readInt()) : LivenessInfo.NO_DELETION_TIME;
+ DeletionTime deletion = hasDeletion ? UnfilteredRowIteratorSerializer.readDelTime(in, header) : DeletionTime.LIVE;
+
+ helper.writePartitionKeyLivenessInfo(writer, timestamp, ttl, localDeletionTime);
+ writer.writeRowDeletion(deletion);
+
+ Columns columns = header.columns(isStatic);
+ if (header.useSparseColumnLayout(isStatic))
+ {
+ int count = columns.columnCount();
+ int simpleCount = columns.simpleColumnCount();
+ int i;
+ while ((i = in.readShort()) >= 0)
+ {
+ if (i > count)
+ throw new IOException(String.format("Impossible column index %d, the header has only %d columns defined", i, count));
+
+ if (i < simpleCount)
+ readSimpleColumn(columns.getSimple(i), in, header, helper, writer);
+ else
+ readComplexColumn(columns.getComplex(i - simpleCount), in, header, helper, hasComplexDeletion, writer);
+ }
+ }
+ else
+ {
+ for (int i = 0; i < columns.simpleColumnCount(); i++)
+ readSimpleColumn(columns.getSimple(i), in, header, helper, writer);
+
+ for (int i = 0; i < columns.complexColumnCount(); i++)
+ readComplexColumn(columns.getComplex(i), in, header, helper, hasComplexDeletion, writer);
+ }
+
+ writer.endOfRow();
+ }
+
+ private void readSimpleColumn(ColumnDefinition column, DataInput in, SerializationHeader header, SerializationHelper helper, Row.Writer writer)
+ throws IOException
+ {
+ if (helper.includes(column))
+ readCell(column, in, header, helper, writer);
+ else
+ skipCell(column, in, header);
+ }
+
+ private void readComplexColumn(ColumnDefinition column, DataInput in, SerializationHeader header, SerializationHelper helper, boolean hasComplexDeletion, Row.Writer writer)
+ throws IOException
+ {
+ if (helper.includes(column))
+ {
+ helper.startOfComplexColumn(column);
+
+ if (hasComplexDeletion)
+ writer.writeComplexDeletion(column, UnfilteredRowIteratorSerializer.readDelTime(in, header));
+
+ while (readCell(column, in, header, helper, writer));
+
+ helper.endOfComplexColumn(column);
+ }
+ else
+ {
+ skipComplexColumn(column, in, header, helper, hasComplexDeletion);
+ }
+ }
+
+ public void skipRowBody(DataInput in, SerializationHeader header, SerializationHelper helper, int flags) throws IOException
+ {
+ boolean isStatic = isStatic(flags);
+ boolean hasTimestamp = (flags & HAS_TIMESTAMP) != 0;
+ boolean hasTTL = (flags & HAS_TTL) != 0;
+ boolean hasDeletion = (flags & HAS_DELETION) != 0;
+ boolean hasComplexDeletion = (flags & HAS_COMPLEX_DELETION) != 0;
+
+ // Note that we don't want want to use FileUtils.skipBytesFully for anything that may not have
+ // the size we think due to VINT encoding
+ if (hasTimestamp)
+ in.readLong();
+ if (hasTTL)
+ {
+ // ttl and localDeletionTime
+ in.readInt();
+ in.readInt();
+ }
+ if (hasDeletion)
+ UnfilteredRowIteratorSerializer.skipDelTime(in, header);
+
+ Columns columns = header.columns(isStatic);
+ if (header.useSparseColumnLayout(isStatic))
+ {
+ int count = columns.columnCount();
+ int simpleCount = columns.simpleColumnCount();
+ int i;
+ while ((i = in.readShort()) >= 0)
+ {
+ if (i > count)
+ throw new IOException(String.format("Impossible column index %d, the header has only %d columns defined", i, count));
+
+ if (i < simpleCount)
+ skipCell(columns.getSimple(i), in, header);
+ else
+ skipComplexColumn(columns.getComplex(i - simpleCount), in, header, helper, hasComplexDeletion);
+ }
+ }
+ else
+ {
+ for (int i = 0; i < columns.simpleColumnCount(); i++)
+ skipCell(columns.getSimple(i), in, header);
+
+ for (int i = 0; i < columns.complexColumnCount(); i++)
+ skipComplexColumn(columns.getComplex(i), in, header, helper, hasComplexDeletion);
+ }
+ }
+
+ private void skipComplexColumn(ColumnDefinition column, DataInput in, SerializationHeader header, SerializationHelper helper, boolean hasComplexDeletion)
+ throws IOException
+ {
+ if (hasComplexDeletion)
+ UnfilteredRowIteratorSerializer.skipDelTime(in, header);
+
+ while (skipCell(column, in, header));
+ }
+
+ public static boolean isEndOfPartition(int flags)
+ {
+ return (flags & END_OF_PARTITION) != 0;
+ }
+
+ public static Unfiltered.Kind kind(int flags)
+ {
+ return (flags & IS_MARKER) != 0 ? Unfiltered.Kind.RANGE_TOMBSTONE_MARKER : Unfiltered.Kind.ROW;
+ }
+
+ public static boolean isStatic(int flags)
+ {
+ return (flags & IS_MARKER) == 0 && (flags & IS_STATIC) != 0;
+ }
+
+ private void writeCell(Cell cell, SerializationHeader header, DataOutputPlus out, LivenessInfo rowLiveness)
+ throws IOException
+ {
+ if (cell == null)
+ {
+ out.writeByte((byte)0);
+ return;
+ }
+
+ boolean hasValue = cell.value().hasRemaining();
+ boolean isDeleted = cell.isTombstone();
+ boolean isExpiring = cell.isExpiring();
+ boolean useRowTimestamp = rowLiveness.hasTimestamp() && cell.livenessInfo().timestamp() == rowLiveness.timestamp();
+ boolean useRowTTL = isExpiring && rowLiveness.hasTTL() && cell.livenessInfo().ttl() == rowLiveness.ttl() && cell.livenessInfo().localDeletionTime() == rowLiveness.localDeletionTime();
+ int flags = PRESENCE_MASK;
+ if (!hasValue)
+ flags |= EMPTY_VALUE_MASK;
+
+ if (isDeleted)
+ flags |= DELETION_MASK;
+ else if (isExpiring)
+ flags |= EXPIRATION_MASK;
+
+ if (useRowTimestamp)
+ flags |= USE_ROW_TIMESTAMP;
+ if (useRowTTL)
+ flags |= USE_ROW_TTL;
+
+ out.writeByte((byte)flags);
+
+ if (hasValue)
+ header.getType(cell.column()).writeValue(cell.value(), out);
+
+ if (!useRowTimestamp)
+ out.writeLong(header.encodeTimestamp(cell.livenessInfo().timestamp()));
+
+ if ((isDeleted || isExpiring) && !useRowTTL)
+ out.writeInt(header.encodeDeletionTime(cell.livenessInfo().localDeletionTime()));
+ if (isExpiring && !useRowTTL)
+ out.writeInt(header.encodeTTL(cell.livenessInfo().ttl()));
+
+ if (cell.column().isComplex())
+ cell.column().cellPathSerializer().serialize(cell.path(), out);
+ }
+
+ private long sizeOfCell(Cell cell, SerializationHeader header, TypeSizes sizes, LivenessInfo rowLiveness)
+ {
+ long size = 1; // flags
+
+ if (cell == null)
+ return size;
+
+ boolean hasValue = cell.value().hasRemaining();
+ boolean isDeleted = cell.isTombstone();
+ boolean isExpiring = cell.isExpiring();
+ boolean useRowTimestamp = rowLiveness.hasTimestamp() && cell.livenessInfo().timestamp() == rowLiveness.timestamp();
+ boolean useRowTTL = isExpiring && rowLiveness.hasTTL() && cell.livenessInfo().ttl() == rowLiveness.ttl() && cell.livenessInfo().localDeletionTime() == rowLiveness.localDeletionTime();
+
+ if (hasValue)
+ size += header.getType(cell.column()).writtenLength(cell.value(), sizes);
+
+ if (!useRowTimestamp)
+ size += sizes.sizeof(header.encodeTimestamp(cell.livenessInfo().timestamp()));
+
+ if ((isDeleted || isExpiring) && !useRowTTL)
+ size += sizes.sizeof(header.encodeDeletionTime(cell.livenessInfo().localDeletionTime()));
+ if (isExpiring && !useRowTTL)
+ size += sizes.sizeof(header.encodeTTL(cell.livenessInfo().ttl()));
+
+ if (cell.column().isComplex())
+ size += cell.column().cellPathSerializer().serializedSize(cell.path(), sizes);
+
+ return size;
+ }
+
+ private boolean readCell(ColumnDefinition column, DataInput in, SerializationHeader header, SerializationHelper helper, Row.Writer writer)
+ throws IOException
+ {
+ int flags = in.readUnsignedByte();
+ if ((flags & PRESENCE_MASK) == 0)
+ return false;
+
+ boolean hasValue = (flags & EMPTY_VALUE_MASK) == 0;
+ boolean isDeleted = (flags & DELETION_MASK) != 0;
+ boolean isExpiring = (flags & EXPIRATION_MASK) != 0;
+ boolean useRowTimestamp = (flags & USE_ROW_TIMESTAMP) != 0;
+ boolean useRowTTL = (flags & USE_ROW_TTL) != 0;
+
+ ByteBuffer value = ByteBufferUtil.EMPTY_BYTE_BUFFER;
+ if (hasValue)
+ {
+ if (helper.canSkipValue(column))
+ header.getType(column).skipValue(in);
+ else
+ value = header.getType(column).readValue(in);
+ }
+
+ long timestamp = useRowTimestamp ? helper.getRowTimestamp() : header.decodeTimestamp(in.readLong());
+
+ int localDelTime = useRowTTL
+ ? helper.getRowLocalDeletionTime()
+ : (isDeleted || isExpiring ? header.decodeDeletionTime(in.readInt()) : LivenessInfo.NO_DELETION_TIME);
+
+ int ttl = useRowTTL
+ ? helper.getRowTTL()
+ : (isExpiring ? header.decodeTTL(in.readInt()) : LivenessInfo.NO_TTL);
+
+ CellPath path = column.isComplex()
+ ? column.cellPathSerializer().deserialize(in)
+ : null;
+
+ helper.writeCell(writer, column, false, value, timestamp, localDelTime, ttl, path);
+
+ return true;
+ }
+
+ private boolean skipCell(ColumnDefinition column, DataInput in, SerializationHeader header)
+ throws IOException
+ {
+ int flags = in.readUnsignedByte();
+ if ((flags & PRESENCE_MASK) == 0)
+ return false;
+
+ boolean hasValue = (flags & EMPTY_VALUE_MASK) == 0;
+ boolean isDeleted = (flags & DELETION_MASK) != 0;
+ boolean isExpiring = (flags & EXPIRATION_MASK) != 0;
+ boolean useRowTimestamp = (flags & USE_ROW_TIMESTAMP) != 0;
+ boolean useRowTTL = (flags & USE_ROW_TTL) != 0;
+
+ if (hasValue)
+ header.getType(column).skipValue(in);
+
+ if (!useRowTimestamp)
+ in.readLong();
+
+ if (!useRowTTL && (isDeleted || isExpiring))
+ in.readInt();
+
+ if (!useRowTTL && isExpiring)
+ in.readInt();
+
+ if (column.isComplex())
+ column.cellPathSerializer().skip(in);
+
+ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/rows/WrappingRow.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/WrappingRow.java b/src/java/org/apache/cassandra/db/rows/WrappingRow.java
new file mode 100644
index 0000000..5a0cc78
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/rows/WrappingRow.java
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.rows;
+
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+import com.google.common.collect.UnmodifiableIterator;
+
+import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.utils.SearchIterator;
+
+public abstract class WrappingRow extends AbstractRow
+{
+ protected Row wrapped;
+
+ private final ReusableIterator cellIterator = new ReusableIterator();
+ private final ReusableSearchIterator cellSearchIterator = new ReusableSearchIterator();
+
+ /**
+ * Apply some filtering/transformation on cells. This function
+ * can return {@code null} in which case the cell will be ignored.
+ */
+ protected abstract Cell filterCell(Cell cell);
+
+ protected DeletionTime filterDeletionTime(DeletionTime deletionTime)
+ {
+ return deletionTime;
+ }
+
+ protected ColumnData filterColumnData(ColumnData data)
+ {
+ if (data.column().isComplex())
+ {
+ Iterator<Cell> cells = cellIterator.setTo(data.cells());
+ DeletionTime dt = filterDeletionTime(data.complexDeletion());
+ return cells == null && dt.isLive()
+ ? null
+ : new ColumnData(data.column(), null, cells == null ? Collections.emptyIterator(): cells, dt);
+ }
+ else
+ {
+ Cell filtered = filterCell(data.cell());
+ return filtered == null ? null : new ColumnData(data.column(), filtered, null, null);
+ }
+ }
+
+ public WrappingRow setTo(Row row)
+ {
+ this.wrapped = row;
+ return this;
+ }
+
+ public Unfiltered.Kind kind()
+ {
+ return Unfiltered.Kind.ROW;
+ }
+
+ public Clustering clustering()
+ {
+ return wrapped.clustering();
+ }
+
+ public Columns columns()
+ {
+ return wrapped.columns();
+ }
+
+ public LivenessInfo primaryKeyLivenessInfo()
+ {
+ return wrapped.primaryKeyLivenessInfo();
+ }
+
+ public DeletionTime deletion()
+ {
+ return wrapped.deletion();
+ }
+
+ public boolean hasComplexDeletion()
+ {
+ // Note that because cells can be filtered out/transformed through
+ // filterCell(), we can't rely on wrapped.hasComplexDeletion().
+ for (int i = 0; i < columns().complexColumnCount(); i++)
+ if (!getDeletion(columns().getComplex(i)).isLive())
+ return true;
+ return false;
+ }
+
+ public Cell getCell(ColumnDefinition c)
+ {
+ Cell cell = wrapped.getCell(c);
+ return cell == null ? null : filterCell(cell);
+ }
+
+ public Cell getCell(ColumnDefinition c, CellPath path)
+ {
+ Cell cell = wrapped.getCell(c, path);
+ return cell == null ? null : filterCell(cell);
+ }
+
+ public Iterator<Cell> getCells(ColumnDefinition c)
+ {
+ Iterator<Cell> cells = wrapped.getCells(c);
+ if (cells == null)
+ return null;
+
+ cellIterator.setTo(cells);
+ return cellIterator.hasNext() ? cellIterator : null;
+ }
+
+ public DeletionTime getDeletion(ColumnDefinition c)
+ {
+ return filterDeletionTime(wrapped.getDeletion(c));
+ }
+
+ public Iterator<Cell> iterator()
+ {
+ return cellIterator.setTo(wrapped.iterator());
+ }
+
+ public SearchIterator<ColumnDefinition, ColumnData> searchIterator()
+ {
+ return cellSearchIterator.setTo(wrapped.searchIterator());
+ }
+
+ public Row takeAlias()
+ {
+ boolean isCounter = columns().hasCounters();
+ if (isStatic())
+ {
+ StaticRow.Builder builder = StaticRow.builder(columns(), true, isCounter);
+ copyTo(builder);
+ return builder.build();
+ }
+ else
+ {
+ ReusableRow copy = new ReusableRow(clustering().size(), columns(), true, isCounter);
+ copyTo(copy.writer());
+ return copy;
+ }
+ }
+
+ private class ReusableIterator extends UnmodifiableIterator<Cell>
+ {
+ private Iterator<Cell> iter;
+ private Cell next;
+
+ public ReusableIterator setTo(Iterator<Cell> iter)
+ {
+ this.iter = iter;
+ this.next = null;
+ return this;
+ }
+
+ public boolean hasNext()
+ {
+ while (next == null && iter.hasNext())
+ next = filterCell(iter.next());
+ return next != null;
+ }
+
+ public Cell next()
+ {
+ if (next == null && !hasNext())
+ throw new NoSuchElementException();
+
+ Cell result = next;
+ next = null;
+ return result;
+ }
+ };
+
+ private class ReusableSearchIterator implements SearchIterator<ColumnDefinition, ColumnData>
+ {
+ private SearchIterator<ColumnDefinition, ColumnData> iter;
+
+ public ReusableSearchIterator setTo(SearchIterator<ColumnDefinition, ColumnData> iter)
+ {
+ this.iter = iter;
+ return this;
+ }
+
+ public boolean hasNext()
+ {
+ return iter.hasNext();
+ }
+
+ public ColumnData next(ColumnDefinition column)
+ {
+ ColumnData data = iter.next(column);
+ if (data == null)
+ return null;
+
+ return filterColumnData(data);
+ }
+ };
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/rows/WrappingRowIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/WrappingRowIterator.java b/src/java/org/apache/cassandra/db/rows/WrappingRowIterator.java
new file mode 100644
index 0000000..8847a47
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/rows/WrappingRowIterator.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.rows;
+
+import com.google.common.collect.UnmodifiableIterator;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.*;
+
+/**
+ * Abstract class to make writing atom iterators that wrap another iterator
+ * easier. By default, the wrapping iterator simply delegate every call to
+ * the wrapped iterator so concrete implementations will override some of the
+ * methods.
+ */
+public abstract class WrappingRowIterator extends UnmodifiableIterator<Row> implements RowIterator
+{
+ protected final RowIterator wrapped;
+
+ protected WrappingRowIterator(RowIterator wrapped)
+ {
+ this.wrapped = wrapped;
+ }
+
+ public CFMetaData metadata()
+ {
+ return wrapped.metadata();
+ }
+
+ public boolean isReverseOrder()
+ {
+ return wrapped.isReverseOrder();
+ }
+
+ public PartitionColumns columns()
+ {
+ return wrapped.columns();
+ }
+
+ public DecoratedKey partitionKey()
+ {
+ return wrapped.partitionKey();
+ }
+
+ public Row staticRow()
+ {
+ return wrapped.staticRow();
+ }
+
+ public boolean hasNext()
+ {
+ return wrapped.hasNext();
+ }
+
+ public Row next()
+ {
+ return wrapped.next();
+ }
+
+ public void close()
+ {
+ wrapped.close();
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/rows/WrappingUnfilteredRowIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/WrappingUnfilteredRowIterator.java b/src/java/org/apache/cassandra/db/rows/WrappingUnfilteredRowIterator.java
new file mode 100644
index 0000000..680e502
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/rows/WrappingUnfilteredRowIterator.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.rows;
+
+import com.google.common.collect.UnmodifiableIterator;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.*;
+
+/**
+ * Abstract class to make writing atom iterators that wrap another iterator
+ * easier. By default, the wrapping iterator simply delegate every call to
+ * the wrapped iterator so concrete implementations will override some of the
+ * methods.
+ */
+public abstract class WrappingUnfilteredRowIterator extends UnmodifiableIterator<Unfiltered> implements UnfilteredRowIterator
+{
+ protected final UnfilteredRowIterator wrapped;
+
+ protected WrappingUnfilteredRowIterator(UnfilteredRowIterator wrapped)
+ {
+ this.wrapped = wrapped;
+ }
+
+ public CFMetaData metadata()
+ {
+ return wrapped.metadata();
+ }
+
+ public PartitionColumns columns()
+ {
+ return wrapped.columns();
+ }
+
+ public boolean isReverseOrder()
+ {
+ return wrapped.isReverseOrder();
+ }
+
+ public DecoratedKey partitionKey()
+ {
+ return wrapped.partitionKey();
+ }
+
+ public DeletionTime partitionLevelDeletion()
+ {
+ return wrapped.partitionLevelDeletion();
+ }
+
+ public Row staticRow()
+ {
+ return wrapped.staticRow();
+ }
+
+ public boolean hasNext()
+ {
+ return wrapped.hasNext();
+ }
+
+ public Unfiltered next()
+ {
+ return wrapped.next();
+ }
+
+ public RowStats stats()
+ {
+ return wrapped.stats();
+ }
+
+ public void close()
+ {
+ wrapped.close();
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/dht/AbstractBounds.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/AbstractBounds.java b/src/java/org/apache/cassandra/dht/AbstractBounds.java
index b5ffc22..e295c68 100644
--- a/src/java/org/apache/cassandra/dht/AbstractBounds.java
+++ b/src/java/org/apache/cassandra/dht/AbstractBounds.java
@@ -23,7 +23,7 @@ import java.io.Serializable;
import java.util.List;
import org.apache.cassandra.db.DecoratedKey;
-import org.apache.cassandra.db.RowPosition;
+import org.apache.cassandra.db.PartitionPosition;
import org.apache.cassandra.db.TypeSizes;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.io.util.DataOutputPlus;
@@ -34,8 +34,8 @@ public abstract class AbstractBounds<T extends RingPosition<T>> implements Seria
private static final long serialVersionUID = 1L;
public static final IPartitionerDependentSerializer<AbstractBounds<Token>> tokenSerializer =
new AbstractBoundsSerializer<Token>(Token.serializer);
- public static final IPartitionerDependentSerializer<AbstractBounds<RowPosition>> rowPositionSerializer =
- new AbstractBoundsSerializer<RowPosition>(RowPosition.serializer);
+ public static final IPartitionerDependentSerializer<AbstractBounds<PartitionPosition>> rowPositionSerializer =
+ new AbstractBoundsSerializer<PartitionPosition>(PartitionPosition.serializer);
private enum Type
{
@@ -112,6 +112,9 @@ public abstract class AbstractBounds<T extends RingPosition<T>> implements Seria
protected abstract String getOpeningString();
protected abstract String getClosingString();
+ public abstract boolean isStartInclusive();
+ public abstract boolean isEndInclusive();
+
public abstract AbstractBounds<T> withNewRight(T newRight);
public static class AbstractBoundsSerializer<T extends RingPosition<T>> implements IPartitionerDependentSerializer<AbstractBounds<T>>
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/dht/Bounds.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/Bounds.java b/src/java/org/apache/cassandra/dht/Bounds.java
index 4a5a701..b569349 100644
--- a/src/java/org/apache/cassandra/dht/Bounds.java
+++ b/src/java/org/apache/cassandra/dht/Bounds.java
@@ -20,7 +20,7 @@ package org.apache.cassandra.dht;
import java.util.Collections;
import java.util.List;
-import org.apache.cassandra.db.RowPosition;
+import org.apache.cassandra.db.PartitionPosition;
import org.apache.cassandra.utils.Pair;
/**
@@ -102,12 +102,22 @@ public class Bounds<T extends RingPosition<T>> extends AbstractBounds<T>
return "]";
}
+ public boolean isStartInclusive()
+ {
+ return true;
+ }
+
+ public boolean isEndInclusive()
+ {
+ return true;
+ }
+
/**
* Compute a bounds of keys corresponding to a given bounds of token.
*/
- public static Bounds<RowPosition> makeRowBounds(Token left, Token right)
+ public static Bounds<PartitionPosition> makeRowBounds(Token left, Token right)
{
- return new Bounds<RowPosition>(left.minKeyBound(), right.maxKeyBound());
+ return new Bounds<PartitionPosition>(left.minKeyBound(), right.maxKeyBound());
}
public AbstractBounds<T> withNewRight(T newRight)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/dht/ExcludingBounds.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/ExcludingBounds.java b/src/java/org/apache/cassandra/dht/ExcludingBounds.java
index 0d37e5c..86af68d 100644
--- a/src/java/org/apache/cassandra/dht/ExcludingBounds.java
+++ b/src/java/org/apache/cassandra/dht/ExcludingBounds.java
@@ -90,6 +90,16 @@ public class ExcludingBounds<T extends RingPosition<T>> extends AbstractBounds<T
return ")";
}
+ public boolean isStartInclusive()
+ {
+ return false;
+ }
+
+ public boolean isEndInclusive()
+ {
+ return false;
+ }
+
public AbstractBounds<T> withNewRight(T newRight)
{
return new ExcludingBounds<T>(left, newRight);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/dht/IncludingExcludingBounds.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/IncludingExcludingBounds.java b/src/java/org/apache/cassandra/dht/IncludingExcludingBounds.java
index e9e8e8e..446d0af 100644
--- a/src/java/org/apache/cassandra/dht/IncludingExcludingBounds.java
+++ b/src/java/org/apache/cassandra/dht/IncludingExcludingBounds.java
@@ -89,6 +89,16 @@ public class IncludingExcludingBounds<T extends RingPosition<T>> extends Abstrac
return ")";
}
+ public boolean isStartInclusive()
+ {
+ return true;
+ }
+
+ public boolean isEndInclusive()
+ {
+ return false;
+ }
+
public AbstractBounds<T> withNewRight(T newRight)
{
return new IncludingExcludingBounds<T>(left, newRight);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/dht/Range.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/Range.java b/src/java/org/apache/cassandra/dht/Range.java
index cbf093c..f99716b 100644
--- a/src/java/org/apache/cassandra/dht/Range.java
+++ b/src/java/org/apache/cassandra/dht/Range.java
@@ -21,7 +21,8 @@ import java.io.Serializable;
import java.util.*;
import org.apache.commons.lang3.ObjectUtils;
-import org.apache.cassandra.db.RowPosition;
+
+import org.apache.cassandra.db.PartitionPosition;
import org.apache.cassandra.utils.Pair;
/**
@@ -372,6 +373,16 @@ public class Range<T extends RingPosition<T>> extends AbstractBounds<T> implemen
return "]";
}
+ public boolean isStartInclusive()
+ {
+ return false;
+ }
+
+ public boolean isEndInclusive()
+ {
+ return true;
+ }
+
public List<String> asList()
{
ArrayList<String> ret = new ArrayList<String>(2);
@@ -465,12 +476,12 @@ public class Range<T extends RingPosition<T>> extends AbstractBounds<T> implemen
/**
* Compute a range of keys corresponding to a given range of token.
*/
- public static Range<RowPosition> makeRowRange(Token left, Token right)
+ public static Range<PartitionPosition> makeRowRange(Token left, Token right)
{
- return new Range<RowPosition>(left.maxKeyBound(), right.maxKeyBound());
+ return new Range<PartitionPosition>(left.maxKeyBound(), right.maxKeyBound());
}
- public static Range<RowPosition> makeRowRange(Range<Token> tokenBounds)
+ public static Range<PartitionPosition> makeRowRange(Range<Token> tokenBounds)
{
return makeRowRange(tokenBounds.left, tokenBounds.right);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/dht/Token.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/Token.java b/src/java/org/apache/cassandra/dht/Token.java
index 0cc8a2d..c87b46b 100644
--- a/src/java/org/apache/cassandra/dht/Token.java
+++ b/src/java/org/apache/cassandra/dht/Token.java
@@ -22,7 +22,7 @@ import java.io.IOException;
import java.io.Serializable;
import java.nio.ByteBuffer;
-import org.apache.cassandra.db.RowPosition;
+import org.apache.cassandra.db.PartitionPosition;
import org.apache.cassandra.db.TypeSizes;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.io.util.DataOutputPlus;
@@ -142,7 +142,7 @@ public abstract class Token implements RingPosition<Token>, Serializable
return (R)maxKeyBound();
}
- public static class KeyBound implements RowPosition
+ public static class KeyBound implements PartitionPosition
{
private final Token token;
public final boolean isMinimumBound;
@@ -158,7 +158,7 @@ public abstract class Token implements RingPosition<Token>, Serializable
return token;
}
- public int compareTo(RowPosition pos)
+ public int compareTo(PartitionPosition pos)
{
if (this == pos)
return 0;
@@ -188,9 +188,9 @@ public abstract class Token implements RingPosition<Token>, Serializable
return getToken().isMinimum();
}
- public RowPosition.Kind kind()
+ public PartitionPosition.Kind kind()
{
- return isMinimumBound ? RowPosition.Kind.MIN_BOUND : RowPosition.Kind.MAX_BOUND;
+ return isMinimumBound ? PartitionPosition.Kind.MIN_BOUND : PartitionPosition.Kind.MAX_BOUND;
}
@Override