You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2014/02/05 18:51:48 UTC
[1/3] git commit: Fix partition and range deletes not triggering flush
Updated Branches:
refs/heads/trunk 58d1a4f81 -> fe4247e58
Fix partition and range deletes not triggering flush
patch by benedict; reviewed by slebresne for CASSANDRA-6655
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/adcb713d
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/adcb713d
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/adcb713d
Branch: refs/heads/trunk
Commit: adcb713d597302a868b6224a87ea6ce38e718e5d
Parents: 16efdf4
Author: Sylvain Lebresne <sy...@datastax.com>
Authored: Wed Feb 5 18:34:37 2014 +0100
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Wed Feb 5 18:34:37 2014 +0100
----------------------------------------------------------------------
CHANGES.txt | 3 +++
.../org/apache/cassandra/db/AtomicSortedColumns.java | 7 ++++++-
src/java/org/apache/cassandra/db/DeletionInfo.java | 14 ++++++++++++++
src/java/org/apache/cassandra/db/Memtable.java | 10 ++++++----
4 files changed, 29 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/adcb713d/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 0989dc4..cfdd148 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,3 +1,6 @@
+1.2.16
+ * Fix partition and range deletes not triggering flush (CASSANDRA-6655)
+
1.2.15
* Move handling of migration event source to solve bootstrap race (CASSANDRA-6648)
* Make sure compaction throughput value doesn't overflow with int math (CASSANDRA-6647)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/adcb713d/src/java/org/apache/cassandra/db/AtomicSortedColumns.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/AtomicSortedColumns.java b/src/java/org/apache/cassandra/db/AtomicSortedColumns.java
index 9803544..d6c861b 100644
--- a/src/java/org/apache/cassandra/db/AtomicSortedColumns.java
+++ b/src/java/org/apache/cassandra/db/AtomicSortedColumns.java
@@ -194,7 +194,12 @@ public class AtomicSortedColumns implements ISortedColumns
{
sizeDelta = 0;
current = ref.get();
- DeletionInfo newDelInfo = current.deletionInfo.copy().add(cm.getDeletionInfo());
+ DeletionInfo newDelInfo = current.deletionInfo;
+ if (cm.getDeletionInfo().mayModify(newDelInfo))
+ {
+ newDelInfo = current.deletionInfo.copy().add(cm.getDeletionInfo());
+ sizeDelta += newDelInfo.dataSize() - current.deletionInfo.dataSize();
+ }
modified = new Holder(current.map.clone(), newDelInfo);
for (IColumn column : cm.getSortedColumns())
http://git-wip-us.apache.org/repos/asf/cassandra/blob/adcb713d/src/java/org/apache/cassandra/db/DeletionInfo.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DeletionInfo.java b/src/java/org/apache/cassandra/db/DeletionInfo.java
index e486eeb..91af9fd 100644
--- a/src/java/org/apache/cassandra/db/DeletionInfo.java
+++ b/src/java/org/apache/cassandra/db/DeletionInfo.java
@@ -216,6 +216,20 @@ public class DeletionInfo
return size + (ranges == null ? 0 : ranges.dataSize());
}
+ public int rangeCount()
+ {
+ return ranges == null ? 0 : ranges.size();
+ }
+
+ /**
+ * Whether this deletion info may modify the provided one if added to it.
+ */
+ public boolean mayModify(DeletionInfo delInfo)
+ {
+ return topLevel.markedForDeleteAt > delInfo.topLevel.markedForDeleteAt
+ || ranges == null;
+ }
+
@Override
public String toString()
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/adcb713d/src/java/org/apache/cassandra/db/Memtable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Memtable.java b/src/java/org/apache/cassandra/db/Memtable.java
index 817561b..b229060 100644
--- a/src/java/org/apache/cassandra/db/Memtable.java
+++ b/src/java/org/apache/cassandra/db/Memtable.java
@@ -192,6 +192,7 @@ public class Memtable
{
ColumnFamily previous = columnFamilies.get(key);
+ long sizeDelta = 0;
if (previous == null)
{
// AtomicSortedColumns doesn't work for super columns (see #3821)
@@ -199,14 +200,15 @@ public class Memtable
// We'll add the columns later. This avoids wasting works if we get beaten in the putIfAbsent
previous = columnFamilies.putIfAbsent(new DecoratedKey(key.token, allocator.clone(key.key)), empty);
if (previous == null)
+ {
previous = empty;
+ sizeDelta += empty.deletionInfo().dataSize();
+ }
}
- long sizeDelta = previous.addAllWithSizeDelta(cf, allocator, localCopyFunction, indexer);
+ sizeDelta = previous.addAllWithSizeDelta(cf, allocator, localCopyFunction, indexer);
currentSize.addAndGet(sizeDelta);
- currentOperations.addAndGet((cf.getColumnCount() == 0)
- ? cf.isMarkedForDelete() ? 1 : 0
- : cf.getColumnCount());
+ currentOperations.addAndGet(cf.getColumnCount() + (cf.isMarkedForDelete() ? 1 : 0) + cf.deletionInfo().rangeCount());
}
// for debugging
[3/3] git commit: Merge branch 'cassandra-2.0' into trunk
Posted by sl...@apache.org.
Merge branch 'cassandra-2.0' into trunk
Conflicts:
CHANGES.txt
src/java/org/apache/cassandra/db/AtomicSortedColumns.java
src/java/org/apache/cassandra/db/Memtable.java
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/fe4247e5
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/fe4247e5
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/fe4247e5
Branch: refs/heads/trunk
Commit: fe4247e589714d9ea183187c0538b6446f16ffca
Parents: 58d1a4f 58e9481
Author: Sylvain Lebresne <sy...@datastax.com>
Authored: Wed Feb 5 18:51:40 2014 +0100
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Wed Feb 5 18:51:40 2014 +0100
----------------------------------------------------------------------
CHANGES.txt | 8 ++-----
.../apache/cassandra/db/AtomicBTreeColumns.java | 22 ++++++++++++--------
.../org/apache/cassandra/db/DeletionInfo.java | 14 +++++++++++++
src/java/org/apache/cassandra/db/Memtable.java | 4 +---
4 files changed, 30 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/fe4247e5/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 0690c38,bba5f20..7d628b5
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,40 -1,7 +1,36 @@@
+2.1
+ * add listsnapshots command to nodetool (CASSANDRA-5742)
+ * Introduce AtomicBTreeColumns (CASSANDRA-6271)
+ * Multithreaded commitlog (CASSANDRA-3578)
+ * allocate fixed index summary memory pool and resample cold index summaries
+ to use less memory (CASSANDRA-5519)
+ * Removed multithreaded compaction (CASSANDRA-6142)
+ * Parallelize fetching rows for low-cardinality indexes (CASSANDRA-1337)
+ * change logging from log4j to logback (CASSANDRA-5883)
+ * switch to LZ4 compression for internode communication (CASSANDRA-5887)
+ * Stop using Thrift-generated Index* classes internally (CASSANDRA-5971)
+ * Remove 1.2 network compatibility code (CASSANDRA-5960)
+ * Remove leveled json manifest migration code (CASSANDRA-5996)
+ * Remove CFDefinition (CASSANDRA-6253)
+ * Use AtomicIntegerFieldUpdater in RefCountedMemory (CASSANDRA-6278)
+ * User-defined types for CQL3 (CASSANDRA-5590)
+ * Use of o.a.c.metrics in nodetool (CASSANDRA-5871, 6406)
+ * Batch read from OTC's queue and cleanup (CASSANDRA-1632)
+ * Secondary index support for collections (CASSANDRA-4511, 6383)
+ * SSTable metadata(Stats.db) format change (CASSANDRA-6356)
+ * Push composites support in the storage engine
+ (CASSANDRA-5417, CASSANDRA-6520)
+ * Add snapshot space used to cfstats (CASSANDRA-6231)
+ * Add cardinality estimator for key count estimation (CASSANDRA-5906)
+ * CF id is changed to be non-deterministic. Data dir/key cache are created
+ uniquely for CF id (CASSANDRA-5202)
+ * New counters implementation (CASSANDRA-6504)
+
+
2.0.6
- * Fix direct Memory on architectures that do not support unaligned long access
- (CASSANDRA-6628)
- * Let scrub optionally skip broken counter partitions (CASSANDRA-5930)
Merged from 1.2:
- * Move handling of migration event source to solve bootstrap race. (CASSANDRA-6648)
- * Make sure compaction throughput value doesn't overflow with int math (CASSANDRA-6647)
-
+ * Fix partition and range deletes not triggering flush (CASSANDRA-6655)
+
2.0.5
* Reduce garbage generated by bloom filter lookups (CASSANDRA-6609)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/fe4247e5/src/java/org/apache/cassandra/db/AtomicBTreeColumns.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/AtomicBTreeColumns.java
index 238bb7c,0000000..fd7d4bc
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/db/AtomicBTreeColumns.java
+++ b/src/java/org/apache/cassandra/db/AtomicBTreeColumns.java
@@@ -1,457 -1,0 +1,461 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db;
+
+import java.util.AbstractCollection;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+
+import com.google.common.base.Function;
+import com.google.common.base.Functions;
+import com.google.common.collect.*;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.composites.CellName;
+import org.apache.cassandra.db.composites.CellNameType;
+import org.apache.cassandra.db.filter.ColumnSlice;
+import org.apache.cassandra.db.index.SecondaryIndexManager;
+import org.apache.cassandra.utils.ObjectSizes;
+import org.apache.cassandra.utils.btree.BTree;
+import org.apache.cassandra.utils.btree.BTreeSet;
+import org.apache.cassandra.utils.btree.UpdateFunction;
+import org.apache.cassandra.utils.memory.AbstractAllocator;
+
+import static org.apache.cassandra.db.index.SecondaryIndexManager.Updater;
+
+/**
+ * A thread-safe and atomic ISortedColumns implementation.
+ * Operations (in particular addAll) on this implemenation are atomic and
+ * isolated (in the sense of ACID). Typically a addAll is guaranteed that no
+ * other thread can see the state where only parts but not all columns have
+ * been added.
+ * <p/>
+ * WARNING: removing element through getSortedColumns().iterator() is *not* supported
+ */
+public class AtomicBTreeColumns extends ColumnFamily
+{
+ static final long HEAP_SIZE = ObjectSizes.measure(new AtomicBTreeColumns(CFMetaData.IndexCf, null))
+ + ObjectSizes.measure(new Holder(null, null));
+
+ private static final Function<Cell, CellName> NAME = new Function<Cell, CellName>()
+ {
+ public CellName apply(Cell column)
+ {
+ return column.name;
+ }
+ };
+
+ public static final Factory<AtomicBTreeColumns> factory = new Factory<AtomicBTreeColumns>()
+ {
+ public AtomicBTreeColumns create(CFMetaData metadata, boolean insertReversed)
+ {
+ if (insertReversed)
+ throw new IllegalArgumentException();
+ return new AtomicBTreeColumns(metadata);
+ }
+ };
+
+ private static final DeletionInfo LIVE = DeletionInfo.live();
+ private static final Holder EMPTY = new Holder(BTree.empty(), LIVE);
+
+ private volatile Holder ref;
+
+ private static final AtomicReferenceFieldUpdater<AtomicBTreeColumns, Holder> refUpdater = AtomicReferenceFieldUpdater.newUpdater(AtomicBTreeColumns.class, Holder.class, "ref");
+
+ private AtomicBTreeColumns(CFMetaData metadata)
+ {
+ this(metadata, EMPTY);
+ }
+
+ private AtomicBTreeColumns(CFMetaData metadata, Holder holder)
+ {
+ super(metadata);
+ this.ref = holder;
+ }
+
+ public CellNameType getComparator()
+ {
+ return metadata.comparator;
+ }
+
+ public Factory getFactory()
+ {
+ return factory;
+ }
+
+ public ColumnFamily cloneMe()
+ {
+ return new AtomicBTreeColumns(metadata, ref);
+ }
+
+ public DeletionInfo deletionInfo()
+ {
+ return ref.deletionInfo;
+ }
+
+ public void delete(DeletionTime delTime)
+ {
+ delete(new DeletionInfo(delTime));
+ }
+
+ protected void delete(RangeTombstone tombstone)
+ {
+ delete(new DeletionInfo(tombstone, getComparator()));
+ }
+
+ public void delete(DeletionInfo info)
+ {
+ if (info.isLive())
+ return;
+
+ // Keeping deletion info for max markedForDeleteAt value
+ while (true)
+ {
+ Holder current = ref;
+ DeletionInfo newDelInfo = current.deletionInfo.copy().add(info);
+ if (refUpdater.compareAndSet(this, current, current.with(newDelInfo)))
+ break;
+ }
+ }
+
+ public void setDeletionInfo(DeletionInfo newInfo)
+ {
+ ref = ref.with(newInfo);
+ }
+
+ public void purgeTombstones(int gcBefore)
+ {
+ while (true)
+ {
+ Holder current = ref;
+ if (!current.deletionInfo.hasPurgeableTombstones(gcBefore))
+ break;
+
+ DeletionInfo purgedInfo = current.deletionInfo.copy();
+ purgedInfo.purge(gcBefore);
+ if (refUpdater.compareAndSet(this, current, current.with(purgedInfo)))
+ break;
+ }
+ }
+
+ public void addAll(ColumnFamily cm, AbstractAllocator allocator, Function<Cell, Cell> transformation)
+ {
+ addAllWithSizeDelta(cm, allocator, transformation, SecondaryIndexManager.nullUpdater, new Delta());
+ }
+
+ // the function we provide to the btree utilities to perform any column replacements
+ private static final class ColumnUpdater implements UpdateFunction<Cell>
+ {
+ final AtomicBTreeColumns updating;
+ final Holder ref;
+ final AbstractAllocator allocator;
+ final Function<Cell, Cell> transform;
+ final Updater indexer;
+ final Delta delta;
+
+ private ColumnUpdater(AtomicBTreeColumns updating, Holder ref, AbstractAllocator allocator, Function<Cell, Cell> transform, Updater indexer, Delta delta)
+ {
+ this.updating = updating;
+ this.ref = ref;
+ this.allocator = allocator;
+ this.transform = transform;
+ this.indexer = indexer;
+ this.delta = delta;
+ }
+
+ public Cell apply(Cell inserted)
+ {
+ indexer.insert(inserted);
+ delta.insert(inserted);
+ return transform.apply(inserted);
+ }
+
+ public Cell apply(Cell existing, Cell update)
+ {
+ Cell reconciled = update.reconcile(existing, allocator);
+ indexer.update(existing, reconciled);
+ if (existing != reconciled)
+ delta.swap(existing, reconciled);
+ else
+ delta.abort(update);
+ return transform.apply(reconciled);
+ }
+
+ public boolean abortEarly()
+ {
+ return updating.ref != ref;
+ }
+
+ public void allocated(long heapSize)
+ {
+ delta.addHeapSize(heapSize);
+ }
+ }
+
+ private static Collection<Cell> transform(Comparator<Cell> cmp, ColumnFamily cf, Function<Cell, Cell> transformation, boolean sort)
+ {
+ Cell[] tmp = new Cell[cf.getColumnCount()];
+
+ int i = 0;
+ for (Cell c : cf)
+ tmp[i++] = transformation.apply(c);
+
+ if (sort)
+ Arrays.sort(tmp, cmp);
+
+ return Arrays.asList(tmp);
+ }
+
+ /**
+ * This is only called by Memtable.resolve, so only AtomicBTreeColumns needs to implement it.
+ *
+ * @return the difference in size seen after merging the given columns
+ */
+ public Delta addAllWithSizeDelta(final ColumnFamily cm, AbstractAllocator allocator, Function<Cell, Cell> transformation, Updater indexer, Delta delta)
+ {
+ boolean transformed = false;
+ Collection<Cell> insert;
+ if (cm instanceof UnsortedColumns)
+ {
+ insert = transform(metadata.comparator.columnComparator(), cm, transformation, true);
+ transformed = true;
+ }
+ else
+ insert = cm.getSortedColumns();
+
+ while (true)
+ {
+ Holder current = ref;
+
++ delta.reset();
+ DeletionInfo deletionInfo = cm.deletionInfo();
- if (deletionInfo.hasRanges())
++ if (deletionInfo.mayModify(current.deletionInfo))
+ {
- for (Iterator<Cell> iter : new Iterator[] { insert.iterator(), BTree.<Cell>slice(current.tree, true) })
++ if (deletionInfo.hasRanges())
+ {
- while (iter.hasNext())
++ for (Iterator<Cell> iter : new Iterator[] { insert.iterator(), BTree.<Cell>slice(current.tree, true) })
+ {
- Cell col = iter.next();
- if (deletionInfo.isDeleted(col))
- indexer.remove(col);
++ while (iter.hasNext())
++ {
++ Cell col = iter.next();
++ if (deletionInfo.isDeleted(col))
++ indexer.remove(col);
++ }
+ }
+ }
++
++ deletionInfo = current.deletionInfo.copy().add(deletionInfo);
++ delta.addHeapSize(deletionInfo.unsharedHeapSize() - current.deletionInfo.unsharedHeapSize());
+ }
+
- delta.reset();
- deletionInfo = current.deletionInfo.copy().add(deletionInfo);
- delta.addHeapSize(deletionInfo.unsharedHeapSize() - current.deletionInfo.unsharedHeapSize());
+ ColumnUpdater updater = new ColumnUpdater(this, current, allocator, transformation, indexer, delta);
+ Object[] tree = BTree.update(current.tree, metadata.comparator.columnComparator(), insert, true, updater);
+
+ if (tree != null && refUpdater.compareAndSet(this, current, new Holder(tree, deletionInfo)))
+ {
+ indexer.updateRowLevelIndexes();
+ return updater.delta;
+ }
+
+ if (!transformed)
+ {
+ // After failing once, transform Columns into a new collection to avoid repeatedly allocating Slab space
+ insert = transform(metadata.comparator.columnComparator(), cm, transformation, false);
+ transformed = true;
+ }
+ }
+
+ }
+
+ // no particular reason not to implement these next methods, we just haven't needed them yet
+
+ public void addColumn(Cell column, AbstractAllocator allocator)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public boolean replace(Cell oldColumn, Cell newColumn)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public void clear()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public Cell getColumn(CellName name)
+ {
+ return (Cell) BTree.find(ref.tree, asymmetricComparator(), name);
+ }
+
+ private Comparator<Object> asymmetricComparator()
+ {
+ final Comparator<? super CellName> cmp = metadata.comparator;
+ return new Comparator<Object>()
+ {
+ public int compare(Object o1, Object o2)
+ {
+ return cmp.compare((CellName) o1, ((Cell) o2).name);
+ }
+ };
+ }
+
+ public Iterable<CellName> getColumnNames()
+ {
+ return collection(false, NAME);
+ }
+
+ public Collection<Cell> getSortedColumns()
+ {
+ return collection(true, Functions.<Cell>identity());
+ }
+
+ public Collection<Cell> getReverseSortedColumns()
+ {
+ return collection(false, Functions.<Cell>identity());
+ }
+
+ private <V> Collection<V> collection(final boolean forwards, final Function<Cell, V> f)
+ {
+ final Holder ref = this.ref;
+ return new AbstractCollection<V>()
+ {
+ public Iterator<V> iterator()
+ {
+ return Iterators.transform(BTree.<Cell>slice(ref.tree, forwards), f);
+ }
+
+ public int size()
+ {
+ return BTree.slice(ref.tree, true).count();
+ }
+ };
+ }
+
+ public int getColumnCount()
+ {
+ return BTree.slice(ref.tree, true).count();
+ }
+
+ public Iterator<Cell> iterator(ColumnSlice[] slices)
+ {
+ return new ColumnSlice.NavigableSetIterator(new BTreeSet<>(ref.tree, getComparator().columnComparator()), slices);
+ }
+
+ public Iterator<Cell> reverseIterator(ColumnSlice[] slices)
+ {
+ return new ColumnSlice.NavigableSetIterator(new BTreeSet<>(ref.tree, getComparator().columnComparator()).descendingSet(), slices);
+ }
+
+ public boolean isInsertReversed()
+ {
+ return false;
+ }
+
+ private static class Holder
+ {
+ // This is a small optimization: DeletionInfo is mutable, but we know that we will always copy it in that class,
+ // so we can safely alias one DeletionInfo.live() reference and avoid some allocations.
+ final DeletionInfo deletionInfo;
+ // the btree of columns
+ final Object[] tree;
+
+ Holder(Object[] tree, DeletionInfo deletionInfo)
+ {
+ this.tree = tree;
+ this.deletionInfo = deletionInfo;
+ }
+
+ Holder with(DeletionInfo info)
+ {
+ return new Holder(this.tree, info);
+ }
+ }
+
+ // TODO: create a stack-allocation-friendly list to help optimise garbage for updates to rows with few columns
+
+ /**
+ * tracks the size changes made while merging a new group of cells in
+ */
+ public static final class Delta
+ {
+ private long dataSize;
+ private long heapSize;
+
+ // we track the discarded cells (cells that were in the btree, but replaced by new ones)
+ // separately from aborted ones (were part of an update but older than existing cells)
+ // since we need to reset the former when we race on the btree update, but not the latter
+ private List<Cell> discarded = new ArrayList<>();
+ private List<Cell> aborted;
+
+ protected void reset()
+ {
+ this.dataSize = 0;
+ this.heapSize = 0;
+ discarded.clear();
+ }
+
+ protected void addHeapSize(long heapSize)
+ {
+ this.heapSize += heapSize;
+ }
+
+ protected void swap(Cell old, Cell updated)
+ {
+ dataSize += updated.dataSize() - old.dataSize();
+ heapSize += updated.excessHeapSizeExcludingData() - old.excessHeapSizeExcludingData();
+ discarded.add(old);
+ }
+
+ protected void insert(Cell insert)
+ {
+ this.dataSize += insert.dataSize();
+ this.heapSize += insert.excessHeapSizeExcludingData();
+ }
+
+ private void abort(Cell neverUsed)
+ {
+ if (aborted == null)
+ aborted = new ArrayList<>();
+ aborted.add(neverUsed);
+ }
+
+ public long dataSize()
+ {
+ return dataSize;
+ }
+
+ public long excessHeapSize()
+ {
+ return heapSize;
+ }
+
+ public Iterable<Cell> reclaimed()
+ {
+ if (aborted == null)
+ return discarded;
+ return Iterables.concat(discarded, aborted);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/fe4247e5/src/java/org/apache/cassandra/db/DeletionInfo.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/fe4247e5/src/java/org/apache/cassandra/db/Memtable.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/Memtable.java
index 49a3f92,01170d6..412a0a8
--- a/src/java/org/apache/cassandra/db/Memtable.java
+++ b/src/java/org/apache/cassandra/db/Memtable.java
@@@ -190,20 -195,9 +190,18 @@@ public class Memtabl
}
}
- sizeDelta = previous.addAllWithSizeDelta(cf, allocator, localCopyFunction, indexer);
- currentSize.addAndGet(sizeDelta);
+ ContextAllocator contextAllocator = allocator.wrap(opGroup, cfs);
+ AtomicBTreeColumns.Delta delta = previous.addAllWithSizeDelta(cf, contextAllocator, contextAllocator, indexer, new AtomicBTreeColumns.Delta());
+ liveDataSize.addAndGet(delta.dataSize());
- currentOperations.addAndGet((cf.getColumnCount() == 0)
- ? cf.isMarkedForDelete() ? 1 : 0
- : cf.getColumnCount());
+ currentOperations.addAndGet(cf.getColumnCount() + (cf.isMarkedForDelete() ? 1 : 0) + cf.deletionInfo().rangeCount());
+
+ // allocate or free the delta in column overhead after the fact
+ for (Cell cell : delta.reclaimed())
+ {
+ cell.name.free(allocator);
+ allocator.free(cell.value);
+ }
+ allocator.allocate((int) delta.excessHeapSize(), opGroup);
}
// for debugging
[2/3] git commit: Merge branch 'cassandra-1.2' into cassandra-2.0
Posted by sl...@apache.org.
Merge branch 'cassandra-1.2' into cassandra-2.0
Conflicts:
CHANGES.txt
src/java/org/apache/cassandra/db/AtomicSortedColumns.java
src/java/org/apache/cassandra/db/DeletionInfo.java
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/58e94818
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/58e94818
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/58e94818
Branch: refs/heads/trunk
Commit: 58e948185e214dbdc68e4ce533edb4dfa5430b51
Parents: 49bb972 adcb713
Author: Sylvain Lebresne <sy...@datastax.com>
Authored: Wed Feb 5 18:42:00 2014 +0100
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Wed Feb 5 18:42:00 2014 +0100
----------------------------------------------------------------------
CHANGES.txt | 5 +++++
.../org/apache/cassandra/db/AtomicSortedColumns.java | 7 ++++++-
src/java/org/apache/cassandra/db/DeletionInfo.java | 14 ++++++++++++++
src/java/org/apache/cassandra/db/Memtable.java | 10 ++++++----
4 files changed, 31 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/58e94818/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 9599e56,cfdd148..bba5f20
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,29 -1,21 +1,34 @@@
-1.2.16
++2.0.6
++Merged from 1.2:
+ * Fix partition and range deletes not triggering flush (CASSANDRA-6655)
+
-1.2.15
- * Move handling of migration event source to solve bootstrap race (CASSANDRA-6648)
- * Make sure compaction throughput value doesn't overflow with int math (CASSANDRA-6647)
-
+
-1.2.14
- * Reverted code to limit CQL prepared statement cache by size (CASSANDRA-6592)
- * add cassandra.default_messaging_version property to allow easier
- upgrading from 1.1 (CASSANDRA-6619)
- * Allow executing CREATE statements multiple times (CASSANDRA-6471)
- * Don't send confusing info with timeouts (CASSANDRA-6491)
- * Don't resubmit counter mutation runnables internally (CASSANDRA-6427)
- * Don't drop local mutations without a hint (CASSANDRA-6510)
- * Don't allow null max_hint_window_in_ms (CASSANDRA-6419)
- * Validate SliceRange start and finish lengths (CASSANDRA-6521)
+2.0.5
+ * Reduce garbage generated by bloom filter lookups (CASSANDRA-6609)
+ * Add ks.cf names to tombstone logging (CASSANDRA-6597)
+ * Use LOCAL_QUORUM for LWT operations at LOCAL_SERIAL (CASSANDRA-6495)
+ * Wait for gossip to settle before accepting client connections (CASSANDRA-4288)
+ * Delete unfinished compaction incrementally (CASSANDRA-6086)
+ * Allow specifying custom secondary index options in CQL3 (CASSANDRA-6480)
+ * Improve replica pinning for cache efficiency in DES (CASSANDRA-6485)
+ * Fix LOCAL_SERIAL from thrift (CASSANDRA-6584)
+ * Don't special case received counts in CAS timeout exceptions (CASSANDRA-6595)
+ * Add support for 2.1 global counter shards (CASSANDRA-6505)
+ * Fix NPE when streaming connection is not yet established (CASSANDRA-6210)
+ * Avoid rare duplicate read repair triggering (CASSANDRA-6606)
+ * Fix paging discardFirst (CASSANDRA-6555)
+ * Fix ArrayIndexOutOfBoundsException in 2ndary index query (CASSANDRA-6470)
+ * Release sstables upon rebuilding 2i (CASSANDRA-6635)
+ * Add AbstractCompactionStrategy.startup() method (CASSANDRA-6637)
+ * SSTableScanner may skip rows during cleanup (CASSANDRA-6638)
+ * sstables from stalled repair sessions can resurrect deleted data (CASSANDRA-6503)
+ * Switch stress to use ITransportFactory (CASSANDRA-6641)
+ * Fix IllegalArgumentException during prepare (CASSANDRA-6592)
+ * Fix possible loss of 2ndary index entries during compaction (CASSANDRA-6517)
+ * Fix direct Memory on architectures that do not support unaligned long access
+ (CASSANDRA-6628)
+ * Let scrub optionally skip broken counter partitions (CASSANDRA-5930)
+Merged from 1.2:
* fsync compression metadata (CASSANDRA-6531)
* Validate CF existence on execution for prepared statement (CASSANDRA-6535)
* Add ability to throttle batchlog replay (CASSANDRA-6550)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/58e94818/src/java/org/apache/cassandra/db/AtomicSortedColumns.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/AtomicSortedColumns.java
index 1c0bf1b,d6c861b..d3a979c
--- a/src/java/org/apache/cassandra/db/AtomicSortedColumns.java
+++ b/src/java/org/apache/cassandra/db/AtomicSortedColumns.java
@@@ -178,19 -194,15 +178,24 @@@ public class AtomicSortedColumns extend
{
sizeDelta = 0;
current = ref.get();
- DeletionInfo newDelInfo = current.deletionInfo.copy().add(cm.deletionInfo());
+ DeletionInfo newDelInfo = current.deletionInfo;
- if (cm.getDeletionInfo().mayModify(newDelInfo))
++ if (cm.deletionInfo().mayModify(newDelInfo))
+ {
- newDelInfo = current.deletionInfo.copy().add(cm.getDeletionInfo());
++ newDelInfo = current.deletionInfo.copy().add(cm.deletionInfo());
+ sizeDelta += newDelInfo.dataSize() - current.deletionInfo.dataSize();
+ }
modified = new Holder(current.map.clone(), newDelInfo);
- for (IColumn column : cm.getSortedColumns())
+ if (cm.deletionInfo().hasRanges())
+ {
+ for (Column currentColumn : Iterables.concat(current.map.values(), cm))
+ {
+ if (cm.deletionInfo().isDeleted(currentColumn))
+ indexer.remove(currentColumn);
+ }
+ }
+
+ for (Column column : cm)
{
sizeDelta += modified.addColumn(transformation.apply(column), allocator, indexer);
// bail early if we know we've been beaten
http://git-wip-us.apache.org/repos/asf/cassandra/blob/58e94818/src/java/org/apache/cassandra/db/DeletionInfo.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/DeletionInfo.java
index 13fc824,91af9fd..23f46bf
--- a/src/java/org/apache/cassandra/db/DeletionInfo.java
+++ b/src/java/org/apache/cassandra/db/DeletionInfo.java
@@@ -266,11 -216,20 +266,25 @@@ public class DeletionInf
return size + (ranges == null ? 0 : ranges.dataSize());
}
+ public boolean hasRanges()
+ {
+ return ranges != null && !ranges.isEmpty();
+ }
+
+ public int rangeCount()
+ {
- return ranges == null ? 0 : ranges.size();
++ return hasRanges() ? ranges.size() : 0;
+ }
+
+ /**
+ * Whether this deletion info may modify the provided one if added to it.
+ */
+ public boolean mayModify(DeletionInfo delInfo)
+ {
+ return topLevel.markedForDeleteAt > delInfo.topLevel.markedForDeleteAt
- || ranges == null;
++ || hasRanges();
+ }
+
@Override
public String toString()
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/58e94818/src/java/org/apache/cassandra/db/Memtable.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/Memtable.java
index 1ca7e39,b229060..01170d6
--- a/src/java/org/apache/cassandra/db/Memtable.java
+++ b/src/java/org/apache/cassandra/db/Memtable.java
@@@ -180,22 -190,25 +180,24 @@@ public class Memtabl
private void resolve(DecoratedKey key, ColumnFamily cf, SecondaryIndexManager.Updater indexer)
{
- ColumnFamily previous = columnFamilies.get(key);
+ AtomicSortedColumns previous = rows.get(key);
+ long sizeDelta = 0;
if (previous == null)
{
- // AtomicSortedColumns doesn't work for super columns (see #3821)
- ColumnFamily empty = cf.cloneMeShallow(cf.isSuper() ? ThreadSafeSortedColumns.factory() : AtomicSortedColumns.factory(), false);
+ AtomicSortedColumns empty = cf.cloneMeShallow(AtomicSortedColumns.factory, false);
// We'll add the columns later. This avoids wasting works if we get beaten in the putIfAbsent
- previous = columnFamilies.putIfAbsent(new DecoratedKey(key.token, allocator.clone(key.key)), empty);
+ previous = rows.putIfAbsent(new DecoratedKey(key.token, allocator.clone(key.key)), empty);
if (previous == null)
+ {
previous = empty;
+ sizeDelta += empty.deletionInfo().dataSize();
+ }
}
- long sizeDelta = previous.addAllWithSizeDelta(cf, allocator, localCopyFunction, indexer);
+ sizeDelta = previous.addAllWithSizeDelta(cf, allocator, localCopyFunction, indexer);
currentSize.addAndGet(sizeDelta);
- currentOperations.addAndGet((cf.getColumnCount() == 0)
- ? cf.isMarkedForDelete() ? 1 : 0
- : cf.getColumnCount());
+ currentOperations.addAndGet(cf.getColumnCount() + (cf.isMarkedForDelete() ? 1 : 0) + cf.deletionInfo().rangeCount());
}
// for debugging