You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2012/07/03 18:50:32 UTC
[2/3] git commit: use data size ratio in liveRatio instead of live
size : serialized throughput patch by jbellis;
reviewed by slebresne for CASSANDRA-4399
use data size ratio in liveRatio instead of live size : serialized throughput
patch by jbellis; reviewed by slebresne for CASSANDRA-4399
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/67dec69f
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/67dec69f
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/67dec69f
Branch: refs/heads/trunk
Commit: 67dec69f53d2bfd3818fea4ede40e5d5a6b2356b
Parents: 8674784
Author: Jonathan Ellis <jb...@apache.org>
Authored: Mon Jul 2 01:40:38 2012 -0500
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Tue Jul 3 11:44:46 2012 -0500
----------------------------------------------------------------------
.../cassandra/db/AbstractColumnContainer.java | 8 ++-
.../db/AbstractThreadUnsafeSortedColumns.java | 6 +++
.../apache/cassandra/db/AtomicSortedColumns.java | 31 +++++++++++++-
.../org/apache/cassandra/db/ISortedColumns.java | 7 +++
src/java/org/apache/cassandra/db/Memtable.java | 24 +++++------
5 files changed, 57 insertions(+), 19 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/67dec69f/src/java/org/apache/cassandra/db/AbstractColumnContainer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/AbstractColumnContainer.java b/src/java/org/apache/cassandra/db/AbstractColumnContainer.java
index c7922b1..c35c63c 100644
--- a/src/java/org/apache/cassandra/db/AbstractColumnContainer.java
+++ b/src/java/org/apache/cassandra/db/AbstractColumnContainer.java
@@ -84,9 +84,11 @@ public abstract class AbstractColumnContainer implements IColumnContainer, IIter
columns.maybeResetDeletionTimes(gcBefore);
}
- /**
- * We need to go through each column in the column container and resolve it before adding
- */
+ public long addAllWithSizeDelta(AbstractColumnContainer cc, Allocator allocator, Function<IColumn, IColumn> transformation)
+ {
+ return columns.addAllWithSizeDelta(cc.columns, allocator, transformation);
+ }
+
public void addAll(AbstractColumnContainer cc, Allocator allocator, Function<IColumn, IColumn> transformation)
{
columns.addAll(cc.columns, allocator, transformation);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/67dec69f/src/java/org/apache/cassandra/db/AbstractThreadUnsafeSortedColumns.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/AbstractThreadUnsafeSortedColumns.java b/src/java/org/apache/cassandra/db/AbstractThreadUnsafeSortedColumns.java
index b09b5ee..1360336 100644
--- a/src/java/org/apache/cassandra/db/AbstractThreadUnsafeSortedColumns.java
+++ b/src/java/org/apache/cassandra/db/AbstractThreadUnsafeSortedColumns.java
@@ -93,6 +93,12 @@ public abstract class AbstractThreadUnsafeSortedColumns implements ISortedColumn
// having to care about the deletion infos
protected abstract void addAllColumns(ISortedColumns columns, Allocator allocator, Function<IColumn, IColumn> transformation);
+ public long addAllWithSizeDelta(ISortedColumns cm, Allocator allocator, Function<IColumn, IColumn> transformation)
+ {
+ // sizeDelta is only needed by memtable updates which should not be using thread-unsafe containers
+ throw new UnsupportedOperationException();
+ }
+
public void addAll(ISortedColumns columns, Allocator allocator, Function<IColumn, IColumn> transformation)
{
addAllColumns(columns, allocator, transformation);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/67dec69f/src/java/org/apache/cassandra/db/AtomicSortedColumns.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/AtomicSortedColumns.java b/src/java/org/apache/cassandra/db/AtomicSortedColumns.java
index 5fdc0f6..9cb44d2 100644
--- a/src/java/org/apache/cassandra/db/AtomicSortedColumns.java
+++ b/src/java/org/apache/cassandra/db/AtomicSortedColumns.java
@@ -154,6 +154,11 @@ public class AtomicSortedColumns implements ISortedColumns
public void addAll(ISortedColumns cm, Allocator allocator, Function<IColumn, IColumn> transformation)
{
+ addAllWithSizeDelta(cm, allocator, transformation);
+ }
+
+ public long addAllWithSizeDelta(ISortedColumns cm, Allocator allocator, Function<IColumn, IColumn> transformation)
+ {
/*
* This operation needs to atomicity and isolation. To that end, we
* add the new column to a copy of the map (a cheap O(1) snapTree
@@ -166,9 +171,12 @@ public class AtomicSortedColumns implements ISortedColumns
* we bail early, avoiding unnecessary work if possible.
*/
Holder current, modified;
+ long sizeDelta;
+
main_loop:
do
{
+ sizeDelta = 0;
current = ref.get();
DeletionInfo newDelInfo = current.deletionInfo;
if (newDelInfo.markedForDeleteAt < cm.getDeletionInfo().markedForDeleteAt)
@@ -177,13 +185,15 @@ public class AtomicSortedColumns implements ISortedColumns
for (IColumn column : cm.getSortedColumns())
{
- modified.addColumn(transformation.apply(column), allocator);
+ sizeDelta += modified.addColumn(transformation.apply(column), allocator);
// bail early if we know we've been beaten
if (ref.get() != current)
continue main_loop;
}
}
while (!ref.compareAndSet(current, modified));
+
+ return sizeDelta;
}
public boolean replace(IColumn oldColumn, IColumn newColumn)
@@ -329,16 +339,26 @@ public class AtomicSortedColumns implements ISortedColumns
return new Holder(new SnapTreeMap<ByteBuffer, IColumn>(map.comparator()), deletionInfo);
}
- void addColumn(IColumn column, Allocator allocator)
+ long addColumn(IColumn column, Allocator allocator)
{
ByteBuffer name = column.name();
IColumn oldColumn;
- while ((oldColumn = map.putIfAbsent(name, column)) != null)
+ long sizeDelta = 0;
+ while (true)
{
+ oldColumn = map.putIfAbsent(name, column);
+ if (oldColumn == null)
+ {
+ sizeDelta += column.serializedSize();
+ break;
+ }
+
if (oldColumn instanceof SuperColumn)
{
assert column instanceof SuperColumn;
+ long previousSize = oldColumn.serializedSize();
((SuperColumn) oldColumn).putColumn((SuperColumn)column, allocator);
+ sizeDelta += oldColumn.serializedSize() - previousSize;
break; // Delegated to SuperColumn
}
else
@@ -346,12 +366,17 @@ public class AtomicSortedColumns implements ISortedColumns
// calculate reconciled col from old (existing) col and new col
IColumn reconciledColumn = column.reconcile(oldColumn, allocator);
if (map.replace(name, oldColumn, reconciledColumn))
+ {
+ sizeDelta += reconciledColumn.serializedSize() - oldColumn.serializedSize();
break;
+ }
// We failed to replace column due to a concurrent update or a concurrent removal. Keep trying.
// (Currently, concurrent removal should not happen (only updates), but let us support that anyway.)
}
}
+
+ return sizeDelta;
}
void retainAll(ISortedColumns columns)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/67dec69f/src/java/org/apache/cassandra/db/ISortedColumns.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ISortedColumns.java b/src/java/org/apache/cassandra/db/ISortedColumns.java
index 1200544..00c8ea5 100644
--- a/src/java/org/apache/cassandra/db/ISortedColumns.java
+++ b/src/java/org/apache/cassandra/db/ISortedColumns.java
@@ -68,6 +68,13 @@ public interface ISortedColumns extends IIterableColumns
* add(c);
* </code>
* but is potentially faster.
+ *
+ * @return the difference in size seen after merging the given columns
+ */
+ public long addAllWithSizeDelta(ISortedColumns cm, Allocator allocator, Function<IColumn, IColumn> transformation);
+
+ /**
+ * Adds the columns without necessarily computing the size delta
*/
public void addAll(ISortedColumns cm, Allocator allocator, Function<IColumn, IColumn> transformation);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/67dec69f/src/java/org/apache/cassandra/db/Memtable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Memtable.java b/src/java/org/apache/cassandra/db/Memtable.java
index f1dcc56..347b997 100644
--- a/src/java/org/apache/cassandra/db/Memtable.java
+++ b/src/java/org/apache/cassandra/db/Memtable.java
@@ -80,7 +80,7 @@ public class Memtable
volatile static Memtable activelyMeasuring;
private volatile boolean isFrozen;
- private final AtomicLong currentThroughput = new AtomicLong(0);
+ private final AtomicLong currentSize = new AtomicLong(0);
private final AtomicLong currentOperations = new AtomicLong(0);
// We index the memtable by RowPosition only for the purpose of being able
@@ -122,12 +122,12 @@ public class Memtable
{
// 25% fudge factor on the base throughput * liveRatio calculation. (Based on observed
// pre-slabbing behavior -- not sure what accounts for this. May have changed with introduction of slabbing.)
- return (long) (currentThroughput.get() * cfs.liveRatio * 1.25);
+ return (long) (currentSize.get() * cfs.liveRatio * 1.25);
}
public long getSerializedSize()
{
- return currentThroughput.get();
+ return currentSize.get();
}
public long getOperations()
@@ -190,7 +190,7 @@ public class Memtable
deepSize += meter.measureDeep(entry.getKey()) + meter.measureDeep(entry.getValue());
objects += entry.getValue().getColumnCount();
}
- double newRatio = (double) deepSize / currentThroughput.get();
+ double newRatio = (double) deepSize / currentSize.get();
if (newRatio < MIN_SANE_LIVE_RATIO)
{
@@ -226,12 +226,6 @@ public class Memtable
private void resolve(DecoratedKey key, ColumnFamily cf)
{
- currentThroughput.addAndGet(cf.size());
- currentOperations.addAndGet((cf.getColumnCount() == 0)
- ? cf.isMarkedForDelete() ? 1 : 0
- : cf.getColumnCount());
-
-
ColumnFamily previous = columnFamilies.get(key);
if (previous == null)
@@ -244,7 +238,11 @@ public class Memtable
previous = empty;
}
- previous.addAll(cf, allocator, localCopyFunction);
+ long sizeDelta = previous.addAllWithSizeDelta(cf, allocator, localCopyFunction);
+ currentSize.addAndGet(sizeDelta);
+ currentOperations.addAndGet((cf.getColumnCount() == 0)
+ ? cf.isMarkedForDelete() ? 1 : 0
+ : cf.getColumnCount());
}
// for debugging
@@ -274,7 +272,7 @@ public class Memtable
}
long estimatedSize = (long) ((keySize // index entries
+ keySize // keys in data file
- + currentThroughput.get()) // data
+ + currentSize.get()) // data
* 1.2); // bloom filter and row index overhead
SSTableReader ssTable;
// errors when creating the writer that may leave empty temp files.
@@ -325,7 +323,7 @@ public class Memtable
public String toString()
{
return String.format("Memtable-%s@%s(%s/%s serialized/live bytes, %s ops)",
- cfs.getColumnFamilyName(), hashCode(), currentThroughput, getLiveSize(), currentOperations);
+ cfs.getColumnFamilyName(), hashCode(), currentSize, getLiveSize(), currentOperations);
}
/**