You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2015/06/30 12:47:46 UTC
[22/51] [partial] cassandra git commit: Storage engine refactor,
a.k.a CASSANDRA-8099
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java b/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
new file mode 100644
index 0000000..ca1e424
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
@@ -0,0 +1,764 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.partitions;
+
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.*;
+
+import com.google.common.collect.Iterables;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.filter.ColumnFilter;
+import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.db.index.SecondaryIndexManager;
+import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.Sorting;
+
+/**
+ * Stores updates made on a partition.
+ * <p>
+ * A PartitionUpdate object requires that all writes are performed before we
+ * try to read the updates (attempts to write to the PartitionUpdate after a
+ * read method has been called will result in an exception being thrown).
+ * In other words, a Partition is mutable while we do a write and become
+ * immutable as soon as it is read.
+ * <p>
+ * Row updates are added to this update through the {@link #writer} method which
+ * returns a {@link Row.Writer}. Multiple rows can be added to this writer as required and
+ * those row do not have to added in (clustering) order, and the same row can be added
+ * multiple times. Further, for a given row, the writer actually supports intermingling
+ * the writing of cells for different complex cells (note that this is usually not supported
+ * by {@code Row.Writer} implementations, but is supported here because
+ * {@code ModificationStatement} requires that (because we could have multiple {@link Operation}
+ * on the same column in a given statement)).
+ */
+public class PartitionUpdate extends AbstractPartitionData implements Sorting.Sortable
+{
+ protected static final Logger logger = LoggerFactory.getLogger(PartitionUpdate.class);
+
+ // Records whether the partition update has been sorted (it is the rows contained in the partition
+ // that are sorted since we don't require rows to be added in order). Sorting happens when the
+ // update is read, and writting is rejected as soon as the update is sorted (it's actually possible
+ // to manually allow new update by using allowNewUpdates(), and we could make that more implicit, but
+ // as only triggers really requires it, we keep it simple for now).
+ private boolean isSorted;
+
+ public static final PartitionUpdateSerializer serializer = new PartitionUpdateSerializer();
+
+ private final Writer writer;
+
+ // Used by compare for the sake of implementing the Sorting.Sortable interface (which is in turn used
+ // to sort the rows of this update).
+ private final InternalReusableClustering p1 = new InternalReusableClustering();
+ private final InternalReusableClustering p2 = new InternalReusableClustering();
+
+ private PartitionUpdate(CFMetaData metadata,
+ DecoratedKey key,
+ DeletionInfo delInfo,
+ RowDataBlock data,
+ PartitionColumns columns,
+ int initialRowCapacity)
+ {
+ super(metadata, key, delInfo, columns, data, initialRowCapacity);
+ this.writer = createWriter();
+ }
+
+ public PartitionUpdate(CFMetaData metadata,
+ DecoratedKey key,
+ DeletionInfo delInfo,
+ PartitionColumns columns,
+ int initialRowCapacity)
+ {
+ this(metadata,
+ key,
+ delInfo,
+ new RowDataBlock(columns.regulars, initialRowCapacity, true, metadata.isCounter()),
+ columns,
+ initialRowCapacity);
+ }
+
+ public PartitionUpdate(CFMetaData metadata,
+ DecoratedKey key,
+ PartitionColumns columns,
+ int initialRowCapacity)
+ {
+ this(metadata,
+ key,
+ DeletionInfo.live(),
+ columns,
+ initialRowCapacity);
+ }
+
+ protected Writer createWriter()
+ {
+ return new RegularWriter();
+ }
+
+ protected StaticWriter createStaticWriter()
+ {
+ return new StaticWriter();
+ }
+
+ /**
+ * Deserialize a partition update from a provided byte buffer.
+ *
+ * @param bytes the byte buffer that contains the serialized update.
+ * @param version the version with which the update is serialized.
+ * @param key the partition key for the update. This is only used if {@code version < 3.0}
+ * and can be {@code null} otherwise.
+ *
+ * @return the deserialized update or {@code null} if {@code bytes == null}.
+ */
+ public static PartitionUpdate fromBytes(ByteBuffer bytes, int version, DecoratedKey key)
+ {
+ if (bytes == null)
+ return null;
+
+ try
+ {
+ return serializer.deserialize(new DataInputStream(ByteBufferUtil.inputStream(bytes)),
+ version,
+ SerializationHelper.Flag.LOCAL,
+ version < MessagingService.VERSION_30 ? key : null);
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+ * Serialize a partition update as a byte buffer.
+ *
+ * @param update the partition update to serialize.
+ * @param version the version to serialize the update into.
+ *
+ * @return a newly allocated byte buffer containing the serialized update.
+ */
+ public static ByteBuffer toBytes(PartitionUpdate update, int version)
+ {
+ try (DataOutputBuffer out = new DataOutputBuffer())
+ {
+ serializer.serialize(update, out, MessagingService.current_version);
+ return ByteBuffer.wrap(out.getData(), 0, out.getLength());
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+ * Creates a empty immutable partition update.
+ *
+ * @param metadata the metadata for the created update.
+ * @param key the partition key for the created update.
+ *
+ * @return the newly created empty (and immutable) update.
+ */
+ public static PartitionUpdate emptyUpdate(CFMetaData metadata, DecoratedKey key)
+ {
+ return new PartitionUpdate(metadata, key, PartitionColumns.NONE, 0)
+ {
+ public Row.Writer staticWriter()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public Row.Writer writer()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public void addPartitionDeletion(DeletionTime deletionTime)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public void addRangeTombstone(RangeTombstone range)
+ {
+ throw new UnsupportedOperationException();
+ }
+ };
+ }
+
+ /**
+ * Creates a partition update that entirely deletes a given partition.
+ *
+ * @param metadata the metadata for the created update.
+ * @param key the partition key for the partition that the created update should delete.
+ * @param timestamp the timestamp for the deletion.
+ * @param nowInSec the current time in seconds to use as local deletion time for the partition deletion.
+ *
+ * @return the newly created partition deletion update.
+ */
+ public static PartitionUpdate fullPartitionDelete(CFMetaData metadata, DecoratedKey key, long timestamp, int nowInSec)
+ {
+ return new PartitionUpdate(metadata,
+ key,
+ new DeletionInfo(timestamp, nowInSec),
+ new RowDataBlock(Columns.NONE, 0, true, metadata.isCounter()),
+ PartitionColumns.NONE,
+ 0);
+ }
+
+ /**
+ * Merges the provided updates, yielding a new update that incorporates all those updates.
+ *
+ * @param updates the collection of updates to merge. This shouldn't be empty.
+ *
+ * @return a partition update that include (merge) all the updates from {@code updates}.
+ */
+ public static PartitionUpdate merge(Collection<PartitionUpdate> updates)
+ {
+ assert !updates.isEmpty();
+ if (updates.size() == 1)
+ return Iterables.getOnlyElement(updates);
+
+ int totalSize = 0;
+ PartitionColumns.Builder builder = PartitionColumns.builder();
+ DecoratedKey key = null;
+ CFMetaData metadata = null;
+ for (PartitionUpdate update : updates)
+ {
+ totalSize += update.rows;
+ builder.addAll(update.columns());
+
+ if (key == null)
+ key = update.partitionKey();
+ else
+ assert key.equals(update.partitionKey());
+
+ if (metadata == null)
+ metadata = update.metadata();
+ else
+ assert metadata.cfId.equals(update.metadata().cfId);
+ }
+
+ // Used when merging row to decide of liveness
+ int nowInSec = FBUtilities.nowInSeconds();
+ PartitionUpdate newUpdate = new PartitionUpdate(metadata, key, builder.build(), totalSize);
+ for (PartitionUpdate update : updates)
+ {
+ newUpdate.deletionInfo.add(update.deletionInfo);
+ if (!update.staticRow().isEmpty())
+ {
+ if (newUpdate.staticRow().isEmpty())
+ newUpdate.staticRow = update.staticRow().takeAlias();
+ else
+ Rows.merge(newUpdate.staticRow(), update.staticRow(), newUpdate.columns().statics, newUpdate.staticWriter(), nowInSec, SecondaryIndexManager.nullUpdater);
+ }
+ for (Row row : update)
+ row.copyTo(newUpdate.writer);
+ }
+ return newUpdate;
+ }
+
+ /**
+ * The number of "operations" contained in the update.
+ * <p>
+ * This is used by {@code Memtable} to approximate how much work this update does. In practice, this
+ * count how many rows are updated and how many ranges are deleted by the partition update.
+ *
+ * @return the number of "operations" performed by the update.
+ */
+ public int operationCount()
+ {
+ return rowCount()
+ + deletionInfo.rangeCount()
+ + (deletionInfo.getPartitionDeletion().isLive() ? 0 : 1);
+ }
+
+ /**
+ * The size of the data contained in this update.
+ *
+ * @return the size of the data contained in this update.
+ */
+ public int dataSize()
+ {
+ int clusteringSize = metadata().comparator.size();
+ int size = 0;
+ for (Row row : this)
+ {
+ size += row.clustering().dataSize();
+ for (Cell cell : row)
+ size += cell.dataSize();
+ }
+ return size;
+ }
+
+ /**
+ * If a partition update has been read (and is thus unmodifiable), a call to this method
+ * makes the update modifiable again.
+ * <p>
+ * Please note that calling this method won't result in optimal behavior in the sense that
+ * even if very little is added to the update after this call, the whole update will be sorted
+ * again on read. This should thus be used sparingly (and if it turns that we end up using
+ * this often, we should consider optimizing the behavior).
+ */
+ public synchronized void allowNewUpdates()
+ {
+ // This is synchronized to make extra sure things work properly even if this is
+ // called concurrently with sort() (which should be avoided in the first place, but
+ // better safe than sorry).
+ isSorted = false;
+ }
+
+ /**
+ * Returns an iterator that iterators over the rows of this update in clustering order.
+ * <p>
+ * Note that this might trigger a sorting of the update, and as such the update will not
+ * be modifiable anymore after this call.
+ *
+ * @return an iterator over the rows of this update.
+ */
+ @Override
+ public Iterator<Row> iterator()
+ {
+ maybeSort();
+ return super.iterator();
+ }
+
+ @Override
+ protected SliceableUnfilteredRowIterator sliceableUnfilteredIterator(ColumnFilter columns, boolean reversed)
+ {
+ maybeSort();
+ return super.sliceableUnfilteredIterator(columns, reversed);
+ }
+
+ /**
+ * Validates the data contained in this update.
+ *
+ * @throws MarshalException if some of the data contained in this update is corrupted.
+ */
+ public void validate()
+ {
+ for (Row row : this)
+ {
+ metadata().comparator.validate(row.clustering());
+ for (Cell cell : row)
+ cell.validate();
+ }
+ }
+
+ /**
+ * The maximum timestamp used in this update.
+ *
+ * @return the maximum timestamp used in this update.
+ */
+ public long maxTimestamp()
+ {
+ return maxTimestamp;
+ }
+
+ /**
+ * For an update on a counter table, returns a list containing a {@code CounterMark} for
+ * every counter contained in the update.
+ *
+ * @return a list with counter marks for every counter in this update.
+ */
+ public List<CounterMark> collectCounterMarks()
+ {
+ assert metadata().isCounter();
+
+ InternalReusableClustering clustering = new InternalReusableClustering();
+ List<CounterMark> l = new ArrayList<>();
+ int i = 0;
+ for (Row row : this)
+ {
+ for (Cell cell : row)
+ if (cell.isCounterCell())
+ l.add(new CounterMark(clustering, i, cell.column(), cell.path()));
+ i++;
+ }
+ return l;
+ }
+
+ /**
+ * Returns a row writer for the static row of this partition update.
+ *
+ * @return a row writer for the static row of this partition update. A partition
+ * update contains only one static row so only one row should be written through
+ * this writer (but if multiple rows are added, the latest written one wins).
+ */
+ public Row.Writer staticWriter()
+ {
+ return createStaticWriter();
+ }
+
+ /**
+ * Returns a row writer to add (non-static) rows to this partition update.
+ *
+ * @return a row writer to add (non-static) rows to this partition update.
+ * Multiple rows can be successively added this way and the rows added do not have
+ * to be in clustering order. Further, the same row can be added multiple time.
+ *
+ */
+ public Row.Writer writer()
+ {
+ if (isSorted)
+ throw new IllegalStateException("An update should not written again once it has been read");
+
+ return writer;
+ }
+
+ /**
+ * Returns a range tombstone marker writer to add range tombstones to this
+ * partition update.
+ * <p>
+ * Note that if more convenient, range tombstones can also be added using
+ * {@link addRangeTombstone}.
+ *
+ * @param isReverseOrder whether the range tombstone marker will be provided to the returned writer
+ * in clustering order or in reverse clustering order.
+ * @return a range tombstone marker writer to add range tombstones to this update.
+ */
+ public RangeTombstoneMarker.Writer markerWriter(boolean isReverseOrder)
+ {
+ return new RangeTombstoneCollector(isReverseOrder);
+ }
+
+ /**
+ * The number of rows contained in this update.
+ *
+ * @return the number of rows contained in this update.
+ */
+ public int size()
+ {
+ return rows;
+ }
+
+ private void maybeSort()
+ {
+ if (isSorted)
+ return;
+
+ sort();
+ }
+
+ private synchronized void sort()
+ {
+ if (isSorted)
+ return;
+
+ if (rows <= 1)
+ {
+ isSorted = true;
+ return;
+ }
+
+ // Sort the rows - will still potentially contain duplicate (non-reconciled) rows
+ Sorting.sort(this);
+
+ // Now find duplicates and merge them together
+ int previous = 0; // The last element that was set
+ int nowInSec = FBUtilities.nowInSeconds();
+ for (int current = 1; current < rows; current++)
+ {
+ // There is really only 2 possible comparison: < 0 or == 0 since we've sorted already
+ int cmp = compare(previous, current);
+ if (cmp == 0)
+ {
+ // current and previous are the same row. Merge current into previous
+ // (and so previous + 1 will be "free").
+ data.merge(current, previous, nowInSec);
+ }
+ else
+ {
+ // data[current] != [previous], so move current just after previous if needs be
+ ++previous;
+ if (previous != current)
+ data.move(current, previous);
+ }
+ }
+
+ // previous is on the last value to keep
+ rows = previous + 1;
+
+ isSorted = true;
+ }
+
+ /**
+ * This method is note meant to be used externally: it is only public so this
+ * update conform to the {@link Sorting.Sortable} interface.
+ */
+ public int compare(int i, int j)
+ {
+ return metadata.comparator.compare(p1.setTo(i), p2.setTo(j));
+ }
+
+ protected class StaticWriter extends StaticRow.Builder
+ {
+ protected StaticWriter()
+ {
+ super(columns.statics, false, metadata().isCounter());
+ }
+
+ @Override
+ public void endOfRow()
+ {
+ super.endOfRow();
+ if (staticRow == null)
+ {
+ staticRow = build();
+ }
+ else
+ {
+ StaticRow.Builder builder = StaticRow.builder(columns.statics, true, metadata().isCounter());
+ Rows.merge(staticRow, build(), columns.statics, builder, FBUtilities.nowInSeconds());
+ staticRow = builder.build();
+ }
+ }
+ }
+
+ protected class RegularWriter extends Writer
+ {
+ // For complex column, the writer assumptions is that for a given row, cells of different
+ // complex columns are not intermingled (they also should be in cellPath order). We however
+ // don't yet guarantee that this will be the case for updates (both UpdateStatement and
+ // RowUpdateBuilder can potentially break that assumption; we could change those classes but
+ // that's non trivial, at least for UpdateStatement).
+ // To deal with that problem, we record which complex columns have been updated (for the current
+ // row) and if we detect a violation of our assumption, we switch the row we're writing
+ // into (which is ok because everything will be sorted and merged in maybeSort()).
+ private final Set<ColumnDefinition> updatedComplex = new HashSet();
+ private ColumnDefinition lastUpdatedComplex;
+ private CellPath lastUpdatedComplexPath;
+
+ public RegularWriter()
+ {
+ super(false);
+ }
+
+ @Override
+ public void writeCell(ColumnDefinition column, boolean isCounter, ByteBuffer value, LivenessInfo info, CellPath path)
+ {
+ if (column.isComplex())
+ {
+ if (updatedComplex.contains(column)
+ && (!column.equals(lastUpdatedComplex) || (column.cellPathComparator().compare(path, lastUpdatedComplexPath)) <= 0))
+ {
+ // We've updated that complex already, but we've either updated another complex or it's not in order: as this
+ // break the writer assumption, switch rows.
+ endOfRow();
+
+ // Copy the clustering values from the previous row
+ int clusteringSize = metadata.clusteringColumns().size();
+ int base = (row - 1) * clusteringSize;
+ for (int i = 0; i < clusteringSize; i++)
+ writer.writeClusteringValue(clusterings[base + i]);
+
+ updatedComplex.clear();
+ }
+
+ lastUpdatedComplex = column;
+ lastUpdatedComplexPath = path;
+ updatedComplex.add(column);
+ }
+ super.writeCell(column, isCounter, value, info, path);
+ }
+
+ @Override
+ public void endOfRow()
+ {
+ super.endOfRow();
+ clear();
+ }
+
+ @Override
+ public Writer reset()
+ {
+ super.reset();
+ clear();
+ return this;
+ }
+
+ private void clear()
+ {
+ updatedComplex.clear();
+ lastUpdatedComplex = null;
+ lastUpdatedComplexPath = null;
+ }
+ }
+
+ public static class PartitionUpdateSerializer
+ {
+ public void serialize(PartitionUpdate update, DataOutputPlus out, int version) throws IOException
+ {
+ if (version < MessagingService.VERSION_30)
+ {
+ // TODO
+ throw new UnsupportedOperationException();
+
+ // if (cf == null)
+ // {
+ // out.writeBoolean(false);
+ // return;
+ // }
+
+ // out.writeBoolean(true);
+ // serializeCfId(cf.id(), out, version);
+ // cf.getComparator().deletionInfoSerializer().serialize(cf.deletionInfo(), out, version);
+ // ColumnSerializer columnSerializer = cf.getComparator().columnSerializer();
+ // int count = cf.getColumnCount();
+ // out.writeInt(count);
+ // int written = 0;
+ // for (Cell cell : cf)
+ // {
+ // columnSerializer.serialize(cell, out);
+ // written++;
+ // }
+ // assert count == written: "Table had " + count + " columns, but " + written + " written";
+ }
+
+ try (UnfilteredRowIterator iter = update.sliceableUnfilteredIterator())
+ {
+ assert !iter.isReverseOrder();
+ UnfilteredRowIteratorSerializer.serializer.serialize(iter, out, version, update.rows);
+ }
+ }
+
+ public PartitionUpdate deserialize(DataInput in, int version, SerializationHelper.Flag flag, DecoratedKey key) throws IOException
+ {
+ if (version < MessagingService.VERSION_30)
+ {
+ assert key != null;
+
+ // This is only used in mutation, and mutation have never allowed "null" column families
+ boolean present = in.readBoolean();
+ assert present;
+
+ CFMetaData metadata = CFMetaData.serializer.deserialize(in, version);
+ LegacyLayout.LegacyDeletionInfo info = LegacyLayout.LegacyDeletionInfo.serializer.deserialize(metadata, in, version);
+ int size = in.readInt();
+ Iterator<LegacyLayout.LegacyCell> cells = LegacyLayout.deserializeCells(metadata, in, flag, size);
+ SerializationHelper helper = new SerializationHelper(version, flag);
+ try (UnfilteredRowIterator iterator = LegacyLayout.onWireCellstoUnfilteredRowIterator(metadata, key, info, cells, false, helper))
+ {
+ return UnfilteredRowIterators.toUpdate(iterator);
+ }
+ }
+
+ assert key == null; // key is only there for the old format
+
+ UnfilteredRowIteratorSerializer.Header h = UnfilteredRowIteratorSerializer.serializer.deserializeHeader(in, version, flag);
+ if (h.isEmpty)
+ return emptyUpdate(h.metadata, h.key);
+
+ assert !h.isReversed;
+ assert h.rowEstimate >= 0;
+ PartitionUpdate upd = new PartitionUpdate(h.metadata,
+ h.key,
+ new DeletionInfo(h.partitionDeletion),
+ new RowDataBlock(h.sHeader.columns().regulars, h.rowEstimate, false, h.metadata.isCounter()),
+ h.sHeader.columns(),
+ h.rowEstimate);
+
+ upd.staticRow = h.staticRow;
+
+ RangeTombstoneMarker.Writer markerWriter = upd.markerWriter(false);
+ UnfilteredRowIteratorSerializer.serializer.deserialize(in, new SerializationHelper(version, flag), h.sHeader, upd.writer(), markerWriter);
+
+ // Mark sorted after we're read it all since that's what we use in the writer() method to detect bad uses
+ upd.isSorted = true;
+
+ return upd;
+ }
+
+ public long serializedSize(PartitionUpdate update, int version, TypeSizes sizes)
+ {
+ if (version < MessagingService.VERSION_30)
+ {
+ // TODO
+ throw new UnsupportedOperationException("Version is " + version);
+ //if (cf == null)
+ //{
+ // return typeSizes.sizeof(false);
+ //}
+ //else
+ //{
+ // return typeSizes.sizeof(true) /* nullness bool */
+ // + cfIdSerializedSize(cf.id(), typeSizes, version) /* id */
+ // + contentSerializedSize(cf, typeSizes, version);
+ //}
+ }
+
+ try (UnfilteredRowIterator iter = update.sliceableUnfilteredIterator())
+ {
+ return UnfilteredRowIteratorSerializer.serializer.serializedSize(iter, version, update.rows, sizes);
+ }
+ }
+ }
+
+ /**
+ * A counter mark is basically a pointer to a counter update inside this partition update. That pointer allows
+ * us to update the counter value based on the pre-existing value read during the read-before-write that counters
+ * do. See {@link CounterMutation} to understand how this is used.
+ */
+ public class CounterMark
+ {
+ private final InternalReusableClustering clustering;
+ private final int row;
+ private final ColumnDefinition column;
+ private final CellPath path;
+
+ private CounterMark(InternalReusableClustering clustering, int row, ColumnDefinition column, CellPath path)
+ {
+ this.clustering = clustering;
+ this.row = row;
+ this.column = column;
+ this.path = path;
+ }
+
+ public Clustering clustering()
+ {
+ return clustering.setTo(row);
+ }
+
+ public ColumnDefinition column()
+ {
+ return column;
+ }
+
+ public CellPath path()
+ {
+ return path;
+ }
+
+ public ByteBuffer value()
+ {
+ return data.getValue(row, column, path);
+ }
+
+ public void setValue(ByteBuffer value)
+ {
+ data.setValue(row, column, path, value);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/partitions/SingletonUnfilteredPartitionIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/partitions/SingletonUnfilteredPartitionIterator.java b/src/java/org/apache/cassandra/db/partitions/SingletonUnfilteredPartitionIterator.java
new file mode 100644
index 0000000..e2fec05
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/partitions/SingletonUnfilteredPartitionIterator.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.partitions;
+
+import java.util.NoSuchElementException;
+
+import org.apache.cassandra.db.rows.UnfilteredRowIterator;
+
+public class SingletonUnfilteredPartitionIterator implements UnfilteredPartitionIterator
+{
+ private final UnfilteredRowIterator iter;
+ private final boolean isForThrift;
+ private boolean returned;
+
+ public SingletonUnfilteredPartitionIterator(UnfilteredRowIterator iter, boolean isForThrift)
+ {
+ this.iter = iter;
+ this.isForThrift = isForThrift;
+ }
+
+ public boolean isForThrift()
+ {
+ return isForThrift;
+ }
+
+ public boolean hasNext()
+ {
+ return !returned;
+ }
+
+ public UnfilteredRowIterator next()
+ {
+ if (returned)
+ throw new NoSuchElementException();
+
+ returned = true;
+ return iter;
+ }
+
+ public void remove()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public void close()
+ {
+ iter.close();
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/partitions/TombstonePurgingPartitionIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/partitions/TombstonePurgingPartitionIterator.java b/src/java/org/apache/cassandra/db/partitions/TombstonePurgingPartitionIterator.java
new file mode 100644
index 0000000..10022eb
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/partitions/TombstonePurgingPartitionIterator.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.partitions;
+
+import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.rows.*;
+
+public abstract class TombstonePurgingPartitionIterator extends FilteringPartitionIterator
+{
+ private final int gcBefore;
+
+ public TombstonePurgingPartitionIterator(UnfilteredPartitionIterator iterator, int gcBefore)
+ {
+ super(iterator);
+ this.gcBefore = gcBefore;
+ }
+
+ protected abstract long getMaxPurgeableTimestamp();
+
+ protected FilteringRow makeRowFilter()
+ {
+ return new FilteringRow()
+ {
+ @Override
+ protected boolean include(LivenessInfo info)
+ {
+ return !info.hasLocalDeletionTime() || !info.isPurgeable(getMaxPurgeableTimestamp(), gcBefore);
+ }
+
+ @Override
+ protected boolean include(DeletionTime dt)
+ {
+ return includeDelTime(dt);
+ }
+
+ @Override
+ protected boolean include(ColumnDefinition c, DeletionTime dt)
+ {
+ return includeDelTime(dt);
+ }
+ };
+ }
+
+ private boolean includeDelTime(DeletionTime dt)
+ {
+ return dt.isLive() || !dt.isPurgeable(getMaxPurgeableTimestamp(), gcBefore);
+ }
+
+ @Override
+ protected boolean includePartitionDeletion(DeletionTime dt)
+ {
+ return includeDelTime(dt);
+ }
+
+ @Override
+ protected boolean includeRangeTombstoneMarker(RangeTombstoneMarker marker)
+ {
+ if (marker.isBoundary())
+ {
+ // We can only skip the whole marker if both deletion time are purgeable.
+ // If only one of them is, filterTombstoneMarker will deal with it.
+ RangeTombstoneBoundaryMarker boundary = (RangeTombstoneBoundaryMarker)marker;
+ return includeDelTime(boundary.endDeletionTime()) || includeDelTime(boundary.startDeletionTime());
+ }
+ else
+ {
+ return includeDelTime(((RangeTombstoneBoundMarker)marker).deletionTime());
+ }
+ }
+
+ @Override
+ protected RangeTombstoneMarker filterRangeTombstoneMarker(RangeTombstoneMarker marker, boolean reversed)
+ {
+ if (!marker.isBoundary())
+ return marker;
+
+ // Note that we know this is called after includeRangeTombstoneMarker. So if one of the deletion time is
+ // purgeable, we know the other one isn't.
+ RangeTombstoneBoundaryMarker boundary = (RangeTombstoneBoundaryMarker)marker;
+ if (!(includeDelTime(boundary.closeDeletionTime(reversed))))
+ return boundary.createCorrespondingCloseBound(reversed);
+ else if (!(includeDelTime(boundary.openDeletionTime(reversed))))
+ return boundary.createCorrespondingOpenBound(reversed);
+ return boundary;
+ }
+
+};
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterator.java b/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterator.java
new file mode 100644
index 0000000..2447da8
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterator.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.partitions;
+
+import java.util.Iterator;
+
+import org.apache.cassandra.db.rows.UnfilteredRowIterator;
+
+/**
+ * An iterator over a number of unfiltered partitions (i.e. partitions containing deletion informations).
+ *
+ * The object returned by a call to next() is only guaranteed to be
+ * valid until the next call to hasNext() or next(). If a consumer wants to keep a
+ * reference on the returned objects for longer than the iteration, it must
+ * make a copy of it explicitely.
+ */
+public interface UnfilteredPartitionIterator extends Iterator<UnfilteredRowIterator>, AutoCloseable
+{
+ /**
+ * Whether that partition iterator is for a thrift queries.
+ * <p>
+ * If this is true, the partition iterator may return some empty UnfilteredRowIterator and those
+ * should be preserved as thrift include partitions that "exists" (have some cells even
+ * if this are actually deleted) but have nothing matching the query.
+ *
+ * @return whether the iterator is for a thrift query.
+ */
+ public boolean isForThrift();
+
+ public void close();
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java b/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java
new file mode 100644
index 0000000..f66ec11
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java
@@ -0,0 +1,503 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.partitions;
+
+import java.io.DataInput;
+import java.io.IOError;
+import java.io.IOException;
+import java.security.MessageDigest;
+import java.util.*;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.cql3.ColumnIdentifier;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.utils.MergeIterator;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Static methods to work with partition iterators.
+ */
+public abstract class UnfilteredPartitionIterators
+{
+ private static final Logger logger = LoggerFactory.getLogger(UnfilteredPartitionIterators.class);
+
+ private static final Serializer serializer = new Serializer();
+
+ private static final Comparator<UnfilteredRowIterator> partitionComparator = new Comparator<UnfilteredRowIterator>()
+ {
+ public int compare(UnfilteredRowIterator p1, UnfilteredRowIterator p2)
+ {
+ return p1.partitionKey().compareTo(p2.partitionKey());
+ }
+ };
+
+ public static final UnfilteredPartitionIterator EMPTY = new AbstractUnfilteredPartitionIterator()
+ {
+ public boolean isForThrift()
+ {
+ return false;
+ }
+
+ public boolean hasNext()
+ {
+ return false;
+ }
+
+ public UnfilteredRowIterator next()
+ {
+ throw new NoSuchElementException();
+ }
+ };
+
+ private UnfilteredPartitionIterators() {}
+
+ public interface MergeListener
+ {
+ public UnfilteredRowIterators.MergeListener getRowMergeListener(DecoratedKey partitionKey, List<UnfilteredRowIterator> versions);
+ public void close();
+ }
+
+ @SuppressWarnings("resource") // The created resources are returned right away
+ public static UnfilteredRowIterator getOnlyElement(final UnfilteredPartitionIterator iter, SinglePartitionReadCommand<?> command)
+ {
+ // If the query has no results, we'll get an empty iterator, but we still
+ // want a RowIterator out of this method, so we return an empty one.
+ UnfilteredRowIterator toReturn = iter.hasNext()
+ ? iter.next()
+ : UnfilteredRowIterators.emptyIterator(command.metadata(),
+ command.partitionKey(),
+ command.clusteringIndexFilter().isReversed());
+
+ // Note that in general, we should wrap the result so that it's close method actually
+ // close the whole UnfilteredPartitionIterator.
+ return new WrappingUnfilteredRowIterator(toReturn)
+ {
+ public void close()
+ {
+ try
+ {
+ super.close();
+ }
+ finally
+ {
+ // asserting this only now because it bothers Serializer if hasNext() is called before
+ // the previously returned iterator hasn't been fully consumed.
+ assert !iter.hasNext();
+
+ iter.close();
+ }
+ }
+ };
+ }
+
+ public static PartitionIterator mergeAndFilter(List<UnfilteredPartitionIterator> iterators, int nowInSec, MergeListener listener)
+ {
+ // TODO: we could have a somewhat faster version if we were to merge the UnfilteredRowIterators directly as RowIterators
+ return filter(merge(iterators, nowInSec, listener), nowInSec);
+ }
+
+ public static PartitionIterator filter(final UnfilteredPartitionIterator iterator, final int nowInSec)
+ {
+ return new PartitionIterator()
+ {
+ private RowIterator next;
+
+ public boolean hasNext()
+ {
+ while (next == null && iterator.hasNext())
+ {
+ @SuppressWarnings("resource") // closed either directly if empty, or, if assigned to next, by either
+ // the caller of next() or close()
+ UnfilteredRowIterator rowIterator = iterator.next();
+ next = UnfilteredRowIterators.filter(rowIterator, nowInSec);
+ if (!iterator.isForThrift() && next.isEmpty())
+ {
+ rowIterator.close();
+ next = null;
+ }
+ }
+ return next != null;
+ }
+
+ public RowIterator next()
+ {
+ if (next == null && !hasNext())
+ throw new NoSuchElementException();
+
+ RowIterator toReturn = next;
+ next = null;
+ return toReturn;
+ }
+
+ public void remove()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public void close()
+ {
+ try
+ {
+ iterator.close();
+ }
+ finally
+ {
+ if (next != null)
+ next.close();
+ }
+ }
+ };
+ }
+
+ public static UnfilteredPartitionIterator merge(final List<? extends UnfilteredPartitionIterator> iterators, final int nowInSec, final MergeListener listener)
+ {
+ assert listener != null;
+ assert !iterators.isEmpty();
+
+ final boolean isForThrift = iterators.get(0).isForThrift();
+
+ final MergeIterator<UnfilteredRowIterator, UnfilteredRowIterator> merged = MergeIterator.get(iterators, partitionComparator, new MergeIterator.Reducer<UnfilteredRowIterator, UnfilteredRowIterator>()
+ {
+ private final List<UnfilteredRowIterator> toMerge = new ArrayList<>(iterators.size());
+
+ private CFMetaData metadata;
+ private DecoratedKey partitionKey;
+ private boolean isReverseOrder;
+
+ public void reduce(int idx, UnfilteredRowIterator current)
+ {
+ metadata = current.metadata();
+ partitionKey = current.partitionKey();
+ isReverseOrder = current.isReverseOrder();
+
+ // Note that because the MergeListener cares about it, we want to preserve the index of the iterator.
+ // Non-present iterator will thus be set to empty in getReduced.
+ toMerge.set(idx, current);
+ }
+
+ protected UnfilteredRowIterator getReduced()
+ {
+ UnfilteredRowIterators.MergeListener rowListener = listener.getRowMergeListener(partitionKey, toMerge);
+
+ // Replace nulls by empty iterators
+ for (int i = 0; i < toMerge.size(); i++)
+ if (toMerge.get(i) == null)
+ toMerge.set(i, UnfilteredRowIterators.emptyIterator(metadata, partitionKey, isReverseOrder));
+
+ return UnfilteredRowIterators.merge(toMerge, nowInSec, rowListener);
+ }
+
+ protected void onKeyChange()
+ {
+ toMerge.clear();
+ for (int i = 0; i < iterators.size(); i++)
+ toMerge.add(null);
+ }
+ });
+
+ return new AbstractUnfilteredPartitionIterator()
+ {
+ public boolean isForThrift()
+ {
+ return isForThrift;
+ }
+
+ public boolean hasNext()
+ {
+ return merged.hasNext();
+ }
+
+ public UnfilteredRowIterator next()
+ {
+ return merged.next();
+ }
+
+ @Override
+ public void close()
+ {
+ merged.close();
+ }
+ };
+ }
+
+ /**
+ * Convert all expired cells to equivalent tombstones.
+ * <p>
+ * See {@link UnfilteredRowIterators#convertExpiredCellsToTombstones} for details.
+ *
+ * @param iterator the iterator in which to conver expired cells.
+ * @param nowInSec the current time to use to decide if a cell is expired.
+ * @return an iterator that returns the same data than {@code iterator} but with all expired cells converted
+ * to equivalent tombstones.
+ */
+ public static UnfilteredPartitionIterator convertExpiredCellsToTombstones(UnfilteredPartitionIterator iterator, final int nowInSec)
+ {
+ return new WrappingUnfilteredPartitionIterator(iterator)
+ {
+ @Override
+ protected UnfilteredRowIterator computeNext(UnfilteredRowIterator iter)
+ {
+ return UnfilteredRowIterators.convertExpiredCellsToTombstones(iter, nowInSec);
+ }
+ };
+ }
+
+ public static UnfilteredPartitionIterator mergeLazily(final List<? extends UnfilteredPartitionIterator> iterators, final int nowInSec)
+ {
+ assert !iterators.isEmpty();
+
+ if (iterators.size() == 1)
+ return iterators.get(0);
+
+ final boolean isForThrift = iterators.get(0).isForThrift();
+
+ final MergeIterator<UnfilteredRowIterator, UnfilteredRowIterator> merged = MergeIterator.get(iterators, partitionComparator, new MergeIterator.Reducer<UnfilteredRowIterator, UnfilteredRowIterator>()
+ {
+ private final List<UnfilteredRowIterator> toMerge = new ArrayList<>(iterators.size());
+
+ @Override
+ public boolean trivialReduceIsTrivial()
+ {
+ return false;
+ }
+
+ public void reduce(int idx, UnfilteredRowIterator current)
+ {
+ toMerge.add(current);
+ }
+
+ protected UnfilteredRowIterator getReduced()
+ {
+ return new LazilyInitializedUnfilteredRowIterator(toMerge.get(0).partitionKey())
+ {
+ protected UnfilteredRowIterator initializeIterator()
+ {
+ return UnfilteredRowIterators.merge(toMerge, nowInSec);
+ }
+ };
+ }
+
+ protected void onKeyChange()
+ {
+ toMerge.clear();
+ }
+ });
+
+ return new AbstractUnfilteredPartitionIterator()
+ {
+ public boolean isForThrift()
+ {
+ return isForThrift;
+ }
+
+ public boolean hasNext()
+ {
+ return merged.hasNext();
+ }
+
+ public UnfilteredRowIterator next()
+ {
+ return merged.next();
+ }
+
+ @Override
+ public void close()
+ {
+ merged.close();
+ }
+ };
+ }
+
+ public static UnfilteredPartitionIterator removeDroppedColumns(UnfilteredPartitionIterator iterator, final Map<ColumnIdentifier, CFMetaData.DroppedColumn> droppedColumns)
+ {
+ return new FilteringPartitionIterator(iterator)
+ {
+ @Override
+ protected FilteringRow makeRowFilter()
+ {
+ return new FilteringRow()
+ {
+ @Override
+ protected boolean include(Cell cell)
+ {
+ return include(cell.column(), cell.livenessInfo().timestamp());
+ }
+
+ @Override
+ protected boolean include(ColumnDefinition c, DeletionTime dt)
+ {
+ return include(c, dt.markedForDeleteAt());
+ }
+
+ private boolean include(ColumnDefinition column, long timestamp)
+ {
+ CFMetaData.DroppedColumn dropped = droppedColumns.get(column.name);
+ return dropped == null || timestamp > dropped.droppedTime;
+ }
+ };
+ }
+
+ @Override
+ protected boolean shouldFilter(UnfilteredRowIterator iterator)
+ {
+ // TODO: We could have row iterators return the smallest timestamp they might return
+ // (which we can get from sstable stats), and ignore any dropping if that smallest
+ // timestamp is bigger that the biggest droppedColumns timestamp.
+
+ // If none of the dropped columns is part of the columns that the iterator actually returns, there is nothing to do;
+ for (ColumnDefinition c : iterator.columns())
+ if (droppedColumns.containsKey(c.name))
+ return true;
+
+ return false;
+ }
+ };
+ }
+
+ public static void digest(UnfilteredPartitionIterator iterator, MessageDigest digest)
+ {
+ try (UnfilteredPartitionIterator iter = iterator)
+ {
+ while (iter.hasNext())
+ {
+ try (UnfilteredRowIterator partition = iter.next())
+ {
+ UnfilteredRowIterators.digest(partition, digest);
+ }
+ }
+ }
+ }
+
+ public static Serializer serializerForIntraNode()
+ {
+ return serializer;
+ }
+
+ /**
+ * Wraps the provided iterator so it logs the returned rows/RT for debugging purposes.
+ * <p>
+ * Note that this is only meant for debugging as this can log a very large amount of
+ * logging at INFO.
+ */
+ public static UnfilteredPartitionIterator loggingIterator(UnfilteredPartitionIterator iterator, final String id, final boolean fullDetails)
+ {
+ return new WrappingUnfilteredPartitionIterator(iterator)
+ {
+ public UnfilteredRowIterator next()
+ {
+ return UnfilteredRowIterators.loggingIterator(super.next(), id, fullDetails);
+ }
+ };
+ }
+
+ /**
+ * Serialize each UnfilteredSerializer one after the other, with an initial byte that indicates whether
+ * we're done or not.
+ */
+ public static class Serializer
+ {
+ public void serialize(UnfilteredPartitionIterator iter, DataOutputPlus out, int version) throws IOException
+ {
+ if (version < MessagingService.VERSION_30)
+ throw new UnsupportedOperationException();
+
+ out.writeBoolean(iter.isForThrift());
+ while (iter.hasNext())
+ {
+ out.writeBoolean(true);
+ try (UnfilteredRowIterator partition = iter.next())
+ {
+ UnfilteredRowIteratorSerializer.serializer.serialize(partition, out, version);
+ }
+ }
+ out.writeBoolean(false);
+ }
+
+ public UnfilteredPartitionIterator deserialize(final DataInput in, final int version, final SerializationHelper.Flag flag) throws IOException
+ {
+ if (version < MessagingService.VERSION_30)
+ throw new UnsupportedOperationException();
+
+ final boolean isForThrift = in.readBoolean();
+
+ return new AbstractUnfilteredPartitionIterator()
+ {
+ private UnfilteredRowIterator next;
+ private boolean hasNext;
+ private boolean nextReturned = true;
+
+ public boolean isForThrift()
+ {
+ return isForThrift;
+ }
+
+ public boolean hasNext()
+ {
+ if (!nextReturned)
+ return hasNext;
+
+ // We can't answer this until the previously returned iterator has been fully consumed,
+ // so complain if that's not the case.
+ if (next != null && next.hasNext())
+ throw new IllegalStateException("Cannot call hasNext() until the previous iterator has been fully consumed");
+
+ try
+ {
+ hasNext = in.readBoolean();
+ nextReturned = false;
+ return hasNext;
+ }
+ catch (IOException e)
+ {
+ throw new IOError(e);
+ }
+ }
+
+ public UnfilteredRowIterator next()
+ {
+ if (nextReturned && !hasNext())
+ throw new NoSuchElementException();
+
+ try
+ {
+ nextReturned = true;
+ next = UnfilteredRowIteratorSerializer.serializer.deserialize(in, version, flag);
+ return next;
+ }
+ catch (IOException e)
+ {
+ throw new IOError(e);
+ }
+ }
+
+ @Override
+ public void close()
+ {
+ if (next != null)
+ next.close();
+ }
+ };
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/partitions/WrappingPartitionIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/partitions/WrappingPartitionIterator.java b/src/java/org/apache/cassandra/db/partitions/WrappingPartitionIterator.java
new file mode 100644
index 0000000..4d4be70
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/partitions/WrappingPartitionIterator.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.partitions;
+
+import org.apache.cassandra.db.rows.RowIterator;
+
+public abstract class WrappingPartitionIterator implements PartitionIterator
+{
+ protected final PartitionIterator wrapped;
+
+ protected WrappingPartitionIterator(PartitionIterator wrapped)
+ {
+ this.wrapped = wrapped;
+ }
+
+ public boolean hasNext()
+ {
+ return wrapped.hasNext();
+ }
+
+ public RowIterator next()
+ {
+ return wrapped.next();
+ }
+
+ public void remove()
+ {
+ wrapped.remove();
+ }
+
+ public void close()
+ {
+ wrapped.close();
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/partitions/WrappingUnfilteredPartitionIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/partitions/WrappingUnfilteredPartitionIterator.java b/src/java/org/apache/cassandra/db/partitions/WrappingUnfilteredPartitionIterator.java
new file mode 100644
index 0000000..4f35075
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/partitions/WrappingUnfilteredPartitionIterator.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.partitions;
+
+import org.apache.cassandra.db.rows.UnfilteredRowIterator;
+import org.apache.cassandra.db.rows.UnfilteredRowIterators;
+
+/**
+ * A utility class for writing partition iterators that filter/modify other
+ * partition iterators.
+ *
+ * This work a little bit like Guava's AbstractIterator in that you only need
+ * to implement the computeNext() method, though that method takes as argument
+ * the UnfilteredRowIterator to filter from the wrapped partition iterator.
+ */
+public abstract class WrappingUnfilteredPartitionIterator extends AbstractUnfilteredPartitionIterator
+{
+ protected final UnfilteredPartitionIterator wrapped;
+
+ private UnfilteredRowIterator next;
+
+ protected WrappingUnfilteredPartitionIterator(UnfilteredPartitionIterator wrapped)
+ {
+ this.wrapped = wrapped;
+ }
+
+ public boolean isForThrift()
+ {
+ return wrapped.isForThrift();
+ }
+
+ public boolean hasNext()
+ {
+ prepareNext();
+ return next != null;
+ }
+
+ public UnfilteredRowIterator next()
+ {
+ prepareNext();
+ assert next != null;
+
+ UnfilteredRowIterator toReturn = next;
+ next = null;
+ return toReturn;
+ }
+
+ private void prepareNext()
+ {
+ while (next == null && wrapped.hasNext())
+ {
+ @SuppressWarnings("resource") // Closed on exception, right away if empty or ignored by computeNext, or if assigned to 'next',
+ // either by the caller to next(), or in close().
+ UnfilteredRowIterator wrappedNext = wrapped.next();
+ try
+ {
+ UnfilteredRowIterator maybeNext = computeNext(wrappedNext);
+
+ // As the wrappd iterator shouldn't return an empty iterator, if computeNext
+ // gave us back it's input we save the isEmpty check.
+ if (maybeNext != null && (isForThrift() || maybeNext == wrappedNext || !maybeNext.isEmpty()))
+ {
+ next = maybeNext;
+ return;
+ }
+ else
+ {
+ wrappedNext.close();
+ }
+ }
+ catch (RuntimeException | Error e)
+ {
+ wrappedNext.close();
+ throw e;
+ }
+ }
+ }
+
+ /**
+ * Given the next UnfilteredRowIterator from the wrapped partition iterator, return
+ * the (potentially modified) UnfilteredRowIterator to return. Please note that the
+ * result will be skipped if it's either {@code null} of if it's empty.
+ *
+ * The default implementation return it's input unchanged to make it easier
+ * to write wrapping partition iterators that only change the close method.
+ */
+ protected UnfilteredRowIterator computeNext(UnfilteredRowIterator iter)
+ {
+ return iter;
+ }
+
+ @Override
+ public void close()
+ {
+ try
+ {
+ wrapped.close();
+ }
+ finally
+ {
+ if (next != null)
+ next.close();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/rows/AbstractCell.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/AbstractCell.java b/src/java/org/apache/cassandra/db/rows/AbstractCell.java
new file mode 100644
index 0000000..c003d6f
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/rows/AbstractCell.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.rows;
+
+import java.security.MessageDigest;
+import java.util.Objects;
+
+import org.apache.cassandra.db.context.CounterContext;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.marshal.CollectionType;
+import org.apache.cassandra.serializers.MarshalException;
+import org.apache.cassandra.utils.FBUtilities;
+
+/**
+ * Base abstract class for {@code Cell} implementations.
+ *
+ * Unless you have a very good reason not to, every cell implementation
+ * should probably extend this class.
+ */
+public abstract class AbstractCell implements Cell
+{
+ public boolean isLive(int nowInSec)
+ {
+ return livenessInfo().isLive(nowInSec);
+ }
+
+ public boolean isTombstone()
+ {
+ return livenessInfo().hasLocalDeletionTime() && !livenessInfo().hasTTL();
+ }
+
+ public boolean isExpiring()
+ {
+ return livenessInfo().hasTTL();
+ }
+
+ public void writeTo(Row.Writer writer)
+ {
+ writer.writeCell(column(), isCounterCell(), value(), livenessInfo(), path());
+ }
+
+ public void digest(MessageDigest digest)
+ {
+ digest.update(value().duplicate());
+ livenessInfo().digest(digest);
+ FBUtilities.updateWithBoolean(digest, isCounterCell());
+ if (path() != null)
+ path().digest(digest);
+ }
+
+ public void validate()
+ {
+ column().validateCellValue(value());
+
+ livenessInfo().validate();
+
+ // If cell is a tombstone, it shouldn't have a value.
+ if (isTombstone() && value().hasRemaining())
+ throw new MarshalException("A tombstone should not have a value");
+
+ if (path() != null)
+ column().validateCellPath(path());
+ }
+
+ public int dataSize()
+ {
+ int size = value().remaining() + livenessInfo().dataSize();
+ if (path() != null)
+ size += path().dataSize();
+ return size;
+
+ }
+
+ @Override
+ public boolean equals(Object other)
+ {
+ if(!(other instanceof Cell))
+ return false;
+
+ Cell that = (Cell)other;
+ return this.column().equals(that.column())
+ && this.isCounterCell() == that.isCounterCell()
+ && Objects.equals(this.value(), that.value())
+ && Objects.equals(this.livenessInfo(), that.livenessInfo())
+ && Objects.equals(this.path(), that.path());
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hash(column(), isCounterCell(), value(), livenessInfo(), path());
+ }
+
+ @Override
+ public String toString()
+ {
+ if (isCounterCell())
+ return String.format("[%s=%d ts=%d]", column().name, CounterContext.instance().total(value()), livenessInfo().timestamp());
+
+ AbstractType<?> type = column().type;
+ if (type instanceof CollectionType && type.isMultiCell())
+ {
+ CollectionType ct = (CollectionType)type;
+ return String.format("[%s[%s]=%s info=%s]",
+ column().name,
+ ct.nameComparator().getString(path().get(0)),
+ ct.valueComparator().getString(value()),
+ livenessInfo());
+ }
+ return String.format("[%s=%s info=%s]", column().name, type.getString(value()), livenessInfo());
+ }
+
+ public Cell takeAlias()
+ {
+ // Cell is always used as an Aliasable object but as the code currently
+ // never need to alias a cell outside of its valid scope, we don't yet
+ // need that.
+ throw new UnsupportedOperationException();
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/rows/AbstractRangeTombstoneMarker.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/AbstractRangeTombstoneMarker.java b/src/java/org/apache/cassandra/db/rows/AbstractRangeTombstoneMarker.java
new file mode 100644
index 0000000..d8256fc
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/rows/AbstractRangeTombstoneMarker.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.rows;
+
+import java.nio.ByteBuffer;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.*;
+
+public abstract class AbstractRangeTombstoneMarker implements RangeTombstoneMarker
+{
+ protected final RangeTombstone.Bound bound;
+
+ protected AbstractRangeTombstoneMarker(RangeTombstone.Bound bound)
+ {
+ this.bound = bound;
+ }
+
+ public RangeTombstone.Bound clustering()
+ {
+ return bound;
+ }
+
+ public Unfiltered.Kind kind()
+ {
+ return Unfiltered.Kind.RANGE_TOMBSTONE_MARKER;
+ }
+
+ public void validateData(CFMetaData metadata)
+ {
+ Slice.Bound bound = clustering();
+ for (int i = 0; i < bound.size(); i++)
+ {
+ ByteBuffer value = bound.get(i);
+ if (value != null)
+ metadata.comparator.subtype(i).validate(value);
+ }
+ }
+
+ public String toString(CFMetaData metadata, boolean fullDetails)
+ {
+ return toString(metadata);
+ }
+
+ protected void copyBoundTo(RangeTombstoneMarker.Writer writer)
+ {
+ for (int i = 0; i < bound.size(); i++)
+ writer.writeClusteringValue(bound.get(i));
+ writer.writeBoundKind(bound.kind());
+ }
+
+ public Unfiltered takeAlias()
+ {
+ return this;
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/rows/AbstractReusableRow.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/AbstractReusableRow.java b/src/java/org/apache/cassandra/db/rows/AbstractReusableRow.java
new file mode 100644
index 0000000..03aeb88
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/rows/AbstractReusableRow.java
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.rows;
+
+import java.util.Iterator;
+
+import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.utils.SearchIterator;
+
+public abstract class AbstractReusableRow extends AbstractRow
+{
+ private CellData.ReusableCell simpleCell;
+ private ComplexRowDataBlock.ReusableIterator complexCells;
+ private DeletionTimeArray.Cursor complexDeletionCursor;
+ private RowDataBlock.ReusableIterator iterator;
+
+ public AbstractReusableRow()
+ {
+ }
+
+ protected abstract int row();
+ protected abstract RowDataBlock data();
+
+ private CellData.ReusableCell simpleCell()
+ {
+ if (simpleCell == null)
+ simpleCell = SimpleRowDataBlock.reusableCell();
+ return simpleCell;
+ }
+
+ private ComplexRowDataBlock.ReusableIterator complexCells()
+ {
+ if (complexCells == null)
+ complexCells = ComplexRowDataBlock.reusableComplexCells();
+ return complexCells;
+ }
+
+ private DeletionTimeArray.Cursor complexDeletionCursor()
+ {
+ if (complexDeletionCursor == null)
+ complexDeletionCursor = ComplexRowDataBlock.complexDeletionCursor();
+ return complexDeletionCursor;
+ }
+
+ private RowDataBlock.ReusableIterator reusableIterator()
+ {
+ if (iterator == null)
+ iterator = RowDataBlock.reusableIterator();
+ return iterator;
+ }
+
+ public Columns columns()
+ {
+ return data().columns();
+ }
+
+ public Cell getCell(ColumnDefinition c)
+ {
+ assert !c.isComplex();
+ if (data().simpleData == null)
+ return null;
+
+ int idx = columns().simpleIdx(c, 0);
+ if (idx < 0)
+ return null;
+
+ return simpleCell().setTo(data().simpleData.data, c, (row() * columns().simpleColumnCount()) + idx);
+ }
+
+ public Cell getCell(ColumnDefinition c, CellPath path)
+ {
+ assert c.isComplex();
+
+ ComplexRowDataBlock data = data().complexData;
+ if (data == null)
+ return null;
+
+ int idx = data.cellIdx(row(), c, path);
+ if (idx < 0)
+ return null;
+
+ return simpleCell().setTo(data.cellData(row()), c, idx);
+ }
+
+ public Iterator<Cell> getCells(ColumnDefinition c)
+ {
+ assert c.isComplex();
+ return complexCells().setTo(data().complexData, row(), c);
+ }
+
+ public boolean hasComplexDeletion()
+ {
+ return data().hasComplexDeletion(row());
+ }
+
+ public DeletionTime getDeletion(ColumnDefinition c)
+ {
+ assert c.isComplex();
+ if (data().complexData == null)
+ return DeletionTime.LIVE;
+
+ int idx = data().complexData.complexDeletionIdx(row(), c);
+ return idx < 0
+ ? DeletionTime.LIVE
+ : complexDeletionCursor().setTo(data().complexData.complexDelTimes, idx);
+ }
+
+ public Iterator<Cell> iterator()
+ {
+ return reusableIterator().setTo(data(), row());
+ }
+
+ public SearchIterator<ColumnDefinition, ColumnData> searchIterator()
+ {
+ return new SearchIterator<ColumnDefinition, ColumnData>()
+ {
+ private int simpleIdx = 0;
+
+ public boolean hasNext()
+ {
+ // TODO: we can do better, but we expect users to no rely on this anyway
+ return true;
+ }
+
+ public ColumnData next(ColumnDefinition column)
+ {
+ if (column.isComplex())
+ {
+ // TODO: this is sub-optimal
+
+ Iterator<Cell> cells = getCells(column);
+ return cells == null ? null : new ColumnData(column, null, cells, getDeletion(column));
+ }
+ else
+ {
+ int idx = columns().simpleIdx(column, simpleIdx);
+ if (idx < 0)
+ return null;
+
+ Cell cell = simpleCell().setTo(data().simpleData.data, column, (row() * columns().simpleColumnCount()) + idx);
+ simpleIdx = idx + 1;
+ return cell == null ? null : new ColumnData(column, cell, null, null);
+ }
+ }
+ };
+ }
+
+ public Row takeAlias()
+ {
+ final Clustering clustering = clustering().takeAlias();
+ final LivenessInfo info = primaryKeyLivenessInfo().takeAlias();
+ final DeletionTime deletion = deletion().takeAlias();
+
+ final RowDataBlock data = data();
+ final int row = row();
+
+ return new AbstractReusableRow()
+ {
+ protected RowDataBlock data()
+ {
+ return data;
+ }
+
+ protected int row()
+ {
+ return row;
+ }
+
+ public Clustering clustering()
+ {
+ return clustering;
+ }
+
+ public LivenessInfo primaryKeyLivenessInfo()
+ {
+ return info;
+ }
+
+ public DeletionTime deletion()
+ {
+ return deletion;
+ }
+
+ @Override
+ public Row takeAlias()
+ {
+ return this;
+ }
+ };
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/rows/AbstractRow.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/AbstractRow.java b/src/java/org/apache/cassandra/db/rows/AbstractRow.java
new file mode 100644
index 0000000..a99bc78
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/rows/AbstractRow.java
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.rows;
+
+import java.nio.ByteBuffer;
+import java.security.MessageDigest;
+import java.util.Iterator;
+import java.util.Objects;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.marshal.CollectionType;
+import org.apache.cassandra.serializers.MarshalException;
+import org.apache.cassandra.utils.FBUtilities;
+
+/**
+ * Base abstract class for {@code Row} implementations.
+ *
+ * Unless you have a very good reason not to, every row implementation
+ * should probably extend this class.
+ */
+public abstract class AbstractRow implements Row
+{
+ public Unfiltered.Kind kind()
+ {
+ return Unfiltered.Kind.ROW;
+ }
+
+ public boolean hasLiveData(int nowInSec)
+ {
+ if (primaryKeyLivenessInfo().isLive(nowInSec))
+ return true;
+
+ for (Cell cell : this)
+ if (cell.isLive(nowInSec))
+ return true;
+
+ return false;
+ }
+
+ public boolean isEmpty()
+ {
+ return !primaryKeyLivenessInfo().hasTimestamp()
+ && deletion().isLive()
+ && !iterator().hasNext()
+ && !hasComplexDeletion();
+ }
+
+ public boolean isStatic()
+ {
+ return clustering() == Clustering.STATIC_CLUSTERING;
+ }
+
+ public void digest(MessageDigest digest)
+ {
+ FBUtilities.updateWithByte(digest, kind().ordinal());
+ clustering().digest(digest);
+
+ deletion().digest(digest);
+ primaryKeyLivenessInfo().digest(digest);
+
+ Iterator<ColumnDefinition> iter = columns().complexColumns();
+ while (iter.hasNext())
+ getDeletion(iter.next()).digest(digest);
+
+ for (Cell cell : this)
+ cell.digest(digest);
+ }
+
+ /**
+ * Copy this row to the provided writer.
+ *
+ * @param writer the row writer to write this row to.
+ */
+ public void copyTo(Row.Writer writer)
+ {
+ Rows.writeClustering(clustering(), writer);
+ writer.writePartitionKeyLivenessInfo(primaryKeyLivenessInfo());
+ writer.writeRowDeletion(deletion());
+
+ for (Cell cell : this)
+ cell.writeTo(writer);
+
+ for (int i = 0; i < columns().complexColumnCount(); i++)
+ {
+ ColumnDefinition c = columns().getComplex(i);
+ DeletionTime dt = getDeletion(c);
+ if (!dt.isLive())
+ writer.writeComplexDeletion(c, dt);
+ }
+ writer.endOfRow();
+ }
+
+ public void validateData(CFMetaData metadata)
+ {
+ Clustering clustering = clustering();
+ for (int i = 0; i < clustering.size(); i++)
+ {
+ ByteBuffer value = clustering.get(i);
+ if (value != null)
+ metadata.comparator.subtype(i).validate(value);
+ }
+
+ primaryKeyLivenessInfo().validate();
+ if (deletion().localDeletionTime() < 0)
+ throw new MarshalException("A local deletion time should not be negative");
+
+ for (Cell cell : this)
+ cell.validate();
+ }
+
+ public String toString(CFMetaData metadata)
+ {
+ return toString(metadata, false);
+ }
+
+ public String toString(CFMetaData metadata, boolean fullDetails)
+ {
+ StringBuilder sb = new StringBuilder();
+ sb.append("Row");
+ if (fullDetails)
+ {
+ sb.append("[info=").append(primaryKeyLivenessInfo());
+ if (!deletion().isLive())
+ sb.append(" del=").append(deletion());
+ sb.append(" ]");
+ }
+ sb.append(": ").append(clustering().toString(metadata)).append(" | ");
+ boolean isFirst = true;
+ ColumnDefinition prevColumn = null;
+ for (Cell cell : this)
+ {
+ if (isFirst) isFirst = false; else sb.append(", ");
+ if (fullDetails)
+ {
+ if (cell.column().isComplex() && !cell.column().equals(prevColumn))
+ {
+ DeletionTime complexDel = getDeletion(cell.column());
+ if (!complexDel.isLive())
+ sb.append("del(").append(cell.column().name).append(")=").append(complexDel).append(", ");
+ }
+ sb.append(cell);
+ prevColumn = cell.column();
+ }
+ else
+ {
+ sb.append(cell.column().name);
+ if (cell.column().type instanceof CollectionType)
+ {
+ CollectionType ct = (CollectionType)cell.column().type;
+ sb.append("[").append(ct.nameComparator().getString(cell.path().get(0))).append("]");
+ sb.append("=").append(ct.valueComparator().getString(cell.value()));
+ }
+ else
+ {
+ sb.append("=").append(cell.column().type.getString(cell.value()));
+ }
+ }
+ }
+ return sb.toString();
+ }
+
+ @Override
+ public boolean equals(Object other)
+ {
+ if(!(other instanceof Row))
+ return false;
+
+ Row that = (Row)other;
+ if (!this.clustering().equals(that.clustering())
+ || !this.columns().equals(that.columns())
+ || !this.primaryKeyLivenessInfo().equals(that.primaryKeyLivenessInfo())
+ || !this.deletion().equals(that.deletion()))
+ return false;
+
+ Iterator<Cell> thisCells = this.iterator();
+ Iterator<Cell> thatCells = that.iterator();
+ while (thisCells.hasNext())
+ {
+ if (!thatCells.hasNext() || !thisCells.next().equals(thatCells.next()))
+ return false;
+ }
+ return !thatCells.hasNext();
+ }
+
+ @Override
+ public int hashCode()
+ {
+ int hash = Objects.hash(clustering(), columns(), primaryKeyLivenessInfo(), deletion());
+ for (Cell cell : this)
+ hash += 31 * cell.hashCode();
+ return hash;
+ }
+}