You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2012/07/03 18:50:32 UTC

[3/3] git commit: use data size ratio in liveRatio instead of live size : serialized throughput patch by jbellis; reviewed by slebresne for CASSANDRA-4399

use data size ratio in liveRatio instead of live size : serialized throughput
patch by jbellis; reviewed by slebresne for CASSANDRA-4399


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/67dec69f
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/67dec69f
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/67dec69f

Branch: refs/heads/cassandra-1.1
Commit: 67dec69f53d2bfd3818fea4ede40e5d5a6b2356b
Parents: 8674784
Author: Jonathan Ellis <jb...@apache.org>
Authored: Mon Jul 2 01:40:38 2012 -0500
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Tue Jul 3 11:44:46 2012 -0500

----------------------------------------------------------------------
 .../cassandra/db/AbstractColumnContainer.java      |    8 ++-
 .../db/AbstractThreadUnsafeSortedColumns.java      |    6 +++
 .../apache/cassandra/db/AtomicSortedColumns.java   |   31 +++++++++++++-
 .../org/apache/cassandra/db/ISortedColumns.java    |    7 +++
 src/java/org/apache/cassandra/db/Memtable.java     |   24 +++++------
 5 files changed, 57 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/67dec69f/src/java/org/apache/cassandra/db/AbstractColumnContainer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/AbstractColumnContainer.java b/src/java/org/apache/cassandra/db/AbstractColumnContainer.java
index c7922b1..c35c63c 100644
--- a/src/java/org/apache/cassandra/db/AbstractColumnContainer.java
+++ b/src/java/org/apache/cassandra/db/AbstractColumnContainer.java
@@ -84,9 +84,11 @@ public abstract class AbstractColumnContainer implements IColumnContainer, IIter
         columns.maybeResetDeletionTimes(gcBefore);
     }
 
-    /**
-     * We need to go through each column in the column container and resolve it before adding
-     */
+    public long addAllWithSizeDelta(AbstractColumnContainer cc, Allocator allocator, Function<IColumn, IColumn> transformation)
+    {
+        return columns.addAllWithSizeDelta(cc.columns, allocator, transformation);
+    }
+
     public void addAll(AbstractColumnContainer cc, Allocator allocator, Function<IColumn, IColumn> transformation)
     {
         columns.addAll(cc.columns, allocator, transformation);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/67dec69f/src/java/org/apache/cassandra/db/AbstractThreadUnsafeSortedColumns.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/AbstractThreadUnsafeSortedColumns.java b/src/java/org/apache/cassandra/db/AbstractThreadUnsafeSortedColumns.java
index b09b5ee..1360336 100644
--- a/src/java/org/apache/cassandra/db/AbstractThreadUnsafeSortedColumns.java
+++ b/src/java/org/apache/cassandra/db/AbstractThreadUnsafeSortedColumns.java
@@ -93,6 +93,12 @@ public abstract class AbstractThreadUnsafeSortedColumns implements ISortedColumn
     // having to care about the deletion infos
     protected abstract void addAllColumns(ISortedColumns columns, Allocator allocator, Function<IColumn, IColumn> transformation);
 
+    public long addAllWithSizeDelta(ISortedColumns cm, Allocator allocator, Function<IColumn, IColumn> transformation)
+    {
+        // sizeDelta is only needed by memtable updates which should not be using thread-unsafe containers
+        throw new UnsupportedOperationException();
+    }
+
     public void addAll(ISortedColumns columns, Allocator allocator, Function<IColumn, IColumn> transformation)
     {
         addAllColumns(columns, allocator, transformation);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/67dec69f/src/java/org/apache/cassandra/db/AtomicSortedColumns.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/AtomicSortedColumns.java b/src/java/org/apache/cassandra/db/AtomicSortedColumns.java
index 5fdc0f6..9cb44d2 100644
--- a/src/java/org/apache/cassandra/db/AtomicSortedColumns.java
+++ b/src/java/org/apache/cassandra/db/AtomicSortedColumns.java
@@ -154,6 +154,11 @@ public class AtomicSortedColumns implements ISortedColumns
 
     public void addAll(ISortedColumns cm, Allocator allocator, Function<IColumn, IColumn> transformation)
     {
+        addAllWithSizeDelta(cm, allocator, transformation);
+    }
+
+    public long addAllWithSizeDelta(ISortedColumns cm, Allocator allocator, Function<IColumn, IColumn> transformation)
+    {
         /*
          * This operation needs to atomicity and isolation. To that end, we
          * add the new column to a copy of the map (a cheap O(1) snapTree
@@ -166,9 +171,12 @@ public class AtomicSortedColumns implements ISortedColumns
          * we bail early, avoiding unnecessary work if possible.
          */
         Holder current, modified;
+        long sizeDelta;
+
         main_loop:
         do
         {
+            sizeDelta = 0;
             current = ref.get();
             DeletionInfo newDelInfo = current.deletionInfo;
             if (newDelInfo.markedForDeleteAt < cm.getDeletionInfo().markedForDeleteAt)
@@ -177,13 +185,15 @@ public class AtomicSortedColumns implements ISortedColumns
 
             for (IColumn column : cm.getSortedColumns())
             {
-                modified.addColumn(transformation.apply(column), allocator);
+                sizeDelta += modified.addColumn(transformation.apply(column), allocator);
                 // bail early if we know we've been beaten
                 if (ref.get() != current)
                     continue main_loop;
             }
         }
         while (!ref.compareAndSet(current, modified));
+
+        return sizeDelta;
     }
 
     public boolean replace(IColumn oldColumn, IColumn newColumn)
@@ -329,16 +339,26 @@ public class AtomicSortedColumns implements ISortedColumns
             return new Holder(new SnapTreeMap<ByteBuffer, IColumn>(map.comparator()), deletionInfo);
         }
 
-        void addColumn(IColumn column, Allocator allocator)
+        long addColumn(IColumn column, Allocator allocator)
         {
             ByteBuffer name = column.name();
             IColumn oldColumn;
-            while ((oldColumn = map.putIfAbsent(name, column)) != null)
+            long sizeDelta = 0;
+            while (true)
             {
+                oldColumn = map.putIfAbsent(name, column);
+                if (oldColumn == null)
+                {
+                    sizeDelta += column.serializedSize();
+                    break;
+                }
+
                 if (oldColumn instanceof SuperColumn)
                 {
                     assert column instanceof SuperColumn;
+                    long previousSize = oldColumn.serializedSize();
                     ((SuperColumn) oldColumn).putColumn((SuperColumn)column, allocator);
+                    sizeDelta += oldColumn.serializedSize() - previousSize;
                     break;  // Delegated to SuperColumn
                 }
                 else
@@ -346,12 +366,17 @@ public class AtomicSortedColumns implements ISortedColumns
                     // calculate reconciled col from old (existing) col and new col
                     IColumn reconciledColumn = column.reconcile(oldColumn, allocator);
                     if (map.replace(name, oldColumn, reconciledColumn))
+                    {
+                        sizeDelta += reconciledColumn.serializedSize() - oldColumn.serializedSize();
                         break;
+                    }
 
                     // We failed to replace column due to a concurrent update or a concurrent removal. Keep trying.
                     // (Currently, concurrent removal should not happen (only updates), but let us support that anyway.)
                 }
             }
+
+            return sizeDelta;
         }
 
         void retainAll(ISortedColumns columns)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/67dec69f/src/java/org/apache/cassandra/db/ISortedColumns.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ISortedColumns.java b/src/java/org/apache/cassandra/db/ISortedColumns.java
index 1200544..00c8ea5 100644
--- a/src/java/org/apache/cassandra/db/ISortedColumns.java
+++ b/src/java/org/apache/cassandra/db/ISortedColumns.java
@@ -68,6 +68,13 @@ public interface ISortedColumns extends IIterableColumns
      *      add(c);
      *   </code>
      *  but is potentially faster.
+     *
+     *  @return the difference in size seen after merging the given columns
+     */
+    public long addAllWithSizeDelta(ISortedColumns cm, Allocator allocator, Function<IColumn, IColumn> transformation);
+
+    /**
+     * Adds the columns without necessarily computing the size delta
      */
     public void addAll(ISortedColumns cm, Allocator allocator, Function<IColumn, IColumn> transformation);
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/67dec69f/src/java/org/apache/cassandra/db/Memtable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Memtable.java b/src/java/org/apache/cassandra/db/Memtable.java
index f1dcc56..347b997 100644
--- a/src/java/org/apache/cassandra/db/Memtable.java
+++ b/src/java/org/apache/cassandra/db/Memtable.java
@@ -80,7 +80,7 @@ public class Memtable
     volatile static Memtable activelyMeasuring;
 
     private volatile boolean isFrozen;
-    private final AtomicLong currentThroughput = new AtomicLong(0);
+    private final AtomicLong currentSize = new AtomicLong(0);
     private final AtomicLong currentOperations = new AtomicLong(0);
 
     // We index the memtable by RowPosition only for the purpose of being able
@@ -122,12 +122,12 @@ public class Memtable
     {
         // 25% fudge factor on the base throughput * liveRatio calculation.  (Based on observed
         // pre-slabbing behavior -- not sure what accounts for this. May have changed with introduction of slabbing.)
-        return (long) (currentThroughput.get() * cfs.liveRatio * 1.25);
+        return (long) (currentSize.get() * cfs.liveRatio * 1.25);
     }
 
     public long getSerializedSize()
     {
-        return currentThroughput.get();
+        return currentSize.get();
     }
 
     public long getOperations()
@@ -190,7 +190,7 @@ public class Memtable
                         deepSize += meter.measureDeep(entry.getKey()) + meter.measureDeep(entry.getValue());
                         objects += entry.getValue().getColumnCount();
                     }
-                    double newRatio = (double) deepSize / currentThroughput.get();
+                    double newRatio = (double) deepSize / currentSize.get();
 
                     if (newRatio < MIN_SANE_LIVE_RATIO)
                     {
@@ -226,12 +226,6 @@ public class Memtable
 
     private void resolve(DecoratedKey key, ColumnFamily cf)
     {
-        currentThroughput.addAndGet(cf.size());
-        currentOperations.addAndGet((cf.getColumnCount() == 0)
-                                    ? cf.isMarkedForDelete() ? 1 : 0
-                                    : cf.getColumnCount());
-
-
         ColumnFamily previous = columnFamilies.get(key);
 
         if (previous == null)
@@ -244,7 +238,11 @@ public class Memtable
                 previous = empty;
         }
 
-        previous.addAll(cf, allocator, localCopyFunction);
+        long sizeDelta = previous.addAllWithSizeDelta(cf, allocator, localCopyFunction);
+        currentSize.addAndGet(sizeDelta);
+        currentOperations.addAndGet((cf.getColumnCount() == 0)
+                                    ? cf.isMarkedForDelete() ? 1 : 0
+                                    : cf.getColumnCount());
     }
 
     // for debugging
@@ -274,7 +272,7 @@ public class Memtable
         }
         long estimatedSize = (long) ((keySize // index entries
                                       + keySize // keys in data file
-                                      + currentThroughput.get()) // data
+                                      + currentSize.get()) // data
                                      * 1.2); // bloom filter and row index overhead
         SSTableReader ssTable;
         // errors when creating the writer that may leave empty temp files.
@@ -325,7 +323,7 @@ public class Memtable
     public String toString()
     {
         return String.format("Memtable-%s@%s(%s/%s serialized/live bytes, %s ops)",
-                             cfs.getColumnFamilyName(), hashCode(), currentThroughput, getLiveSize(), currentOperations);
+                             cfs.getColumnFamilyName(), hashCode(), currentSize, getLiveSize(), currentOperations);
     }
 
     /**