You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/09/18 21:02:50 UTC
svn commit: r816745 - in /incubator/cassandra/trunk:
src/java/org/apache/cassandra/db/ test/unit/org/apache/cassandra/db/
Author: jbellis
Date: Fri Sep 18 19:02:49 2009
New Revision: 816745
URL: http://svn.apache.org/viewvc?rev=816745&view=rev
Log:
serialize row outside of commitlog executor to improve parallelizability
patch by jbellis; reviewed by junrao for CASSANDRA-444
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/BinaryVerbHandler.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Row.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java
incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/RowTest.java
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/BinaryVerbHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/BinaryVerbHandler.java?rev=816745&r1=816744&r2=816745&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/BinaryVerbHandler.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/BinaryVerbHandler.java Fri Sep 18 19:02:49 2009
@@ -18,37 +18,28 @@
package org.apache.cassandra.db;
-import org.apache.cassandra.db.RowMutationVerbHandler.RowMutationContext;
import org.apache.cassandra.net.IVerbHandler;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.utils.LogUtil;
+import org.apache.cassandra.io.DataInputBuffer;
+
import org.apache.log4j.Logger;
public class BinaryVerbHandler implements IVerbHandler
{
- private static Logger logger_ = Logger.getLogger(BinaryVerbHandler.class);
- /* We use this so that we can reuse the same row mutation context for the mutation. */
- private static ThreadLocal<RowMutationContext> tls_ = new InheritableThreadLocal<RowMutationContext>();
-
+ private static Logger logger_ = Logger.getLogger(BinaryVerbHandler.class);
+
public void doVerb(Message message)
{
byte[] bytes = message.getMessageBody();
- /* Obtain a Row Mutation Context from TLS */
- RowMutationContext rowMutationCtx = tls_.get();
- if ( rowMutationCtx == null )
- {
- rowMutationCtx = new RowMutationContext();
- tls_.set(rowMutationCtx);
- }
- rowMutationCtx.buffer_.reset(bytes, bytes.length);
-
+ DataInputBuffer buffer = new DataInputBuffer();
+ buffer.reset(bytes, bytes.length);
+
try
{
- RowMutationMessage rmMsg = RowMutationMessage.serializer().deserialize(rowMutationCtx.buffer_);
+ RowMutationMessage rmMsg = RowMutationMessage.serializer().deserialize(buffer);
RowMutation rm = rmMsg.getRowMutation();
- rowMutationCtx.row_.setKey(rm.key());
- rm.applyBinary(rowMutationCtx.row_);
-
+ rm.applyBinary();
}
catch ( Exception e )
{
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java?rev=816745&r1=816744&r2=816745&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java Fri Sep 18 19:02:49 2009
@@ -397,9 +397,9 @@
* of any problems. This way we can assume that the subsequent commit log
* entry will override the garbage left over by the previous write.
*/
- CommitLogContext add(final Row row) throws IOException
+ CommitLogContext add(Row row, DataOutputBuffer serializedRow) throws IOException
{
- Callable<CommitLogContext> task = new LogRecordAdder(row);
+ Callable<CommitLogContext> task = new LogRecordAdder(row, serializedRow);
try
{
@@ -559,27 +559,38 @@
class LogRecordAdder implements Callable<CommitLog.CommitLogContext>
{
- Row row;
+ final Row row;
+ final Object serializedRow;
- LogRecordAdder(Row row)
+ LogRecordAdder(Row row, DataOutputBuffer serializedRow)
{
this.row = row;
+ this.serializedRow = serializedRow;
}
public CommitLog.CommitLogContext call() throws Exception
{
long currentPosition = -1L;
- DataOutputBuffer cfBuffer = new DataOutputBuffer();
try
{
/* serialize the row */
- Row.serializer().serialize(row, cfBuffer);
currentPosition = logWriter_.getFilePointer();
CommitLogContext cLogCtx = new CommitLogContext(logFile_, currentPosition);
/* Update the header */
maybeUpdateHeader(row);
- logWriter_.writeLong(cfBuffer.getLength());
- logWriter_.write(cfBuffer.getData(), 0, cfBuffer.getLength());
+ if (serializedRow instanceof DataOutputBuffer)
+ {
+ DataOutputBuffer buffer = (DataOutputBuffer) serializedRow;
+ logWriter_.writeLong(buffer.getLength());
+ logWriter_.write(buffer.getData(), 0, buffer.getLength());
+ }
+ else
+ {
+ assert serializedRow instanceof byte[];
+ byte[] bytes = (byte[]) serializedRow;
+ logWriter_.writeLong(bytes.length);
+ logWriter_.write(bytes);
+ }
maybeRollLog();
return cLogCtx;
}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java?rev=816745&r1=816744&r2=816745&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java Fri Sep 18 19:02:49 2009
@@ -56,7 +56,7 @@
private String cfName_;
private long creationTime_;
// we use NBHM with manual locking, so reads are automatically threadsafe but write merging is serialized per key
- private Map<String, ColumnFamily> columnFamilies_ = new NonBlockingHashMap<String, ColumnFamily>();
+ private NonBlockingHashMap<String, ColumnFamily> columnFamilies_ = new NonBlockingHashMap<String, ColumnFamily>();
private Object[] keyLocks;
Memtable(String table, String cfName)
@@ -142,16 +142,19 @@
{
assert !isFrozen_; // not 100% foolproof but hell, it's an assert
isDirty_ = true;
- synchronized (keyLocks[Math.abs(key.hashCode() % keyLocks.length)])
- {
- resolve(key, columnFamily);
- }
+ resolve(key, columnFamily);
}
private void resolve(String key, ColumnFamily columnFamily)
{
- ColumnFamily oldCf = columnFamilies_.get(key);
- if ( oldCf != null )
+ ColumnFamily oldCf = columnFamilies_.putIfAbsent(key, columnFamily);
+ if (oldCf == null)
+ {
+ currentSize_.addAndGet(columnFamily.size() + key.length());
+ currentObjectCount_.addAndGet(columnFamily.getColumnCount());
+ return;
+ }
+ synchronized (keyLocks[Math.abs(key.hashCode() % keyLocks.length)])
{
int oldSize = oldCf.size();
int oldObjectCount = oldCf.getColumnCount();
@@ -162,12 +165,6 @@
resolveCount(oldObjectCount, newObjectCount);
oldCf.delete(columnFamily);
}
- else
- {
- columnFamilies_.put(key, columnFamily);
- currentSize_.addAndGet(columnFamily.size() + key.length());
- currentObjectCount_.addAndGet(columnFamily.getColumnCount());
- }
}
/** flush synchronously (in the current thread, not on the executor).
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Row.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Row.java?rev=816745&r1=816744&r2=816745&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Row.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Row.java Fri Sep 18 19:02:49 2009
@@ -32,6 +32,7 @@
import org.apache.log4j.Logger;
import org.apache.cassandra.io.ICompactSerializer;
+import org.apache.cassandra.io.DataOutputBuffer;
import org.apache.cassandra.utils.FBUtilities;
public class Row
@@ -45,17 +46,13 @@
return serializer;
}
- public Row(String table, String key) {
+ public Row(String table, String key)
+ {
assert table != null;
this.table_ = table;
this.key_ = key;
}
- // only for use by RMVH
- Row()
- {
- }
-
public String getTable() {
return table_;
}
@@ -191,6 +188,13 @@
columnFamilies_.clear();
}
+ public DataOutputBuffer getSerializedBuffer() throws IOException
+ {
+ DataOutputBuffer buffer = new DataOutputBuffer();
+ Row.serializer().serialize(this, buffer);
+ return buffer;
+ }
+
public String toString()
{
return "Row(" + key_ + " [" + StringUtils.join(columnFamilies_.values(), ", ") + ")]";
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java?rev=816745&r1=816744&r2=816745&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java Fri Sep 18 19:02:49 2009
@@ -24,7 +24,6 @@
import java.io.IOException;
import java.io.Serializable;
import java.util.HashMap;
-import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -40,7 +39,6 @@
import org.apache.cassandra.service.*;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.db.filter.QueryPath;
-import org.apache.cassandra.db.marshal.MarshalException;
import org.apache.cassandra.config.DatabaseDescriptor;
public class RowMutation implements Serializable
@@ -186,40 +184,27 @@
*/
public void apply() throws IOException
{
- Row row = new Row(table_, key_);
- apply(row);
+ Row row = createRow();
+ Table.open(table_).apply(row, row.getSerializedBuffer());
}
- /*
- * Allows RowMutationVerbHandler to optimize by re-using a single Row object.
- */
- void apply(Row emptyRow) throws IOException
+ private Row createRow()
{
- assert emptyRow.getColumnFamilies().size() == 0;
- Table table = Table.open(table_);
+ Row row = new Row(table_, key_);
for (String cfName : modifications_.keySet())
{
- assert table.isValidColumnFamily(cfName);
- emptyRow.addColumnFamily(modifications_.get(cfName));
+ row.addColumnFamily(modifications_.get(cfName));
}
- table.apply(emptyRow);
+ return row;
}
/*
* This is equivalent to calling commit. Applies the changes to
* to the table that is obtained by calling Table.open().
*/
- void applyBinary(Row emptyRow) throws IOException, ExecutionException, InterruptedException
+ void applyBinary() throws IOException, ExecutionException, InterruptedException
{
- assert emptyRow.getColumnFamilies().size() == 0;
- Table table = Table.open(table_);
- Set<String> cfNames = modifications_.keySet();
- for (String cfName : cfNames)
- {
- assert table.isValidColumnFamily(cfName);
- emptyRow.addColumnFamily(modifications_.get(cfName));
- }
- table.load(emptyRow);
+ Table.open(table_).load(createRow());
}
public Message makeRowMutationMessage() throws IOException
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java?rev=816745&r1=816744&r2=816745&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java Fri Sep 18 19:02:49 2009
@@ -31,32 +31,17 @@
public class RowMutationVerbHandler implements IVerbHandler
{
- protected static class RowMutationContext
- {
- protected Row row_ = new Row();
- protected DataInputBuffer buffer_ = new DataInputBuffer();
- }
-
private static Logger logger_ = Logger.getLogger(RowMutationVerbHandler.class);
- /* We use this so that we can reuse the same row mutation context for the mutation. */
- private static ThreadLocal<RowMutationContext> tls_ = new InheritableThreadLocal<RowMutationContext>();
public void doVerb(Message message)
{
byte[] bytes = message.getMessageBody();
- /* Obtain a Row Mutation Context from TLS */
- RowMutationContext rowMutationCtx = tls_.get();
- if ( rowMutationCtx == null )
- {
- rowMutationCtx = new RowMutationContext();
- tls_.set(rowMutationCtx);
- }
-
- rowMutationCtx.buffer_.reset(bytes, bytes.length);
+ DataInputBuffer buffer = new DataInputBuffer();
+ buffer.reset(bytes, bytes.length);
try
{
- RowMutation rm = RowMutation.serializer().deserialize(rowMutationCtx.buffer_);
+ RowMutation rm = RowMutation.serializer().deserialize(buffer);
if (logger_.isDebugEnabled())
logger_.debug("Applying " + rm);
@@ -73,10 +58,7 @@
hintedMutation.apply();
}
- rowMutationCtx.row_.clear();
- rowMutationCtx.row_.setTable(rm.table());
- rowMutationCtx.row_.setKey(rm.key());
- rm.apply(rowMutationCtx.row_);
+ rm.apply();
WriteResponse response = new WriteResponse(rm.table(), rm.key(), true);
Message responseMessage = WriteResponse.makeWriteResponseMessage(message, response);
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java?rev=816745&r1=816744&r2=816745&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java Fri Sep 18 19:02:49 2009
@@ -31,6 +31,7 @@
import org.apache.cassandra.io.DataInputBuffer;
import org.apache.cassandra.io.SSTableReader;
import org.apache.cassandra.io.SSTableWriter;
+import org.apache.cassandra.io.DataOutputBuffer;
import org.apache.cassandra.net.EndPoint;
import org.apache.cassandra.net.IVerbHandler;
import org.apache.cassandra.net.Message;
@@ -591,14 +592,14 @@
* Once this happens the data associated with the individual column families
* is also written to the column family store's memtable.
*/
- void apply(Row row) throws IOException
+ void apply(Row row, DataOutputBuffer serializedRow) throws IOException
{
HashMap<ColumnFamilyStore,Memtable> memtablesToFlush = new HashMap<ColumnFamilyStore, Memtable>();
flusherLock_.readLock().lock();
try
{
- CommitLog.open().add(row);
+ CommitLog.open().add(row, serializedRow);
for (ColumnFamily columnFamily : row.getColumnFamilies())
{
Modified: incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/RowTest.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/RowTest.java?rev=816745&r1=816744&r2=816745&view=diff
==============================================================================
--- incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/RowTest.java (original)
+++ incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/RowTest.java Fri Sep 18 19:02:49 2009
@@ -60,12 +60,12 @@
@Test
public void testRepair()
{
- Row row1 = new Row();
+ Row row1 = new Row("", "");
ColumnFamily cf1 = ColumnFamily.create("Keyspace1", "Standard1");
cf1.addColumn(column("one", "A", 0));
row1.addColumnFamily(cf1);
- Row row2 = new Row();
+ Row row2 = new Row("", "");
ColumnFamily cf2 = ColumnFamily.create("Keyspace1", "Standard1");
cf2.addColumn(column("one", "B", 1));
cf2.addColumn(column("two", "C", 1));