You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2010/01/07 06:47:06 UTC
svn commit: r896756 - in
/incubator/cassandra/trunk/src/java/org/apache/cassandra:
db/ColumnFamily.java db/ColumnFamilySerializer.java db/Memtable.java
db/SuperColumn.java utils/FBUtilities.java
Author: jbellis
Date: Thu Jan 7 05:47:05 2010
New Revision: 896756
URL: http://svn.apache.org/viewvc?rev=896756&view=rev
Log:
replace sharded row locks with column-level locking
patch by jbellis; tested by Brandon Williams for CASSANDRA-658
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamily.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/SuperColumn.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/utils/FBUtilities.java
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamily.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamily.java?rev=896756&r1=896755&r2=896756&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamily.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamily.java Thu Jan 7 05:47:05 2010
@@ -20,14 +20,14 @@
import java.util.Collection;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.SortedSet;
-import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.ConcurrentSkipListMap;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
import org.apache.log4j.Logger;
@@ -35,6 +35,7 @@
import org.apache.cassandra.io.ICompactSerializer2;
import org.apache.cassandra.db.filter.QueryPath;
import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.utils.FBUtilities;
public final class ColumnFamily implements IColumnContainer
@@ -78,8 +79,8 @@
private String name_;
private transient ICompactSerializer2<IColumn> columnSerializer_;
- long markedForDeleteAt = Long.MIN_VALUE;
- int localDeletionTime = Integer.MIN_VALUE;
+ AtomicLong markedForDeleteAt = new AtomicLong(Long.MIN_VALUE);
+ AtomicInteger localDeletionTime = new AtomicInteger(Integer.MIN_VALUE);
private ConcurrentSkipListMap<byte[], IColumn> columns_;
public ColumnFamily(String cfName, String columnType, AbstractType comparator, AbstractType subcolumnComparator)
@@ -190,26 +191,23 @@
public void addColumn(IColumn column)
{
byte[] name = column.name();
- IColumn oldColumn = columns_.get(name);
+ IColumn oldColumn = columns_.putIfAbsent(name, column);
if (oldColumn != null)
{
if (oldColumn instanceof SuperColumn)
{
- int oldSize = oldColumn.size();
((SuperColumn) oldColumn).putColumn(column);
}
else
{
- if (((Column)oldColumn).comparePriority((Column)column) <= 0)
+ while (((Column) oldColumn).comparePriority((Column)column) <= 0)
{
- columns_.put(name, column);
+ if (columns_.replace(name, oldColumn, column))
+ break;
+ oldColumn = columns_.get(name);
}
}
}
- else
- {
- columns_.put(name, column);
- }
}
public IColumn getColumn(byte[] name)
@@ -237,21 +235,22 @@
columns_.remove(columnName);
}
+ @Deprecated // TODO this is a hack to set initial value outside constructor
public void delete(int localtime, long timestamp)
{
- localDeletionTime = localtime;
- markedForDeleteAt = timestamp;
+ localDeletionTime.set(localtime);
+ markedForDeleteAt.set(timestamp);
}
public void delete(ColumnFamily cf2)
{
- delete(Math.max(getLocalDeletionTime(), cf2.getLocalDeletionTime()),
- Math.max(getMarkedForDeleteAt(), cf2.getMarkedForDeleteAt()));
+ FBUtilities.atomicSetMax(localDeletionTime, cf2.getLocalDeletionTime());
+ FBUtilities.atomicSetMax(markedForDeleteAt, cf2.getMarkedForDeleteAt());
}
public boolean isMarkedForDelete()
{
- return markedForDeleteAt > Long.MIN_VALUE;
+ return markedForDeleteAt.get() > Long.MIN_VALUE;
}
/*
@@ -367,12 +366,12 @@
public long getMarkedForDeleteAt()
{
- return markedForDeleteAt;
+ return markedForDeleteAt.get();
}
public int getLocalDeletionTime()
{
- return localDeletionTime;
+ return localDeletionTime.get();
}
public String type()
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java?rev=896756&r1=896755&r2=896756&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java Thu Jan 7 05:47:05 2010
@@ -77,8 +77,8 @@
{
try
{
- dos.writeInt(columnFamily.localDeletionTime);
- dos.writeLong(columnFamily.markedForDeleteAt);
+ dos.writeInt(columnFamily.localDeletionTime.get());
+ dos.writeLong(columnFamily.markedForDeleteAt.get());
Collection<IColumn> columns = columnFamily.getSortedColumns();
dos.writeInt(columns.size());
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java?rev=896756&r1=896755&r2=896756&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java Thu Jan 7 05:47:05 2010
@@ -53,9 +53,7 @@
private final String table_;
private final String cfName_;
private final long creationTime_;
- // we use NBHM with manual locking, so reads are automatically threadsafe but write merging is serialized per key
private final NonBlockingHashMap<DecoratedKey, ColumnFamily> columnFamilies_ = new NonBlockingHashMap<DecoratedKey, ColumnFamily>();
- private final Object[] keyLocks;
private final IPartitioner partitioner_ = StorageService.getPartitioner();
Memtable(String table, String cfName)
@@ -63,11 +61,6 @@
table_ = table;
cfName_ = cfName;
creationTime_ = System.currentTimeMillis();
- keyLocks = new Object[Runtime.getRuntime().availableProcessors() * 8];
- for (int i = 0; i < keyLocks.length; i++)
- {
- keyLocks[i] = new Object();
- }
}
public boolean isFlushed()
@@ -135,17 +128,15 @@
private void resolve(String key, ColumnFamily cf)
{
- DecoratedKey decoratedKey = partitioner_.decorateKey(key);
currentThroughput_.addAndGet(cf.size());
currentOperations.addAndGet(cf.getColumnCount());
+
+ DecoratedKey decoratedKey = partitioner_.decorateKey(key);
ColumnFamily oldCf = columnFamilies_.putIfAbsent(decoratedKey, cf);
if (oldCf == null)
return;
- synchronized (keyLocks[Math.abs(key.hashCode() % keyLocks.length)])
- {
- oldCf.resolve(cf);
- }
+ oldCf.resolve(cf);
}
// for debugging
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/SuperColumn.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/SuperColumn.java?rev=896756&r1=896755&r2=896756&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/SuperColumn.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/SuperColumn.java Thu Jan 7 05:47:05 2010
@@ -20,15 +20,17 @@
import java.io.*;
import java.util.Collection;
-import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.ConcurrentSkipListMap;
import java.security.MessageDigest;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
import org.apache.log4j.Logger;
import org.apache.cassandra.io.ICompactSerializer2;
import org.apache.cassandra.io.util.DataOutputBuffer;
import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.utils.FBUtilities;
public final class SuperColumn implements IColumn, IColumnContainer
@@ -42,9 +44,8 @@
private byte[] name_;
private ConcurrentSkipListMap<byte[], IColumn> columns_;
- private int localDeletionTime = Integer.MIN_VALUE;
- private long markedForDeleteAt = Long.MIN_VALUE;
- private AtomicInteger size_ = new AtomicInteger(0);
+ private AtomicInteger localDeletionTime = new AtomicInteger(Integer.MIN_VALUE);
+ private AtomicLong markedForDeleteAt = new AtomicLong(Long.MIN_VALUE);
SuperColumn(byte[] name, AbstractType comparator)
{
@@ -67,21 +68,20 @@
public SuperColumn cloneMeShallow()
{
SuperColumn sc = new SuperColumn(name_, getComparator());
- sc.markForDeleteAt(localDeletionTime, markedForDeleteAt);
+ sc.markForDeleteAt(localDeletionTime.get(), markedForDeleteAt.get());
return sc;
}
-
public IColumn cloneMe()
{
SuperColumn sc = new SuperColumn(name_, new ConcurrentSkipListMap<byte[], IColumn>(columns_));
- sc.markForDeleteAt(localDeletionTime, markedForDeleteAt);
+ sc.markForDeleteAt(localDeletionTime.get(), markedForDeleteAt.get());
return sc;
}
public boolean isMarkedForDelete()
{
- return markedForDeleteAt > Long.MIN_VALUE;
+ return markedForDeleteAt.get() > Long.MIN_VALUE;
}
public byte[] name()
@@ -101,15 +101,17 @@
return column;
}
+ /**
+ * This calculates the exact size of the sub columns on the fly
+ */
public int size()
{
- /*
- * return the size of the individual columns
- * that make up the super column. This is an
- * APPROXIMATION of the size used only from the
- * Memtable.
- */
- return size_.get();
+ int size = 0;
+ for (IColumn subColumn : getSubColumns())
+ {
+ size += subColumn.serializedSize();
+ }
+ return size;
}
/**
@@ -122,20 +124,7 @@
* We need to keep the way we are calculating the column size in sync with the
* way we are calculating the size for the column family serializer.
*/
- return IColumn.UtfPrefix_ + name_.length + DBConstants.intSize_ + DBConstants.longSize_ + DBConstants.intSize_ + getSizeOfAllColumns();
- }
-
- /**
- * This calculates the exact size of the sub columns on the fly
- */
- int getSizeOfAllColumns()
- {
- int size = 0;
- for (IColumn subColumn : getSubColumns())
- {
- size += subColumn.serializedSize();
- }
- return size;
+ return IColumn.UtfPrefix_ + name_.length + DBConstants.intSize_ + DBConstants.longSize_ + DBConstants.intSize_ + size();
}
public void remove(byte[] columnName)
@@ -152,9 +141,9 @@
{
IColumn column = columns_.get(columnName);
assert column instanceof Column;
- if ( column != null )
- return column.timestamp();
- throw new IllegalArgumentException("Timestamp was requested for a column that does not exist.");
+ if ( column != null )
+ return column.timestamp();
+ throw new IllegalArgumentException("Timestamp was requested for a column that does not exist.");
}
public long mostRecentLiveChangeAt()
@@ -187,22 +176,15 @@
{
if (!(column instanceof Column))
throw new UnsupportedOperationException("A super column can only contain simple columns.");
- IColumn oldColumn = columns_.get(column.name());
- if ( oldColumn == null )
+ byte[] name = column.name();
+ IColumn oldColumn = columns_.putIfAbsent(name, column);
+ if (oldColumn != null)
{
- columns_.put(column.name(), column);
- size_.addAndGet(column.size());
- }
- else
- {
- if (((Column)oldColumn).comparePriority((Column)column) <= 0)
+ while (((Column)oldColumn).comparePriority((Column)column) <= 0)
{
- columns_.put(column.name(), column);
- int delta = (-1)*oldColumn.size();
- /* subtract the size of the oldColumn */
- size_.addAndGet(delta);
- /* add the size of the new column */
- size_.addAndGet(column.size());
+ if (columns_.replace(name, oldColumn, column))
+ break;
+ oldColumn = columns_.get(name);
}
}
}
@@ -219,10 +201,7 @@
{
addColumn(subColumn);
}
- if (column.getMarkedForDeleteAt() > markedForDeleteAt)
- {
- markForDeleteAt(column.getLocalDeletionTime(), column.getMarkedForDeleteAt());
- }
+ FBUtilities.atomicSetMax(markedForDeleteAt, column.getMarkedForDeleteAt());
}
public int getObjectCount()
@@ -230,13 +209,9 @@
return 1 + columns_.size();
}
- public long getMarkedForDeleteAt() {
- return markedForDeleteAt;
- }
-
- int getColumnCount()
+ public long getMarkedForDeleteAt()
{
- return columns_.size();
+ return markedForDeleteAt.get();
}
public IColumn diff(IColumn columnNew)
@@ -280,7 +255,7 @@
DataOutputBuffer buffer = new DataOutputBuffer();
try
{
- buffer.writeLong(markedForDeleteAt);
+ buffer.writeLong(markedForDeleteAt.get());
}
catch (IOException e)
{
@@ -312,13 +287,14 @@
public int getLocalDeletionTime()
{
- return localDeletionTime;
+ return localDeletionTime.get();
}
+ @Deprecated // TODO this is a hack to set initial value outside constructor
public void markForDeleteAt(int localDeleteTime, long timestamp)
{
- this.localDeletionTime = localDeleteTime;
- this.markedForDeleteAt = timestamp;
+ this.localDeletionTime.set(localDeleteTime);
+ this.markedForDeleteAt.set(timestamp);
}
}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/utils/FBUtilities.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/utils/FBUtilities.java?rev=896756&r1=896755&r2=896756&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/utils/FBUtilities.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/utils/FBUtilities.java Thu Jan 7 05:47:05 2010
@@ -24,6 +24,8 @@
import java.net.UnknownHostException;
import java.security.MessageDigest;
import java.util.*;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.zip.DataFormatException;
import java.util.zip.Deflater;
import java.util.zip.Inflater;
@@ -318,4 +320,26 @@
}
});
}
+
+ public static void atomicSetMax(AtomicInteger atomic, int i)
+ {
+ int j;
+ while (true)
+ {
+ if ((j = atomic.getAndSet(i)) <= i)
+ break;
+ i = j;
+ }
+ }
+
+ public static void atomicSetMax(AtomicLong atomic, long i)
+ {
+ long j;
+ while (true)
+ {
+ if ((j = atomic.getAndSet(i)) <= i)
+ break;
+ i = j;
+ }
+ }
}