You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by be...@apache.org on 2015/07/02 11:37:26 UTC

[6/6] cassandra git commit: Merge branch 'cassandra-2.2' into trunk

Merge branch 'cassandra-2.2' into trunk

Conflicts:
	src/java/org/apache/cassandra/db/AtomicBTreeColumns.java
	src/java/org/apache/cassandra/utils/memory/HeapPool.java


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/dea6ab1b
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/dea6ab1b
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/dea6ab1b

Branch: refs/heads/trunk
Commit: dea6ab1b769943eedaeb590d545d7c476c4a2466
Parents: 03f72ac 99f7ce9
Author: Benedict Elliott Smith <be...@apache.org>
Authored: Thu Jul 2 10:34:46 2015 +0100
Committer: Benedict Elliott Smith <be...@apache.org>
Committed: Thu Jul 2 10:34:46 2015 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../apache/cassandra/db/ColumnFamilyStore.java  | 22 +++++-
 src/java/org/apache/cassandra/db/Memtable.java  | 15 ++--
 .../db/partitions/AtomicBTreePartition.java     |  2 +-
 .../org/apache/cassandra/utils/FBUtilities.java | 10 +++
 .../utils/memory/MemtableAllocator.java         | 39 +++++++----
 .../cassandra/utils/memory/MemtablePool.java    | 73 ++++++++++++--------
 .../utils/memory/NativeAllocatorTest.java       | 18 ++++-
 8 files changed, 131 insertions(+), 49 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/dea6ab1b/CHANGES.txt
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dea6ab1b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dea6ab1b/src/java/org/apache/cassandra/db/Memtable.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/Memtable.java
index e82c35e,6e4802f..b6341aa
--- a/src/java/org/apache/cassandra/db/Memtable.java
+++ b/src/java/org/apache/cassandra/db/Memtable.java
@@@ -253,35 -256,23 +253,36 @@@ public class Memtable implements Compar
      public String toString()
      {
          return String.format("Memtable-%s@%s(%s serialized bytes, %s ops, %.0f%%/%.0f%% of on/off-heap limit)",
-                              cfs.name, hashCode(), liveDataSize, currentOperations, 100 * allocator.onHeap().ownershipRatio(), 100 * allocator.offHeap().ownershipRatio());
+                              cfs.name, hashCode(), FBUtilities.prettyPrintMemory(liveDataSize.get()), currentOperations,
+                              100 * allocator.onHeap().ownershipRatio(), 100 * allocator.offHeap().ownershipRatio());
      }
  
 -    /**
 -     * @param startWith Include data in the result from and including this key and to the end of the memtable
 -     * @return An iterator of entries with the data from the start key
 -     */
 -    public Iterator<Map.Entry<DecoratedKey, ColumnFamily>> getEntryIterator(final RowPosition startWith, final RowPosition stopAt)
 +    public UnfilteredPartitionIterator makePartitionIterator(final ColumnFilter columnFilter, final DataRange dataRange, final boolean isForThrift)
      {
 -        return new Iterator<Map.Entry<DecoratedKey, ColumnFamily>>()
 -        {
 -            private Iterator<? extends Map.Entry<? extends RowPosition, AtomicBTreeColumns>> iter = stopAt.isMinimum()
 -                    ? rows.tailMap(startWith).entrySet().iterator()
 -                    : rows.subMap(startWith, true, stopAt, true).entrySet().iterator();
 +        AbstractBounds<PartitionPosition> keyRange = dataRange.keyRange();
 +
 +        boolean startIsMin = keyRange.left.isMinimum();
 +        boolean stopIsMin = keyRange.right.isMinimum();
 +
 +        boolean isBound = keyRange instanceof Bounds;
 +        boolean includeStart = isBound || keyRange instanceof IncludingExcludingBounds;
 +        boolean includeStop = isBound || keyRange instanceof Range;
 +        Map<PartitionPosition, AtomicBTreePartition> subMap;
 +        if (startIsMin)
 +            subMap = stopIsMin ? partitions : partitions.headMap(keyRange.right, includeStop);
 +        else
 +            subMap = stopIsMin
 +                   ? partitions.tailMap(keyRange.left, includeStart)
 +                   : partitions.subMap(keyRange.left, includeStart, keyRange.right, includeStop);
 +
 +        final Iterator<Map.Entry<PartitionPosition, AtomicBTreePartition>> iter = subMap.entrySet().iterator();
  
 -            private Map.Entry<? extends RowPosition, ? extends ColumnFamily> currentEntry;
 +        return new AbstractUnfilteredPartitionIterator()
 +        {
 +            public boolean isForThrift()
 +            {
 +                return isForThrift;
 +            }
  
              public boolean hasNext()
              {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dea6ab1b/src/java/org/apache/cassandra/db/partitions/AtomicBTreePartition.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/partitions/AtomicBTreePartition.java
index 4edd707,0000000..777e7b4
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/db/partitions/AtomicBTreePartition.java
+++ b/src/java/org/apache/cassandra/db/partitions/AtomicBTreePartition.java
@@@ -1,819 -1,0 +1,819 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.db.partitions;
 +
 +import java.nio.ByteBuffer;
 +import java.util.ArrayList;
 +import java.util.Iterator;
 +import java.util.List;
 +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 +
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +import com.google.common.collect.AbstractIterator;
 +import com.google.common.collect.Iterators;
 +
 +import org.apache.cassandra.config.CFMetaData;
 +import org.apache.cassandra.config.ColumnDefinition;
 +import org.apache.cassandra.db.*;
 +import org.apache.cassandra.db.rows.*;
 +import org.apache.cassandra.db.filter.ColumnFilter;
 +import org.apache.cassandra.db.index.SecondaryIndexManager;
 +import org.apache.cassandra.utils.btree.BTree;
 +import org.apache.cassandra.utils.btree.BTreeSearchIterator;
 +import org.apache.cassandra.utils.btree.UpdateFunction;
 +import org.apache.cassandra.utils.ObjectSizes;
 +import org.apache.cassandra.utils.FBUtilities;
 +import org.apache.cassandra.utils.SearchIterator;
 +import org.apache.cassandra.utils.concurrent.OpOrder;
 +import org.apache.cassandra.utils.concurrent.Locks;
 +import org.apache.cassandra.utils.memory.MemtableAllocator;
 +import org.apache.cassandra.utils.memory.HeapAllocator;
 +import org.apache.cassandra.service.StorageService;
 +import static org.apache.cassandra.db.index.SecondaryIndexManager.Updater;
 +
 +/**
 + * A thread-safe and atomic Partition implementation.
 + *
 + * Operations (in particular addAll) on this implementation are atomic and
 + * isolated (in the sense of ACID). Typically a addAll is guaranteed that no
 + * other thread can see the state where only parts but not all rows have
 + * been added.
 + */
 +public class AtomicBTreePartition implements Partition
 +{
 +    private static final Logger logger = LoggerFactory.getLogger(AtomicBTreePartition.class);
 +
 +    public static final long EMPTY_SIZE = ObjectSizes.measure(new AtomicBTreePartition(CFMetaData.createFake("keyspace", "table"),
 +                                                                                       StorageService.getPartitioner().decorateKey(ByteBuffer.allocate(1)),
 +                                                                                       null));
 +
 +    // Reserved values for wasteTracker field. These values must not be consecutive (see avoidReservedValues)
 +    private static final int TRACKER_NEVER_WASTED = 0;
 +    private static final int TRACKER_PESSIMISTIC_LOCKING = Integer.MAX_VALUE;
 +
 +    // The granularity with which we track wasted allocation/work; we round up
 +    private static final int ALLOCATION_GRANULARITY_BYTES = 1024;
 +    // The number of bytes we have to waste in excess of our acceptable realtime rate of waste (defined below)
 +    private static final long EXCESS_WASTE_BYTES = 10 * 1024 * 1024L;
 +    private static final int EXCESS_WASTE_OFFSET = (int) (EXCESS_WASTE_BYTES / ALLOCATION_GRANULARITY_BYTES);
 +    // Note this is a shift, because dividing a long time and then picking the low 32 bits doesn't give correct rollover behavior
 +    private static final int CLOCK_SHIFT = 17;
 +    // CLOCK_GRANULARITY = 1^9ns >> CLOCK_SHIFT == 132us == (1/7.63)ms
 +
 +    /**
 +     * (clock + allocation) granularity are combined to give us an acceptable (waste) allocation rate that is defined by
 +     * the passage of real time of ALLOCATION_GRANULARITY_BYTES/CLOCK_GRANULARITY, or in this case 7.63Kb/ms, or 7.45Mb/s
 +     *
 +     * in wasteTracker we maintain within EXCESS_WASTE_OFFSET before the current time; whenever we waste bytes
 +     * we increment the current value if it is within this window, and set it to the min of the window plus our waste
 +     * otherwise.
 +     */
 +    private volatile int wasteTracker = TRACKER_NEVER_WASTED;
 +
 +    private static final AtomicIntegerFieldUpdater<AtomicBTreePartition> wasteTrackerUpdater = AtomicIntegerFieldUpdater.newUpdater(AtomicBTreePartition.class, "wasteTracker");
 +
 +    private static final DeletionInfo LIVE = DeletionInfo.live();
 +    // This is a small optimization: DeletionInfo is mutable, but we know that we will always copy it in that class,
 +    // so we can safely alias one DeletionInfo.live() reference and avoid some allocations.
 +    private static final Holder EMPTY = new Holder(BTree.empty(), LIVE, null, RowStats.NO_STATS);
 +
 +    private final CFMetaData metadata;
 +    private final DecoratedKey partitionKey;
 +    private final MemtableAllocator allocator;
 +
 +    private volatile Holder ref;
 +
 +    private static final AtomicReferenceFieldUpdater<AtomicBTreePartition, Holder> refUpdater = AtomicReferenceFieldUpdater.newUpdater(AtomicBTreePartition.class, Holder.class, "ref");
 +
 +    public AtomicBTreePartition(CFMetaData metadata, DecoratedKey partitionKey, MemtableAllocator allocator)
 +    {
 +        this.metadata = metadata;
 +        this.partitionKey = partitionKey;
 +        this.allocator = allocator;
 +        this.ref = EMPTY;
 +    }
 +
 +    public boolean isEmpty()
 +    {
 +        return ref.deletionInfo.isLive() && BTree.isEmpty(ref.tree) && ref.staticRow == null;
 +    }
 +
 +    public CFMetaData metadata()
 +    {
 +        return metadata;
 +    }
 +
 +    public DecoratedKey partitionKey()
 +    {
 +        return partitionKey;
 +    }
 +
 +    public DeletionTime partitionLevelDeletion()
 +    {
 +        return ref.deletionInfo.getPartitionDeletion();
 +    }
 +
 +    public PartitionColumns columns()
 +    {
 +        // We don't really know which columns will be part of the update, so assume it's all of them
 +        return metadata.partitionColumns();
 +    }
 +
 +    public boolean hasRows()
 +    {
 +        return !BTree.isEmpty(ref.tree);
 +    }
 +
 +    public RowStats stats()
 +    {
 +        return ref.stats;
 +    }
 +
 +    public Row getRow(Clustering clustering)
 +    {
 +        Row row = searchIterator(ColumnFilter.selection(columns()), false).next(clustering);
 +        // Note that for statics, this will never return null, this will return an empty row. However,
 +        // it's more consistent for this method to return null if we don't really have a static row.
 +        return row == null || (clustering == Clustering.STATIC_CLUSTERING && row.isEmpty()) ? null : row;
 +    }
 +
 +    public SearchIterator<Clustering, Row> searchIterator(final ColumnFilter columns, final boolean reversed)
 +    {
 +        // TODO: we could optimize comparison for "NativeRow" à la #6755
 +        final Holder current = ref;
 +        return new SearchIterator<Clustering, Row>()
 +        {
 +            private final SearchIterator<Clustering, MemtableRowData> rawIter = new BTreeSearchIterator<>(current.tree, metadata.comparator, !reversed);
 +            private final MemtableRowData.ReusableRow row = allocator.newReusableRow();
 +            private final ReusableFilteringRow filter = new ReusableFilteringRow(columns.fetchedColumns().regulars, columns);
 +            private final long partitionDeletion = current.deletionInfo.getPartitionDeletion().markedForDeleteAt();
 +
 +            public boolean hasNext()
 +            {
 +                return rawIter.hasNext();
 +            }
 +
 +            public Row next(Clustering key)
 +            {
 +                if (key == Clustering.STATIC_CLUSTERING)
 +                    return makeStatic(columns, current, allocator);
 +
 +                MemtableRowData data = rawIter.next(key);
 +                // We also need to find if there is a range tombstone covering this key
 +                RangeTombstone rt = current.deletionInfo.rangeCovering(key);
 +
 +                if (data == null)
 +                {
 +                    // If we have a range tombstone but not data, "fake" the RT by return a row deletion
 +                    // corresponding to the tombstone.
 +                    if (rt != null && rt.deletionTime().markedForDeleteAt() > partitionDeletion)
 +                        return filter.setRowDeletion(rt.deletionTime()).setTo(emptyDeletedRow(key, rt.deletionTime()));
 +                    return null;
 +                }
 +
 +                row.setTo(data);
 +
 +                filter.setRowDeletion(null);
 +                if (rt == null || rt.deletionTime().markedForDeleteAt() < partitionDeletion)
 +                {
 +                    filter.setDeletionTimestamp(partitionDeletion);
 +                }
 +                else
 +                {
 +                    filter.setDeletionTimestamp(rt.deletionTime().markedForDeleteAt());
 +                    // If we have a range tombstone covering that row and it's bigger than the row deletion itself, then
 +                    // we replace the row deletion by the tombstone deletion as a way to return the tombstone.
 +                    if (rt.deletionTime().supersedes(row.deletion()))
 +                        filter.setRowDeletion(rt.deletionTime());
 +                }
 +
 +                return filter.setTo(row);
 +            }
 +        };
 +    }
 +
 +    private static Row emptyDeletedRow(Clustering clustering, DeletionTime deletion)
 +    {
 +        return new AbstractRow()
 +        {
 +            public Columns columns()
 +            {
 +                return Columns.NONE;
 +            }
 +
 +            public LivenessInfo primaryKeyLivenessInfo()
 +            {
 +                return LivenessInfo.NONE;
 +            }
 +
 +            public DeletionTime deletion()
 +            {
 +                return deletion;
 +            }
 +
 +            public boolean isEmpty()
 +            {
 +                return true;
 +            }
 +
 +            public boolean hasComplexDeletion()
 +            {
 +                return false;
 +            }
 +
 +            public Clustering clustering()
 +            {
 +                return clustering;
 +            }
 +
 +            public Cell getCell(ColumnDefinition c)
 +            {
 +                return null;
 +            }
 +
 +            public Cell getCell(ColumnDefinition c, CellPath path)
 +            {
 +                return null;
 +            }
 +
 +            public Iterator<Cell> getCells(ColumnDefinition c)
 +            {
 +                return null;
 +            }
 +
 +            public DeletionTime getDeletion(ColumnDefinition c)
 +            {
 +                return DeletionTime.LIVE;
 +            }
 +
 +            public Iterator<Cell> iterator()
 +            {
 +                return Iterators.<Cell>emptyIterator();
 +            }
 +
 +            public SearchIterator<ColumnDefinition, ColumnData> searchIterator()
 +            {
 +                return new SearchIterator<ColumnDefinition, ColumnData>()
 +                {
 +                    public boolean hasNext()
 +                    {
 +                        return false;
 +                    }
 +
 +                    public ColumnData next(ColumnDefinition column)
 +                    {
 +                        return null;
 +                    }
 +                };
 +            }
 +
 +            public Row takeAlias()
 +            {
 +                return this;
 +            }
 +        };
 +    }
 +
 +    public UnfilteredRowIterator unfilteredIterator()
 +    {
 +        return unfilteredIterator(ColumnFilter.selection(columns()), Slices.ALL, false);
 +    }
 +
 +    public UnfilteredRowIterator unfilteredIterator(ColumnFilter selection, Slices slices, boolean reversed)
 +    {
 +        if (slices.size() == 0)
 +        {
 +            Holder current = ref;
 +            DeletionTime partitionDeletion = current.deletionInfo.getPartitionDeletion();
 +            if (selection.fetchedColumns().statics.isEmpty() && partitionDeletion.isLive())
 +                return UnfilteredRowIterators.emptyIterator(metadata, partitionKey, reversed);
 +
 +            return new AbstractUnfilteredRowIterator(metadata,
 +                                                     partitionKey,
 +                                                     partitionDeletion,
 +                                                     selection.fetchedColumns(),
 +                                                     makeStatic(selection, current, allocator),
 +                                                     reversed,
 +                                                     current.stats)
 +            {
 +                protected Unfiltered computeNext()
 +                {
 +                    return endOfData();
 +                }
 +            };
 +        }
 +
 +        return slices.size() == 1
 +             ? new SingleSliceIterator(metadata, partitionKey, ref, selection, slices.get(0), reversed, allocator)
 +             : new SlicesIterator(metadata, partitionKey, ref, selection, slices, reversed, allocator);
 +    }
 +
 +    private static Row makeStatic(ColumnFilter selection, Holder holder, MemtableAllocator allocator)
 +    {
 +        Columns statics = selection.fetchedColumns().statics;
 +        if (statics.isEmpty() || holder.staticRow == null)
 +            return Rows.EMPTY_STATIC_ROW;
 +
 +        return new ReusableFilteringRow(statics, selection)
 +               .setDeletionTimestamp(holder.deletionInfo.getPartitionDeletion().markedForDeleteAt())
 +               .setTo(allocator.newReusableRow().setTo(holder.staticRow));
 +    }
 +
 +    private static class ReusableFilteringRow extends FilteringRow
 +    {
 +        private final Columns columns;
 +        private final ColumnFilter selection;
 +        private ColumnFilter.Tester tester;
 +        private long deletionTimestamp;
 +
 +        // Used by searchIterator in case the row is covered by a tombstone.
 +        private DeletionTime rowDeletion;
 +
 +        public ReusableFilteringRow(Columns columns, ColumnFilter selection)
 +        {
 +            this.columns = columns;
 +            this.selection = selection;
 +        }
 +
 +        public ReusableFilteringRow setDeletionTimestamp(long timestamp)
 +        {
 +            this.deletionTimestamp = timestamp;
 +            return this;
 +        }
 +
 +        public ReusableFilteringRow setRowDeletion(DeletionTime rowDeletion)
 +        {
 +            this.rowDeletion = rowDeletion;
 +            return this;
 +        }
 +
 +        @Override
 +        public DeletionTime deletion()
 +        {
 +            return rowDeletion == null ? super.deletion() : rowDeletion;
 +        }
 +
 +        @Override
 +        protected boolean include(LivenessInfo info)
 +        {
 +            return info.timestamp() > deletionTimestamp;
 +        }
 +
 +        @Override
 +        protected boolean include(ColumnDefinition def)
 +        {
 +            return columns.contains(def);
 +        }
 +
 +        @Override
 +        protected boolean include(DeletionTime dt)
 +        {
 +            return dt.markedForDeleteAt() > deletionTimestamp;
 +        }
 +
 +        @Override
 +        protected boolean include(ColumnDefinition c, DeletionTime dt)
 +        {
 +            return dt.markedForDeleteAt() > deletionTimestamp;
 +        }
 +
 +        @Override
 +        protected boolean include(Cell cell)
 +        {
 +            return selection.includes(cell);
 +        }
 +    }
 +
 +    private static class SingleSliceIterator extends AbstractUnfilteredRowIterator
 +    {
 +        private final Iterator<Unfiltered> iterator;
 +        private final ReusableFilteringRow row;
 +
 +        private SingleSliceIterator(CFMetaData metadata,
 +                                    DecoratedKey key,
 +                                    Holder holder,
 +                                    ColumnFilter selection,
 +                                    Slice slice,
 +                                    boolean isReversed,
 +                                    MemtableAllocator allocator)
 +        {
 +            super(metadata,
 +                  key,
 +                  holder.deletionInfo.getPartitionDeletion(),
 +                  selection.fetchedColumns(),
 +                  makeStatic(selection, holder, allocator),
 +                  isReversed,
 +                  holder.stats);
 +
 +            Iterator<Row> rowIter = rowIter(metadata,
 +                                            holder,
 +                                            slice,
 +                                            !isReversed,
 +                                            allocator);
 +
 +            this.iterator = new RowAndTombstoneMergeIterator(metadata.comparator, isReversed)
 +                            .setTo(rowIter, holder.deletionInfo.rangeIterator(slice, isReversed));
 +
 +            this.row = new ReusableFilteringRow(selection.fetchedColumns().regulars, selection)
 +                       .setDeletionTimestamp(partitionLevelDeletion.markedForDeleteAt());
 +        }
 +
 +        private Iterator<Row> rowIter(CFMetaData metadata,
 +                                      Holder holder,
 +                                      Slice slice,
 +                                      boolean forwards,
 +                                      final MemtableAllocator allocator)
 +        {
 +            Slice.Bound start = slice.start() == Slice.Bound.BOTTOM ? null : slice.start();
 +            Slice.Bound end = slice.end() == Slice.Bound.TOP ? null : slice.end();
 +            final Iterator<MemtableRowData> dataIter = BTree.slice(holder.tree, metadata.comparator, start, true, end, true, forwards);
 +            return new AbstractIterator<Row>()
 +            {
 +                private final MemtableRowData.ReusableRow row = allocator.newReusableRow();
 +
 +                protected Row computeNext()
 +                {
 +                    return dataIter.hasNext() ? row.setTo(dataIter.next()) : endOfData();
 +                }
 +            };
 +        }
 +
 +        protected Unfiltered computeNext()
 +        {
 +            while (iterator.hasNext())
 +            {
 +                Unfiltered next = iterator.next();
 +                if (next.kind() == Unfiltered.Kind.ROW)
 +                {
 +                    row.setTo((Row)next);
 +                    if (!row.isEmpty())
 +                        return row;
 +                }
 +                else
 +                {
 +                    RangeTombstoneMarker marker = (RangeTombstoneMarker)next;
 +
 +                    long deletion = partitionLevelDeletion().markedForDeleteAt();
 +                    if (marker.isOpen(isReverseOrder()))
 +                        deletion = Math.max(deletion, marker.openDeletionTime(isReverseOrder()).markedForDeleteAt());
 +                    row.setDeletionTimestamp(deletion);
 +                    return marker;
 +                }
 +            }
 +            return endOfData();
 +        }
 +    }
 +
 +    public static class SlicesIterator extends AbstractUnfilteredRowIterator
 +    {
 +        private final Holder holder;
 +        private final MemtableAllocator allocator;
 +        private final ColumnFilter selection;
 +        private final Slices slices;
 +
 +        private int idx;
 +        private UnfilteredRowIterator currentSlice;
 +
 +        private SlicesIterator(CFMetaData metadata,
 +                               DecoratedKey key,
 +                               Holder holder,
 +                               ColumnFilter selection,
 +                               Slices slices,
 +                               boolean isReversed,
 +                               MemtableAllocator allocator)
 +        {
 +            super(metadata, key, holder.deletionInfo.getPartitionDeletion(), selection.fetchedColumns(), makeStatic(selection, holder, allocator), isReversed, holder.stats);
 +            this.holder = holder;
 +            this.selection = selection;
 +            this.allocator = allocator;
 +            this.slices = slices;
 +        }
 +
 +        protected Unfiltered computeNext()
 +        {
 +            while (true)
 +            {
 +                if (currentSlice == null)
 +                {
 +                    if (idx >= slices.size())
 +                        return endOfData();
 +
 +                    int sliceIdx = isReverseOrder ? slices.size() - idx - 1 : idx;
 +                    currentSlice = new SingleSliceIterator(metadata,
 +                                                           partitionKey,
 +                                                           holder,
 +                                                           selection,
 +                                                           slices.get(sliceIdx),
 +                                                           isReverseOrder,
 +                                                           allocator);
 +                    idx++;
 +                }
 +
 +                if (currentSlice.hasNext())
 +                    return currentSlice.next();
 +
 +                currentSlice = null;
 +            }
 +        }
 +    }
 +
 +    /**
 +     * Adds a given update to this in-memtable partition.
 +     *
 +     * @return an array containing first the difference in size seen after merging the updates, and second the minimum
 +     * time detla between updates.
 +     */
 +    public long[] addAllWithSizeDelta(final PartitionUpdate update, OpOrder.Group writeOp, Updater indexer)
 +    {
 +        RowUpdater updater = new RowUpdater(this, allocator, writeOp, indexer);
 +        DeletionInfo inputDeletionInfoCopy = null;
 +
 +        boolean monitorOwned = false;
 +        try
 +        {
 +            if (usePessimisticLocking())
 +            {
 +                Locks.monitorEnterUnsafe(this);
 +                monitorOwned = true;
 +            }
 +            while (true)
 +            {
 +                Holder current = ref;
 +                updater.ref = current;
 +                updater.reset();
 +
 +                DeletionInfo deletionInfo;
 +                if (update.deletionInfo().mayModify(current.deletionInfo))
 +                {
 +                    if (inputDeletionInfoCopy == null)
 +                        inputDeletionInfoCopy = update.deletionInfo().copy(HeapAllocator.instance);
 +
 +                    deletionInfo = current.deletionInfo.copy().add(inputDeletionInfoCopy);
 +                    updater.allocated(deletionInfo.unsharedHeapSize() - current.deletionInfo.unsharedHeapSize());
 +                }
 +                else
 +                {
 +                    deletionInfo = current.deletionInfo;
 +                }
 +
 +                Row newStatic = update.staticRow();
 +                MemtableRowData staticRow = newStatic == Rows.EMPTY_STATIC_ROW
 +                                          ? current.staticRow
 +                                          : (current.staticRow == null ? updater.apply(newStatic) : updater.apply(current.staticRow, newStatic));
 +                Object[] tree = BTree.<Clusterable, Row, MemtableRowData>update(current.tree, update.metadata().comparator, update, update.rowCount(), updater);
 +                RowStats newStats = current.stats.mergeWith(update.stats());
 +
 +                if (tree != null && refUpdater.compareAndSet(this, current, new Holder(tree, deletionInfo, staticRow, newStats)))
 +                {
 +                    indexer.updateRowLevelIndexes();
 +                    updater.finish();
 +                    return new long[]{ updater.dataSize, updater.colUpdateTimeDelta };
 +                }
 +                else if (!monitorOwned)
 +                {
 +                    boolean shouldLock = usePessimisticLocking();
 +                    if (!shouldLock)
 +                    {
 +                        shouldLock = updateWastedAllocationTracker(updater.heapSize);
 +                    }
 +                    if (shouldLock)
 +                    {
 +                        Locks.monitorEnterUnsafe(this);
 +                        monitorOwned = true;
 +                    }
 +                }
 +            }
 +        }
 +        finally
 +        {
 +            if (monitorOwned)
 +                Locks.monitorExitUnsafe(this);
 +        }
 +
 +    }
 +
 +    public boolean usePessimisticLocking()
 +    {
 +        return wasteTracker == TRACKER_PESSIMISTIC_LOCKING;
 +    }
 +
 +    /**
 +     * Update the wasted allocation tracker state based on newly wasted allocation information
 +     *
 +     * @param wastedBytes the number of bytes wasted by this thread
 +     * @return true if the caller should now proceed with pessimistic locking because the waste limit has been reached
 +     */
 +    private boolean updateWastedAllocationTracker(long wastedBytes)
 +    {
 +        // Early check for huge allocation that exceeds the limit
 +        if (wastedBytes < EXCESS_WASTE_BYTES)
 +        {
 +            // We round up to ensure work < granularity are still accounted for
 +            int wastedAllocation = ((int) (wastedBytes + ALLOCATION_GRANULARITY_BYTES - 1)) / ALLOCATION_GRANULARITY_BYTES;
 +
 +            int oldTrackerValue;
 +            while (TRACKER_PESSIMISTIC_LOCKING != (oldTrackerValue = wasteTracker))
 +            {
 +                // Note this time value has an arbitrary offset, but is a constant rate 32 bit counter (that may wrap)
 +                int time = (int) (System.nanoTime() >>> CLOCK_SHIFT);
 +                int delta = oldTrackerValue - time;
 +                if (oldTrackerValue == TRACKER_NEVER_WASTED || delta >= 0 || delta < -EXCESS_WASTE_OFFSET)
 +                    delta = -EXCESS_WASTE_OFFSET;
 +                delta += wastedAllocation;
 +                if (delta >= 0)
 +                    break;
 +                if (wasteTrackerUpdater.compareAndSet(this, oldTrackerValue, avoidReservedValues(time + delta)))
 +                    return false;
 +            }
 +        }
 +        // We have definitely reached our waste limit so set the state if it isn't already
 +        wasteTrackerUpdater.set(this, TRACKER_PESSIMISTIC_LOCKING);
 +        // And tell the caller to proceed with pessimistic locking
 +        return true;
 +    }
 +
 +    private static int avoidReservedValues(int wasteTracker)
 +    {
 +        if (wasteTracker == TRACKER_NEVER_WASTED || wasteTracker == TRACKER_PESSIMISTIC_LOCKING)
 +            return wasteTracker + 1;
 +        return wasteTracker;
 +    }
 +
 +    private static final class Holder
 +    {
 +        final DeletionInfo deletionInfo;
 +        // the btree of rows
 +        final Object[] tree;
 +        final MemtableRowData staticRow;
 +        final RowStats stats;
 +
 +        Holder(Object[] tree, DeletionInfo deletionInfo, MemtableRowData staticRow, RowStats stats)
 +        {
 +            this.tree = tree;
 +            this.deletionInfo = deletionInfo;
 +            this.staticRow = staticRow;
 +            this.stats = stats;
 +        }
 +
 +        Holder with(DeletionInfo info)
 +        {
 +            return new Holder(this.tree, info, this.staticRow, this.stats);
 +        }
 +    }
 +
 +    // the function we provide to the btree utilities to perform any column replacements
 +    private static final class RowUpdater implements UpdateFunction<Row, MemtableRowData>
 +    {
 +        final AtomicBTreePartition updating;
 +        final MemtableAllocator allocator;
 +        final OpOrder.Group writeOp;
 +        final Updater indexer;
 +        final int nowInSec;
 +        Holder ref;
 +        long dataSize;
 +        long heapSize;
 +        long colUpdateTimeDelta = Long.MAX_VALUE;
 +        final MemtableRowData.ReusableRow row;
 +        final MemtableAllocator.DataReclaimer reclaimer;
 +        final MemtableAllocator.RowAllocator rowAllocator;
 +        List<MemtableRowData> inserted; // TODO: replace with walk of aborted BTree
 +
 +        private RowUpdater(AtomicBTreePartition updating, MemtableAllocator allocator, OpOrder.Group writeOp, Updater indexer)
 +        {
 +            this.updating = updating;
 +            this.allocator = allocator;
 +            this.writeOp = writeOp;
 +            this.indexer = indexer;
 +            this.nowInSec = FBUtilities.nowInSeconds();
 +            this.row = allocator.newReusableRow();
 +            this.reclaimer = allocator.reclaimer();
 +            this.rowAllocator = allocator.newRowAllocator(updating.metadata(), writeOp);
 +        }
 +
 +        public MemtableRowData apply(Row insert)
 +        {
 +            rowAllocator.allocateNewRow(insert.clustering().size(), insert.columns(), insert.isStatic());
 +            insert.copyTo(rowAllocator);
 +            MemtableRowData data = rowAllocator.allocatedRowData();
 +
 +            insertIntoIndexes(insert);
 +
 +            this.dataSize += data.dataSize();
 +            this.heapSize += data.unsharedHeapSizeExcludingData();
 +            if (inserted == null)
 +                inserted = new ArrayList<>();
 +            inserted.add(data);
 +            return data;
 +        }
 +
 +        public MemtableRowData apply(MemtableRowData existing, Row update)
 +        {
 +            Columns mergedColumns = existing.columns().mergeTo(update.columns());
 +            rowAllocator.allocateNewRow(update.clustering().size(), mergedColumns, update.isStatic());
 +
 +            colUpdateTimeDelta = Math.min(colUpdateTimeDelta, Rows.merge(row.setTo(existing), update, mergedColumns, rowAllocator, nowInSec, indexer));
 +
 +            MemtableRowData reconciled = rowAllocator.allocatedRowData();
 +
 +            dataSize += reconciled.dataSize() - existing.dataSize();
 +            heapSize += reconciled.unsharedHeapSizeExcludingData() - existing.unsharedHeapSizeExcludingData();
 +            if (inserted == null)
 +                inserted = new ArrayList<>();
 +            inserted.add(reconciled);
 +            discard(existing);
 +
 +            return reconciled;
 +        }
 +
 +        private void insertIntoIndexes(Row toInsert)
 +        {
 +            if (indexer == SecondaryIndexManager.nullUpdater)
 +                return;
 +
 +            maybeIndexPrimaryKeyColumns(toInsert);
 +            Clustering clustering = toInsert.clustering();
 +            for (Cell cell : toInsert)
 +                indexer.insert(clustering, cell);
 +        }
 +
 +        private void maybeIndexPrimaryKeyColumns(Row row)
 +        {
 +            // We want to update a primary key index with the most up to date info contains in that inserted row (if only for
 +            // backward compatibility). Note that if there is an index but not a partition key or clustering column one, we've
 +            // wasting this work. We might be able to avoid that if row indexing was pushed in the index updater.
 +            long timestamp = row.primaryKeyLivenessInfo().timestamp();
 +            int ttl = row.primaryKeyLivenessInfo().ttl();
 +
 +            for (Cell cell : row)
 +            {
 +                long cellTimestamp = cell.livenessInfo().timestamp();
 +                if (cell.isLive(nowInSec))
 +                {
 +                    if (cellTimestamp > timestamp)
 +                    {
 +                        timestamp = cellTimestamp;
 +                        ttl = cell.livenessInfo().ttl();
 +                    }
 +                }
 +            }
 +
 +            indexer.maybeIndex(row.clustering(), timestamp, ttl, row.deletion());
 +        }
 +
 +        protected void reset()
 +        {
 +            this.dataSize = 0;
 +            this.heapSize = 0;
 +            if (inserted != null)
 +            {
 +                for (MemtableRowData row : inserted)
 +                    abort(row);
 +                inserted.clear();
 +            }
 +            reclaimer.cancel();
 +        }
 +
 +        protected void abort(MemtableRowData abort)
 +        {
 +            reclaimer.reclaimImmediately(abort);
 +        }
 +
 +        protected void discard(MemtableRowData discard)
 +        {
 +            reclaimer.reclaim(discard);
 +        }
 +
 +        public boolean abortEarly()
 +        {
 +            return updating.ref != ref;
 +        }
 +
 +        public void allocated(long heapSize)
 +        {
 +            this.heapSize += heapSize;
 +        }
 +
 +        protected void finish()
 +        {
-             allocator.onHeap().allocate(heapSize, writeOp);
++            allocator.onHeap().adjust(heapSize, writeOp);
 +            reclaimer.commit();
 +        }
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dea6ab1b/src/java/org/apache/cassandra/utils/FBUtilities.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dea6ab1b/src/java/org/apache/cassandra/utils/memory/MemtableAllocator.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dea6ab1b/test/unit/org/apache/cassandra/utils/memory/NativeAllocatorTest.java
----------------------------------------------------------------------