You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/09/18 21:02:50 UTC

svn commit: r816745 - in /incubator/cassandra/trunk: src/java/org/apache/cassandra/db/ test/unit/org/apache/cassandra/db/

Author: jbellis
Date: Fri Sep 18 19:02:49 2009
New Revision: 816745

URL: http://svn.apache.org/viewvc?rev=816745&view=rev
Log:
serialize row outside of commitlog executor to improve parallelizability
patch by jbellis; reviewed by junrao for CASSANDRA-444

Modified:
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/BinaryVerbHandler.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Row.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java
    incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/RowTest.java

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/BinaryVerbHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/BinaryVerbHandler.java?rev=816745&r1=816744&r2=816745&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/BinaryVerbHandler.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/BinaryVerbHandler.java Fri Sep 18 19:02:49 2009
@@ -18,37 +18,28 @@
 
 package org.apache.cassandra.db;
 
-import org.apache.cassandra.db.RowMutationVerbHandler.RowMutationContext;
 import org.apache.cassandra.net.IVerbHandler;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.utils.LogUtil;
+import org.apache.cassandra.io.DataInputBuffer;
+
 import org.apache.log4j.Logger;
 
 public class BinaryVerbHandler implements IVerbHandler
 {
-    private static Logger logger_ = Logger.getLogger(BinaryVerbHandler.class);    
-    /* We use this so that we can reuse the same row mutation context for the mutation. */
-    private static ThreadLocal<RowMutationContext> tls_ = new InheritableThreadLocal<RowMutationContext>();
-    
+    private static Logger logger_ = Logger.getLogger(BinaryVerbHandler.class);
+
     public void doVerb(Message message)
     { 
         byte[] bytes = message.getMessageBody();
-        /* Obtain a Row Mutation Context from TLS */
-        RowMutationContext rowMutationCtx = tls_.get();
-        if ( rowMutationCtx == null )
-        {
-            rowMutationCtx = new RowMutationContext();
-            tls_.set(rowMutationCtx);
-        }                
-        rowMutationCtx.buffer_.reset(bytes, bytes.length);
-        
+        DataInputBuffer buffer = new DataInputBuffer();
+        buffer.reset(bytes, bytes.length);
+
 	    try
 	    {
-            RowMutationMessage rmMsg = RowMutationMessage.serializer().deserialize(rowMutationCtx.buffer_);
+            RowMutationMessage rmMsg = RowMutationMessage.serializer().deserialize(buffer);
             RowMutation rm = rmMsg.getRowMutation();            	                
-            rowMutationCtx.row_.setKey(rm.key());
-            rm.applyBinary(rowMutationCtx.row_);
-	
+            rm.applyBinary();
 	    }        
 	    catch ( Exception e )
 	    {

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java?rev=816745&r1=816744&r2=816745&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java Fri Sep 18 19:02:49 2009
@@ -397,9 +397,9 @@
      * of any problems. This way we can assume that the subsequent commit log
      * entry will override the garbage left over by the previous write.
     */
-    CommitLogContext add(final Row row) throws IOException
+    CommitLogContext add(Row row, DataOutputBuffer serializedRow) throws IOException
     {
-        Callable<CommitLogContext> task = new LogRecordAdder(row);
+        Callable<CommitLogContext> task = new LogRecordAdder(row, serializedRow);
 
         try
         {
@@ -559,27 +559,38 @@
 
     class LogRecordAdder implements Callable<CommitLog.CommitLogContext>
     {
-        Row row;
+        final Row row;
+        final Object serializedRow;
 
-        LogRecordAdder(Row row)
+        LogRecordAdder(Row row, DataOutputBuffer serializedRow)
         {
             this.row = row;
+            this.serializedRow = serializedRow;
         }
 
         public CommitLog.CommitLogContext call() throws Exception
         {
             long currentPosition = -1L;
-            DataOutputBuffer cfBuffer = new DataOutputBuffer();
             try
             {
                 /* serialize the row */
-                Row.serializer().serialize(row, cfBuffer);
                 currentPosition = logWriter_.getFilePointer();
                 CommitLogContext cLogCtx = new CommitLogContext(logFile_, currentPosition);
                 /* Update the header */
                 maybeUpdateHeader(row);
-                logWriter_.writeLong(cfBuffer.getLength());
-                logWriter_.write(cfBuffer.getData(), 0, cfBuffer.getLength());
+                if (serializedRow instanceof DataOutputBuffer)
+                {
+                    DataOutputBuffer buffer = (DataOutputBuffer) serializedRow;
+                    logWriter_.writeLong(buffer.getLength());
+                    logWriter_.write(buffer.getData(), 0, buffer.getLength());
+                }
+                else
+                {
+                    assert serializedRow instanceof byte[];
+                    byte[] bytes = (byte[]) serializedRow;
+                    logWriter_.writeLong(bytes.length);
+                    logWriter_.write(bytes);
+                }
                 maybeRollLog();
                 return cLogCtx;
             }

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java?rev=816745&r1=816744&r2=816745&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java Fri Sep 18 19:02:49 2009
@@ -56,7 +56,7 @@
     private String cfName_;
     private long creationTime_;
     // we use NBHM with manual locking, so reads are automatically threadsafe but write merging is serialized per key
-    private Map<String, ColumnFamily> columnFamilies_ = new NonBlockingHashMap<String, ColumnFamily>();
+    private NonBlockingHashMap<String, ColumnFamily> columnFamilies_ = new NonBlockingHashMap<String, ColumnFamily>();
     private Object[] keyLocks;
 
     Memtable(String table, String cfName)
@@ -142,16 +142,19 @@
     {
         assert !isFrozen_; // not 100% foolproof but hell, it's an assert
         isDirty_ = true;
-        synchronized (keyLocks[Math.abs(key.hashCode() % keyLocks.length)])
-        {
-            resolve(key, columnFamily);
-        }
+        resolve(key, columnFamily);
     }
 
     private void resolve(String key, ColumnFamily columnFamily)
     {
-    	ColumnFamily oldCf = columnFamilies_.get(key);
-        if ( oldCf != null )
+        ColumnFamily oldCf = columnFamilies_.putIfAbsent(key, columnFamily);
+        if (oldCf == null)
+        {
+            currentSize_.addAndGet(columnFamily.size() + key.length());
+            currentObjectCount_.addAndGet(columnFamily.getColumnCount());
+            return;
+        }
+        synchronized (keyLocks[Math.abs(key.hashCode() % keyLocks.length)])
         {
             int oldSize = oldCf.size();
             int oldObjectCount = oldCf.getColumnCount();
@@ -162,12 +165,6 @@
             resolveCount(oldObjectCount, newObjectCount);
             oldCf.delete(columnFamily);
         }
-        else
-        {
-            columnFamilies_.put(key, columnFamily);
-            currentSize_.addAndGet(columnFamily.size() + key.length());
-            currentObjectCount_.addAndGet(columnFamily.getColumnCount());
-        }
     }
 
     /** flush synchronously (in the current thread, not on the executor).

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Row.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Row.java?rev=816745&r1=816744&r2=816745&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Row.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Row.java Fri Sep 18 19:02:49 2009
@@ -32,6 +32,7 @@
 import org.apache.log4j.Logger;
 
 import org.apache.cassandra.io.ICompactSerializer;
+import org.apache.cassandra.io.DataOutputBuffer;
 import org.apache.cassandra.utils.FBUtilities;
 
 public class Row
@@ -45,17 +46,13 @@
         return serializer;
     }
 
-    public Row(String table, String key) {
+    public Row(String table, String key)
+    {
         assert table != null;
         this.table_ = table;
         this.key_ = key;
     }
 
-    // only for use by RMVH
-    Row()
-    {
-    }
-
     public String getTable() {
         return table_;
     }
@@ -191,6 +188,13 @@
         columnFamilies_.clear();
     }
 
+    public DataOutputBuffer getSerializedBuffer() throws IOException
+    {
+        DataOutputBuffer buffer = new DataOutputBuffer();
+        Row.serializer().serialize(this, buffer);
+        return buffer;
+    }
+
     public String toString()
     {
         return "Row(" + key_ + " [" + StringUtils.join(columnFamilies_.values(), ", ") + ")]";

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java?rev=816745&r1=816744&r2=816745&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java Fri Sep 18 19:02:49 2009
@@ -24,7 +24,6 @@
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.HashMap;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -40,7 +39,6 @@
 import org.apache.cassandra.service.*;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.db.filter.QueryPath;
-import org.apache.cassandra.db.marshal.MarshalException;
 import org.apache.cassandra.config.DatabaseDescriptor;
 
 public class RowMutation implements Serializable
@@ -186,40 +184,27 @@
     */
     public void apply() throws IOException
     {
-        Row row = new Row(table_, key_);
-        apply(row);
+        Row row = createRow();
+        Table.open(table_).apply(row, row.getSerializedBuffer());
     }
 
-    /*
-     * Allows RowMutationVerbHandler to optimize by re-using a single Row object.
-    */
-    void apply(Row emptyRow) throws IOException
+    private Row createRow()
     {
-        assert emptyRow.getColumnFamilies().size() == 0;
-        Table table = Table.open(table_);
+        Row row = new Row(table_, key_);
         for (String cfName : modifications_.keySet())
         {
-            assert table.isValidColumnFamily(cfName);
-            emptyRow.addColumnFamily(modifications_.get(cfName));
+            row.addColumnFamily(modifications_.get(cfName));
         }
-        table.apply(emptyRow);
+        return row;
     }
 
     /*
      * This is equivalent to calling commit. Applies the changes to
      * to the table that is obtained by calling Table.open().
     */
-    void applyBinary(Row emptyRow) throws IOException, ExecutionException, InterruptedException
+    void applyBinary() throws IOException, ExecutionException, InterruptedException
     {
-        assert emptyRow.getColumnFamilies().size() == 0;
-        Table table = Table.open(table_);
-        Set<String> cfNames = modifications_.keySet();
-        for (String cfName : cfNames)
-        {
-            assert table.isValidColumnFamily(cfName);
-            emptyRow.addColumnFamily(modifications_.get(cfName));
-        }
-        table.load(emptyRow);
+        Table.open(table_).load(createRow());
     }
 
     public Message makeRowMutationMessage() throws IOException

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java?rev=816745&r1=816744&r2=816745&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java Fri Sep 18 19:02:49 2009
@@ -31,32 +31,17 @@
 
 public class RowMutationVerbHandler implements IVerbHandler
 {
-    protected static class RowMutationContext
-    {
-        protected Row row_ = new Row();
-        protected DataInputBuffer buffer_ = new DataInputBuffer();
-    }
-
     private static Logger logger_ = Logger.getLogger(RowMutationVerbHandler.class);
-    /* We use this so that we can reuse the same row mutation context for the mutation. */
-    private static ThreadLocal<RowMutationContext> tls_ = new InheritableThreadLocal<RowMutationContext>();
 
     public void doVerb(Message message)
     {
         byte[] bytes = message.getMessageBody();
-        /* Obtain a Row Mutation Context from TLS */
-        RowMutationContext rowMutationCtx = tls_.get();
-        if ( rowMutationCtx == null )
-        {
-            rowMutationCtx = new RowMutationContext();
-            tls_.set(rowMutationCtx);
-        }
-
-        rowMutationCtx.buffer_.reset(bytes, bytes.length);
+        DataInputBuffer buffer = new DataInputBuffer();
+        buffer.reset(bytes, bytes.length);
 
         try
         {
-            RowMutation rm = RowMutation.serializer().deserialize(rowMutationCtx.buffer_);
+            RowMutation rm = RowMutation.serializer().deserialize(buffer);
             if (logger_.isDebugEnabled())
               logger_.debug("Applying " + rm);
 
@@ -73,10 +58,7 @@
                 hintedMutation.apply();
             }
 
-            rowMutationCtx.row_.clear();
-            rowMutationCtx.row_.setTable(rm.table());
-            rowMutationCtx.row_.setKey(rm.key());
-            rm.apply(rowMutationCtx.row_);
+            rm.apply();
 
             WriteResponse response = new WriteResponse(rm.table(), rm.key(), true);
             Message responseMessage = WriteResponse.makeWriteResponseMessage(message, response);

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java?rev=816745&r1=816744&r2=816745&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java Fri Sep 18 19:02:49 2009
@@ -31,6 +31,7 @@
 import org.apache.cassandra.io.DataInputBuffer;
 import org.apache.cassandra.io.SSTableReader;
 import org.apache.cassandra.io.SSTableWriter;
+import org.apache.cassandra.io.DataOutputBuffer;
 import org.apache.cassandra.net.EndPoint;
 import org.apache.cassandra.net.IVerbHandler;
 import org.apache.cassandra.net.Message;
@@ -591,14 +592,14 @@
      * Once this happens the data associated with the individual column families
      * is also written to the column family store's memtable.
     */
-    void apply(Row row) throws IOException
+    void apply(Row row, DataOutputBuffer serializedRow) throws IOException
     {
         HashMap<ColumnFamilyStore,Memtable> memtablesToFlush = new HashMap<ColumnFamilyStore, Memtable>();
 
         flusherLock_.readLock().lock();
         try
         {
-            CommitLog.open().add(row);
+            CommitLog.open().add(row, serializedRow);
         
             for (ColumnFamily columnFamily : row.getColumnFamilies())
             {

Modified: incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/RowTest.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/RowTest.java?rev=816745&r1=816744&r2=816745&view=diff
==============================================================================
--- incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/RowTest.java (original)
+++ incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/RowTest.java Fri Sep 18 19:02:49 2009
@@ -60,12 +60,12 @@
     @Test
     public void testRepair()
     {
-        Row row1 = new Row();
+        Row row1 = new Row("", "");
         ColumnFamily cf1 = ColumnFamily.create("Keyspace1", "Standard1");
         cf1.addColumn(column("one", "A", 0));
         row1.addColumnFamily(cf1);
 
-        Row row2 = new Row();
+        Row row2 = new Row("", "");
         ColumnFamily cf2 = ColumnFamily.create("Keyspace1", "Standard1");
         cf2.addColumn(column("one", "B", 1));
         cf2.addColumn(column("two", "C", 1));