You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2015/06/30 12:47:44 UTC
[20/51] [partial] cassandra git commit: Storage engine refactor,
a.k.a CASSANDRA-8099
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/rows/MemtableRowData.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/MemtableRowData.java b/src/java/org/apache/cassandra/db/rows/MemtableRowData.java
new file mode 100644
index 0000000..cad0765
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/rows/MemtableRowData.java
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.rows;
+
+import java.nio.ByteBuffer;
+
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.utils.ObjectSizes;
+import org.apache.cassandra.utils.memory.AbstractAllocator;
+
+/**
+ * Row data stored inside a memtable.
+ *
+ * This has methods like dataSize and unsharedHeapSizeExcludingData that are
+ * specific to memtables.
+ */
+public interface MemtableRowData extends Clusterable
+{
+ public Columns columns();
+
+ public int dataSize();
+
+ // returns the size of the Row and all references on the heap, excluding any costs associated with byte arrays
+ // that would be allocated by a clone operation, as these will be accounted for by the allocator
+ public long unsharedHeapSizeExcludingData();
+
+ public interface ReusableRow extends Row
+ {
+ public ReusableRow setTo(MemtableRowData rowData);
+ }
+
+ public class BufferRowData implements MemtableRowData
+ {
+ private static final long EMPTY_SIZE = ObjectSizes.measure(new BufferRowData(null, LivenessInfo.NONE, DeletionTime.LIVE, null));
+
+ private final Clustering clustering;
+ private final LivenessInfo livenessInfo;
+ private final DeletionTime deletion;
+ private final RowDataBlock dataBlock;
+
+ public BufferRowData(Clustering clustering, LivenessInfo livenessInfo, DeletionTime deletion, RowDataBlock dataBlock)
+ {
+ this.clustering = clustering;
+ this.livenessInfo = livenessInfo.takeAlias();
+ this.deletion = deletion.takeAlias();
+ this.dataBlock = dataBlock;
+ }
+
+ public Clustering clustering()
+ {
+ return clustering;
+ }
+
+ public Columns columns()
+ {
+ return dataBlock.columns();
+ }
+
+ public int dataSize()
+ {
+ return clustering.dataSize() + livenessInfo.dataSize() + deletion.dataSize() + dataBlock.dataSize();
+ }
+
+ public long unsharedHeapSizeExcludingData()
+ {
+ return EMPTY_SIZE
+ + (clustering == Clustering.STATIC_CLUSTERING ? 0 : ((BufferClustering)clustering).unsharedHeapSizeExcludingData())
+ + dataBlock.unsharedHeapSizeExcludingData();
+ }
+
+ public static ReusableRow createReusableRow()
+ {
+ return new BufferRow();
+ }
+
+ private static class BufferRow extends AbstractReusableRow implements ReusableRow
+ {
+ private BufferRowData rowData;
+
+ private BufferRow()
+ {
+ }
+
+ public ReusableRow setTo(MemtableRowData rowData)
+ {
+ assert rowData instanceof BufferRowData;
+ this.rowData = (BufferRowData)rowData;
+ return this;
+ }
+
+ protected RowDataBlock data()
+ {
+ return rowData.dataBlock;
+ }
+
+ protected int row()
+ {
+ return 0;
+ }
+
+ public Clustering clustering()
+ {
+ return rowData.clustering;
+ }
+
+ public LivenessInfo primaryKeyLivenessInfo()
+ {
+ return rowData.livenessInfo;
+ }
+
+ public DeletionTime deletion()
+ {
+ return rowData.deletion;
+ }
+ }
+ }
+
+ public class BufferClustering extends Clustering
+ {
+ private static final long EMPTY_SIZE = ObjectSizes.measure(new BufferClustering(0));
+
+ private final ByteBuffer[] values;
+
+ public BufferClustering(int size)
+ {
+ this.values = new ByteBuffer[size];
+ }
+
+ public void setClusteringValue(int i, ByteBuffer value)
+ {
+ values[i] = value;
+ }
+
+ public int size()
+ {
+ return values.length;
+ }
+
+ public ByteBuffer get(int i)
+ {
+ return values[i];
+ }
+
+ public ByteBuffer[] getRawValues()
+ {
+ return values;
+ }
+
+ public long unsharedHeapSizeExcludingData()
+ {
+ return EMPTY_SIZE + ObjectSizes.sizeOnHeapExcludingData(values);
+ }
+
+ @Override
+ public long unsharedHeapSize()
+ {
+ return EMPTY_SIZE + ObjectSizes.sizeOnHeapOf(values);
+ }
+
+ public Clustering takeAlias()
+ {
+ return this;
+ }
+ }
+
+ public class BufferCellPath extends CellPath.SimpleCellPath
+ {
+ private static final long EMPTY_SIZE = ObjectSizes.measure(new BufferCellPath(new ByteBuffer[0]));
+
+ private BufferCellPath(ByteBuffer[] values)
+ {
+ super(values);
+ }
+
+ public static BufferCellPath clone(CellPath path, AbstractAllocator allocator)
+ {
+ int size = path.size();
+ ByteBuffer[] values = new ByteBuffer[size];
+ for (int i = 0; i < size; i++)
+ values[i] = allocator.clone(path.get(0));
+ return new BufferCellPath(values);
+ }
+
+ public long unsharedHeapSizeExcludingData()
+ {
+ return EMPTY_SIZE + ObjectSizes.sizeOnHeapExcludingData(values);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/rows/RangeTombstoneBoundMarker.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/RangeTombstoneBoundMarker.java b/src/java/org/apache/cassandra/db/rows/RangeTombstoneBoundMarker.java
new file mode 100644
index 0000000..b5ac19b
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/rows/RangeTombstoneBoundMarker.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.rows;
+
+import java.nio.ByteBuffer;
+import java.security.MessageDigest;
+import java.util.Objects;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.*;
+
+/**
+ * A range tombstone marker that indicates the bound of a range tombstone (start or end).
+ */
+public class RangeTombstoneBoundMarker extends AbstractRangeTombstoneMarker
+{
+ private final DeletionTime deletion;
+
+ public RangeTombstoneBoundMarker(RangeTombstone.Bound bound, DeletionTime deletion)
+ {
+ super(bound);
+ assert bound.kind().isBound();
+ this.deletion = deletion;
+ }
+
+ public RangeTombstoneBoundMarker(Slice.Bound bound, DeletionTime deletion)
+ {
+ this(new RangeTombstone.Bound(bound.kind(), bound.getRawValues()), deletion);
+ }
+
+ public static RangeTombstoneBoundMarker inclusiveStart(ClusteringPrefix clustering, DeletionTime deletion)
+ {
+ return new RangeTombstoneBoundMarker(new RangeTombstone.Bound(RangeTombstone.Bound.Kind.INCL_START_BOUND, clustering.getRawValues()), deletion);
+ }
+
+ public static RangeTombstoneBoundMarker inclusiveEnd(ClusteringPrefix clustering, DeletionTime deletion)
+ {
+ return new RangeTombstoneBoundMarker(new RangeTombstone.Bound(RangeTombstone.Bound.Kind.INCL_END_BOUND, clustering.getRawValues()), deletion);
+ }
+
+ public static RangeTombstoneBoundMarker inclusiveOpen(boolean reversed, ByteBuffer[] boundValues, DeletionTime deletion)
+ {
+ RangeTombstone.Bound bound = RangeTombstone.Bound.inclusiveOpen(reversed, boundValues);
+ return new RangeTombstoneBoundMarker(bound, deletion);
+ }
+
+ public static RangeTombstoneBoundMarker exclusiveOpen(boolean reversed, ByteBuffer[] boundValues, DeletionTime deletion)
+ {
+ RangeTombstone.Bound bound = RangeTombstone.Bound.exclusiveOpen(reversed, boundValues);
+ return new RangeTombstoneBoundMarker(bound, deletion);
+ }
+
+ public static RangeTombstoneBoundMarker inclusiveClose(boolean reversed, ByteBuffer[] boundValues, DeletionTime deletion)
+ {
+ RangeTombstone.Bound bound = RangeTombstone.Bound.inclusiveClose(reversed, boundValues);
+ return new RangeTombstoneBoundMarker(bound, deletion);
+ }
+
+ public static RangeTombstoneBoundMarker exclusiveClose(boolean reversed, ByteBuffer[] boundValues, DeletionTime deletion)
+ {
+ RangeTombstone.Bound bound = RangeTombstone.Bound.exclusiveClose(reversed, boundValues);
+ return new RangeTombstoneBoundMarker(bound, deletion);
+ }
+
+ public boolean isBoundary()
+ {
+ return false;
+ }
+
+ /**
+ * The deletion time for the range tombstone this is a bound of.
+ */
+ public DeletionTime deletionTime()
+ {
+ return deletion;
+ }
+
+ public boolean isOpen(boolean reversed)
+ {
+ return bound.kind().isOpen(reversed);
+ }
+
+ public boolean isClose(boolean reversed)
+ {
+ return bound.kind().isClose(reversed);
+ }
+
+ public DeletionTime openDeletionTime(boolean reversed)
+ {
+ if (!isOpen(reversed))
+ throw new IllegalStateException();
+ return deletion;
+ }
+
+ public DeletionTime closeDeletionTime(boolean reversed)
+ {
+ if (isOpen(reversed))
+ throw new IllegalStateException();
+ return deletion;
+ }
+
+ public void copyTo(RangeTombstoneMarker.Writer writer)
+ {
+ copyBoundTo(writer);
+ writer.writeBoundDeletion(deletion);
+ writer.endOfMarker();
+ }
+
+ public void digest(MessageDigest digest)
+ {
+ bound.digest(digest);
+ deletion.digest(digest);
+ }
+
+ public String toString(CFMetaData metadata)
+ {
+ StringBuilder sb = new StringBuilder();
+ sb.append("Marker ");
+ sb.append(bound.toString(metadata));
+ sb.append("@").append(deletion.markedForDeleteAt());
+ return sb.toString();
+ }
+
+ @Override
+ public boolean equals(Object other)
+ {
+ if(!(other instanceof RangeTombstoneBoundMarker))
+ return false;
+
+ RangeTombstoneBoundMarker that = (RangeTombstoneBoundMarker)other;
+ return this.bound.equals(that.bound)
+ && this.deletion.equals(that.deletion);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hash(bound, deletion);
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/rows/RangeTombstoneBoundaryMarker.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/RangeTombstoneBoundaryMarker.java b/src/java/org/apache/cassandra/db/rows/RangeTombstoneBoundaryMarker.java
new file mode 100644
index 0000000..1140d40
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/rows/RangeTombstoneBoundaryMarker.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.rows;
+
+import java.nio.ByteBuffer;
+import java.security.MessageDigest;
+import java.util.Objects;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.*;
+
+/**
+ * A range tombstone marker that represents a boundary between 2 range tombstones (i.e. it closes one range and open another).
+ */
+public class RangeTombstoneBoundaryMarker extends AbstractRangeTombstoneMarker
+{
+ private final DeletionTime endDeletion;
+ private final DeletionTime startDeletion;
+
+ public RangeTombstoneBoundaryMarker(RangeTombstone.Bound bound, DeletionTime endDeletion, DeletionTime startDeletion)
+ {
+ super(bound);
+ assert bound.kind().isBoundary();
+ this.endDeletion = endDeletion;
+ this.startDeletion = startDeletion;
+ }
+
+ public static RangeTombstoneBoundaryMarker exclusiveCloseInclusiveOpen(boolean reversed, ByteBuffer[] boundValues, DeletionTime closeDeletion, DeletionTime openDeletion)
+ {
+ RangeTombstone.Bound bound = RangeTombstone.Bound.exclusiveCloseInclusiveOpen(reversed, boundValues);
+ DeletionTime endDeletion = reversed ? openDeletion : closeDeletion;
+ DeletionTime startDeletion = reversed ? closeDeletion : openDeletion;
+ return new RangeTombstoneBoundaryMarker(bound, endDeletion, startDeletion);
+ }
+
+ public static RangeTombstoneBoundaryMarker inclusiveCloseExclusiveOpen(boolean reversed, ByteBuffer[] boundValues, DeletionTime closeDeletion, DeletionTime openDeletion)
+ {
+ RangeTombstone.Bound bound = RangeTombstone.Bound.inclusiveCloseExclusiveOpen(reversed, boundValues);
+ DeletionTime endDeletion = reversed ? openDeletion : closeDeletion;
+ DeletionTime startDeletion = reversed ? closeDeletion : openDeletion;
+ return new RangeTombstoneBoundaryMarker(bound, endDeletion, startDeletion);
+ }
+
+ public boolean isBoundary()
+ {
+ return true;
+ }
+
+ /**
+ * The deletion time for the range tombstone this boundary ends (in clustering order).
+ */
+ public DeletionTime endDeletionTime()
+ {
+ return endDeletion;
+ }
+
+ /**
+ * The deletion time for the range tombstone this boundary starts (in clustering order).
+ */
+ public DeletionTime startDeletionTime()
+ {
+ return startDeletion;
+ }
+
+ public DeletionTime closeDeletionTime(boolean reversed)
+ {
+ return reversed ? startDeletion : endDeletion;
+ }
+
+ public DeletionTime openDeletionTime(boolean reversed)
+ {
+ return reversed ? endDeletion : startDeletion;
+ }
+
+ public boolean isOpen(boolean reversed)
+ {
+ // A boundary always open one side
+ return true;
+ }
+
+ public boolean isClose(boolean reversed)
+ {
+ // A boundary always close one side
+ return true;
+ }
+
+ public static boolean isBoundary(ClusteringComparator comparator, Slice.Bound close, Slice.Bound open)
+ {
+ if (!comparator.isOnSameClustering(close, open))
+ return false;
+
+ // If both bound are exclusive, then it's not a boundary, otherwise it is one.
+ // Note that most code should never call this with 2 inclusive bound: this would mean we had
+ // 2 RTs that were overlapping and RangeTombstoneList don't create that. However, old
+ // code was generating that so supporting this case helps dealing with backward compatibility.
+ return close.isInclusive() || open.isInclusive();
+ }
+
+ // Please note that isBoundary *must* have been called (and returned true) before this is called.
+ public static RangeTombstoneBoundaryMarker makeBoundary(boolean reversed, Slice.Bound close, Slice.Bound open, DeletionTime closeDeletion, DeletionTime openDeletion)
+ {
+ boolean isExclusiveClose = close.isExclusive() || (close.isInclusive() && open.isInclusive() && openDeletion.supersedes(closeDeletion));
+ return isExclusiveClose
+ ? exclusiveCloseInclusiveOpen(reversed, close.getRawValues(), closeDeletion, openDeletion)
+ : inclusiveCloseExclusiveOpen(reversed, close.getRawValues(), closeDeletion, openDeletion);
+ }
+
+ public RangeTombstoneBoundMarker createCorrespondingCloseBound(boolean reversed)
+ {
+ return new RangeTombstoneBoundMarker(bound.withNewKind(bound.kind().closeBoundOfBoundary(reversed)), endDeletion);
+ }
+
+ public RangeTombstoneBoundMarker createCorrespondingOpenBound(boolean reversed)
+ {
+ return new RangeTombstoneBoundMarker(bound.withNewKind(bound.kind().openBoundOfBoundary(reversed)), startDeletion);
+ }
+
+ public void copyTo(RangeTombstoneMarker.Writer writer)
+ {
+ copyBoundTo(writer);
+ writer.writeBoundaryDeletion(endDeletion, startDeletion);
+ writer.endOfMarker();
+ }
+
+ public void digest(MessageDigest digest)
+ {
+ bound.digest(digest);
+ endDeletion.digest(digest);
+ startDeletion.digest(digest);
+ }
+
+ public String toString(CFMetaData metadata)
+ {
+ StringBuilder sb = new StringBuilder();
+ sb.append("Marker ");
+ sb.append(bound.toString(metadata));
+ sb.append("@").append(endDeletion.markedForDeleteAt()).append("-").append(startDeletion.markedForDeleteAt());
+ return sb.toString();
+ }
+
+ @Override
+ public boolean equals(Object other)
+ {
+ if(!(other instanceof RangeTombstoneBoundaryMarker))
+ return false;
+
+ RangeTombstoneBoundaryMarker that = (RangeTombstoneBoundaryMarker)other;
+ return this.bound.equals(that.bound)
+ && this.endDeletion.equals(that.endDeletion)
+ && this.startDeletion.equals(that.startDeletion);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hash(bound, endDeletion, startDeletion);
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/rows/RangeTombstoneMarker.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/RangeTombstoneMarker.java b/src/java/org/apache/cassandra/db/rows/RangeTombstoneMarker.java
new file mode 100644
index 0000000..1a506d5
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/rows/RangeTombstoneMarker.java
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.rows;
+
+import java.nio.ByteBuffer;
+import java.util.*;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.*;
+
+/**
+ * A marker for a range tombstone bound.
+ * <p>
+ * There is 2 types of markers: bounds (see {@link RangeTombstoneBound}) and boundaries (see {@link RangeTombstoneBoundary}).
+ */
+public interface RangeTombstoneMarker extends Unfiltered
+{
+ @Override
+ public RangeTombstone.Bound clustering();
+
+ public boolean isBoundary();
+
+ public void copyTo(RangeTombstoneMarker.Writer writer);
+
+ public boolean isOpen(boolean reversed);
+ public boolean isClose(boolean reversed);
+ public DeletionTime openDeletionTime(boolean reversed);
+ public DeletionTime closeDeletionTime(boolean reversed);
+
+ public interface Writer extends Slice.Bound.Writer
+ {
+ public void writeBoundDeletion(DeletionTime deletion);
+ public void writeBoundaryDeletion(DeletionTime endDeletion, DeletionTime startDeletion);
+ public void endOfMarker();
+ }
+
+ public static class Builder implements Writer
+ {
+ private final ByteBuffer[] values;
+ private int size;
+
+ private RangeTombstone.Bound.Kind kind;
+ private DeletionTime firstDeletion;
+ private DeletionTime secondDeletion;
+
+ public Builder(int maxClusteringSize)
+ {
+ this.values = new ByteBuffer[maxClusteringSize];
+ }
+
+ public void writeClusteringValue(ByteBuffer value)
+ {
+ values[size++] = value;
+ }
+
+ public void writeBoundKind(RangeTombstone.Bound.Kind kind)
+ {
+ this.kind = kind;
+ }
+
+ public void writeBoundDeletion(DeletionTime deletion)
+ {
+ firstDeletion = deletion;
+ }
+
+ public void writeBoundaryDeletion(DeletionTime endDeletion, DeletionTime startDeletion)
+ {
+ firstDeletion = endDeletion;
+ secondDeletion = startDeletion;
+ }
+
+ public void endOfMarker()
+ {
+ }
+
+ public RangeTombstoneMarker build()
+ {
+ assert kind != null : "Nothing has been written";
+ if (kind.isBoundary())
+ return new RangeTombstoneBoundaryMarker(new RangeTombstone.Bound(kind, Arrays.copyOfRange(values, 0, size)), firstDeletion, secondDeletion);
+ else
+ return new RangeTombstoneBoundMarker(new RangeTombstone.Bound(kind, Arrays.copyOfRange(values, 0, size)), firstDeletion);
+ }
+
+ public Builder reset()
+ {
+ Arrays.fill(values, null);
+ size = 0;
+ kind = null;
+ return this;
+ }
+ }
+
+ /**
+ * Utility class to help merging range tombstone markers coming from multiple inputs (UnfilteredRowIterators).
+ * <p>
+ * The assumption that each individual input must validate and that we must preserve in the output is that every
+ * open marker has a corresponding close marker with the exact same deletion info, and that there is no other range
+ * tombstone marker between those open and close marker (of course, they could be rows in between). In other word,
+ * for any {@code UnfilteredRowIterator}, you only ever have to remenber the last open marker (if any) to have the
+ * full picture of what is deleted by range tombstones at any given point of iterating that iterator.
+ * <p>
+ * Note that this class can merge both forward and reverse iterators. To deal with reverse, we just reverse how we
+ * deal with open and close markers (in forward order, we'll get open-close, open-close, ..., while in reverse we'll
+ * get close-open, close-open, ...).
+ */
+ public static class Merger
+ {
+ // Boundaries sorts like the bound that have their equivalent "inclusive" part and that's the main action we
+ // care about as far as merging goes. So MergedKind just group those as the same case, and tell us whether
+ // we're dealing with an open or a close (based on whether we're dealing with reversed iterators or not).
+ // Really this enum is just a convenience for merging.
+ private enum MergedKind
+ {
+ INCL_OPEN, EXCL_CLOSE, EXCL_OPEN, INCL_CLOSE;
+
+ public static MergedKind forBound(RangeTombstone.Bound bound, boolean reversed)
+ {
+ switch (bound.kind())
+ {
+ case INCL_START_BOUND:
+ case EXCL_END_INCL_START_BOUNDARY:
+ return reversed ? INCL_CLOSE : INCL_OPEN;
+ case EXCL_END_BOUND:
+ return reversed ? EXCL_OPEN : EXCL_CLOSE;
+ case EXCL_START_BOUND:
+ return reversed ? EXCL_CLOSE : EXCL_OPEN;
+ case INCL_END_EXCL_START_BOUNDARY:
+ case INCL_END_BOUND:
+ return reversed ? INCL_OPEN : INCL_CLOSE;
+ }
+ throw new AssertionError();
+ }
+ }
+
+ private final CFMetaData metadata;
+ private final UnfilteredRowIterators.MergeListener listener;
+ private final DeletionTime partitionDeletion;
+ private final boolean reversed;
+
+ private RangeTombstone.Bound bound;
+ private final RangeTombstoneMarker[] markers;
+
+ // For each iterator, what is the currently open marker deletion time (or null if there is no open marker on that iterator)
+ private final DeletionTime[] openMarkers;
+ // The index in openMarkers of the "biggest" marker, the one with the biggest deletion time. Is < 0 iff there is no open
+ // marker on any iterator.
+ private int biggestOpenMarker = -1;
+
+ public Merger(CFMetaData metadata, int size, DeletionTime partitionDeletion, boolean reversed, UnfilteredRowIterators.MergeListener listener)
+ {
+ this.metadata = metadata;
+ this.listener = listener;
+ this.partitionDeletion = partitionDeletion;
+ this.reversed = reversed;
+
+ this.markers = new RangeTombstoneMarker[size];
+ this.openMarkers = new DeletionTime[size];
+ }
+
+ public void clear()
+ {
+ Arrays.fill(markers, null);
+ }
+
+ public void add(int i, RangeTombstoneMarker marker)
+ {
+ bound = marker.clustering();
+ markers[i] = marker;
+ }
+
+ public RangeTombstoneMarker merge()
+ {
+ /*
+ * Merging of range tombstones works this way:
+ * 1) We remember what is the currently open marker in the merged stream
+ * 2) We update our internal states of what range is opened on the input streams based on the new markers to merge
+ * 3) We compute what should be the state in the merge stream after 2)
+ * 4) We return what marker should be issued on the merged stream based on the difference between the state from 1) and 3)
+ */
+
+ DeletionTime previousDeletionTimeInMerged = currentOpenDeletionTimeInMerged();
+
+ updateOpenMarkers();
+
+ DeletionTime newDeletionTimeInMerged = currentOpenDeletionTimeInMerged();
+ if (previousDeletionTimeInMerged.equals(newDeletionTimeInMerged))
+ return null;
+
+ ByteBuffer[] values = bound.getRawValues();
+
+ RangeTombstoneMarker merged;
+ switch (MergedKind.forBound(bound, reversed))
+ {
+ case INCL_OPEN:
+ merged = previousDeletionTimeInMerged.isLive()
+ ? RangeTombstoneBoundMarker.inclusiveOpen(reversed, values, newDeletionTimeInMerged)
+ : RangeTombstoneBoundaryMarker.exclusiveCloseInclusiveOpen(reversed, values, previousDeletionTimeInMerged, newDeletionTimeInMerged);
+ break;
+ case EXCL_CLOSE:
+ merged = newDeletionTimeInMerged.isLive()
+ ? RangeTombstoneBoundMarker.exclusiveClose(reversed, values, previousDeletionTimeInMerged)
+ : RangeTombstoneBoundaryMarker.exclusiveCloseInclusiveOpen(reversed, values, previousDeletionTimeInMerged, newDeletionTimeInMerged);
+ break;
+ case EXCL_OPEN:
+ merged = previousDeletionTimeInMerged.isLive()
+ ? RangeTombstoneBoundMarker.exclusiveOpen(reversed, values, newDeletionTimeInMerged)
+ : RangeTombstoneBoundaryMarker.inclusiveCloseExclusiveOpen(reversed, values, previousDeletionTimeInMerged, newDeletionTimeInMerged);
+ break;
+ case INCL_CLOSE:
+ merged = newDeletionTimeInMerged.isLive()
+ ? RangeTombstoneBoundMarker.inclusiveClose(reversed, values, previousDeletionTimeInMerged)
+ : RangeTombstoneBoundaryMarker.inclusiveCloseExclusiveOpen(reversed, values, previousDeletionTimeInMerged, newDeletionTimeInMerged);
+ break;
+ default:
+ throw new AssertionError();
+ }
+
+ if (listener != null)
+ listener.onMergedRangeTombstoneMarkers(merged, markers);
+
+ return merged;
+ }
+
+ private DeletionTime currentOpenDeletionTimeInMerged()
+ {
+ if (biggestOpenMarker < 0)
+ return DeletionTime.LIVE;
+
+ DeletionTime biggestDeletionTime = openMarkers[biggestOpenMarker];
+ // it's only open in the merged iterator if it's not shadowed by the partition level deletion
+ return partitionDeletion.supersedes(biggestDeletionTime) ? DeletionTime.LIVE : biggestDeletionTime.takeAlias();
+ }
+
+ private void updateOpenMarkers()
+ {
+ for (int i = 0; i < markers.length; i++)
+ {
+ RangeTombstoneMarker marker = markers[i];
+ if (marker == null)
+ continue;
+
+ // Note that we can have boundaries that are both open and close, but in that case all we care about
+ // is what it the open deletion after the marker, so we favor the opening part in this case.
+ if (marker.isOpen(reversed))
+ openMarkers[i] = marker.openDeletionTime(reversed).takeAlias();
+ else
+ openMarkers[i] = null;
+ }
+
+ // Recompute what is now the biggest open marker
+ biggestOpenMarker = -1;
+ for (int i = 0; i < openMarkers.length; i++)
+ {
+ if (openMarkers[i] != null && (biggestOpenMarker < 0 || openMarkers[i].supersedes(openMarkers[biggestOpenMarker])))
+ biggestOpenMarker = i;
+ }
+ }
+
+ public DeletionTime activeDeletion()
+ {
+ DeletionTime openMarker = currentOpenDeletionTimeInMerged();
+ // We only have an open marker in the merged stream if it's not shadowed by the partition deletion (which can be LIVE itself), so
+ // if have an open marker, we know it's the "active" deletion for the merged stream.
+ return openMarker.isLive() ? partitionDeletion : openMarker;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/rows/ReusableRow.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/ReusableRow.java b/src/java/org/apache/cassandra/db/rows/ReusableRow.java
new file mode 100644
index 0000000..0135afc
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/rows/ReusableRow.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.rows;
+
+import java.nio.ByteBuffer;
+
+import org.apache.cassandra.db.*;
+
+public class ReusableRow extends AbstractReusableRow
+{
+ private final ReusableClustering clustering;
+
+ private final ReusableLivenessInfo liveness = new ReusableLivenessInfo();
+
+ private DeletionTime deletion = DeletionTime.LIVE;
+
+ private final RowDataBlock data;
+ private final Writer writer;
+
+ public ReusableRow(int clusteringSize, Columns columns, boolean inOrderCells, boolean isCounter)
+ {
+ this.clustering = new ReusableClustering(clusteringSize);
+ this.data = new RowDataBlock(columns, 1, false, isCounter);
+ this.writer = new Writer(data, inOrderCells);
+ }
+
+ protected RowDataBlock data()
+ {
+ return data;
+ }
+
+ protected int row()
+ {
+ return 0;
+ }
+
+ public Clustering clustering()
+ {
+ return clustering;
+ }
+
+ public LivenessInfo primaryKeyLivenessInfo()
+ {
+ return liveness;
+ }
+
+ public DeletionTime deletion()
+ {
+ return deletion;
+ }
+
+ public Row.Writer writer()
+ {
+ return writer.reset();
+ }
+
+ private class Writer extends RowDataBlock.Writer
+ {
+ public Writer(RowDataBlock data, boolean inOrderCells)
+ {
+ super(data, inOrderCells);
+ }
+
+ public void writeClusteringValue(ByteBuffer buffer)
+ {
+ clustering.writer().writeClusteringValue(buffer);
+ }
+
+ public void writePartitionKeyLivenessInfo(LivenessInfo info)
+ {
+ ReusableRow.this.liveness.setTo(info);
+ }
+
+ public void writeRowDeletion(DeletionTime deletion)
+ {
+ ReusableRow.this.deletion = deletion;
+ }
+
+ @Override
+ public Writer reset()
+ {
+ super.reset();
+ clustering.reset();
+ liveness.reset();
+ deletion = DeletionTime.LIVE;
+ return this;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/rows/Row.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/Row.java b/src/java/org/apache/cassandra/db/rows/Row.java
new file mode 100644
index 0000000..545da7a
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/rows/Row.java
@@ -0,0 +1,555 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.rows;
+
+import java.nio.ByteBuffer;
+import java.util.*;
+
+import com.google.common.collect.Iterators;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.utils.MergeIterator;
+import org.apache.cassandra.utils.SearchIterator;
+
+/**
+ * Storage engine representation of a row.
+ *
+ * A row is identified by it's clustering column values (it's an Unfiltered),
+ * has row level informations (deletion and partition key liveness infos (see below))
+ * and contains data (Cells) regarding the columns it contains.
+ *
+ * A row implements {@code WithLivenessInfo} and has thus a timestamp, ttl and
+ * local deletion time. Those information do not apply to the row content, they
+ * apply to the partition key columns. In other words, the timestamp is the
+ * timestamp for the partition key columns: it is what allows to distinguish
+ * between a dead row, and a live row but for which only the partition key columns
+ * are set. The ttl and local deletion time information are for the case where
+ * a TTL is set on those partition key columns. Note however that a row can have
+ * live cells but no partition key columns timestamp, because said timestamp (and
+ * its corresponding ttl) is only set on INSERT (not UPDATE).
+ */
+public interface Row extends Unfiltered, Iterable<Cell>, Aliasable<Row>
+{
+ /**
+ * The clustering values for this row.
+ */
+ @Override
+ public Clustering clustering();
+
+ /**
+ * The columns this row contains.
+ *
+ * Note that this is actually a superset of the columns the row contains. The row
+ * may not have values for each of those columns, but it can't have values for other
+ * columns.
+ *
+ * @return a superset of the columns contained in this row.
+ */
+ public Columns columns();
+
+ /**
+ * The row deletion.
+ *
+ * This correspond to the last row deletion done on this row.
+ *
+ * @return the row deletion.
+ */
+ public DeletionTime deletion();
+
+ /**
+ * Liveness information for the primary key columns of this row.
+ * <p>
+ * As a row is uniquely identified by its primary key, all its primary key columns
+ * share the same {@code LivenessInfo}. This liveness information is what allows us
+ * to distinguish between a dead row (it has no live cells and its primary key liveness
+ * info has no timestamp) and a live row but where all non PK columns are null (it has no
+ * live cells, but its primary key liveness has a timestamp). Please note that the ttl
+ * (and local deletion time) of the PK liveness information only apply to the
+ * liveness info timestamp, and not to the content of the row. Also note that because
+ * in practice there is not way to only delete the primary key columns (without deleting
+ * the row itself), the returned {@code LivenessInfo} can only have a local deletion time
+ * if it has a TTL.
+ * <p>
+ * Lastly, note that it is possible for a row to have live cells but no PK liveness
+ * info timestamp, because said timestamp is only set on {@code INSERT} (which makes sense
+ * in itself, see #6782) but live cells can be add through {@code UPDATE} even if the row
+ * wasn't pre-existing (which users are encouraged not to do, but we can't validate).
+ */
+ public LivenessInfo primaryKeyLivenessInfo();
+
+ /**
+ * Whether the row correspond to a static row or not.
+ *
+ * @return whether the row correspond to a static row or not.
+ */
+ public boolean isStatic();
+
+ /**
+ * Whether the row has no information whatsoever. This means no row infos
+ * (timestamp, ttl, deletion), no cells and no complex deletion info.
+ *
+ * @return {@code true} if the row has no data whatsoever, {@code false} otherwise.
+ */
+ public boolean isEmpty();
+
+ /**
+ * Whether the row has some live information (i.e. it's not just deletion informations).
+ */
+ public boolean hasLiveData(int nowInSec);
+
+ /**
+ * Whether or not this row contains any deletion for a complex column. That is if
+ * there is at least one column for which {@code getDeletion} returns a non
+ * live deletion time.
+ */
+ public boolean hasComplexDeletion();
+
+ /**
+ * Returns a cell for a simple column.
+ *
+ * Calls to this method are allowed to return the same Cell object, and hence the returned
+ * object is only valid until the next getCell/getCells call on the same Row object. You will need
+ * to copy the returned data if you plan on using a reference to the Cell object
+ * longer than that.
+ *
+ * @param c the simple column for which to fetch the cell.
+ * @return the corresponding cell or {@code null} if the row has no such cell.
+ */
+ public Cell getCell(ColumnDefinition c);
+
+ /**
+ * Return a cell for a given complex column and cell path.
+ *
+ * Calls to this method are allowed to return the same Cell object, and hence the returned
+ * object is only valid until the next getCell/getCells call on the same Row object. You will need
+ * to copy the returned data if you plan on using a reference to the Cell object
+ * longer than that.
+ *
+ * @param c the complex column for which to fetch the cell.
+ * @param path the cell path for which to fetch the cell.
+ * @return the corresponding cell or {@code null} if the row has no such cell.
+ */
+ public Cell getCell(ColumnDefinition c, CellPath path);
+
+ /**
+ * Returns an iterator on the cells of a complex column c.
+ *
+ * Calls to this method are allowed to return the same iterator object, and
+ * hence the returned object is only valid until the next getCell/getCells call
+ * on the same Row object. You will need to copy the returned data if you
+ * plan on using a reference to the Cell object longer than that.
+ *
+ * @param c the complex column for which to fetch the cells.
+ * @return an iterator on the cells of complex column {@code c} or {@code null} if the row has no
+ * cells for that column.
+ */
+ public Iterator<Cell> getCells(ColumnDefinition c);
+
+ /**
+ * Deletion informations for complex columns.
+ *
+ * @param c the complex column for which to fetch deletion info.
+ * @return the deletion time for complex column {@code c} in this row.
+ */
+ public DeletionTime getDeletion(ColumnDefinition c);
+
+ /**
+ * An iterator over the cells of this row.
+ *
+ * The iterator guarantees that for 2 rows of the same partition, columns
+ * are returned in a consistent order in the sense that if the cells for
+ * column c1 is returned before the cells for column c2 by the first iterator,
+ * it is also the case for the 2nd iterator.
+ *
+ * The object returned by a call to next() is only guaranteed to be valid until
+ * the next call to hasNext() or next(). If a consumer wants to keep a
+ * reference on the returned Cell objects for longer than the iteration, it must
+ * make a copy of it explicitly.
+ *
+ * @return an iterator over the cells of this row.
+ */
+ public Iterator<Cell> iterator();
+
+ /**
+ * An iterator to efficiently search data for a given column.
+ *
+ * @return a search iterator for the cells of this row.
+ */
+ public SearchIterator<ColumnDefinition, ColumnData> searchIterator();
+
+ /**
+ * Copy this row to the provided writer.
+ *
+ * @param writer the row writer to write this row to.
+ */
+ public void copyTo(Row.Writer writer);
+
+ public String toString(CFMetaData metadata, boolean fullDetails);
+
+ /**
+ * Interface for writing a row.
+ * <p>
+ * Clients of this interface should abid to the following assumptions:
+ * 1) if the row has a non empty clustering (it's not a static one and it doesn't belong to a table without
+ * clustering columns), then that clustering should be the first thing written (through
+ * {@link ClusteringPrefix.Writer#writeClusteringValue})).
+ * 2) for a given complex column, calls to {@link #writeCell} are performed consecutively (without
+ * any call to {@code writeCell} for another column intermingled) and in {@code CellPath} order.
+ * 3) {@link #endOfRow} is always called to end the writing of a given row.
+ */
+ public interface Writer extends ClusteringPrefix.Writer
+ {
+ /**
+ * Writes the livness information for the partition key columns of this row.
+ *
+ * This call is optional: skipping it is equivalent to calling {@code writePartitionKeyLivenessInfo(LivenessInfo.NONE)}.
+ *
+ * @param info the liveness information for the partition key columns of the written row.
+ */
+ public void writePartitionKeyLivenessInfo(LivenessInfo info);
+
+ /**
+ * Writes the deletion information for this row.
+ *
+ * This call is optional and can be skipped if the row is not deleted.
+ *
+ * @param deletion the row deletion time, or {@code DeletionTime.LIVE} if the row isn't deleted.
+ */
+ public void writeRowDeletion(DeletionTime deletion);
+
+ /**
+ * Writes a cell to the writer.
+ *
+ * As mentionned above, add cells for a given column should be added consecutively (and in {@code CellPath} order for complex columns).
+ *
+ * @param column the column for the written cell.
+ * @param isCounter whether or not this is a counter cell.
+ * @param value the value for the cell. For tombstones, which don't have values, this should be an empty buffer.
+ * @param info the cell liveness information.
+ * @param path the {@link CellPath} for complex cells and {@code null} for regular cells.
+ */
+ public void writeCell(ColumnDefinition column, boolean isCounter, ByteBuffer value, LivenessInfo info, CellPath path);
+
+ /**
+ * Writes a deletion for a complex column, that is one that apply to all cells of the complex column.
+ *
+ * @param column the (complex) column this is a deletion for.
+ * @param complexDeletion the deletion time.
+ */
+ public void writeComplexDeletion(ColumnDefinition column, DeletionTime complexDeletion);
+
+ /**
+ * Should be called to indicates that the row has been fully written.
+ */
+ public void endOfRow();
+ }
+
+ /**
+ * Utility class to help merging rows from multiple inputs (UnfilteredRowIterators).
+ */
+ public abstract static class Merger
+ {
+ private final CFMetaData metadata;
+ private final int nowInSec;
+ private final UnfilteredRowIterators.MergeListener listener;
+ private final Columns columns;
+
+ private Clustering clustering;
+ private final Row[] rows;
+ private int rowsToMerge;
+
+ private LivenessInfo rowInfo = LivenessInfo.NONE;
+ private DeletionTime rowDeletion = DeletionTime.LIVE;
+
+ private final Cell[] cells;
+ private final List<Iterator<Cell>> complexCells;
+ private final ComplexColumnReducer complexReducer = new ComplexColumnReducer();
+
+ // For the sake of the listener if there is one
+ private final DeletionTime[] complexDelTimes;
+
+ private boolean signaledListenerForRow;
+
+ public static Merger createStatic(CFMetaData metadata, int size, int nowInSec, Columns columns, UnfilteredRowIterators.MergeListener listener)
+ {
+ return new StaticMerger(metadata, size, nowInSec, columns, listener);
+ }
+
+ public static Merger createRegular(CFMetaData metadata, int size, int nowInSec, Columns columns, UnfilteredRowIterators.MergeListener listener)
+ {
+ return new RegularMerger(metadata, size, nowInSec, columns, listener);
+ }
+
+ protected Merger(CFMetaData metadata, int size, int nowInSec, Columns columns, UnfilteredRowIterators.MergeListener listener)
+ {
+ this.metadata = metadata;
+ this.nowInSec = nowInSec;
+ this.listener = listener;
+ this.columns = columns;
+ this.rows = new Row[size];
+ this.complexCells = new ArrayList<>(size);
+
+ this.cells = new Cell[size];
+ this.complexDelTimes = listener == null ? null : new DeletionTime[size];
+ }
+
+ public void clear()
+ {
+ Arrays.fill(rows, null);
+ Arrays.fill(cells, null);
+ if (complexDelTimes != null)
+ Arrays.fill(complexDelTimes, null);
+ complexCells.clear();
+ rowsToMerge = 0;
+
+ rowInfo = LivenessInfo.NONE;
+ rowDeletion = DeletionTime.LIVE;
+
+ signaledListenerForRow = false;
+ }
+
+ public void add(int i, Row row)
+ {
+ clustering = row.clustering();
+ rows[i] = row;
+ ++rowsToMerge;
+ }
+
+ protected abstract Row.Writer getWriter();
+ protected abstract Row getRow();
+
+ public Row merge(DeletionTime activeDeletion)
+ {
+ // If for this clustering we have only one row version and have no activeDeletion (i.e. nothing to filter out),
+ // then we can just return that single row (we also should have no listener)
+ if (rowsToMerge == 1 && activeDeletion.isLive() && listener == null)
+ {
+ for (int i = 0; i < rows.length; i++)
+ if (rows[i] != null)
+ return rows[i];
+ throw new AssertionError();
+ }
+
+ Row.Writer writer = getWriter();
+ Rows.writeClustering(clustering, writer);
+
+ for (int i = 0; i < rows.length; i++)
+ {
+ if (rows[i] == null)
+ continue;
+
+ rowInfo = rowInfo.mergeWith(rows[i].primaryKeyLivenessInfo());
+
+ if (rows[i].deletion().supersedes(rowDeletion))
+ rowDeletion = rows[i].deletion();
+ }
+
+ if (rowDeletion.supersedes(activeDeletion))
+ activeDeletion = rowDeletion;
+
+ if (activeDeletion.deletes(rowInfo))
+ rowInfo = LivenessInfo.NONE;
+
+ writer.writePartitionKeyLivenessInfo(rowInfo);
+ writer.writeRowDeletion(rowDeletion);
+
+ for (int i = 0; i < columns.simpleColumnCount(); i++)
+ {
+ ColumnDefinition c = columns.getSimple(i);
+ for (int j = 0; j < rows.length; j++)
+ cells[j] = rows[j] == null ? null : rows[j].getCell(c);
+
+ reconcileCells(activeDeletion, writer);
+ }
+
+ complexReducer.activeDeletion = activeDeletion;
+ complexReducer.writer = writer;
+ for (int i = 0; i < columns.complexColumnCount(); i++)
+ {
+ ColumnDefinition c = columns.getComplex(i);
+
+ DeletionTime maxComplexDeletion = DeletionTime.LIVE;
+ for (int j = 0; j < rows.length; j++)
+ {
+ if (rows[j] == null)
+ continue;
+
+ DeletionTime dt = rows[j].getDeletion(c);
+ if (complexDelTimes != null)
+ complexDelTimes[j] = dt;
+
+ if (dt.supersedes(maxComplexDeletion))
+ maxComplexDeletion = dt;
+ }
+
+ boolean overrideActive = maxComplexDeletion.supersedes(activeDeletion);
+ maxComplexDeletion = overrideActive ? maxComplexDeletion : DeletionTime.LIVE;
+ writer.writeComplexDeletion(c, maxComplexDeletion);
+ if (listener != null)
+ listener.onMergedComplexDeletion(c, maxComplexDeletion, complexDelTimes);
+
+ mergeComplex(overrideActive ? maxComplexDeletion : activeDeletion, c);
+ }
+ writer.endOfRow();
+
+ Row row = getRow();
+ // Because shadowed cells are skipped, the row could be empty. In which case
+ // we return null (we also don't want to signal anything in that case since that
+ // means everything in the row was shadowed and the listener will have been signalled
+ // for whatever shadows it).
+ if (row.isEmpty())
+ return null;
+
+ maybeSignalEndOfRow();
+ return row;
+ }
+
+ private void maybeSignalListenerForRow()
+ {
+ if (listener != null && !signaledListenerForRow)
+ {
+ listener.onMergingRows(clustering, rowInfo, rowDeletion, rows);
+ signaledListenerForRow = true;
+ }
+ }
+
+ private void maybeSignalListenerForCell(Cell merged, Cell[] versions)
+ {
+ if (listener != null)
+ {
+ maybeSignalListenerForRow();
+ listener.onMergedCells(merged, versions);
+ }
+ }
+
+ private void maybeSignalEndOfRow()
+ {
+ if (listener != null)
+ {
+ // If we haven't signaled the listener yet (we had no cells but some deletion info), do it now
+ maybeSignalListenerForRow();
+ listener.onRowDone();
+ }
+ }
+
+ private void reconcileCells(DeletionTime activeDeletion, Row.Writer writer)
+ {
+ Cell reconciled = null;
+ for (int j = 0; j < cells.length; j++)
+ {
+ Cell cell = cells[j];
+ if (cell != null && !activeDeletion.deletes(cell.livenessInfo()))
+ reconciled = Cells.reconcile(reconciled, cell, nowInSec);
+ }
+
+ if (reconciled != null)
+ {
+ reconciled.writeTo(writer);
+ maybeSignalListenerForCell(reconciled, cells);
+ }
+ }
+
+ private void mergeComplex(DeletionTime activeDeletion, ColumnDefinition c)
+ {
+ complexCells.clear();
+ for (int j = 0; j < rows.length; j++)
+ {
+ Row row = rows[j];
+ Iterator<Cell> iter = row == null ? null : row.getCells(c);
+ complexCells.add(iter == null ? Iterators.<Cell>emptyIterator() : iter);
+ }
+
+ complexReducer.column = c;
+ complexReducer.activeDeletion = activeDeletion;
+
+ // Note that we use the mergeIterator only to group cells to merge, but we
+ // write the result to the writer directly in the reducer, so all we care
+ // about is iterating over the result.
+ Iterator<Void> iter = MergeIterator.get(complexCells, c.cellComparator(), complexReducer);
+ while (iter.hasNext())
+ iter.next();
+ }
+
+ private class ComplexColumnReducer extends MergeIterator.Reducer<Cell, Void>
+ {
+ private DeletionTime activeDeletion;
+ private Row.Writer writer;
+ private ColumnDefinition column;
+
+ public void reduce(int idx, Cell current)
+ {
+ cells[idx] = current;
+ }
+
+ protected Void getReduced()
+ {
+ reconcileCells(activeDeletion, writer);
+ return null;
+ }
+
+ protected void onKeyChange()
+ {
+ Arrays.fill(cells, null);
+ }
+ }
+
+ private static class StaticMerger extends Merger
+ {
+ private final StaticRow.Builder builder;
+
+ private StaticMerger(CFMetaData metadata, int size, int nowInSec, Columns columns, UnfilteredRowIterators.MergeListener listener)
+ {
+ super(metadata, size, nowInSec, columns, listener);
+ this.builder = StaticRow.builder(columns, true, metadata.isCounter());
+ }
+
+ protected Row.Writer getWriter()
+ {
+ return builder;
+ }
+
+ protected Row getRow()
+ {
+ return builder.build();
+ }
+ }
+
+ private static class RegularMerger extends Merger
+ {
+ private final ReusableRow row;
+
+ private RegularMerger(CFMetaData metadata, int size, int nowInSec, Columns columns, UnfilteredRowIterators.MergeListener listener)
+ {
+ super(metadata, size, nowInSec, columns, listener);
+ this.row = new ReusableRow(metadata.clusteringColumns().size(), columns, true, metadata.isCounter());
+ }
+
+ protected Row.Writer getWriter()
+ {
+ return row.writer();
+ }
+
+ protected Row getRow()
+ {
+ return row;
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/rows/RowAndTombstoneMergeIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/RowAndTombstoneMergeIterator.java b/src/java/org/apache/cassandra/db/rows/RowAndTombstoneMergeIterator.java
new file mode 100644
index 0000000..51383a2
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/rows/RowAndTombstoneMergeIterator.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.rows;
+
+import java.util.Comparator;
+import java.util.Iterator;
+
+import com.google.common.collect.PeekingIterator;
+import com.google.common.collect.UnmodifiableIterator;
+
+import org.apache.cassandra.db.*;
+
+public class RowAndTombstoneMergeIterator extends UnmodifiableIterator<Unfiltered> implements PeekingIterator<Unfiltered>
+{
+ private final ClusteringComparator clusteringComparator;
+ private final Comparator<Clusterable> comparator;
+ private final boolean reversed;
+
+ private Iterator<Row> rowIter;
+ private Row nextRow;
+
+ private Iterator<RangeTombstone> tombstoneIter;
+ private RangeTombstone nextTombstone;
+ private boolean inTombstone;
+
+ private Unfiltered next;
+
+ public RowAndTombstoneMergeIterator(ClusteringComparator comparator, boolean reversed)
+ {
+ this.clusteringComparator = comparator;
+ this.comparator = reversed ? comparator.reversed() : comparator;
+ this.reversed = reversed;
+ }
+
+ public RowAndTombstoneMergeIterator setTo(Iterator<Row> rowIter, Iterator<RangeTombstone> tombstoneIter)
+ {
+ this.rowIter = rowIter;
+ this.tombstoneIter = tombstoneIter;
+ this.nextRow = null;
+ this.nextTombstone = null;
+ this.next = null;
+ this.inTombstone = false;
+ return this;
+ }
+
+ public boolean isSet()
+ {
+ return rowIter != null;
+ }
+
+ private void prepareNext()
+ {
+ if (next != null)
+ return;
+
+ if (nextTombstone == null && tombstoneIter.hasNext())
+ nextTombstone = tombstoneIter.next();
+ if (nextRow == null && rowIter.hasNext())
+ nextRow = rowIter.next();
+
+ if (nextTombstone == null)
+ {
+ if (nextRow == null)
+ return;
+
+ next = nextRow;
+ nextRow = null;
+ }
+ else if (nextRow == null)
+ {
+ if (inTombstone)
+ {
+ RangeTombstone rt = nextTombstone;
+ nextTombstone = tombstoneIter.hasNext() ? tombstoneIter.next() : null;
+ if (nextTombstone != null && RangeTombstoneBoundaryMarker.isBoundary(clusteringComparator, rt.deletedSlice().close(reversed), nextTombstone.deletedSlice().open(reversed)))
+ {
+ next = RangeTombstoneBoundaryMarker.makeBoundary(reversed,
+ rt.deletedSlice().close(reversed),
+ nextTombstone.deletedSlice().open(reversed),
+ rt.deletionTime(),
+ nextTombstone.deletionTime());
+ }
+ else
+ {
+ inTombstone = false;
+ next = new RangeTombstoneBoundMarker(rt.deletedSlice().close(reversed), rt.deletionTime());
+ }
+ }
+ else
+ {
+ inTombstone = true;
+ next = new RangeTombstoneBoundMarker(nextTombstone.deletedSlice().open(reversed), nextTombstone.deletionTime());
+ }
+ }
+ else if (inTombstone)
+ {
+ if (comparator.compare(nextTombstone.deletedSlice().close(reversed), nextRow.clustering()) < 0)
+ {
+ RangeTombstone rt = nextTombstone;
+ nextTombstone = tombstoneIter.hasNext() ? tombstoneIter.next() : null;
+ if (nextTombstone != null && RangeTombstoneBoundaryMarker.isBoundary(clusteringComparator, rt.deletedSlice().close(reversed), nextTombstone.deletedSlice().open(reversed)))
+ {
+ next = RangeTombstoneBoundaryMarker.makeBoundary(reversed,
+ rt.deletedSlice().close(reversed),
+ nextTombstone.deletedSlice().open(reversed),
+ rt.deletionTime(),
+ nextTombstone.deletionTime());
+ }
+ else
+ {
+ inTombstone = false;
+ next = new RangeTombstoneBoundMarker(rt.deletedSlice().close(reversed), rt.deletionTime());
+ }
+ }
+ else
+ {
+ next = nextRow;
+ nextRow = null;
+ }
+ }
+ else
+ {
+ if (comparator.compare(nextTombstone.deletedSlice().open(reversed), nextRow.clustering()) < 0)
+ {
+ inTombstone = true;
+ next = new RangeTombstoneBoundMarker(nextTombstone.deletedSlice().open(reversed), nextTombstone.deletionTime());
+ }
+ else
+ {
+ next = nextRow;
+ nextRow = null;
+ }
+ }
+ }
+
+ public boolean hasNext()
+ {
+ prepareNext();
+ return next != null;
+ }
+
+ public Unfiltered next()
+ {
+ prepareNext();
+ Unfiltered toReturn = next;
+ next = null;
+ return toReturn;
+ }
+
+ public Unfiltered peek()
+ {
+ prepareNext();
+ return next();
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/rows/RowDataBlock.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/RowDataBlock.java b/src/java/org/apache/cassandra/db/rows/RowDataBlock.java
new file mode 100644
index 0000000..b1e2b13
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/rows/RowDataBlock.java
@@ -0,0 +1,275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.rows;
+
+import java.nio.ByteBuffer;
+import java.util.*;
+
+import com.google.common.collect.UnmodifiableIterator;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.utils.ObjectSizes;
+
+/**
+ * A {@code RowDataBlock} holds data for one or more row (of a given table). More precisely, it contains
+ * cell data and complex deletion data (for complex columns) and allow access to this data. Please note
+ * however that {@code RowDataBlock} only holds the data inside the row, it does not hold the data
+ * pertaining to the row itself: clustering, partition key liveness info and row deletion.
+ * <p>
+ * {@code RowDataBlock} is largely an implementation detail: it is only there to be reused by
+ * {@link AbstractPartitionData} and every concrete row implementation.
+ */
+public class RowDataBlock
+{
+ private static final Logger logger = LoggerFactory.getLogger(RowDataBlock.class);
+
+ private static final long EMPTY_SIZE = ObjectSizes.measure(new RowDataBlock(Columns.NONE, 0, false, false));
+
+ // We distinguish 2 sub-objects: SimpleRowDataBlock that contains the data for the simple columns only,
+ // and ComplexRowDataBlock that only contains data for complex columns. The reason for having 2 separate
+ // objects is that simple columns are much easier to handle since we have only a single cell per-object
+ // and thus having a more specialized object allow a simpler and more efficient handling.
+ final SimpleRowDataBlock simpleData;
+ final ComplexRowDataBlock complexData;
+
+ public RowDataBlock(Columns columns, int rows, boolean sortable, boolean isCounter)
+ {
+ this.simpleData = columns.hasSimple() ? new SimpleRowDataBlock(columns, rows, isCounter) : null;
+ this.complexData = columns.hasComplex() ? ComplexRowDataBlock.create(columns, rows, sortable, isCounter) : null;
+ }
+
+ public Columns columns()
+ {
+ if (simpleData != null)
+ return simpleData.columns();
+ if (complexData != null)
+ return complexData.columns();
+ return Columns.NONE;
+ }
+
+ /**
+ * Return the cell value for a given column of a given row.
+ *
+ * @param row the row for which to return the cell value.
+ * @param column the column for which to return the cell value.
+ * @param path the cell path for which to return the cell value. Can be null for
+ * simple columns.
+ *
+ * @return the value of the cell of path {@code path} for {@code column} in row {@code row}, or
+ * {@code null} if their is no such cell.
+ */
+ public ByteBuffer getValue(int row, ColumnDefinition column, CellPath path)
+ {
+ if (column.isComplex())
+ {
+ return complexData.getValue(row, column, path);
+ }
+ else
+ {
+ int idx = columns().simpleIdx(column, 0);
+ assert idx >= 0;
+ return simpleData.data.value((row * columns().simpleColumnCount()) + idx);
+ }
+ }
+
+ /**
+ * Sets the cell value for a given simple column of a given row.
+ *
+ * @param row the row for which to set the cell value.
+ * @param column the simple column for which to set the cell value.
+ * @param path the cell path for which to return the cell value. Can be null for
+ * simple columns.
+ * @param value the value to set.
+ */
+ public void setValue(int row, ColumnDefinition column, CellPath path, ByteBuffer value)
+ {
+ if (column.isComplex())
+ {
+ complexData.setValue(row, column, path, value);
+ }
+ else
+ {
+ int idx = columns().simpleIdx(column, 0);
+ assert idx >= 0;
+ simpleData.data.setValue((row * columns().simpleColumnCount()) + idx, value);
+ }
+ }
+
+ public static ReusableIterator reusableIterator()
+ {
+ return new ReusableIterator();
+ }
+
+ // Swap row i and j
+ public void swap(int i, int j)
+ {
+ if (simpleData != null)
+ simpleData.swap(i, j);
+ if (complexData != null)
+ complexData.swap(i, j);
+ }
+
+ // Merge row i into j
+ public void merge(int i, int j, int nowInSec)
+ {
+ if (simpleData != null)
+ simpleData.merge(i, j, nowInSec);
+ if (complexData != null)
+ complexData.merge(i, j, nowInSec);
+ }
+
+ // Move row i into j
+ public void move(int i, int j)
+ {
+ if (simpleData != null)
+ simpleData.move(i, j);
+ if (complexData != null)
+ complexData.move(i, j);
+ }
+
+ public boolean hasComplexDeletion(int row)
+ {
+ return complexData != null && complexData.hasComplexDeletion(row);
+ }
+
+ public long unsharedHeapSizeExcludingData()
+ {
+ return EMPTY_SIZE
+ + (simpleData == null ? 0 : simpleData.unsharedHeapSizeExcludingData())
+ + (complexData == null ? 0 : complexData.unsharedHeapSizeExcludingData());
+ }
+
+ public static int computeNewCapacity(int currentCapacity, int idxToSet)
+ {
+ int newCapacity = currentCapacity == 0 ? 4 : currentCapacity;
+ while (idxToSet >= newCapacity)
+ newCapacity = 1 + (newCapacity * 3) / 2;
+ return newCapacity;
+ }
+
+ public int dataSize()
+ {
+ return (simpleData == null ? 0 : simpleData.dataSize())
+ + (complexData == null ? 0 : complexData.dataSize());
+ }
+
+ public void clear()
+ {
+ if (simpleData != null)
+ simpleData.clear();
+ if (complexData != null)
+ complexData.clear();
+ }
+
+ public abstract static class Writer implements Row.Writer
+ {
+ private final boolean inOrderCells;
+
+ protected int row;
+
+ protected SimpleRowDataBlock.CellWriter simpleWriter;
+ protected ComplexRowDataBlock.CellWriter complexWriter;
+
+ protected Writer(boolean inOrderCells)
+ {
+ this.inOrderCells = inOrderCells;
+ }
+
+ protected Writer(RowDataBlock data, boolean inOrderCells)
+ {
+ this(inOrderCells);
+ updateWriter(data);
+ }
+
+ protected void updateWriter(RowDataBlock data)
+ {
+ this.simpleWriter = data.simpleData == null ? null : data.simpleData.cellWriter(inOrderCells);
+ this.complexWriter = data.complexData == null ? null : data.complexData.cellWriter(inOrderCells);
+ }
+
+ public Writer reset()
+ {
+ row = 0;
+
+ if (simpleWriter != null)
+ simpleWriter.reset();
+ if (complexWriter != null)
+ complexWriter.reset();
+
+ return this;
+ }
+
+ public void writeCell(ColumnDefinition column, boolean isCounter, ByteBuffer value, LivenessInfo info, CellPath path)
+ {
+ if (column.isComplex())
+ complexWriter.addCell(column, value, info, path);
+ else
+ simpleWriter.addCell(column, value, info);
+ }
+
+ public void writeComplexDeletion(ColumnDefinition c, DeletionTime complexDeletion)
+ {
+ if (complexDeletion.isLive())
+ return;
+
+ complexWriter.setComplexDeletion(c, complexDeletion);
+ }
+
+ public void endOfRow()
+ {
+ ++row;
+ if (simpleWriter != null)
+ simpleWriter.endOfRow();
+ if (complexWriter != null)
+ complexWriter.endOfRow();
+ }
+ }
+
+ static class ReusableIterator extends UnmodifiableIterator<Cell> implements Iterator<Cell>
+ {
+ private SimpleRowDataBlock.ReusableIterator simpleIterator;
+ private ComplexRowDataBlock.ReusableIterator complexIterator;
+
+ public ReusableIterator()
+ {
+ this.simpleIterator = SimpleRowDataBlock.reusableIterator();
+ this.complexIterator = ComplexRowDataBlock.reusableIterator();
+ }
+
+ public ReusableIterator setTo(RowDataBlock dataBlock, int row)
+ {
+ simpleIterator.setTo(dataBlock.simpleData, row);
+ complexIterator.setTo(dataBlock.complexData, row);
+ return this;
+ }
+
+ public boolean hasNext()
+ {
+ return simpleIterator.hasNext() || complexIterator.hasNext();
+ }
+
+ public Cell next()
+ {
+ return simpleIterator.hasNext() ? simpleIterator.next() : complexIterator.next();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/rows/RowIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/RowIterator.java b/src/java/org/apache/cassandra/db/rows/RowIterator.java
new file mode 100644
index 0000000..69994dd
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/rows/RowIterator.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.rows;
+
+import java.util.Iterator;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.*;
+
+/**
+ * An iterator over rows belonging to a partition.
+ *
+ * A RowIterator is an UnfilteredRowIterator to which any deletion information has been
+ * filtered out. As such, all cell of all rows returned by this iterator are,
+ * by definition, live, and hence code using a RowIterator don't have to worry
+ * about tombstones and other deletion information.
+ *
+ * Note that as for UnfilteredRowIterator, the rows returned must be in clustering order (or
+ * reverse clustering order if isReverseOrder is true), and the Row objects returned
+ * by next() are only valid until the next call to hasNext() or next().
+ */
+public interface RowIterator extends Iterator<Row>, AutoCloseable
+{
+ /**
+ * The metadata for the table this iterator on.
+ */
+ public CFMetaData metadata();
+
+ /**
+ * Whether or not the rows returned by this iterator are in reversed
+ * clustering order.
+ */
+ public boolean isReverseOrder();
+
+ /**
+ * A subset of the columns for the (static and regular) rows returned by this iterator.
+ * Every row returned by this iterator must guarantee that it has only those columns.
+ */
+ public PartitionColumns columns();
+
+ /**
+ * The partition key of the partition this in an iterator over.
+ */
+ public DecoratedKey partitionKey();
+
+ /**
+ * The static part corresponding to this partition (this can be an empty
+ * row).
+ */
+ public Row staticRow();
+
+ public void close();
+
+ /**
+ * Returns whether the provided iterator has no data.
+ */
+ public default boolean isEmpty()
+ {
+ return staticRow().isEmpty() && !hasNext();
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/rows/RowIterators.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/RowIterators.java b/src/java/org/apache/cassandra/db/rows/RowIterators.java
new file mode 100644
index 0000000..a3bd913
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/rows/RowIterators.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.rows;
+
+import java.util.*;
+import java.security.MessageDigest;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.partitions.PartitionUpdate;
+import org.apache.cassandra.utils.FBUtilities;
+
+/**
+ * Static methods to work with row iterators.
+ */
+public abstract class RowIterators
+{
+ private static final Logger logger = LoggerFactory.getLogger(RowIterators.class);
+
+ private RowIterators() {}
+
+ public static PartitionUpdate toUpdate(RowIterator iterator)
+ {
+ PartitionUpdate update = new PartitionUpdate(iterator.metadata(), iterator.partitionKey(), iterator.columns(), 1);
+
+ if (iterator.staticRow() != Rows.EMPTY_STATIC_ROW)
+ iterator.staticRow().copyTo(update.staticWriter());
+
+ while (iterator.hasNext())
+ iterator.next().copyTo(update.writer());
+
+ return update;
+ }
+
+ public static void digest(RowIterator iterator, MessageDigest digest)
+ {
+ // TODO: we're not computing digest the same way that old nodes so we'll need
+ // to pass the version we're computing the digest for and deal with that.
+ digest.update(iterator.partitionKey().getKey().duplicate());
+ iterator.columns().digest(digest);
+ FBUtilities.updateWithBoolean(digest, iterator.isReverseOrder());
+ iterator.staticRow().digest(digest);
+
+ while (iterator.hasNext())
+ iterator.next().digest(digest);
+ }
+
+ public static RowIterator emptyIterator(final CFMetaData cfm, final DecoratedKey partitionKey, final boolean isReverseOrder)
+ {
+ return new RowIterator()
+ {
+ public CFMetaData metadata()
+ {
+ return cfm;
+ }
+
+ public boolean isReverseOrder()
+ {
+ return isReverseOrder;
+ }
+
+ public PartitionColumns columns()
+ {
+ return PartitionColumns.NONE;
+ }
+
+ public DecoratedKey partitionKey()
+ {
+ return partitionKey;
+ }
+
+ public Row staticRow()
+ {
+ return Rows.EMPTY_STATIC_ROW;
+ }
+
+ public boolean hasNext()
+ {
+ return false;
+ }
+
+ public Row next()
+ {
+ throw new NoSuchElementException();
+ }
+
+ public void remove()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public void close()
+ {
+ }
+ };
+ }
+
+ /**
+ * Wraps the provided iterator so it logs the returned rows for debugging purposes.
+ * <p>
+ * Note that this is only meant for debugging as this can log a very large amount of
+ * logging at INFO.
+ */
+ public static RowIterator loggingIterator(RowIterator iterator, final String id)
+ {
+ CFMetaData metadata = iterator.metadata();
+ logger.info("[{}] Logging iterator on {}.{}, partition key={}, reversed={}",
+ new Object[]{ id,
+ metadata.ksName,
+ metadata.cfName,
+ metadata.getKeyValidator().getString(iterator.partitionKey().getKey()),
+ iterator.isReverseOrder() });
+
+ return new WrappingRowIterator(iterator)
+ {
+ @Override
+ public Row staticRow()
+ {
+ Row row = super.staticRow();
+ if (!row.isEmpty())
+ logger.info("[{}] {}", id, row.toString(metadata()));
+ return row;
+ }
+
+ @Override
+ public Row next()
+ {
+ Row next = super.next();
+ logger.info("[{}] {}", id, next.toString(metadata()));
+ return next;
+ }
+ };
+ }
+}