You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2010/01/07 06:47:06 UTC

svn commit: r896756 - in /incubator/cassandra/trunk/src/java/org/apache/cassandra: db/ColumnFamily.java db/ColumnFamilySerializer.java db/Memtable.java db/SuperColumn.java utils/FBUtilities.java

Author: jbellis
Date: Thu Jan  7 05:47:05 2010
New Revision: 896756

URL: http://svn.apache.org/viewvc?rev=896756&view=rev
Log:
replace sharded row locks with column-level locking
patch by jbellis; tested by Brandon Williams for CASSANDRA-658

Modified:
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamily.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/SuperColumn.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/utils/FBUtilities.java

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamily.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamily.java?rev=896756&r1=896755&r2=896756&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamily.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamily.java Thu Jan  7 05:47:05 2010
@@ -20,14 +20,14 @@
 
 import java.util.Collection;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.SortedSet;
-import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.ConcurrentSkipListMap;
 import java.security.MessageDigest;
 import java.security.NoSuchAlgorithmException;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.log4j.Logger;
 
@@ -35,6 +35,7 @@
 import org.apache.cassandra.io.ICompactSerializer2;
 import org.apache.cassandra.db.filter.QueryPath;
 import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.utils.FBUtilities;
 
 
 public final class ColumnFamily implements IColumnContainer
@@ -78,8 +79,8 @@
     private String name_;
 
     private transient ICompactSerializer2<IColumn> columnSerializer_;
-    long markedForDeleteAt = Long.MIN_VALUE;
-    int localDeletionTime = Integer.MIN_VALUE;
+    AtomicLong markedForDeleteAt = new AtomicLong(Long.MIN_VALUE);
+    AtomicInteger localDeletionTime = new AtomicInteger(Integer.MIN_VALUE);
     private ConcurrentSkipListMap<byte[], IColumn> columns_;
 
     public ColumnFamily(String cfName, String columnType, AbstractType comparator, AbstractType subcolumnComparator)
@@ -190,26 +191,23 @@
     public void addColumn(IColumn column)
     {
         byte[] name = column.name();
-        IColumn oldColumn = columns_.get(name);
+        IColumn oldColumn = columns_.putIfAbsent(name, column);
         if (oldColumn != null)
         {
             if (oldColumn instanceof SuperColumn)
             {
-                int oldSize = oldColumn.size();
                 ((SuperColumn) oldColumn).putColumn(column);
             }
             else
             {
-                if (((Column)oldColumn).comparePriority((Column)column) <= 0)
+                while (((Column) oldColumn).comparePriority((Column)column) <= 0)
                 {
-                    columns_.put(name, column);
+                    if (columns_.replace(name, oldColumn, column))
+                        break;
+                    oldColumn = columns_.get(name);
                 }
             }
         }
-        else
-        {
-            columns_.put(name, column);
-        }
     }
 
     public IColumn getColumn(byte[] name)
@@ -237,21 +235,22 @@
     	columns_.remove(columnName);
     }
 
+    @Deprecated // TODO this is a hack to set initial value outside constructor
     public void delete(int localtime, long timestamp)
     {
-        localDeletionTime = localtime;
-        markedForDeleteAt = timestamp;
+        localDeletionTime.set(localtime);
+        markedForDeleteAt.set(timestamp);
     }
 
     public void delete(ColumnFamily cf2)
     {
-        delete(Math.max(getLocalDeletionTime(), cf2.getLocalDeletionTime()),
-               Math.max(getMarkedForDeleteAt(), cf2.getMarkedForDeleteAt()));
+        FBUtilities.atomicSetMax(localDeletionTime, cf2.getLocalDeletionTime());
+        FBUtilities.atomicSetMax(markedForDeleteAt, cf2.getMarkedForDeleteAt());
     }
 
     public boolean isMarkedForDelete()
     {
-        return markedForDeleteAt > Long.MIN_VALUE;
+        return markedForDeleteAt.get() > Long.MIN_VALUE;
     }
 
     /*
@@ -367,12 +366,12 @@
 
     public long getMarkedForDeleteAt()
     {
-        return markedForDeleteAt;
+        return markedForDeleteAt.get();
     }
 
     public int getLocalDeletionTime()
     {
-        return localDeletionTime;
+        return localDeletionTime.get();
     }
 
     public String type()

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java?rev=896756&r1=896755&r2=896756&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java Thu Jan  7 05:47:05 2010
@@ -77,8 +77,8 @@
     {
         try
         {
-            dos.writeInt(columnFamily.localDeletionTime);
-            dos.writeLong(columnFamily.markedForDeleteAt);
+            dos.writeInt(columnFamily.localDeletionTime.get());
+            dos.writeLong(columnFamily.markedForDeleteAt.get());
 
             Collection<IColumn> columns = columnFamily.getSortedColumns();
             dos.writeInt(columns.size());

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java?rev=896756&r1=896755&r2=896756&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java Thu Jan  7 05:47:05 2010
@@ -53,9 +53,7 @@
     private final String table_;
     private final String cfName_;
     private final long creationTime_;
-    // we use NBHM with manual locking, so reads are automatically threadsafe but write merging is serialized per key
     private final NonBlockingHashMap<DecoratedKey, ColumnFamily> columnFamilies_ = new NonBlockingHashMap<DecoratedKey, ColumnFamily>();
-    private final Object[] keyLocks;
     private final IPartitioner partitioner_ = StorageService.getPartitioner();
 
     Memtable(String table, String cfName)
@@ -63,11 +61,6 @@
         table_ = table;
         cfName_ = cfName;
         creationTime_ = System.currentTimeMillis();
-        keyLocks = new Object[Runtime.getRuntime().availableProcessors() * 8];
-        for (int i = 0; i < keyLocks.length; i++)
-        {
-            keyLocks[i] = new Object();
-        }
     }
 
     public boolean isFlushed()
@@ -135,17 +128,15 @@
 
     private void resolve(String key, ColumnFamily cf)
     {
-        DecoratedKey decoratedKey = partitioner_.decorateKey(key);
         currentThroughput_.addAndGet(cf.size());
         currentOperations.addAndGet(cf.getColumnCount());
+
+        DecoratedKey decoratedKey = partitioner_.decorateKey(key);
         ColumnFamily oldCf = columnFamilies_.putIfAbsent(decoratedKey, cf);
         if (oldCf == null)
             return;
 
-        synchronized (keyLocks[Math.abs(key.hashCode() % keyLocks.length)])
-        {
-            oldCf.resolve(cf);
-        }
+        oldCf.resolve(cf);
     }
 
     // for debugging

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/SuperColumn.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/SuperColumn.java?rev=896756&r1=896755&r2=896756&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/SuperColumn.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/SuperColumn.java Thu Jan  7 05:47:05 2010
@@ -20,15 +20,17 @@
 
 import java.io.*;
 import java.util.Collection;
-import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.ConcurrentSkipListMap;
 import java.security.MessageDigest;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.log4j.Logger;
 
 import org.apache.cassandra.io.ICompactSerializer2;
 import org.apache.cassandra.io.util.DataOutputBuffer;
 import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.utils.FBUtilities;
 
 
 public final class SuperColumn implements IColumn, IColumnContainer
@@ -42,9 +44,8 @@
 
     private byte[] name_;
     private ConcurrentSkipListMap<byte[], IColumn> columns_;
-    private int localDeletionTime = Integer.MIN_VALUE;
-	private long markedForDeleteAt = Long.MIN_VALUE;
-    private AtomicInteger size_ = new AtomicInteger(0);
+    private AtomicInteger localDeletionTime = new AtomicInteger(Integer.MIN_VALUE);
+	private AtomicLong markedForDeleteAt = new AtomicLong(Long.MIN_VALUE);
 
     SuperColumn(byte[] name, AbstractType comparator)
     {
@@ -67,21 +68,20 @@
     public SuperColumn cloneMeShallow()
     {
         SuperColumn sc = new SuperColumn(name_, getComparator());
-        sc.markForDeleteAt(localDeletionTime, markedForDeleteAt);
+        sc.markForDeleteAt(localDeletionTime.get(), markedForDeleteAt.get());
         return sc;
     }
 
-
     public IColumn cloneMe()
     {
         SuperColumn sc = new SuperColumn(name_, new ConcurrentSkipListMap<byte[], IColumn>(columns_));
-        sc.markForDeleteAt(localDeletionTime, markedForDeleteAt);
+        sc.markForDeleteAt(localDeletionTime.get(), markedForDeleteAt.get());
         return sc;
     }
 
 	public boolean isMarkedForDelete()
 	{
-		return markedForDeleteAt > Long.MIN_VALUE;
+		return markedForDeleteAt.get() > Long.MIN_VALUE;
 	}
 
     public byte[] name()
@@ -101,15 +101,17 @@
         return column;
     }
 
+    /**
+     * This calculates the exact size of the sub columns on the fly
+     */
     public int size()
     {
-        /*
-         * return the size of the individual columns
-         * that make up the super column. This is an
-         * APPROXIMATION of the size used only from the
-         * Memtable.
-        */
-        return size_.get();
+        int size = 0;
+        for (IColumn subColumn : getSubColumns())
+        {
+            size += subColumn.serializedSize();
+        }
+        return size;
     }
 
     /**
@@ -122,20 +124,7 @@
     	 * We need to keep the way we are calculating the column size in sync with the
     	 * way we are calculating the size for the column family serializer.
     	 */
-    	return IColumn.UtfPrefix_ + name_.length + DBConstants.intSize_ + DBConstants.longSize_ + DBConstants.intSize_ + getSizeOfAllColumns();
-    }
-
-    /**
-     * This calculates the exact size of the sub columns on the fly
-     */
-    int getSizeOfAllColumns()
-    {
-        int size = 0;
-        for (IColumn subColumn : getSubColumns())
-        {
-            size += subColumn.serializedSize();
-        }
-        return size;
+    	return IColumn.UtfPrefix_ + name_.length + DBConstants.intSize_ + DBConstants.longSize_ + DBConstants.intSize_ + size();
     }
 
     public void remove(byte[] columnName)
@@ -152,9 +141,9 @@
     {
     	IColumn column = columns_.get(columnName);
     	assert column instanceof Column;
-    	if ( column != null )
-    		return column.timestamp();
-    	throw new IllegalArgumentException("Timestamp was requested for a column that does not exist.");
+        if ( column != null )
+            return column.timestamp();
+        throw new IllegalArgumentException("Timestamp was requested for a column that does not exist.");
     }
 
     public long mostRecentLiveChangeAt()
@@ -187,22 +176,15 @@
     {
     	if (!(column instanceof Column))
     		throw new UnsupportedOperationException("A super column can only contain simple columns.");
-    	IColumn oldColumn = columns_.get(column.name());
-    	if ( oldColumn == null )
+        byte[] name = column.name();
+        IColumn oldColumn = columns_.putIfAbsent(name, column);
+    	if (oldColumn != null)
         {
-    		columns_.put(column.name(), column);
-            size_.addAndGet(column.size());
-        }
-    	else
-    	{
-    		if (((Column)oldColumn).comparePriority((Column)column) <= 0)
+    		while (((Column)oldColumn).comparePriority((Column)column) <= 0)
             {
-    			columns_.put(column.name(), column);
-                int delta = (-1)*oldColumn.size();
-                /* subtract the size of the oldColumn */
-                size_.addAndGet(delta);
-                /* add the size of the new column */
-                size_.addAndGet(column.size());
+    			if (columns_.replace(name, oldColumn, column))
+                    break;
+                oldColumn = columns_.get(name);
             }
     	}
     }
@@ -219,10 +201,7 @@
         {
         	addColumn(subColumn);
         }
-        if (column.getMarkedForDeleteAt() > markedForDeleteAt)
-        {
-            markForDeleteAt(column.getLocalDeletionTime(),  column.getMarkedForDeleteAt());
-        }
+        FBUtilities.atomicSetMax(markedForDeleteAt, column.getMarkedForDeleteAt());
     }
 
     public int getObjectCount()
@@ -230,13 +209,9 @@
     	return 1 + columns_.size();
     }
 
-    public long getMarkedForDeleteAt() {
-        return markedForDeleteAt;
-    }
-
-    int getColumnCount()
+    public long getMarkedForDeleteAt()
     {
-    	return columns_.size();
+        return markedForDeleteAt.get();
     }
 
     public IColumn diff(IColumn columnNew)
@@ -280,7 +255,7 @@
         DataOutputBuffer buffer = new DataOutputBuffer();
         try
         {
-            buffer.writeLong(markedForDeleteAt);
+            buffer.writeLong(markedForDeleteAt.get());
         }
         catch (IOException e)
         {
@@ -312,13 +287,14 @@
 
     public int getLocalDeletionTime()
     {
-        return localDeletionTime;
+        return localDeletionTime.get();
     }
 
+    @Deprecated // TODO this is a hack to set initial value outside constructor
     public void markForDeleteAt(int localDeleteTime, long timestamp)
     {
-        this.localDeletionTime = localDeleteTime;
-        this.markedForDeleteAt = timestamp;
+        this.localDeletionTime.set(localDeleteTime);
+        this.markedForDeleteAt.set(timestamp);
     }
 }
 

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/utils/FBUtilities.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/utils/FBUtilities.java?rev=896756&r1=896755&r2=896756&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/utils/FBUtilities.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/utils/FBUtilities.java Thu Jan  7 05:47:05 2010
@@ -24,6 +24,8 @@
 import java.net.UnknownHostException;
 import java.security.MessageDigest;
 import java.util.*;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.zip.DataFormatException;
 import java.util.zip.Deflater;
 import java.util.zip.Inflater;
@@ -318,4 +320,26 @@
             }
         });
     }
+
+    public static void atomicSetMax(AtomicInteger atomic, int i)
+    {
+        int j;
+        while (true)
+        {
+            if ((j = atomic.getAndSet(i)) <= i)
+                break;
+            i = j;
+        }
+    }
+
+    public static void atomicSetMax(AtomicLong atomic, long i)
+    {
+        long j;
+        while (true)
+        {
+            if ((j = atomic.getAndSet(i)) <= i)
+                break;
+            i = j;
+        }
+    }
 }