You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2015/06/30 12:47:43 UTC
[19/51] [partial] cassandra git commit: Storage engine refactor,
a.k.a CASSANDRA-8099
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/rows/RowStats.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/RowStats.java b/src/java/org/apache/cassandra/db/rows/RowStats.java
new file mode 100644
index 0000000..1bffdbe
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/rows/RowStats.java
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.rows;
+
+import java.io.DataInput;
+import java.io.IOException;
+import java.util.Objects;
+
+import org.apache.cassandra.db.DeletionTime;
+import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.io.util.DataOutputPlus;
+
+import static org.apache.cassandra.db.LivenessInfo.NO_TIMESTAMP;
+import static org.apache.cassandra.db.LivenessInfo.NO_TTL;
+import static org.apache.cassandra.db.LivenessInfo.NO_DELETION_TIME;
+
+/**
+ * General statistics on rows (and and tombstones) for a given source.
+ * <p>
+ * Those stats are used to optimize the on-wire and on-disk storage of rows. More precisely,
+ * the {@code minTimestamp}, {@code minLocalDeletionTime} and {@code minTTL} stats are used to
+ * delta-encode those information for the sake of vint encoding. And {@code avgColumnSetPerRow}
+ * is used to decide if cells should be stored in a sparse or dense way (see {@link UnfilteredSerializer}).
+ * <p>
+ * Note that due to their use, those stats can suffer to be somewhat inaccurate (the more incurrate
+ * they are, the less effective the storage will be, but provided the stats are not completly wacky,
+ * this shouldn't have too huge an impact on performance) and in fact they will not always be
+ * accurate for reasons explained in {@link SerializationHeader#make}.
+ */
+public class RowStats
+{
+ // We should use this sparingly obviously
+ public static final RowStats NO_STATS = new RowStats(NO_TIMESTAMP, NO_DELETION_TIME, NO_TTL, -1);
+
+ public static final Serializer serializer = new Serializer();
+
+ public final long minTimestamp;
+ public final int minLocalDeletionTime;
+ public final int minTTL;
+
+ // Will be < 0 if the value is unknown
+ public final int avgColumnSetPerRow;
+
+ public RowStats(long minTimestamp,
+ int minLocalDeletionTime,
+ int minTTL,
+ int avgColumnSetPerRow)
+ {
+ this.minTimestamp = minTimestamp;
+ this.minLocalDeletionTime = minLocalDeletionTime;
+ this.minTTL = minTTL;
+ this.avgColumnSetPerRow = avgColumnSetPerRow;
+ }
+
+ public boolean hasMinTimestamp()
+ {
+ return minTimestamp != NO_TIMESTAMP;
+ }
+
+ public boolean hasMinLocalDeletionTime()
+ {
+ return minLocalDeletionTime != NO_DELETION_TIME;
+ }
+
+ /**
+ * Merge this stats with another one.
+ * <p>
+ * The comments of {@link SerializationHeader#make} applies here too, i.e. the result of
+ * merging will be not totally accurate but we can live with that.
+ */
+ public RowStats mergeWith(RowStats that)
+ {
+ long minTimestamp = this.minTimestamp == NO_TIMESTAMP
+ ? that.minTimestamp
+ : (that.minTimestamp == NO_TIMESTAMP ? this.minTimestamp : Math.min(this.minTimestamp, that.minTimestamp));
+
+ int minDelTime = this.minLocalDeletionTime == NO_DELETION_TIME
+ ? that.minLocalDeletionTime
+ : (that.minLocalDeletionTime == NO_DELETION_TIME ? this.minLocalDeletionTime : Math.min(this.minLocalDeletionTime, that.minLocalDeletionTime));
+
+ int minTTL = this.minTTL == NO_TTL
+ ? that.minTTL
+ : (that.minTTL == NO_TTL ? this.minTTL : Math.min(this.minTTL, that.minTTL));
+
+ int avgColumnSetPerRow = this.avgColumnSetPerRow < 0
+ ? that.avgColumnSetPerRow
+ : (that.avgColumnSetPerRow < 0 ? this.avgColumnSetPerRow : (this.avgColumnSetPerRow + that.avgColumnSetPerRow) / 2);
+
+ return new RowStats(minTimestamp, minDelTime, minTTL, avgColumnSetPerRow);
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ RowStats rowStats = (RowStats) o;
+
+ if (avgColumnSetPerRow != rowStats.avgColumnSetPerRow) return false;
+ if (minLocalDeletionTime != rowStats.minLocalDeletionTime) return false;
+ if (minTTL != rowStats.minTTL) return false;
+ if (minTimestamp != rowStats.minTimestamp) return false;
+
+ return true;
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hash(minTimestamp, minLocalDeletionTime, minTTL, avgColumnSetPerRow);
+ }
+
+ @Override
+ public String toString()
+ {
+ return String.format("RowStats(ts=%d, ldt=%d, ttl=%d, avgColPerRow=%d)", minTimestamp, minLocalDeletionTime, minTTL, avgColumnSetPerRow);
+ }
+
+ public static class Collector
+ {
+ private boolean isTimestampSet;
+ private long minTimestamp = Long.MAX_VALUE;
+
+ private boolean isDelTimeSet;
+ private int minDeletionTime = Integer.MAX_VALUE;
+
+ private boolean isTTLSet;
+ private int minTTL = Integer.MAX_VALUE;
+
+ private boolean isColumnSetPerRowSet;
+ private long totalColumnsSet;
+ private long rows;
+
+ public void updateTimestamp(long timestamp)
+ {
+ if (timestamp == NO_TIMESTAMP)
+ return;
+
+ isTimestampSet = true;
+ minTimestamp = Math.min(minTimestamp, timestamp);
+ }
+
+ public void updateLocalDeletionTime(int deletionTime)
+ {
+ if (deletionTime == NO_DELETION_TIME)
+ return;
+
+ isDelTimeSet = true;
+ minDeletionTime = Math.min(minDeletionTime, deletionTime);
+ }
+
+ public void updateDeletionTime(DeletionTime deletionTime)
+ {
+ if (deletionTime.isLive())
+ return;
+
+ updateTimestamp(deletionTime.markedForDeleteAt());
+ updateLocalDeletionTime(deletionTime.localDeletionTime());
+ }
+
+ public void updateTTL(int ttl)
+ {
+ if (ttl <= NO_TTL)
+ return;
+
+ isTTLSet = true;
+ minTTL = Math.min(minTTL, ttl);
+ }
+
+ public void updateColumnSetPerRow(int columnSetInRow)
+ {
+ updateColumnSetPerRow(columnSetInRow, 1);
+ }
+
+ public void updateColumnSetPerRow(long totalColumnsSet, long rows)
+ {
+ if (totalColumnsSet < 0 || rows < 0)
+ return;
+
+ this.isColumnSetPerRowSet = true;
+ this.totalColumnsSet += totalColumnsSet;
+ this.rows += rows;
+ }
+
+ public RowStats get()
+ {
+ return new RowStats(isTimestampSet ? minTimestamp : NO_TIMESTAMP,
+ isDelTimeSet ? minDeletionTime : NO_DELETION_TIME,
+ isTTLSet ? minTTL : NO_TTL,
+ isColumnSetPerRowSet ? (rows == 0 ? 0 : (int)(totalColumnsSet / rows)) : -1);
+ }
+ }
+
+ public static class Serializer
+ {
+ public void serialize(RowStats stats, DataOutputPlus out) throws IOException
+ {
+ out.writeLong(stats.minTimestamp);
+ out.writeInt(stats.minLocalDeletionTime);
+ out.writeInt(stats.minTTL);
+ out.writeInt(stats.avgColumnSetPerRow);
+ }
+
+ public int serializedSize(RowStats stats, TypeSizes sizes)
+ {
+ return sizes.sizeof(stats.minTimestamp)
+ + sizes.sizeof(stats.minLocalDeletionTime)
+ + sizes.sizeof(stats.minTTL)
+ + sizes.sizeof(stats.avgColumnSetPerRow);
+ }
+
+ public RowStats deserialize(DataInput in) throws IOException
+ {
+ long minTimestamp = in.readLong();
+ int minLocalDeletionTime = in.readInt();
+ int minTTL = in.readInt();
+ int avgColumnSetPerRow = in.readInt();
+ return new RowStats(minTimestamp, minLocalDeletionTime, minTTL, avgColumnSetPerRow);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/rows/Rows.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/Rows.java b/src/java/org/apache/cassandra/db/rows/Rows.java
new file mode 100644
index 0000000..76dcf60
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/rows/Rows.java
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.rows;
+
+import java.util.*;
+
+import com.google.common.collect.Iterators;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.index.SecondaryIndexManager;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.SearchIterator;
+
+/**
+ * Static utilities to work on Row objects.
+ */
+public abstract class Rows
+{
+ private static final Logger logger = LoggerFactory.getLogger(Rows.class);
+
+ private Rows() {}
+
+ public static final Row EMPTY_STATIC_ROW = new AbstractRow()
+ {
+ public Columns columns()
+ {
+ return Columns.NONE;
+ }
+
+ public LivenessInfo primaryKeyLivenessInfo()
+ {
+ return LivenessInfo.NONE;
+ }
+
+ public DeletionTime deletion()
+ {
+ return DeletionTime.LIVE;
+ }
+
+ public boolean isEmpty()
+ {
+ return true;
+ }
+
+ public boolean hasComplexDeletion()
+ {
+ return false;
+ }
+
+ public Clustering clustering()
+ {
+ return Clustering.STATIC_CLUSTERING;
+ }
+
+ public Cell getCell(ColumnDefinition c)
+ {
+ return null;
+ }
+
+ public Cell getCell(ColumnDefinition c, CellPath path)
+ {
+ return null;
+ }
+
+ public Iterator<Cell> getCells(ColumnDefinition c)
+ {
+ return null;
+ }
+
+ public DeletionTime getDeletion(ColumnDefinition c)
+ {
+ return DeletionTime.LIVE;
+ }
+
+ public Iterator<Cell> iterator()
+ {
+ return Iterators.<Cell>emptyIterator();
+ }
+
+ public SearchIterator<ColumnDefinition, ColumnData> searchIterator()
+ {
+ return new SearchIterator<ColumnDefinition, ColumnData>()
+ {
+ public boolean hasNext()
+ {
+ return false;
+ }
+
+ public ColumnData next(ColumnDefinition column)
+ {
+ return null;
+ }
+ };
+ }
+
+ public Kind kind()
+ {
+ return Unfiltered.Kind.ROW;
+ }
+
+ public Row takeAlias()
+ {
+ return this;
+ }
+ };
+
+ public interface SimpleMergeListener
+ {
+ public void onAdded(Cell newCell);
+ public void onRemoved(Cell removedCell);
+ public void onUpdated(Cell existingCell, Cell updatedCell);
+ }
+
+ public static void writeClustering(Clustering clustering, Row.Writer writer)
+ {
+ for (int i = 0; i < clustering.size(); i++)
+ writer.writeClusteringValue(clustering.get(i));
+ }
+
+ public static void merge(Row row1, Row row2, Columns mergedColumns, Row.Writer writer, int nowInSec)
+ {
+ merge(row1, row2, mergedColumns, writer, nowInSec, SecondaryIndexManager.nullUpdater);
+ }
+
+ // Merge rows in memtable
+ // Return the minimum timestamp delta between existing and update
+ public static long merge(Row existing,
+ Row update,
+ Columns mergedColumns,
+ Row.Writer writer,
+ int nowInSec,
+ SecondaryIndexManager.Updater indexUpdater)
+ {
+ Clustering clustering = existing.clustering();
+ writeClustering(clustering, writer);
+
+ LivenessInfo existingInfo = existing.primaryKeyLivenessInfo();
+ LivenessInfo updateInfo = update.primaryKeyLivenessInfo();
+ LivenessInfo mergedInfo = existingInfo.mergeWith(updateInfo);
+
+ long timeDelta = Math.abs(existingInfo.timestamp() - mergedInfo.timestamp());
+
+ DeletionTime deletion = existing.deletion().supersedes(update.deletion()) ? existing.deletion() : update.deletion();
+
+ if (deletion.deletes(mergedInfo))
+ mergedInfo = LivenessInfo.NONE;
+
+ writer.writePartitionKeyLivenessInfo(mergedInfo);
+ writer.writeRowDeletion(deletion);
+
+ indexUpdater.maybeIndex(clustering, mergedInfo.timestamp(), mergedInfo.ttl(), deletion);
+
+ for (int i = 0; i < mergedColumns.simpleColumnCount(); i++)
+ {
+ ColumnDefinition c = mergedColumns.getSimple(i);
+ Cell existingCell = existing.getCell(c);
+ Cell updateCell = update.getCell(c);
+ timeDelta = Math.min(timeDelta, Cells.reconcile(clustering,
+ existingCell,
+ updateCell,
+ deletion,
+ writer,
+ nowInSec,
+ indexUpdater));
+ }
+
+ for (int i = 0; i < mergedColumns.complexColumnCount(); i++)
+ {
+ ColumnDefinition c = mergedColumns.getComplex(i);
+ DeletionTime existingDt = existing.getDeletion(c);
+ DeletionTime updateDt = update.getDeletion(c);
+ DeletionTime maxDt = existingDt.supersedes(updateDt) ? existingDt : updateDt;
+ if (maxDt.supersedes(deletion))
+ writer.writeComplexDeletion(c, maxDt);
+ else
+ maxDt = deletion;
+
+ Iterator<Cell> existingCells = existing.getCells(c);
+ Iterator<Cell> updateCells = update.getCells(c);
+ timeDelta = Math.min(timeDelta, Cells.reconcileComplex(clustering, c, existingCells, updateCells, maxDt, writer, nowInSec, indexUpdater));
+ }
+
+ writer.endOfRow();
+ return timeDelta;
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/rows/SerializationHelper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/SerializationHelper.java b/src/java/org/apache/cassandra/db/rows/SerializationHelper.java
new file mode 100644
index 0000000..56b993c
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/rows/SerializationHelper.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.rows;
+
+import java.nio.ByteBuffer;
+
+import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.context.CounterContext;
+import org.apache.cassandra.db.filter.ColumnFilter;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+public class SerializationHelper
+{
+ /**
+ * Flag affecting deserialization behavior (this only affect counters in practice).
+ * - LOCAL: for deserialization of local data (Expired columns are
+ * converted to tombstones (to gain disk space)).
+ * - FROM_REMOTE: for deserialization of data received from remote hosts
+ * (Expired columns are converted to tombstone and counters have
+ * their delta cleared)
+ * - PRESERVE_SIZE: used when no transformation must be performed, i.e,
+ * when we must ensure that deserializing and reserializing the
+ * result yield the exact same bytes. Streaming uses this.
+ */
+ public static enum Flag
+ {
+ LOCAL, FROM_REMOTE, PRESERVE_SIZE;
+ }
+
+ private final Flag flag;
+ public final int version;
+
+ private final ReusableLivenessInfo livenessInfo = new ReusableLivenessInfo();
+
+ // The currently read row liveness infos (timestamp, ttl and localDeletionTime).
+ private long rowTimestamp;
+ private int rowTTL;
+ private int rowLocalDeletionTime;
+
+ private final ColumnFilter columnsToFetch;
+ private ColumnFilter.Tester tester;
+
+ public SerializationHelper(int version, Flag flag, ColumnFilter columnsToFetch)
+ {
+ this.flag = flag;
+ this.version = version;
+ this.columnsToFetch = columnsToFetch;
+ }
+
+ public SerializationHelper(int version, Flag flag)
+ {
+ this(version, flag, null);
+ }
+
+ public void writePartitionKeyLivenessInfo(Row.Writer writer, long timestamp, int ttl, int localDeletionTime)
+ {
+ livenessInfo.setTo(timestamp, ttl, localDeletionTime);
+ writer.writePartitionKeyLivenessInfo(livenessInfo);
+
+ rowTimestamp = timestamp;
+ rowTTL = ttl;
+ rowLocalDeletionTime = localDeletionTime;
+ }
+
+ public long getRowTimestamp()
+ {
+ return rowTimestamp;
+ }
+
+ public int getRowTTL()
+ {
+ return rowTTL;
+ }
+
+ public int getRowLocalDeletionTime()
+ {
+ return rowLocalDeletionTime;
+ }
+
+ public boolean includes(ColumnDefinition column)
+ {
+ return columnsToFetch == null || columnsToFetch.includes(column);
+ }
+
+ public boolean canSkipValue(ColumnDefinition column)
+ {
+ return columnsToFetch != null && columnsToFetch.canSkipValue(column);
+ }
+
+ public void startOfComplexColumn(ColumnDefinition column)
+ {
+ this.tester = columnsToFetch == null ? null : columnsToFetch.newTester(column);
+ }
+
+ public void endOfComplexColumn(ColumnDefinition column)
+ {
+ this.tester = null;
+ }
+
+ public void writeCell(Row.Writer writer,
+ ColumnDefinition column,
+ boolean isCounter,
+ ByteBuffer value,
+ long timestamp,
+ int localDelTime,
+ int ttl,
+ CellPath path)
+ {
+ livenessInfo.setTo(timestamp, ttl, localDelTime);
+
+ if (isCounter && ((flag == Flag.FROM_REMOTE || (flag == Flag.LOCAL && CounterContext.instance().shouldClearLocal(value)))))
+ value = CounterContext.instance().clearAllLocal(value);
+
+ if (!column.isComplex() || tester == null || tester.includes(path))
+ {
+ if (tester != null && tester.canSkipValue(path))
+ value = ByteBufferUtil.EMPTY_BYTE_BUFFER;
+ writer.writeCell(column, isCounter, value, livenessInfo, path);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/rows/SimpleRowDataBlock.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/SimpleRowDataBlock.java b/src/java/org/apache/cassandra/db/rows/SimpleRowDataBlock.java
new file mode 100644
index 0000000..08f37fd
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/rows/SimpleRowDataBlock.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.rows;
+
+import java.nio.ByteBuffer;
+
+import com.google.common.collect.UnmodifiableIterator;
+
+import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.utils.ObjectSizes;
+
+/**
+ * Holds cells data for the simple columns of one or more rows.
+ * <p>
+ * In practice, a {@code SimpleRowDataBlock} contains a single {@code CellData} "array" and
+ * the (simple) columns for which the {@code SimplerowDataBlock} has data for. The cell for
+ * a row i and a column c is stored in the {@code CellData} at index 'i * index(c)'.
+ * <p>
+ * This does mean that we store cells in a "dense" way: if column doesn't have a cell for a
+ * given row, the correspond index in the cell data array will simple have a {@code null} value.
+ * We might want to switch to a more sparse encoding in the future but we keep it simple for
+ * now (having a sparse encoding make things a tad more complex because we need to be able to
+ * swap the cells for 2 given rows as seen in ComplexRowDataBlock).
+ */
+public class SimpleRowDataBlock
+{
+ private static final long EMPTY_SIZE = ObjectSizes.measure(new SimpleRowDataBlock(Columns.NONE, 0, false));
+
+ final Columns columns;
+ final CellData data;
+
+ public SimpleRowDataBlock(Columns columns, int rows, boolean isCounter)
+ {
+ this.columns = columns;
+ this.data = new CellData(rows * columns.simpleColumnCount(), isCounter);
+ }
+
+ public Columns columns()
+ {
+ return columns;
+ }
+
+ // Swap row i and j
+ public void swap(int i, int j)
+ {
+ int s = columns.simpleColumnCount();
+ for (int k = 0; k < s; k++)
+ data.swapCell(i * s + k, j * s + k);
+ }
+
+ // Merge row i into j
+ public void merge(int i, int j, int nowInSec)
+ {
+ int s = columns.simpleColumnCount();
+ for (int k = 0; k < s; k++)
+ data.mergeCell(i * s + k, j * s + k, nowInSec);
+ }
+
+ // Move row i into j
+ public void move(int i, int j)
+ {
+ int s = columns.simpleColumnCount();
+ for (int k = 0; k < s; k++)
+ data.moveCell(i * s + k, j * s + k);
+ }
+
+ public long unsharedHeapSizeExcludingData()
+ {
+ return EMPTY_SIZE + data.unsharedHeapSizeExcludingData();
+ }
+
+ public int dataSize()
+ {
+ return data.dataSize();
+ }
+
+ public CellWriter cellWriter(boolean inOrderCells)
+ {
+ return new CellWriter(inOrderCells);
+ }
+
+ public static CellData.ReusableCell reusableCell()
+ {
+ return new CellData.ReusableCell();
+ }
+
+ public static ReusableIterator reusableIterator()
+ {
+ return new ReusableIterator();
+ }
+
+ public void clear()
+ {
+ data.clear();
+ }
+
+ static class ReusableIterator extends UnmodifiableIterator<Cell>
+ {
+ private SimpleRowDataBlock dataBlock;
+ private final CellData.ReusableCell cell = new CellData.ReusableCell();
+
+ private int base;
+ private int column;
+
+ private ReusableIterator()
+ {
+ }
+
+ public ReusableIterator setTo(SimpleRowDataBlock dataBlock, int row)
+ {
+ this.dataBlock = dataBlock;
+ this.base = dataBlock == null ? -1 : row * dataBlock.columns.simpleColumnCount();
+ this.column = 0;
+ return this;
+ }
+
+ public boolean hasNext()
+ {
+ if (dataBlock == null)
+ return false;
+
+ int columnCount = dataBlock.columns.simpleColumnCount();
+ // iterate over column until we find one with data
+ while (column < columnCount && !dataBlock.data.hasCell(base + column))
+ ++column;
+
+ return column < columnCount;
+ }
+
+ public Cell next()
+ {
+ cell.setTo(dataBlock.data, dataBlock.columns.getSimple(column), base + column);
+ ++column;
+ return cell;
+ }
+ }
+
+ public class CellWriter
+ {
+ private final boolean inOrderCells;
+
+ private int base;
+ private int lastColumnIdx;
+
+ public CellWriter(boolean inOrderCells)
+ {
+ this.inOrderCells = inOrderCells;
+ }
+
+ public void addCell(ColumnDefinition column, ByteBuffer value, LivenessInfo info)
+ {
+ int fromIdx = inOrderCells ? lastColumnIdx : 0;
+ lastColumnIdx = columns.simpleIdx(column, fromIdx);
+ assert lastColumnIdx >= 0 : "Cannot find column " + column.name + " in " + columns + " from " + fromIdx;
+ int idx = base + lastColumnIdx;
+ data.setCell(idx, value, info);
+ }
+
+ public void reset()
+ {
+ base = 0;
+ lastColumnIdx = 0;
+ data.clear();
+ }
+
+ public void endOfRow()
+ {
+ base += columns.simpleColumnCount();
+ lastColumnIdx = 0;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/rows/SliceableUnfilteredRowIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/SliceableUnfilteredRowIterator.java b/src/java/org/apache/cassandra/db/rows/SliceableUnfilteredRowIterator.java
new file mode 100644
index 0000000..2250ee9
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/rows/SliceableUnfilteredRowIterator.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.rows;
+
+import java.util.Iterator;
+
+import org.apache.cassandra.db.Slice;
+
+public interface SliceableUnfilteredRowIterator extends UnfilteredRowIterator
+{
+ /**
+ * Move forward (resp. backward if isReverseOrder() is true for the iterator) in
+ * the iterator and return an iterator over the Unfiltered selected by the provided
+ * {@code slice}.
+ * <p>
+ * Please note that successive calls to {@code slice} are allowed provided the
+ * slice are non overlapping and are passed in clustering (resp. reverse clustering) order.
+ * However, {@code slice} is allowed to leave the iterator in an unknown state and there
+ * is no guarantee over what a call to {@code hasNext} or {@code next} will yield after
+ * a call to {@code slice}. In other words, for a given iterator, you should either use
+ * {@code slice} or {@code hasNext/next} but not both.
+ */
+ public Iterator<Unfiltered> slice(Slice slice);
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/rows/StaticRow.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/StaticRow.java b/src/java/org/apache/cassandra/db/rows/StaticRow.java
new file mode 100644
index 0000000..2ad9fb4
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/rows/StaticRow.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.rows;
+
+import java.nio.ByteBuffer;
+import java.util.Iterator;
+
+import org.apache.cassandra.db.*;
+
+import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.utils.SearchIterator;
+
+public class StaticRow extends AbstractRow
+{
+ private final DeletionTime deletion;
+ private final RowDataBlock data;
+
+ private StaticRow(DeletionTime deletion, RowDataBlock data)
+ {
+ this.deletion = deletion.takeAlias();
+ this.data = data;
+ }
+
+ public Columns columns()
+ {
+ return data.columns();
+ }
+
+ public Cell getCell(ColumnDefinition c)
+ {
+ assert !c.isComplex();
+ if (data.simpleData == null)
+ return null;
+
+ int idx = columns().simpleIdx(c, 0);
+ if (idx < 0)
+ return null;
+
+ return SimpleRowDataBlock.reusableCell().setTo(data.simpleData.data, c, idx);
+ }
+
+ public Cell getCell(ColumnDefinition c, CellPath path)
+ {
+ assert c.isComplex();
+
+ ComplexRowDataBlock dataBlock = data.complexData;
+ if (dataBlock == null)
+ return null;
+
+ int idx = dataBlock.cellIdx(0, c, path);
+ if (idx < 0)
+ return null;
+
+ return SimpleRowDataBlock.reusableCell().setTo(dataBlock.cellData(0), c, idx);
+ }
+
+ public Iterator<Cell> getCells(ColumnDefinition c)
+ {
+ assert c.isComplex();
+ return ComplexRowDataBlock.reusableComplexCells().setTo(data.complexData, 0, c);
+ }
+
+ public boolean hasComplexDeletion()
+ {
+ return data.hasComplexDeletion(0);
+ }
+
+ public DeletionTime getDeletion(ColumnDefinition c)
+ {
+ assert c.isComplex();
+ if (data.complexData == null)
+ return DeletionTime.LIVE;
+
+ int idx = data.complexData.complexDeletionIdx(0, c);
+ return idx < 0
+ ? DeletionTime.LIVE
+ : ComplexRowDataBlock.complexDeletionCursor().setTo(data.complexData.complexDelTimes, idx);
+ }
+
+ public Iterator<Cell> iterator()
+ {
+ return RowDataBlock.reusableIterator().setTo(data, 0);
+ }
+
+ public SearchIterator<ColumnDefinition, ColumnData> searchIterator()
+ {
+ return new SearchIterator<ColumnDefinition, ColumnData>()
+ {
+ private int simpleIdx = 0;
+
+ public boolean hasNext()
+ {
+ // TODO: we can do better, but we expect users to no rely on this anyway
+ return true;
+ }
+
+ public ColumnData next(ColumnDefinition column)
+ {
+ if (column.isComplex())
+ {
+ // TODO: this is sub-optimal
+
+ Iterator<Cell> cells = getCells(column);
+ return cells == null ? null : new ColumnData(column, null, cells, getDeletion(column));
+ }
+ else
+ {
+ simpleIdx = columns().simpleIdx(column, simpleIdx);
+ assert simpleIdx >= 0;
+
+ Cell cell = SimpleRowDataBlock.reusableCell().setTo(data.simpleData.data, column, simpleIdx);
+ ++simpleIdx;
+ return cell == null ? null : new ColumnData(column, cell, null, null);
+ }
+ }
+ };
+ }
+
+ public Row takeAlias()
+ {
+ return this;
+ }
+
+ public Clustering clustering()
+ {
+ return Clustering.STATIC_CLUSTERING;
+ }
+
+ public LivenessInfo primaryKeyLivenessInfo()
+ {
+ return LivenessInfo.NONE;
+ }
+
+ public DeletionTime deletion()
+ {
+ return deletion;
+ }
+
+ public static Builder builder(Columns columns, boolean inOrderCells, boolean isCounter)
+ {
+ return new Builder(columns, inOrderCells, isCounter);
+ }
+
+ public static class Builder extends RowDataBlock.Writer
+ {
+ private final RowDataBlock data;
+ private DeletionTime deletion = DeletionTime.LIVE;
+
+ public Builder(Columns columns, boolean inOrderCells, boolean isCounter)
+ {
+ super(inOrderCells);
+ this.data = new RowDataBlock(columns, 1, false, isCounter);
+ updateWriter(data);
+ }
+
+ public void writeClusteringValue(ByteBuffer buffer)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public void writePartitionKeyLivenessInfo(LivenessInfo info)
+ {
+ // Static rows are special and don't really have an existence unless they have live cells,
+ // so we shouldn't have any partition key liveness info.
+ assert info.equals(LivenessInfo.NONE);
+ }
+
+ public void writeRowDeletion(DeletionTime deletion)
+ {
+ this.deletion = deletion;
+ }
+
+ public StaticRow build()
+ {
+ return new StaticRow(deletion, data);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/rows/TombstoneFilteringRow.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/TombstoneFilteringRow.java b/src/java/org/apache/cassandra/db/rows/TombstoneFilteringRow.java
new file mode 100644
index 0000000..a1c0ddc
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/rows/TombstoneFilteringRow.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.rows;
+
+import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.db.*;
+
+public class TombstoneFilteringRow extends FilteringRow
+{
+ private final int nowInSec;
+
+ public TombstoneFilteringRow(int nowInSec)
+ {
+ this.nowInSec = nowInSec;
+ }
+
+ @Override
+ protected boolean include(DeletionTime dt)
+ {
+ return false;
+ }
+
+ @Override
+ protected boolean include(Cell cell)
+ {
+ return cell.isLive(nowInSec);
+ }
+
+ @Override
+ protected boolean include(ColumnDefinition c, DeletionTime dt)
+ {
+ return false;
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/rows/Unfiltered.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/Unfiltered.java b/src/java/org/apache/cassandra/db/rows/Unfiltered.java
new file mode 100644
index 0000000..b1692e3
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/rows/Unfiltered.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.rows;
+
+import java.security.MessageDigest;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.Clusterable;
+
+/**
+ * Unfiltered is the common class for the main constituent of an unfiltered partition.
+ * <p>
+ * In practice, an Unfiltered is either a row or a range tombstone marker. Unfiltereds
+ * are uniquely identified by their clustering information and can be sorted according
+ * to those.
+ */
+public interface Unfiltered extends Clusterable
+{
+ public enum Kind { ROW, RANGE_TOMBSTONE_MARKER };
+
+ /**
+ * The kind of the atom: either row or range tombstone marker.
+ */
+ public Kind kind();
+
+ /**
+ * Digest the atom using the provided {@code MessageDigest}.
+ *
+ * @param digest the {@code MessageDigest} to use.
+ */
+ public void digest(MessageDigest digest);
+
+ /**
+ * Validate the data of this atom.
+ *
+ * @param metadata the metadata for the table this atom is part of.
+ * @throws MarshalException if some of the data in this atom is
+ * invalid (some value is invalid for its column type, or some field
+ * is nonsensical).
+ */
+ public void validateData(CFMetaData metadata);
+
+ public String toString(CFMetaData metadata);
+ public String toString(CFMetaData metadata, boolean fullDetails);
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterator.java b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterator.java
new file mode 100644
index 0000000..a3ecf6d
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterator.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.rows;
+
+import java.util.Iterator;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.*;
+
+/**
+ * An iterator over the rows of a given partition that also includes deletion informations.
+ * <p>
+ * An {@code UnfilteredRowIterator} contains a few partition top-level informations and is an
+ * iterator of {@code Unfiltered}, that is of either {@code Row} or {@code RangeTombstoneMarker}.
+ * An implementation of {@code UnfilteredRowIterator} <b>must</b> provide the following
+ * guarantees:
+ * 1. the returned {@code Unfiltered} must be in clustering order, or in reverse clustering
+ * order iff {@link #isReverseOrder} returns true.
+ * 2. the iterator should not shadow its own data. That is, no deletion
+ * (partition level deletion, row deletion, range tombstone, complex
+ * deletion) should delete anything else returned by the iterator (cell, row, ...).
+ * 3. every "start" range tombstone marker should have a corresponding "end" marker, and no other
+ * marker should be in-between this start-end pair of marker. Note that due to the
+ * previous rule this means that between a "start" and a corresponding "end" marker there
+ * can only be rows that are not deleted by the markers. Also note that when iterating
+ * in reverse order, "end" markers are returned before their "start" counterpart (i.e.
+ * "start" and "end" are always in the sense of the clustering order).
+ *
+ * Note further that the objects returned by next() are only valid until the
+ * next call to hasNext() or next(). If a consumer wants to keep a reference on
+ * the returned objects for longer than the iteration, it must make a copy of
+ * it explicitly.
+ */
+public interface UnfilteredRowIterator extends Iterator<Unfiltered>, AutoCloseable
+{
+ /**
+ * The metadata for the table this iterator on.
+ */
+ public CFMetaData metadata();
+
+ /**
+ * A subset of the columns for the (static and regular) rows returned by this iterator.
+ * Every row returned by this iterator must guarantee that it has only those columns.
+ */
+ public PartitionColumns columns();
+
+ /**
+ * Whether or not the atom returned by this iterator are in reversed
+ * clustering order.
+ */
+ public boolean isReverseOrder();
+
+ /**
+ * The partition key of the partition this in an iterator over.
+ */
+ public DecoratedKey partitionKey();
+
+ /**
+ * The partition level deletion for the partition this iterate over.
+ */
+ public DeletionTime partitionLevelDeletion();
+
+ /**
+ * The static part corresponding to this partition (this can be an empty
+ * row).
+ */
+ public Row staticRow();
+
+ /**
+ * Return "statistics" about what is returned by this iterator. Those are used for
+ * performance reasons (for delta-encoding for instance) and code should not
+ * expect those to be exact.
+ */
+ public RowStats stats();
+
+ public void close();
+
+ /**
+ * Returns whether this iterator has no data (including no deletion data).
+ */
+ public default boolean isEmpty()
+ {
+ return partitionLevelDeletion().isLive()
+ && staticRow().isEmpty()
+ && !hasNext();
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorSerializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorSerializer.java b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorSerializer.java
new file mode 100644
index 0000000..13c09d4
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorSerializer.java
@@ -0,0 +1,306 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.rows;
+
+import java.io.DataInput;
+import java.io.IOException;
+import java.io.IOError;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
+
+/**
+ * Serialize/Deserialize an unfiltered row iterator.
+ *
+ * The serialization is composed of a header, follows by the rows and range tombstones of the iterator serialized
+ * until we read the end of the partition (see UnfilteredSerializer for details). The header itself
+ * is:
+ * <cfid><key><flags><s_header>[<partition_deletion>][<static_row>]
+ * where:
+ * <cfid> is the table cfid.
+ * <key> is the partition key.
+ * <flags> contains bit flags. Each flag is set if it's corresponding bit is set. From rightmost
+ * bit to leftmost one, the flags are:
+ * - is empty: whether the iterator is empty. If so, nothing follows the <flags>
+ * - is reversed: whether the iterator is in reversed clustering order
+ * - has partition deletion: whether or not there is a <partition_deletion> following
+ * - has static row: whether or not there is a <static_row> following
+ * - has row estimate: whether or not there is a <row_estimate> following
+ * <s_header> is the SerializationHeader. More precisely it's
+ * <min_timetamp><min_localDelTime><min_ttl>[<static_columns>]<columns>
+ * where:
+ * - <min_timestamp> is the base timestamp used for delta-encoding timestamps
+ * - <min_localDelTime> is the base localDeletionTime used for delta-encoding local deletion times
+ * - <min_ttl> is the base localDeletionTime used for delta-encoding ttls
+ * - <static_columns> is the static columns if a static row is present. It's
+ * the number of columns as an unsigned short, followed by the column names.
+ * - <columns> is the columns of the rows of the iterator. It's serialized as <static_columns>.
+ * <partition_deletion> is the deletion time for the partition (delta-encoded)
+ * <static_row> is the static row for this partition as serialized by UnfilteredSerializer.
+ * <row_estimate> is the (potentially estimated) number of rows serialized. This is only use for
+ * the purpose of some sizing on the receiving end and should not be relied upon too strongly.
+ *
+ * !!! Please note that the serialized value depends on the schema and as such should not be used as is if
+ * it might be deserialized after the schema as changed !!!
+ * TODO: we should add a flag to include the relevant metadata in the header for commit log etc.....
+ */
+public class UnfilteredRowIteratorSerializer
+{
+ protected static final Logger logger = LoggerFactory.getLogger(UnfilteredRowIteratorSerializer.class);
+
+ private static final int IS_EMPTY = 0x01;
+ private static final int IS_REVERSED = 0x02;
+ private static final int HAS_PARTITION_DELETION = 0x04;
+ private static final int HAS_STATIC_ROW = 0x08;
+ private static final int HAS_ROW_ESTIMATE = 0x10;
+
+ public static final UnfilteredRowIteratorSerializer serializer = new UnfilteredRowIteratorSerializer();
+
+ public void serialize(UnfilteredRowIterator iterator, DataOutputPlus out, int version) throws IOException
+ {
+ serialize(iterator, out, version, -1);
+ }
+
+ public void serialize(UnfilteredRowIterator iterator, DataOutputPlus out, int version, int rowEstimate) throws IOException
+ {
+ SerializationHeader header = new SerializationHeader(iterator.metadata(),
+ iterator.columns(),
+ iterator.stats());
+ serialize(iterator, out, header, version, rowEstimate);
+ }
+
+ public void serialize(UnfilteredRowIterator iterator, DataOutputPlus out, SerializationHeader header, int version, int rowEstimate) throws IOException
+ {
+ CFMetaData.serializer.serialize(iterator.metadata(), out, version);
+ ByteBufferUtil.writeWithLength(iterator.partitionKey().getKey(), out);
+
+ int flags = 0;
+ if (iterator.isReverseOrder())
+ flags |= IS_REVERSED;
+
+ if (iterator.isEmpty())
+ {
+ out.writeByte((byte)(flags | IS_EMPTY));
+ return;
+ }
+
+ DeletionTime partitionDeletion = iterator.partitionLevelDeletion();
+ if (!partitionDeletion.isLive())
+ flags |= HAS_PARTITION_DELETION;
+ Row staticRow = iterator.staticRow();
+ boolean hasStatic = staticRow != Rows.EMPTY_STATIC_ROW;
+ if (hasStatic)
+ flags |= HAS_STATIC_ROW;
+
+ if (rowEstimate >= 0)
+ flags |= HAS_ROW_ESTIMATE;
+
+ out.writeByte((byte)flags);
+
+ SerializationHeader.serializer.serializeForMessaging(header, out, hasStatic);
+
+ if (!partitionDeletion.isLive())
+ writeDelTime(partitionDeletion, header, out);
+
+ if (hasStatic)
+ UnfilteredSerializer.serializer.serialize(staticRow, header, out, version);
+
+ if (rowEstimate >= 0)
+ out.writeInt(rowEstimate);
+
+ while (iterator.hasNext())
+ UnfilteredSerializer.serializer.serialize(iterator.next(), header, out, version);
+ UnfilteredSerializer.serializer.writeEndOfPartition(out);
+ }
+
+ // Please note that this consume the iterator, and as such should not be called unless we have a simple way to
+ // recreate an iterator for both serialize and serializedSize, which is mostly only PartitionUpdate
+ public long serializedSize(UnfilteredRowIterator iterator, int version, int rowEstimate, TypeSizes sizes)
+ {
+ SerializationHeader header = new SerializationHeader(iterator.metadata(),
+ iterator.columns(),
+ iterator.stats());
+
+ assert rowEstimate >= 0;
+
+ long size = CFMetaData.serializer.serializedSize(iterator.metadata(), version, sizes)
+ + sizes.sizeofWithLength(iterator.partitionKey().getKey())
+ + 1; // flags
+
+ if (iterator.isEmpty())
+ return size;
+
+ DeletionTime partitionDeletion = iterator.partitionLevelDeletion();
+ Row staticRow = iterator.staticRow();
+ boolean hasStatic = staticRow != Rows.EMPTY_STATIC_ROW;
+
+ size += SerializationHeader.serializer.serializedSizeForMessaging(header, sizes, hasStatic);
+
+ if (!partitionDeletion.isLive())
+ size += delTimeSerializedSize(partitionDeletion, header, sizes);
+
+ if (hasStatic)
+ size += UnfilteredSerializer.serializer.serializedSize(staticRow, header, version, sizes);
+
+ if (rowEstimate >= 0)
+ size += sizes.sizeof(rowEstimate);
+
+ while (iterator.hasNext())
+ size += UnfilteredSerializer.serializer.serializedSize(iterator.next(), header, version, sizes);
+ size += UnfilteredSerializer.serializer.serializedSizeEndOfPartition(sizes);
+
+ return size;
+ }
+
+ public Header deserializeHeader(DataInput in, int version, SerializationHelper.Flag flag) throws IOException
+ {
+ CFMetaData metadata = CFMetaData.serializer.deserialize(in, version);
+ DecoratedKey key = StorageService.getPartitioner().decorateKey(ByteBufferUtil.readWithLength(in));
+ int flags = in.readUnsignedByte();
+ boolean isReversed = (flags & IS_REVERSED) != 0;
+ if ((flags & IS_EMPTY) != 0)
+ {
+ SerializationHeader sh = new SerializationHeader(metadata, PartitionColumns.NONE, RowStats.NO_STATS);
+ return new Header(sh, metadata, key, isReversed, true, null, null, 0);
+ }
+
+ boolean hasPartitionDeletion = (flags & HAS_PARTITION_DELETION) != 0;
+ boolean hasStatic = (flags & HAS_STATIC_ROW) != 0;
+ boolean hasRowEstimate = (flags & HAS_ROW_ESTIMATE) != 0;
+
+ SerializationHeader header = SerializationHeader.serializer.deserializeForMessaging(in, metadata, hasStatic);
+
+ DeletionTime partitionDeletion = hasPartitionDeletion ? readDelTime(in, header) : DeletionTime.LIVE;
+
+ Row staticRow = Rows.EMPTY_STATIC_ROW;
+ if (hasStatic)
+ staticRow = UnfilteredSerializer.serializer.deserializeStaticRow(in, header, new SerializationHelper(version, flag));
+
+ int rowEstimate = hasRowEstimate ? in.readInt() : -1;
+ return new Header(header, metadata, key, isReversed, false, partitionDeletion, staticRow, rowEstimate);
+ }
+
+ public void deserialize(DataInput in, SerializationHelper helper, SerializationHeader header, Row.Writer rowWriter, RangeTombstoneMarker.Writer markerWriter) throws IOException
+ {
+ while (UnfilteredSerializer.serializer.deserialize(in, header, helper, rowWriter, markerWriter) != null);
+ }
+
+ public UnfilteredRowIterator deserialize(final DataInput in, int version, SerializationHelper.Flag flag) throws IOException
+ {
+ final Header h = deserializeHeader(in, version, flag);
+
+ if (h.isEmpty)
+ return UnfilteredRowIterators.emptyIterator(h.metadata, h.key, h.isReversed);
+
+ final int clusteringSize = h.metadata.clusteringColumns().size();
+ final SerializationHelper helper = new SerializationHelper(version, flag);
+
+ return new AbstractUnfilteredRowIterator(h.metadata, h.key, h.partitionDeletion, h.sHeader.columns(), h.staticRow, h.isReversed, h.sHeader.stats())
+ {
+ private final ReusableRow row = new ReusableRow(clusteringSize, h.sHeader.columns().regulars, true, h.metadata.isCounter());
+ private final RangeTombstoneMarker.Builder markerBuilder = new RangeTombstoneMarker.Builder(clusteringSize);
+
+ protected Unfiltered computeNext()
+ {
+ try
+ {
+ Unfiltered.Kind kind = UnfilteredSerializer.serializer.deserialize(in, h.sHeader, helper, row.writer(), markerBuilder.reset());
+ if (kind == null)
+ return endOfData();
+
+ return kind == Unfiltered.Kind.ROW ? row : markerBuilder.build();
+ }
+ catch (IOException e)
+ {
+ throw new IOError(e);
+ }
+ }
+ };
+ }
+
+ public static void writeDelTime(DeletionTime dt, SerializationHeader header, DataOutputPlus out) throws IOException
+ {
+ out.writeLong(header.encodeTimestamp(dt.markedForDeleteAt()));
+ out.writeInt(header.encodeDeletionTime(dt.localDeletionTime()));
+ }
+
+ public static long delTimeSerializedSize(DeletionTime dt, SerializationHeader header, TypeSizes sizes)
+ {
+ return sizes.sizeof(header.encodeTimestamp(dt.markedForDeleteAt()))
+ + sizes.sizeof(header.encodeDeletionTime(dt.localDeletionTime()));
+ }
+
+ public static DeletionTime readDelTime(DataInput in, SerializationHeader header) throws IOException
+ {
+ long markedAt = header.decodeTimestamp(in.readLong());
+ int localDelTime = header.decodeDeletionTime(in.readInt());
+ return new SimpleDeletionTime(markedAt, localDelTime);
+ }
+
+ public static void skipDelTime(DataInput in, SerializationHeader header) throws IOException
+ {
+ // Note that since we might use VINT, we shouldn't assume the size of a long or an int
+ in.readLong();
+ in.readInt();
+ }
+
+ public static class Header
+ {
+ public final SerializationHeader sHeader;
+ public final CFMetaData metadata;
+ public final DecoratedKey key;
+ public final boolean isReversed;
+ public final boolean isEmpty;
+ public final DeletionTime partitionDeletion;
+ public final Row staticRow;
+ public final int rowEstimate; // -1 if no estimate
+
+ private Header(SerializationHeader sHeader,
+ CFMetaData metadata,
+ DecoratedKey key,
+ boolean isReversed,
+ boolean isEmpty,
+ DeletionTime partitionDeletion,
+ Row staticRow,
+ int rowEstimate)
+ {
+ this.sHeader = sHeader;
+ this.metadata = metadata;
+ this.key = key;
+ this.isReversed = isReversed;
+ this.isEmpty = isEmpty;
+ this.partitionDeletion = partitionDeletion;
+ this.staticRow = staticRow;
+ this.rowEstimate = rowEstimate;
+ }
+
+ @Override
+ public String toString()
+ {
+ return String.format("{header=%s, table=%s.%s, key=%s, isReversed=%b, isEmpty=%b, del=%s, staticRow=%s, rowEstimate=%d}",
+ sHeader, metadata.ksName, metadata.cfName, key, isReversed, isEmpty, partitionDeletion, staticRow.toString(metadata), rowEstimate);
+ }
+ }
+}