You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2015/07/22 18:05:34 UTC
[05/15] cassandra git commit: Simplify some 8099's implementations
http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/rows/Rows.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/Rows.java b/src/java/org/apache/cassandra/db/rows/Rows.java
index 76dcf60..122f7d3 100644
--- a/src/java/org/apache/cassandra/db/rows/Rows.java
+++ b/src/java/org/apache/cassandra/db/rows/Rows.java
@@ -20,14 +20,12 @@ package org.apache.cassandra.db.rows;
import java.util.*;
import com.google.common.collect.Iterators;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import com.google.common.collect.PeekingIterator;
-import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.ColumnDefinition;
import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.partitions.PartitionStatisticsCollector;
import org.apache.cassandra.db.index.SecondaryIndexManager;
-import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.SearchIterator;
/**
@@ -35,110 +33,198 @@ import org.apache.cassandra.utils.SearchIterator;
*/
public abstract class Rows
{
- private static final Logger logger = LoggerFactory.getLogger(Rows.class);
-
- private Rows() {}
-
- public static final Row EMPTY_STATIC_ROW = new AbstractRow()
+ // TODO: we could have a that in a more generic place...
+ private static final SearchIterator<ColumnDefinition, ColumnData> EMPTY_SEARCH_ITERATOR = new SearchIterator<ColumnDefinition, ColumnData>()
{
- public Columns columns()
+ public boolean hasNext()
{
- return Columns.NONE;
+ return false;
}
- public LivenessInfo primaryKeyLivenessInfo()
+ public ColumnData next(ColumnDefinition column)
{
- return LivenessInfo.NONE;
+ return null;
}
+ };
- public DeletionTime deletion()
- {
- return DeletionTime.LIVE;
- }
+ private Rows() {}
- public boolean isEmpty()
- {
- return true;
- }
+ public static final Row EMPTY_STATIC_ROW = ArrayBackedRow.emptyRow(Clustering.STATIC_CLUSTERING);
- public boolean hasComplexDeletion()
+ public static Row.Builder copy(Row row, Row.Builder builder)
+ {
+ builder.newRow(row.clustering());
+ builder.addPrimaryKeyLivenessInfo(row.primaryKeyLivenessInfo());
+ builder.addRowDeletion(row.deletion());
+ for (ColumnData cd : row)
{
- return false;
+ if (cd.column().isSimple())
+ {
+ builder.addCell((Cell)cd);
+ }
+ else
+ {
+ ComplexColumnData complexData = (ComplexColumnData)cd;
+ builder.addComplexDeletion(complexData.column(), complexData.complexDeletion());
+ for (Cell cell : complexData)
+ builder.addCell(cell);
+ }
}
+ return builder;
+ }
- public Clustering clustering()
- {
- return Clustering.STATIC_CLUSTERING;
- }
+ /**
+ * Collect statistics ont a given row.
+ *
+ * @param row the row for which to collect stats.
+ * @param collector the stats collector.
+ * @return the total number of cells in {@code row}.
+ */
+ public static int collectStats(Row row, PartitionStatisticsCollector collector)
+ {
+ assert !row.isEmpty();
- public Cell getCell(ColumnDefinition c)
- {
- return null;
- }
+ collector.update(row.primaryKeyLivenessInfo());
+ collector.update(row.deletion());
- public Cell getCell(ColumnDefinition c, CellPath path)
+ int columnCount = 0;
+ int cellCount = 0;
+ for (ColumnData cd : row)
{
- return null;
- }
+ if (cd.column().isSimple())
+ {
+ ++columnCount;
+ ++cellCount;
+ Cells.collectStats((Cell)cd, collector);
+ }
+ else
+ {
+ ComplexColumnData complexData = (ComplexColumnData)cd;
+ collector.update(complexData.complexDeletion());
+ if (complexData.hasCells())
+ {
+ ++columnCount;
+ for (Cell cell : complexData)
+ {
+ ++cellCount;
+ Cells.collectStats(cell, collector);
+ }
+ }
+ }
- public Iterator<Cell> getCells(ColumnDefinition c)
- {
- return null;
}
+ collector.updateColumnSetPerRow(columnCount);
+ return cellCount;
+ }
- public DeletionTime getDeletion(ColumnDefinition c)
+ /**
+ * Given the result ({@code merged}) of merging multiple {@code inputs}, signals the difference between
+ * each input and {@code merged} to {@code diffListener}.
+ *
+ * @param merged the result of merging {@code inputs}.
+ * @param columns a superset of all the columns in any of {@code merged}/{@code inputs}.
+ * @param inputs the inputs whose merge yielded {@code merged}.
+ * @param diffListener the listener to which to signal the differences between the inputs and the merged
+ * result.
+ */
+ public static void diff(Row merged, Columns columns, Row[] inputs, RowDiffListener diffListener)
+ {
+ Clustering clustering = merged.clustering();
+ LivenessInfo mergedInfo = merged.primaryKeyLivenessInfo().isEmpty() ? null : merged.primaryKeyLivenessInfo();
+ DeletionTime mergedDeletion = merged.deletion().isLive() ? null : merged.deletion();
+ for (int i = 0; i < inputs.length; i++)
{
- return DeletionTime.LIVE;
+ Row input = inputs[i];
+ LivenessInfo inputInfo = input == null || input.primaryKeyLivenessInfo().isEmpty() ? null : input.primaryKeyLivenessInfo();
+ DeletionTime inputDeletion = input == null || input.deletion().isLive() ? null : input.deletion();
+
+ if (mergedInfo != null || inputInfo != null)
+ diffListener.onPrimaryKeyLivenessInfo(i, clustering, mergedInfo, inputInfo);
+ if (mergedDeletion != null || inputDeletion != null)
+ diffListener.onDeletion(i, clustering, mergedDeletion, inputDeletion);
}
- public Iterator<Cell> iterator()
+ SearchIterator<ColumnDefinition, ColumnData> mergedIterator = merged.searchIterator();
+ List<SearchIterator<ColumnDefinition, ColumnData>> inputIterators = new ArrayList<>(inputs.length);
+
+ for (Row row : inputs)
+ inputIterators.add(row == null ? EMPTY_SEARCH_ITERATOR : row.searchIterator());
+
+ Iterator<ColumnDefinition> simpleColumns = columns.simpleColumns();
+ while (simpleColumns.hasNext())
{
- return Iterators.<Cell>emptyIterator();
+ ColumnDefinition column = simpleColumns.next();
+ Cell mergedCell = (Cell)mergedIterator.next(column);
+ for (int i = 0; i < inputs.length; i++)
+ {
+ Cell inputCell = (Cell)inputIterators.get(i).next(column);
+ if (mergedCell != null || inputCell != null)
+ diffListener.onCell(i, clustering, mergedCell, inputCell);
+ }
}
- public SearchIterator<ColumnDefinition, ColumnData> searchIterator()
+ Iterator<ColumnDefinition> complexColumns = columns.complexColumns();
+ while (complexColumns.hasNext())
{
- return new SearchIterator<ColumnDefinition, ColumnData>()
+ ColumnDefinition column = complexColumns.next();
+ ComplexColumnData mergedData = (ComplexColumnData)mergedIterator.next(column);
+ // Doing one input at a time is not the most efficient, but it's a lot simpler for now
+ for (int i = 0; i < inputs.length; i++)
{
- public boolean hasNext()
+ ComplexColumnData inputData = (ComplexColumnData)inputIterators.get(i).next(column);
+ if (mergedData == null)
{
- return false;
+ if (inputData == null)
+ continue;
+
+ // Everything in inputData has been shadowed
+ if (!inputData.complexDeletion().isLive())
+ diffListener.onComplexDeletion(i, clustering, column, null, inputData.complexDeletion());
+ for (Cell inputCell : inputData)
+ diffListener.onCell(i, clustering, null, inputCell);
}
-
- public ColumnData next(ColumnDefinition column)
+ else if (inputData == null)
{
- return null;
+ // Everything in inputData is new
+ if (!mergedData.complexDeletion().isLive())
+ diffListener.onComplexDeletion(i, clustering, column, mergedData.complexDeletion(), null);
+ for (Cell mergedCell : mergedData)
+ diffListener.onCell(i, clustering, mergedCell, null);
}
- };
- }
-
- public Kind kind()
- {
- return Unfiltered.Kind.ROW;
- }
-
- public Row takeAlias()
- {
- return this;
+ else
+ {
+ PeekingIterator<Cell> mergedCells = Iterators.peekingIterator(mergedData.iterator());
+ PeekingIterator<Cell> inputCells = Iterators.peekingIterator(inputData.iterator());
+ while (mergedCells.hasNext() && inputCells.hasNext())
+ {
+ int cmp = column.cellPathComparator().compare(mergedCells.peek().path(), inputCells.peek().path());
+ if (cmp == 0)
+ diffListener.onCell(i, clustering, mergedCells.next(), inputCells.next());
+ else if (cmp < 0)
+ diffListener.onCell(i, clustering, mergedCells.next(), null);
+ else // cmp > 0
+ diffListener.onCell(i, clustering, null, inputCells.next());
+ }
+ while (mergedCells.hasNext())
+ diffListener.onCell(i, clustering, mergedCells.next(), null);
+ while (inputCells.hasNext())
+ diffListener.onCell(i, clustering, null, inputCells.next());
+ }
+ }
}
- };
-
- public interface SimpleMergeListener
- {
- public void onAdded(Cell newCell);
- public void onRemoved(Cell removedCell);
- public void onUpdated(Cell existingCell, Cell updatedCell);
}
- public static void writeClustering(Clustering clustering, Row.Writer writer)
+ public static Row merge(Row row1, Row row2, int nowInSec)
{
- for (int i = 0; i < clustering.size(); i++)
- writer.writeClusteringValue(clustering.get(i));
+ Columns mergedColumns = row1.columns().mergeTo(row2.columns());
+ Row.Builder builder = ArrayBackedRow.sortedBuilder(mergedColumns);
+ merge(row1, row2, mergedColumns, builder, nowInSec, SecondaryIndexManager.nullUpdater);
+ return builder.build();
}
- public static void merge(Row row1, Row row2, Columns mergedColumns, Row.Writer writer, int nowInSec)
+ public static void merge(Row row1, Row row2, Columns mergedColumns, Row.Builder builder, int nowInSec)
{
- merge(row1, row2, mergedColumns, writer, nowInSec, SecondaryIndexManager.nullUpdater);
+ merge(row1, row2, mergedColumns, builder, nowInSec, SecondaryIndexManager.nullUpdater);
}
// Merge rows in memtable
@@ -146,26 +232,26 @@ public abstract class Rows
public static long merge(Row existing,
Row update,
Columns mergedColumns,
- Row.Writer writer,
+ Row.Builder builder,
int nowInSec,
SecondaryIndexManager.Updater indexUpdater)
{
Clustering clustering = existing.clustering();
- writeClustering(clustering, writer);
+ builder.newRow(clustering);
LivenessInfo existingInfo = existing.primaryKeyLivenessInfo();
LivenessInfo updateInfo = update.primaryKeyLivenessInfo();
- LivenessInfo mergedInfo = existingInfo.mergeWith(updateInfo);
+ LivenessInfo mergedInfo = existingInfo.supersedes(updateInfo) ? existingInfo : updateInfo;
long timeDelta = Math.abs(existingInfo.timestamp() - mergedInfo.timestamp());
DeletionTime deletion = existing.deletion().supersedes(update.deletion()) ? existing.deletion() : update.deletion();
if (deletion.deletes(mergedInfo))
- mergedInfo = LivenessInfo.NONE;
+ mergedInfo = LivenessInfo.EMPTY;
- writer.writePartitionKeyLivenessInfo(mergedInfo);
- writer.writeRowDeletion(deletion);
+ builder.addPrimaryKeyLivenessInfo(mergedInfo);
+ builder.addRowDeletion(deletion);
indexUpdater.maybeIndex(clustering, mergedInfo.timestamp(), mergedInfo.ttl(), deletion);
@@ -178,7 +264,7 @@ public abstract class Rows
existingCell,
updateCell,
deletion,
- writer,
+ builder,
nowInSec,
indexUpdater));
}
@@ -186,20 +272,22 @@ public abstract class Rows
for (int i = 0; i < mergedColumns.complexColumnCount(); i++)
{
ColumnDefinition c = mergedColumns.getComplex(i);
- DeletionTime existingDt = existing.getDeletion(c);
- DeletionTime updateDt = update.getDeletion(c);
+ ComplexColumnData existingData = existing.getComplexColumnData(c);
+ ComplexColumnData updateData = update.getComplexColumnData(c);
+
+ DeletionTime existingDt = existingData == null ? DeletionTime.LIVE : existingData.complexDeletion();
+ DeletionTime updateDt = updateData == null ? DeletionTime.LIVE : updateData.complexDeletion();
DeletionTime maxDt = existingDt.supersedes(updateDt) ? existingDt : updateDt;
if (maxDt.supersedes(deletion))
- writer.writeComplexDeletion(c, maxDt);
+ builder.addComplexDeletion(c, maxDt);
else
maxDt = deletion;
- Iterator<Cell> existingCells = existing.getCells(c);
- Iterator<Cell> updateCells = update.getCells(c);
- timeDelta = Math.min(timeDelta, Cells.reconcileComplex(clustering, c, existingCells, updateCells, maxDt, writer, nowInSec, indexUpdater));
+ Iterator<Cell> existingCells = existingData == null ? null : existingData.iterator();
+ Iterator<Cell> updateCells = updateData == null ? null : updateData.iterator();
+ timeDelta = Math.min(timeDelta, Cells.reconcileComplex(clustering, c, existingCells, updateCells, maxDt, builder, nowInSec, indexUpdater));
}
- writer.endOfRow();
return timeDelta;
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/rows/SerializationHelper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/SerializationHelper.java b/src/java/org/apache/cassandra/db/rows/SerializationHelper.java
index 56b993c..6b4bc2e 100644
--- a/src/java/org/apache/cassandra/db/rows/SerializationHelper.java
+++ b/src/java/org/apache/cassandra/db/rows/SerializationHelper.java
@@ -18,12 +18,13 @@
package org.apache.cassandra.db.rows;
import java.nio.ByteBuffer;
+import java.util.*;
+import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.ColumnDefinition;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.context.CounterContext;
import org.apache.cassandra.db.filter.ColumnFilter;
-import org.apache.cassandra.utils.ByteBufferUtil;
public class SerializationHelper
{
@@ -38,100 +39,90 @@ public class SerializationHelper
* when we must ensure that deserializing and reserializing the
* result yield the exact same bytes. Streaming uses this.
*/
- public static enum Flag
+ public enum Flag
{
- LOCAL, FROM_REMOTE, PRESERVE_SIZE;
+ LOCAL, FROM_REMOTE, PRESERVE_SIZE
}
private final Flag flag;
public final int version;
- private final ReusableLivenessInfo livenessInfo = new ReusableLivenessInfo();
-
- // The currently read row liveness infos (timestamp, ttl and localDeletionTime).
- private long rowTimestamp;
- private int rowTTL;
- private int rowLocalDeletionTime;
-
private final ColumnFilter columnsToFetch;
private ColumnFilter.Tester tester;
- public SerializationHelper(int version, Flag flag, ColumnFilter columnsToFetch)
+ private final Map<ByteBuffer, CFMetaData.DroppedColumn> droppedColumns;
+ private CFMetaData.DroppedColumn currentDroppedComplex;
+
+
+ public SerializationHelper(CFMetaData metadata, int version, Flag flag, ColumnFilter columnsToFetch)
{
this.flag = flag;
this.version = version;
this.columnsToFetch = columnsToFetch;
+ this.droppedColumns = metadata.getDroppedColumns();
}
- public SerializationHelper(int version, Flag flag)
+ public SerializationHelper(CFMetaData metadata, int version, Flag flag)
{
- this(version, flag, null);
+ this(metadata, version, flag, null);
}
- public void writePartitionKeyLivenessInfo(Row.Writer writer, long timestamp, int ttl, int localDeletionTime)
+ public Columns fetchedStaticColumns(SerializationHeader header)
{
- livenessInfo.setTo(timestamp, ttl, localDeletionTime);
- writer.writePartitionKeyLivenessInfo(livenessInfo);
-
- rowTimestamp = timestamp;
- rowTTL = ttl;
- rowLocalDeletionTime = localDeletionTime;
+ return columnsToFetch == null ? header.columns().statics : columnsToFetch.fetchedColumns().statics;
}
- public long getRowTimestamp()
+ public Columns fetchedRegularColumns(SerializationHeader header)
{
- return rowTimestamp;
+ return columnsToFetch == null ? header.columns().regulars : columnsToFetch.fetchedColumns().regulars;
}
- public int getRowTTL()
+ public boolean includes(ColumnDefinition column)
{
- return rowTTL;
+ return columnsToFetch == null || columnsToFetch.includes(column);
}
- public int getRowLocalDeletionTime()
+ public boolean includes(CellPath path)
{
- return rowLocalDeletionTime;
+ return path == null || tester == null || tester.includes(path);
}
- public boolean includes(ColumnDefinition column)
+ public boolean canSkipValue(ColumnDefinition column)
{
- return columnsToFetch == null || columnsToFetch.includes(column);
+ return columnsToFetch != null && columnsToFetch.canSkipValue(column);
}
- public boolean canSkipValue(ColumnDefinition column)
+ public boolean canSkipValue(CellPath path)
{
- return columnsToFetch != null && columnsToFetch.canSkipValue(column);
+ return path != null && tester != null && tester.canSkipValue(path);
}
public void startOfComplexColumn(ColumnDefinition column)
{
this.tester = columnsToFetch == null ? null : columnsToFetch.newTester(column);
+ this.currentDroppedComplex = droppedColumns.get(column.name.bytes);
}
- public void endOfComplexColumn(ColumnDefinition column)
+ public void endOfComplexColumn()
{
this.tester = null;
}
- public void writeCell(Row.Writer writer,
- ColumnDefinition column,
- boolean isCounter,
- ByteBuffer value,
- long timestamp,
- int localDelTime,
- int ttl,
- CellPath path)
+ public boolean isDropped(Cell cell, boolean isComplex)
{
- livenessInfo.setTo(timestamp, ttl, localDelTime);
+ CFMetaData.DroppedColumn dropped = isComplex ? currentDroppedComplex : droppedColumns.get(cell.column().name.bytes);
+ return dropped != null && cell.timestamp() <= dropped.droppedTime;
+ }
- if (isCounter && ((flag == Flag.FROM_REMOTE || (flag == Flag.LOCAL && CounterContext.instance().shouldClearLocal(value)))))
- value = CounterContext.instance().clearAllLocal(value);
+ public boolean isDroppedComplexDeletion(DeletionTime complexDeletion)
+ {
+ return currentDroppedComplex != null && complexDeletion.markedForDeleteAt() <= currentDroppedComplex.droppedTime;
+ }
- if (!column.isComplex() || tester == null || tester.includes(path))
- {
- if (tester != null && tester.canSkipValue(path))
- value = ByteBufferUtil.EMPTY_BYTE_BUFFER;
- writer.writeCell(column, isCounter, value, livenessInfo, path);
- }
+ public ByteBuffer maybeClearCounterValue(ByteBuffer value)
+ {
+ return flag == Flag.FROM_REMOTE || (flag == Flag.LOCAL && CounterContext.instance().shouldClearLocal(value))
+ ? CounterContext.instance().clearAllLocal(value)
+ : value;
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/rows/SimpleRowDataBlock.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/SimpleRowDataBlock.java b/src/java/org/apache/cassandra/db/rows/SimpleRowDataBlock.java
deleted file mode 100644
index 08f37fd..0000000
--- a/src/java/org/apache/cassandra/db/rows/SimpleRowDataBlock.java
+++ /dev/null
@@ -1,188 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db.rows;
-
-import java.nio.ByteBuffer;
-
-import com.google.common.collect.UnmodifiableIterator;
-
-import org.apache.cassandra.config.ColumnDefinition;
-import org.apache.cassandra.db.*;
-import org.apache.cassandra.utils.ObjectSizes;
-
-/**
- * Holds cells data for the simple columns of one or more rows.
- * <p>
- * In practice, a {@code SimpleRowDataBlock} contains a single {@code CellData} "array" and
- * the (simple) columns for which the {@code SimplerowDataBlock} has data for. The cell for
- * a row i and a column c is stored in the {@code CellData} at index 'i * index(c)'.
- * <p>
- * This does mean that we store cells in a "dense" way: if column doesn't have a cell for a
- * given row, the correspond index in the cell data array will simple have a {@code null} value.
- * We might want to switch to a more sparse encoding in the future but we keep it simple for
- * now (having a sparse encoding make things a tad more complex because we need to be able to
- * swap the cells for 2 given rows as seen in ComplexRowDataBlock).
- */
-public class SimpleRowDataBlock
-{
- private static final long EMPTY_SIZE = ObjectSizes.measure(new SimpleRowDataBlock(Columns.NONE, 0, false));
-
- final Columns columns;
- final CellData data;
-
- public SimpleRowDataBlock(Columns columns, int rows, boolean isCounter)
- {
- this.columns = columns;
- this.data = new CellData(rows * columns.simpleColumnCount(), isCounter);
- }
-
- public Columns columns()
- {
- return columns;
- }
-
- // Swap row i and j
- public void swap(int i, int j)
- {
- int s = columns.simpleColumnCount();
- for (int k = 0; k < s; k++)
- data.swapCell(i * s + k, j * s + k);
- }
-
- // Merge row i into j
- public void merge(int i, int j, int nowInSec)
- {
- int s = columns.simpleColumnCount();
- for (int k = 0; k < s; k++)
- data.mergeCell(i * s + k, j * s + k, nowInSec);
- }
-
- // Move row i into j
- public void move(int i, int j)
- {
- int s = columns.simpleColumnCount();
- for (int k = 0; k < s; k++)
- data.moveCell(i * s + k, j * s + k);
- }
-
- public long unsharedHeapSizeExcludingData()
- {
- return EMPTY_SIZE + data.unsharedHeapSizeExcludingData();
- }
-
- public int dataSize()
- {
- return data.dataSize();
- }
-
- public CellWriter cellWriter(boolean inOrderCells)
- {
- return new CellWriter(inOrderCells);
- }
-
- public static CellData.ReusableCell reusableCell()
- {
- return new CellData.ReusableCell();
- }
-
- public static ReusableIterator reusableIterator()
- {
- return new ReusableIterator();
- }
-
- public void clear()
- {
- data.clear();
- }
-
- static class ReusableIterator extends UnmodifiableIterator<Cell>
- {
- private SimpleRowDataBlock dataBlock;
- private final CellData.ReusableCell cell = new CellData.ReusableCell();
-
- private int base;
- private int column;
-
- private ReusableIterator()
- {
- }
-
- public ReusableIterator setTo(SimpleRowDataBlock dataBlock, int row)
- {
- this.dataBlock = dataBlock;
- this.base = dataBlock == null ? -1 : row * dataBlock.columns.simpleColumnCount();
- this.column = 0;
- return this;
- }
-
- public boolean hasNext()
- {
- if (dataBlock == null)
- return false;
-
- int columnCount = dataBlock.columns.simpleColumnCount();
- // iterate over column until we find one with data
- while (column < columnCount && !dataBlock.data.hasCell(base + column))
- ++column;
-
- return column < columnCount;
- }
-
- public Cell next()
- {
- cell.setTo(dataBlock.data, dataBlock.columns.getSimple(column), base + column);
- ++column;
- return cell;
- }
- }
-
- public class CellWriter
- {
- private final boolean inOrderCells;
-
- private int base;
- private int lastColumnIdx;
-
- public CellWriter(boolean inOrderCells)
- {
- this.inOrderCells = inOrderCells;
- }
-
- public void addCell(ColumnDefinition column, ByteBuffer value, LivenessInfo info)
- {
- int fromIdx = inOrderCells ? lastColumnIdx : 0;
- lastColumnIdx = columns.simpleIdx(column, fromIdx);
- assert lastColumnIdx >= 0 : "Cannot find column " + column.name + " in " + columns + " from " + fromIdx;
- int idx = base + lastColumnIdx;
- data.setCell(idx, value, info);
- }
-
- public void reset()
- {
- base = 0;
- lastColumnIdx = 0;
- data.clear();
- }
-
- public void endOfRow()
- {
- base += columns.simpleColumnCount();
- lastColumnIdx = 0;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/rows/StaticRow.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/StaticRow.java b/src/java/org/apache/cassandra/db/rows/StaticRow.java
deleted file mode 100644
index 2ad9fb4..0000000
--- a/src/java/org/apache/cassandra/db/rows/StaticRow.java
+++ /dev/null
@@ -1,193 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db.rows;
-
-import java.nio.ByteBuffer;
-import java.util.Iterator;
-
-import org.apache.cassandra.db.*;
-
-import org.apache.cassandra.config.ColumnDefinition;
-import org.apache.cassandra.utils.SearchIterator;
-
-public class StaticRow extends AbstractRow
-{
- private final DeletionTime deletion;
- private final RowDataBlock data;
-
- private StaticRow(DeletionTime deletion, RowDataBlock data)
- {
- this.deletion = deletion.takeAlias();
- this.data = data;
- }
-
- public Columns columns()
- {
- return data.columns();
- }
-
- public Cell getCell(ColumnDefinition c)
- {
- assert !c.isComplex();
- if (data.simpleData == null)
- return null;
-
- int idx = columns().simpleIdx(c, 0);
- if (idx < 0)
- return null;
-
- return SimpleRowDataBlock.reusableCell().setTo(data.simpleData.data, c, idx);
- }
-
- public Cell getCell(ColumnDefinition c, CellPath path)
- {
- assert c.isComplex();
-
- ComplexRowDataBlock dataBlock = data.complexData;
- if (dataBlock == null)
- return null;
-
- int idx = dataBlock.cellIdx(0, c, path);
- if (idx < 0)
- return null;
-
- return SimpleRowDataBlock.reusableCell().setTo(dataBlock.cellData(0), c, idx);
- }
-
- public Iterator<Cell> getCells(ColumnDefinition c)
- {
- assert c.isComplex();
- return ComplexRowDataBlock.reusableComplexCells().setTo(data.complexData, 0, c);
- }
-
- public boolean hasComplexDeletion()
- {
- return data.hasComplexDeletion(0);
- }
-
- public DeletionTime getDeletion(ColumnDefinition c)
- {
- assert c.isComplex();
- if (data.complexData == null)
- return DeletionTime.LIVE;
-
- int idx = data.complexData.complexDeletionIdx(0, c);
- return idx < 0
- ? DeletionTime.LIVE
- : ComplexRowDataBlock.complexDeletionCursor().setTo(data.complexData.complexDelTimes, idx);
- }
-
- public Iterator<Cell> iterator()
- {
- return RowDataBlock.reusableIterator().setTo(data, 0);
- }
-
- public SearchIterator<ColumnDefinition, ColumnData> searchIterator()
- {
- return new SearchIterator<ColumnDefinition, ColumnData>()
- {
- private int simpleIdx = 0;
-
- public boolean hasNext()
- {
- // TODO: we can do better, but we expect users to no rely on this anyway
- return true;
- }
-
- public ColumnData next(ColumnDefinition column)
- {
- if (column.isComplex())
- {
- // TODO: this is sub-optimal
-
- Iterator<Cell> cells = getCells(column);
- return cells == null ? null : new ColumnData(column, null, cells, getDeletion(column));
- }
- else
- {
- simpleIdx = columns().simpleIdx(column, simpleIdx);
- assert simpleIdx >= 0;
-
- Cell cell = SimpleRowDataBlock.reusableCell().setTo(data.simpleData.data, column, simpleIdx);
- ++simpleIdx;
- return cell == null ? null : new ColumnData(column, cell, null, null);
- }
- }
- };
- }
-
- public Row takeAlias()
- {
- return this;
- }
-
- public Clustering clustering()
- {
- return Clustering.STATIC_CLUSTERING;
- }
-
- public LivenessInfo primaryKeyLivenessInfo()
- {
- return LivenessInfo.NONE;
- }
-
- public DeletionTime deletion()
- {
- return deletion;
- }
-
- public static Builder builder(Columns columns, boolean inOrderCells, boolean isCounter)
- {
- return new Builder(columns, inOrderCells, isCounter);
- }
-
- public static class Builder extends RowDataBlock.Writer
- {
- private final RowDataBlock data;
- private DeletionTime deletion = DeletionTime.LIVE;
-
- public Builder(Columns columns, boolean inOrderCells, boolean isCounter)
- {
- super(inOrderCells);
- this.data = new RowDataBlock(columns, 1, false, isCounter);
- updateWriter(data);
- }
-
- public void writeClusteringValue(ByteBuffer buffer)
- {
- throw new UnsupportedOperationException();
- }
-
- public void writePartitionKeyLivenessInfo(LivenessInfo info)
- {
- // Static rows are special and don't really have an existence unless they have live cells,
- // so we shouldn't have any partition key liveness info.
- assert info.equals(LivenessInfo.NONE);
- }
-
- public void writeRowDeletion(DeletionTime deletion)
- {
- this.deletion = deletion;
- }
-
- public StaticRow build()
- {
- return new StaticRow(deletion, data);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/rows/TombstoneFilteringRow.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/TombstoneFilteringRow.java b/src/java/org/apache/cassandra/db/rows/TombstoneFilteringRow.java
deleted file mode 100644
index a6167ea..0000000
--- a/src/java/org/apache/cassandra/db/rows/TombstoneFilteringRow.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db.rows;
-
-import org.apache.cassandra.config.ColumnDefinition;
-import org.apache.cassandra.db.*;
-
-public class TombstoneFilteringRow extends FilteringRow
-{
- private final int nowInSec;
-
- public TombstoneFilteringRow(int nowInSec)
- {
- this.nowInSec = nowInSec;
- }
-
- @Override
- protected boolean include (LivenessInfo info)
- {
- return info.isLive(nowInSec);
- }
-
- @Override
- protected boolean include(DeletionTime dt)
- {
- return false;
- }
-
- @Override
- protected boolean include(Cell cell)
- {
- return cell.isLive(nowInSec);
- }
-
- @Override
- protected boolean include(ColumnDefinition c, DeletionTime dt)
- {
- return false;
- }
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/rows/Unfiltered.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/Unfiltered.java b/src/java/org/apache/cassandra/db/rows/Unfiltered.java
index b1692e3..ba03741 100644
--- a/src/java/org/apache/cassandra/db/rows/Unfiltered.java
+++ b/src/java/org/apache/cassandra/db/rows/Unfiltered.java
@@ -57,4 +57,14 @@ public interface Unfiltered extends Clusterable
public String toString(CFMetaData metadata);
public String toString(CFMetaData metadata, boolean fullDetails);
+
+ default boolean isRow()
+ {
+ return kind() == Kind.ROW;
+ }
+
+ default boolean isRangeTombstoneMarker()
+ {
+ return kind() == Kind.RANGE_TOMBSTONE_MARKER;
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorSerializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorSerializer.java b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorSerializer.java
index 8abd228..129ed50 100644
--- a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorSerializer.java
+++ b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorSerializer.java
@@ -17,7 +17,6 @@
*/
package org.apache.cassandra.db.rows;
-import java.io.DataInput;
import java.io.IOException;
import java.io.IOError;
@@ -30,7 +29,6 @@ import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputPlus;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.cassandra.utils.FBUtilities;
/**
* Serialize/Deserialize an unfiltered row iterator.
@@ -38,7 +36,7 @@ import org.apache.cassandra.utils.FBUtilities;
* The serialization is composed of a header, follows by the rows and range tombstones of the iterator serialized
* until we read the end of the partition (see UnfilteredSerializer for details). The header itself
* is:
- * <cfid><key><flags><s_header>[<partition_deletion>][<static_row>]
+ * <cfid><key><flags><s_header>[<partition_deletion>][<static_row>][<row_estimate>]
* where:
* <cfid> is the table cfid.
* <key> is the partition key.
@@ -49,23 +47,17 @@ import org.apache.cassandra.utils.FBUtilities;
* - has partition deletion: whether or not there is a <partition_deletion> following
* - has static row: whether or not there is a <static_row> following
* - has row estimate: whether or not there is a <row_estimate> following
- * <s_header> is the SerializationHeader. More precisely it's
- * <min_timetamp><min_localDelTime><min_ttl>[<static_columns>]<columns>
- * where:
- * - <min_timestamp> is the base timestamp used for delta-encoding timestamps
- * - <min_localDelTime> is the base localDeletionTime used for delta-encoding local deletion times
- * - <min_ttl> is the base localDeletionTime used for delta-encoding ttls
- * - <static_columns> is the static columns if a static row is present. It's
- * the number of columns as an unsigned short, followed by the column names.
- * - <columns> is the columns of the rows of the iterator. It's serialized as <static_columns>.
+ * <s_header> is the {@code SerializationHeader}. It contains in particular the columns contains in the serialized
+ * iterator as well as other information necessary to decoding the serialized rows
+ * (see {@code SerializationHeader.Serializer for details}).
* <partition_deletion> is the deletion time for the partition (delta-encoded)
* <static_row> is the static row for this partition as serialized by UnfilteredSerializer.
- * <row_estimate> is the (potentially estimated) number of rows serialized. This is only use for
- * the purpose of some sizing on the receiving end and should not be relied upon too strongly.
+ * <row_estimate> is the (potentially estimated) number of rows serialized. This is only used for
+ * the purpose of sizing on the receiving end and should not be relied upon too strongly.
*
- * !!! Please note that the serialized value depends on the schema and as such should not be used as is if
- * it might be deserialized after the schema as changed !!!
- * TODO: we should add a flag to include the relevant metadata in the header for commit log etc.....
+ * Please note that the format described above is the on-wire format. On-disk, the format is basically the
+ * same, but the header is written once per sstable, not once per-partition. Further, the actual row and
+ * range tombstones are not written using this class, but rather by {@link ColumnIndex}.
*/
public class UnfilteredRowIteratorSerializer
{
@@ -79,11 +71,13 @@ public class UnfilteredRowIteratorSerializer
public static final UnfilteredRowIteratorSerializer serializer = new UnfilteredRowIteratorSerializer();
+ // Should only be used for the on-wire format.
public void serialize(UnfilteredRowIterator iterator, DataOutputPlus out, int version) throws IOException
{
serialize(iterator, out, version, -1);
}
+ // Should only be used for the on-wire format.
public void serialize(UnfilteredRowIterator iterator, DataOutputPlus out, int version, int rowEstimate) throws IOException
{
SerializationHeader header = new SerializationHeader(iterator.metadata(),
@@ -92,6 +86,7 @@ public class UnfilteredRowIteratorSerializer
serialize(iterator, out, header, version, rowEstimate);
}
+ // Should only be used for the on-wire format.
public void serialize(UnfilteredRowIterator iterator, DataOutputPlus out, SerializationHeader header, int version, int rowEstimate) throws IOException
{
CFMetaData.serializer.serialize(iterator.metadata(), out, version);
@@ -129,7 +124,7 @@ public class UnfilteredRowIteratorSerializer
UnfilteredSerializer.serializer.serialize(staticRow, header, out, version);
if (rowEstimate >= 0)
- out.writeInt(rowEstimate);
+ out.writeVInt(rowEstimate);
while (iterator.hasNext())
UnfilteredSerializer.serializer.serialize(iterator.next(), header, out, version);
@@ -137,7 +132,7 @@ public class UnfilteredRowIteratorSerializer
}
// Please note that this consume the iterator, and as such should not be called unless we have a simple way to
- // recreate an iterator for both serialize and serializedSize, which is mostly only PartitionUpdate
+ // recreate an iterator for both serialize and serializedSize, which is mostly only PartitionUpdate/ArrayBackedCachedPartition.
public long serializedSize(UnfilteredRowIterator iterator, int version, int rowEstimate)
{
SerializationHeader header = new SerializationHeader(iterator.metadata(),
@@ -166,7 +161,7 @@ public class UnfilteredRowIteratorSerializer
size += UnfilteredSerializer.serializer.serializedSize(staticRow, header, version);
if (rowEstimate >= 0)
- size += TypeSizes.sizeof(rowEstimate);
+ size += TypeSizes.sizeofVInt(rowEstimate);
while (iterator.hasNext())
size += UnfilteredSerializer.serializer.serializedSize(iterator.next(), header, version);
@@ -197,41 +192,29 @@ public class UnfilteredRowIteratorSerializer
Row staticRow = Rows.EMPTY_STATIC_ROW;
if (hasStatic)
- staticRow = UnfilteredSerializer.serializer.deserializeStaticRow(in, header, new SerializationHelper(version, flag));
+ staticRow = UnfilteredSerializer.serializer.deserializeStaticRow(in, header, new SerializationHelper(metadata, version, flag));
- int rowEstimate = hasRowEstimate ? in.readInt() : -1;
+ int rowEstimate = hasRowEstimate ? (int)in.readVInt() : -1;
return new Header(header, metadata, key, isReversed, false, partitionDeletion, staticRow, rowEstimate);
}
- public void deserialize(DataInput in, SerializationHelper helper, SerializationHeader header, Row.Writer rowWriter, RangeTombstoneMarker.Writer markerWriter) throws IOException
+ public UnfilteredRowIterator deserialize(DataInputPlus in, int version, SerializationHelper.Flag flag, Header header) throws IOException
{
- while (UnfilteredSerializer.serializer.deserialize(in, header, helper, rowWriter, markerWriter) != null);
- }
-
- public UnfilteredRowIterator deserialize(final DataInputPlus in, int version, SerializationHelper.Flag flag) throws IOException
- {
- final Header h = deserializeHeader(in, version, flag);
-
- if (h.isEmpty)
- return UnfilteredRowIterators.emptyIterator(h.metadata, h.key, h.isReversed);
-
- final int clusteringSize = h.metadata.clusteringColumns().size();
- final SerializationHelper helper = new SerializationHelper(version, flag);
+ if (header.isEmpty)
+ return UnfilteredRowIterators.emptyIterator(header.metadata, header.key, header.isReversed);
- return new AbstractUnfilteredRowIterator(h.metadata, h.key, h.partitionDeletion, h.sHeader.columns(), h.staticRow, h.isReversed, h.sHeader.stats())
+ final SerializationHelper helper = new SerializationHelper(header.metadata, version, flag);
+ final SerializationHeader sHeader = header.sHeader;
+ return new AbstractUnfilteredRowIterator(header.metadata, header.key, header.partitionDeletion, sHeader.columns(), header.staticRow, header.isReversed, sHeader.stats())
{
- private final ReusableRow row = new ReusableRow(clusteringSize, h.sHeader.columns().regulars, true, h.metadata.isCounter());
- private final RangeTombstoneMarker.Builder markerBuilder = new RangeTombstoneMarker.Builder(clusteringSize);
+ private final Row.Builder builder = ArrayBackedRow.sortedBuilder(sHeader.columns().regulars);
protected Unfiltered computeNext()
{
try
{
- Unfiltered.Kind kind = UnfilteredSerializer.serializer.deserialize(in, h.sHeader, helper, row.writer(), markerBuilder.reset());
- if (kind == null)
- return endOfData();
-
- return kind == Unfiltered.Kind.ROW ? row : markerBuilder.build();
+ Unfiltered unfiltered = UnfilteredSerializer.serializer.deserialize(in, sHeader, helper, builder);
+ return unfiltered == null ? endOfData() : unfiltered;
}
catch (IOException e)
{
@@ -241,30 +224,34 @@ public class UnfilteredRowIteratorSerializer
};
}
+ public UnfilteredRowIterator deserialize(DataInputPlus in, int version, SerializationHelper.Flag flag) throws IOException
+ {
+ return deserialize(in, version, flag, deserializeHeader(in, version, flag));
+ }
+
public static void writeDelTime(DeletionTime dt, SerializationHeader header, DataOutputPlus out) throws IOException
{
- out.writeLong(header.encodeTimestamp(dt.markedForDeleteAt()));
- out.writeInt(header.encodeDeletionTime(dt.localDeletionTime()));
+ out.writeVInt(header.encodeTimestamp(dt.markedForDeleteAt()));
+ out.writeVInt(header.encodeDeletionTime(dt.localDeletionTime()));
}
public static long delTimeSerializedSize(DeletionTime dt, SerializationHeader header)
{
- return TypeSizes.sizeof(header.encodeTimestamp(dt.markedForDeleteAt()))
- + TypeSizes.sizeof(header.encodeDeletionTime(dt.localDeletionTime()));
+ return TypeSizes.sizeofVInt(header.encodeTimestamp(dt.markedForDeleteAt()))
+ + TypeSizes.sizeofVInt(header.encodeDeletionTime(dt.localDeletionTime()));
}
- public static DeletionTime readDelTime(DataInput in, SerializationHeader header) throws IOException
+ public static DeletionTime readDelTime(DataInputPlus in, SerializationHeader header) throws IOException
{
- long markedAt = header.decodeTimestamp(in.readLong());
- int localDelTime = header.decodeDeletionTime(in.readInt());
- return new SimpleDeletionTime(markedAt, localDelTime);
+ long markedAt = header.decodeTimestamp(in.readVInt());
+ int localDelTime = header.decodeDeletionTime((int)in.readVInt());
+ return new DeletionTime(markedAt, localDelTime);
}
- public static void skipDelTime(DataInput in, SerializationHeader header) throws IOException
+ public static void skipDelTime(DataInputPlus in, SerializationHeader header) throws IOException
{
- // Note that since we might use VINT, we shouldn't assume the size of a long or an int
- in.readLong();
- in.readInt();
+ in.readVInt();
+ in.readVInt();
}
public static class Header
http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java
index 2c71cf3..6b6ec67 100644
--- a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java
+++ b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java
@@ -17,7 +17,6 @@
*/
package org.apache.cassandra.db.rows;
-import java.nio.ByteBuffer;
import java.util.*;
import java.security.MessageDigest;
@@ -26,13 +25,10 @@ import org.slf4j.LoggerFactory;
import com.google.common.collect.AbstractIterator;
import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.ColumnDefinition;
import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.partitions.PartitionUpdate;
import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.io.sstable.CorruptSSTableException;
import org.apache.cassandra.serializers.MarshalException;
-import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.IMergeIterator;
import org.apache.cassandra.utils.MergeIterator;
@@ -49,13 +45,9 @@ public abstract class UnfilteredRowIterators
public interface MergeListener
{
- public void onMergePartitionLevelDeletion(DeletionTime mergedDeletion, DeletionTime[] versions);
-
- public void onMergingRows(Clustering clustering, LivenessInfo mergedInfo, DeletionTime mergedDeletion, Row[] versions);
- public void onMergedComplexDeletion(ColumnDefinition c, DeletionTime mergedComplexDeletion, DeletionTime[] versions);
- public void onMergedCells(Cell mergedCell, Cell[] versions);
- public void onRowDone();
+ public void onMergedPartitionLevelDeletion(DeletionTime mergedDeletion, DeletionTime[] versions);
+ public void onMergedRows(Row merged, Columns columns, Row[] versions);
public void onMergedRangeTombstoneMarkers(RangeTombstoneMarker merged, RangeTombstoneMarker[] versions);
public void close();
@@ -87,14 +79,13 @@ public abstract class UnfilteredRowIterators
}
/**
- * Returns an iterator that is the result of merging other iterators, and using
+ * Returns an iterator that is the result of merging other iterators, and (optionally) using
* specific MergeListener.
*
* Note that this method assumes that there is at least 2 iterators to merge.
*/
public static UnfilteredRowIterator merge(List<UnfilteredRowIterator> iterators, int nowInSec, MergeListener mergeListener)
{
- assert mergeListener != null;
return UnfilteredRowMergeIterator.create(iterators, nowInSec, mergeListener);
}
@@ -175,10 +166,7 @@ public abstract class UnfilteredRowIterators
while (iterator.hasNext())
{
Unfiltered unfiltered = iterator.next();
- if (unfiltered.kind() == Unfiltered.Kind.ROW)
- ((Row) unfiltered).digest(digest);
- else
- ((RangeTombstoneMarker) unfiltered).digest(digest);
+ unfiltered.digest(digest);
}
}
@@ -198,12 +186,12 @@ public abstract class UnfilteredRowIterators
&& iter1.staticRow().equals(iter2.staticRow());
return new AbstractUnfilteredRowIterator(iter1.metadata(),
- iter1.partitionKey(),
- iter1.partitionLevelDeletion(),
- iter1.columns(),
- iter1.staticRow(),
- iter1.isReverseOrder(),
- iter1.stats())
+ iter1.partitionKey(),
+ iter1.partitionLevelDeletion(),
+ iter1.columns(),
+ iter1.staticRow(),
+ iter1.isReverseOrder(),
+ iter1.stats())
{
protected Unfiltered computeNext()
{
@@ -230,155 +218,35 @@ public abstract class UnfilteredRowIterators
public static UnfilteredRowIterator cloningIterator(UnfilteredRowIterator iterator, final AbstractAllocator allocator)
{
- return new WrappingUnfilteredRowIterator(iterator)
+ return new AlteringUnfilteredRowIterator(iterator)
{
- private final CloningRow cloningRow = new CloningRow();
- private final RangeTombstoneMarker.Builder markerBuilder = new RangeTombstoneMarker.Builder(iterator.metadata().comparator.size());
-
- public Unfiltered next()
- {
- Unfiltered next = super.next();
- return next.kind() == Unfiltered.Kind.ROW
- ? cloningRow.setTo((Row)next)
- : clone((RangeTombstoneMarker)next);
- }
-
- private RangeTombstoneMarker clone(RangeTombstoneMarker marker)
- {
- markerBuilder.reset();
+ private Row.Builder regularBuilder;
- RangeTombstone.Bound bound = marker.clustering();
- for (int i = 0; i < bound.size(); i++)
- markerBuilder.writeClusteringValue(allocator.clone(bound.get(i)));
- markerBuilder.writeBoundKind(bound.kind());
- if (marker.isBoundary())
- {
- RangeTombstoneBoundaryMarker bm = (RangeTombstoneBoundaryMarker)marker;
- markerBuilder.writeBoundaryDeletion(bm.endDeletionTime(), bm.startDeletionTime());
- }
- else
- {
- markerBuilder.writeBoundDeletion(((RangeTombstoneBoundMarker)marker).deletionTime());
- }
- markerBuilder.endOfMarker();
- return markerBuilder.build();
- }
-
- class CloningRow extends WrappingRow
+ @Override
+ protected Row computeNextStatic(Row row)
{
- private final CloningClustering cloningClustering = new CloningClustering();
- private final CloningCell cloningCell = new CloningCell();
-
- protected Cell filterCell(Cell cell)
- {
- return cloningCell.setTo(cell);
- }
-
- @Override
- public Clustering clustering()
- {
- return cloningClustering.setTo(super.clustering());
- }
+ Row.Builder staticBuilder = allocator.cloningArrayBackedRowBuilder(columns().statics);
+ return Rows.copy(row, staticBuilder).build();
}
- class CloningClustering extends Clustering
+ @Override
+ protected Row computeNext(Row row)
{
- private Clustering wrapped;
-
- public Clustering setTo(Clustering wrapped)
- {
- this.wrapped = wrapped;
- return this;
- }
-
- public int size()
- {
- return wrapped.size();
- }
-
- public ByteBuffer get(int i)
- {
- ByteBuffer value = wrapped.get(i);
- return value == null ? null : allocator.clone(value);
- }
+ if (regularBuilder == null)
+ regularBuilder = allocator.cloningArrayBackedRowBuilder(columns().regulars);
- public ByteBuffer[] getRawValues()
- {
- throw new UnsupportedOperationException();
- }
+ return Rows.copy(row, regularBuilder).build();
}
- class CloningCell extends AbstractCell
+ @Override
+ protected RangeTombstoneMarker computeNext(RangeTombstoneMarker marker)
{
- private Cell wrapped;
-
- public Cell setTo(Cell wrapped)
- {
- this.wrapped = wrapped;
- return this;
- }
-
- public ColumnDefinition column()
- {
- return wrapped.column();
- }
-
- public boolean isCounterCell()
- {
- return wrapped.isCounterCell();
- }
-
- public ByteBuffer value()
- {
- return allocator.clone(wrapped.value());
- }
-
- public LivenessInfo livenessInfo()
- {
- return wrapped.livenessInfo();
- }
-
- public CellPath path()
- {
- CellPath path = wrapped.path();
- if (path == null)
- return null;
-
- assert path.size() == 1;
- return CellPath.create(allocator.clone(path.get(0)));
- }
+ return marker.copy(allocator);
}
};
}
/**
- * Turns the given iterator into an update.
- *
- * Warning: this method does not close the provided iterator, it is up to
- * the caller to close it.
- */
- public static PartitionUpdate toUpdate(UnfilteredRowIterator iterator)
- {
- PartitionUpdate update = new PartitionUpdate(iterator.metadata(), iterator.partitionKey(), iterator.columns(), 1);
-
- update.addPartitionDeletion(iterator.partitionLevelDeletion());
-
- if (iterator.staticRow() != Rows.EMPTY_STATIC_ROW)
- iterator.staticRow().copyTo(update.staticWriter());
-
- while (iterator.hasNext())
- {
- Unfiltered unfiltered = iterator.next();
- if (unfiltered.kind() == Unfiltered.Kind.ROW)
- ((Row) unfiltered).copyTo(update.writer());
- else
- ((RangeTombstoneMarker) unfiltered).copyTo(update.markerWriter(iterator.isReverseOrder()));
- }
-
- return update;
- }
-
- /**
* Validate that the data of the provided iterator is valid, that is that the values
* it contains are valid for the type they represent, and more generally that the
* infos stored are sensible.
@@ -393,15 +261,34 @@ public abstract class UnfilteredRowIterators
*/
public static UnfilteredRowIterator withValidation(UnfilteredRowIterator iterator, final String filename)
{
- return new WrappingUnfilteredRowIterator(iterator)
+ return new AlteringUnfilteredRowIterator(iterator)
{
- public Unfiltered next()
+ @Override
+ protected Row computeNextStatic(Row row)
+ {
+ validate(row);
+ return row;
+ }
+
+ @Override
+ protected Row computeNext(Row row)
+ {
+ validate(row);
+ return row;
+ }
+
+ @Override
+ protected RangeTombstoneMarker computeNext(RangeTombstoneMarker marker)
+ {
+ validate(marker);
+ return marker;
+ }
+
+ private void validate(Unfiltered unfiltered)
{
- Unfiltered next = super.next();
try
{
- next.validateData(metadata());
- return next;
+ unfiltered.validateData(iterator.metadata());
}
catch (MarshalException me)
{
@@ -412,56 +299,6 @@ public abstract class UnfilteredRowIterators
}
/**
- * Convert all expired cells to equivalent tombstones.
- * <p>
- * Once a cell expires, it acts exactly as a tombstone and this until it is purged. But in particular that
- * means we don't care about the value of an expired cell, and it is thus equivalent but more efficient to
- * replace the expired cell by an equivalent tombstone (that has no value).
- *
- * @param iterator the iterator in which to conver expired cells.
- * @param nowInSec the current time to use to decide if a cell is expired.
- * @return an iterator that returns the same data than {@code iterator} but with all expired cells converted
- * to equivalent tombstones.
- */
- public static UnfilteredRowIterator convertExpiredCellsToTombstones(UnfilteredRowIterator iterator, final int nowInSec)
- {
- return new FilteringRowIterator(iterator)
- {
- protected FilteringRow makeRowFilter()
- {
- return new FilteringRow()
- {
- @Override
- protected Cell filterCell(Cell cell)
- {
- Cell filtered = super.filterCell(cell);
- if (filtered == null)
- return null;
-
- LivenessInfo info = filtered.livenessInfo();
- if (info.hasTTL() && !filtered.isLive(nowInSec))
- {
- // The column is now expired, we can safely return a simple tombstone. Note that as long as the expiring
- // column and the tombstone put together live longer than GC grace seconds, we'll fulfil our responsibility
- // to repair. See discussion at
- // http://cassandra-user-incubator-apache-org.3065146.n2.nabble.com/repair-compaction-and-tombstone-rows-td7583481.html
- return Cells.create(filtered.column(),
- filtered.isCounterCell(),
- ByteBufferUtil.EMPTY_BYTE_BUFFER,
- SimpleLivenessInfo.forDeletion(info.timestamp(), info.localDeletionTime() - info.ttl()),
- filtered.path());
- }
- else
- {
- return filtered;
- }
- }
- };
- }
- };
- }
-
- /**
* Wraps the provided iterator so it logs the returned atoms for debugging purposes.
* <p>
* Note that this is only meant for debugging as this can log a very large amount of
@@ -478,26 +315,28 @@ public abstract class UnfilteredRowIterators
iterator.isReverseOrder(),
iterator.partitionLevelDeletion().markedForDeleteAt());
- return new WrappingUnfilteredRowIterator(iterator)
+ return new AlteringUnfilteredRowIterator(iterator)
{
@Override
- public Row staticRow()
+ protected Row computeNextStatic(Row row)
{
- Row row = super.staticRow();
if (!row.isEmpty())
logger.info("[{}] {}", id, row.toString(metadata(), fullDetails));
return row;
}
@Override
- public Unfiltered next()
+ protected Row computeNext(Row row)
{
- Unfiltered next = super.next();
- if (next.kind() == Unfiltered.Kind.ROW)
- logger.info("[{}] {}", id, ((Row)next).toString(metadata(), fullDetails));
- else
- logger.info("[{}] {}", id, ((RangeTombstoneMarker)next).toString(metadata()));
- return next;
+ logger.info("[{}] {}", id, row.toString(metadata(), fullDetails));
+ return row;
+ }
+
+ @Override
+ protected RangeTombstoneMarker computeNext(RangeTombstoneMarker marker)
+ {
+ logger.info("[{}] {}", id, marker.toString(metadata()));
+ return marker;
}
};
}
@@ -526,10 +365,10 @@ public abstract class UnfilteredRowIterators
reversed,
mergeStats(iterators));
- this.listener = listener;
this.mergeIterator = MergeIterator.get(iterators,
reversed ? metadata.comparator.reversed() : metadata.comparator,
- new MergeReducer(metadata, iterators.size(), reversed, nowInSec));
+ new MergeReducer(iterators.size(), reversed, nowInSec, listener));
+ this.listener = listener;
}
private static UnfilteredRowMergeIterator create(List<UnfilteredRowIterator> iterators, int nowInSec, MergeListener listener)
@@ -591,7 +430,7 @@ public abstract class UnfilteredRowIterators
delTime = iterDeletion;
}
if (listener != null && !delTime.isLive())
- listener.onMergePartitionLevelDeletion(delTime, versions);
+ listener.onMergedPartitionLevelDeletion(delTime, versions);
return delTime;
}
@@ -605,14 +444,19 @@ public abstract class UnfilteredRowIterators
if (columns.isEmpty())
return Rows.EMPTY_STATIC_ROW;
- Row.Merger merger = Row.Merger.createStatic(metadata, iterators.size(), nowInSec, columns, listener);
+ if (iterators.stream().allMatch(iter -> iter.staticRow().isEmpty()))
+ return Rows.EMPTY_STATIC_ROW;
+
+ Row.Merger merger = new Row.Merger(iterators.size(), nowInSec, columns);
for (int i = 0; i < iterators.size(); i++)
merger.add(i, iterators.get(i).staticRow());
- // Note that we should call 'takeAlias' on the result in theory, but we know that we
- // won't reuse the merger and so that it's ok not to.
Row merged = merger.merge(partitionDeletion);
- return merged == null ? Rows.EMPTY_STATIC_ROW : merged;
+ if (merged == null)
+ merged = Rows.EMPTY_STATIC_ROW;
+ if (listener != null)
+ listener.onMergedRows(merged, columns, merger.mergedRows());
+ return merged;
}
private static PartitionColumns collectColumns(List<UnfilteredRowIterator> iterators)
@@ -659,26 +503,26 @@ public abstract class UnfilteredRowIterators
listener.close();
}
- /**
- * Specific reducer for merge operations that rewrite the same reusable
- * row every time. This also skip cells shadowed by range tombstones when writing.
- */
private class MergeReducer extends MergeIterator.Reducer<Unfiltered, Unfiltered>
{
+ private final MergeListener listener;
+
private Unfiltered.Kind nextKind;
private final Row.Merger rowMerger;
private final RangeTombstoneMarker.Merger markerMerger;
- private MergeReducer(CFMetaData metadata, int size, boolean reversed, int nowInSec)
+ private MergeReducer(int size, boolean reversed, int nowInSec, MergeListener listener)
{
- this.rowMerger = Row.Merger.createRegular(metadata, size, nowInSec, columns().regulars, listener);
- this.markerMerger = new RangeTombstoneMarker.Merger(metadata, size, partitionLevelDeletion(), reversed, listener);
+ this.rowMerger = new Row.Merger(size, nowInSec, columns().regulars);
+ this.markerMerger = new RangeTombstoneMarker.Merger(size, partitionLevelDeletion(), reversed);
+ this.listener = listener;
}
@Override
public boolean trivialReduceIsTrivial()
{
+ // If we have a listener, we must signal it even when we have a single version
return listener == null;
}
@@ -693,9 +537,20 @@ public abstract class UnfilteredRowIterators
protected Unfiltered getReduced()
{
- return nextKind == Unfiltered.Kind.ROW
- ? rowMerger.merge(markerMerger.activeDeletion())
- : markerMerger.merge();
+ if (nextKind == Unfiltered.Kind.ROW)
+ {
+ Row merged = rowMerger.merge(markerMerger.activeDeletion());
+ if (listener != null)
+ listener.onMergedRows(merged == null ? ArrayBackedRow.emptyRow(rowMerger.mergedClustering()) : merged, columns().regulars, rowMerger.mergedRows());
+ return merged;
+ }
+ else
+ {
+ RangeTombstoneMarker merged = markerMerger.merge();
+ if (merged != null && listener != null)
+ listener.onMergedRangeTombstoneMarkers(merged, markerMerger.mergedMarkers());
+ return merged;
+ }
}
protected void onKeyChange()
@@ -712,13 +567,11 @@ public abstract class UnfilteredRowIterators
{
private final UnfilteredRowIterator iter;
private final int nowInSec;
- private final TombstoneFilteringRow filter;
public FilteringIterator(UnfilteredRowIterator iter, int nowInSec)
{
this.iter = iter;
this.nowInSec = nowInSec;
- this.filter = new TombstoneFilteringRow(nowInSec);
}
public CFMetaData metadata()
@@ -744,7 +597,11 @@ public abstract class UnfilteredRowIterators
public Row staticRow()
{
Row row = iter.staticRow();
- return row.isEmpty() ? row : new TombstoneFilteringRow(nowInSec).setTo(row);
+ if (row.isEmpty())
+ return Rows.EMPTY_STATIC_ROW;
+
+ row = row.purge(DeletionPurger.PURGE_ALL, nowInSec);
+ return row == null ? Rows.EMPTY_STATIC_ROW : row;
}
protected Row computeNext()
@@ -752,11 +609,11 @@ public abstract class UnfilteredRowIterators
while (iter.hasNext())
{
Unfiltered next = iter.next();
- if (next.kind() != Unfiltered.Kind.ROW)
+ if (next.isRangeTombstoneMarker())
continue;
- Row row = filter.setTo((Row)next);
- if (!row.isEmpty())
+ Row row = ((Row)next).purge(DeletionPurger.PURGE_ALL, nowInSec);
+ if (row != null)
return row;
}
return endOfData();