You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/09/24 21:55:14 UTC

svn commit: r818607 - in /incubator/cassandra/trunk/src/java/org/apache/cassandra/db: CommitLog.java RowMutation.java RowMutationVerbHandler.java Table.java

Author: jbellis
Date: Thu Sep 24 19:55:10 2009
New Revision: 818607

URL: http://svn.apache.org/viewvc?rev=818607&view=rev
Log:
CommitLog and Table.apply/applyNow/load paths use RMs directly instead of converting to Rows first.  patch by Sandeep Tata; reviewed by jbellis for CASSANDRA-456

Modified:
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java?rev=818607&r1=818606&r2=818607&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java Thu Sep 24 19:55:10 2009
@@ -317,27 +317,27 @@
                 bufIn.reset(bytes, bytes.length);
 
                 /* read the commit log entry */
-                Row row = Row.serializer().deserialize(bufIn);
+                RowMutation rm = RowMutation.serializer().deserialize(bufIn);
                 if (logger_.isDebugEnabled())
                     logger_.debug(String.format("replaying mutation for %s.%s: %s",
-                                                row.getTable(),
-                                                row.key(),
-                                                "{" + StringUtils.join(row.getColumnFamilies(), ", ") + "}"));
-                Table table = Table.open(row.getTable());
+                                                rm.getTable(),
+                                                rm.key(),
+                                                "{" + StringUtils.join(rm.getColumnFamilies(), ", ") + "}"));
+                Table table = Table.open(rm.getTable());
                 tablesRecovered.add(table);
-                Collection<ColumnFamily> columnFamilies = new ArrayList<ColumnFamily>(row.getColumnFamilies());
+                Collection<ColumnFamily> columnFamilies = new ArrayList<ColumnFamily>(rm.getColumnFamilies());
                 /* remove column families that have already been flushed */
                 for (ColumnFamily columnFamily : columnFamilies)
                 {
                     int id = table.getColumnFamilyId(columnFamily.name());
                     if (!clHeader.isDirty(id) || reader.getFilePointer() < clHeader.getPosition(id))
                     {
-                        row.removeColumnFamily(columnFamily);
+                        rm.removeColumnFamily(columnFamily);
                     }
                 }
-                if (!row.isEmpty())
+                if (!rm.isEmpty())
                 {
-                    table.applyNow(row);
+                    table.applyNow(rm);
                 }
             }
             reader.close();
@@ -353,10 +353,10 @@
      * Update the header of the commit log if a new column family
      * is encountered for the first time.
     */
-    private void maybeUpdateHeader(Row row) throws IOException
+    private void maybeUpdateHeader(RowMutation rm) throws IOException
     {
-        Table table = Table.open(row.getTable());
-        for (ColumnFamily columnFamily : row.getColumnFamilies())
+        Table table = Table.open(rm.getTable());
+        for (ColumnFamily columnFamily : rm.getColumnFamilies())
         {
             int id = table.getColumnFamilyId(columnFamily.name());
             if (!clHeader_.isDirty(id) || (clHeader_.isDirty(id) && clHeader_.getPosition(id) == 0))
@@ -396,9 +396,9 @@
      * of any problems. This way we can assume that the subsequent commit log
      * entry will override the garbage left over by the previous write.
     */
-    void add(Row row, DataOutputBuffer serializedRow) throws IOException
+    void add(RowMutation rowMutation, DataOutputBuffer serializedRow) throws IOException
     {
-        Callable<CommitLogContext> task = new LogRecordAdder(row, serializedRow);
+        Callable<CommitLogContext> task = new LogRecordAdder(rowMutation, serializedRow);
 
         try
         {
@@ -558,12 +558,12 @@
 
     class LogRecordAdder implements Callable<CommitLog.CommitLogContext>
     {
-        final Row row;
+        final RowMutation rowMutation;
         final Object serializedRow;
 
-        LogRecordAdder(Row row, DataOutputBuffer serializedRow)
+        LogRecordAdder(RowMutation rm, DataOutputBuffer serializedRow)
         {
-            this.row = row;
+            this.rowMutation = rm;
             this.serializedRow = serializedRow;
         }
 
@@ -576,7 +576,7 @@
                 currentPosition = logWriter_.getFilePointer();
                 CommitLogContext cLogCtx = new CommitLogContext(logFile_, currentPosition);
                 /* Update the header */
-                maybeUpdateHeader(row);
+                maybeUpdateHeader(rowMutation);
                 if (serializedRow instanceof DataOutputBuffer)
                 {
                     DataOutputBuffer buffer = (DataOutputBuffer) serializedRow;

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java?rev=818607&r1=818606&r2=818607&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java Thu Sep 24 19:55:10 2009
@@ -23,6 +23,7 @@
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.io.Serializable;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -33,6 +34,7 @@
 import org.apache.commons.lang.ArrayUtils;
 import org.apache.commons.lang.StringUtils;
 
+import org.apache.cassandra.io.DataOutputBuffer;
 import org.apache.cassandra.io.ICompactSerializer;
 import org.apache.cassandra.net.EndPoint;
 import org.apache.cassandra.net.Message;
@@ -83,7 +85,7 @@
         modifications_ = modifications;
     }
 
-    public String table()
+    public String getTable()
     {
         return table_;
     }
@@ -97,6 +99,11 @@
     {
         return modifications_.keySet();
     }
+    
+    public Collection<ColumnFamily> getColumnFamilies()
+    {
+        return modifications_.values();
+    }
 
     void addHints(String key, String host) throws IOException
     {
@@ -119,6 +126,17 @@
         modifications_.put(columnFamily.name(), columnFamily);
     }
 
+    /** should only be called by commitlog replay code */
+    public void removeColumnFamily(ColumnFamily columnFamily)
+    {
+        modifications_.remove(columnFamily.name());
+    }
+    
+    public boolean isEmpty()
+    {
+        return modifications_.isEmpty();
+    }
+
     /*
      * Specify a column name and a corresponding value for
      * the column. Column name is specified as <column family>:column.
@@ -183,19 +201,8 @@
      * to the table that is obtained by calling Table.open().
     */
     public void apply() throws IOException
-    {
-        Row row = createRow();
-        Table.open(table_).apply(row, row.getSerializedBuffer());
-    }
-
-    private Row createRow()
-    {
-        Row row = new Row(table_, key_);
-        for (String cfName : modifications_.keySet())
-        {
-            row.addColumnFamily(modifications_.get(cfName));
-        }
-        return row;
+    {   
+        Table.open(table_).apply(this, this.getSerializedBuffer());
     }
 
     /*
@@ -204,7 +211,7 @@
     */
     void applyBinary() throws IOException, ExecutionException, InterruptedException
     {
-        Table.open(table_).load(createRow());
+        Table.open(table_).load(this);
     }
 
     public Message makeRowMutationMessage() throws IOException
@@ -247,6 +254,13 @@
         }
         return rm;
     }
+    
+    public DataOutputBuffer getSerializedBuffer() throws IOException
+    {
+        DataOutputBuffer buffer = new DataOutputBuffer();
+        RowMutation.serializer().serialize(this, buffer);
+        return buffer;
+    }
 
     public String toString()
     {
@@ -281,7 +295,7 @@
 
     public void serialize(RowMutation rm, DataOutputStream dos) throws IOException
     {
-        dos.writeUTF(rm.table());
+        dos.writeUTF(rm.getTable());
         dos.writeUTF(rm.key());
 
         /* serialize the modifications_ in the mutation */

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java?rev=818607&r1=818606&r2=818607&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java Thu Sep 24 19:55:10 2009
@@ -53,14 +53,14 @@
                 if (logger_.isDebugEnabled())
                   logger_.debug("Adding hint for " + hint);
                 /* add necessary hints to this mutation */
-                RowMutation hintedMutation = new RowMutation(Table.SYSTEM_TABLE, rm.table());
+                RowMutation hintedMutation = new RowMutation(Table.SYSTEM_TABLE, rm.getTable());
                 hintedMutation.addHints(rm.key(), hint.getHost());
                 hintedMutation.apply();
             }
 
             rm.apply();
 
-            WriteResponse response = new WriteResponse(rm.table(), rm.key(), true);
+            WriteResponse response = new WriteResponse(rm.getTable(), rm.key(), true);
             Message responseMessage = WriteResponse.makeWriteResponseMessage(message, response);
             if (logger_.isDebugEnabled())
               logger_.debug(rm + " applied.  Sending response to " + message.getMessageId() + "@" + message.getFrom());

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java?rev=818607&r1=818606&r2=818607&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java Thu Sep 24 19:55:10 2009
@@ -594,20 +594,20 @@
      * Once this happens the data associated with the individual column families
      * is also written to the column family store's memtable.
     */
-    void apply(Row row, DataOutputBuffer serializedRow) throws IOException
+    void apply(RowMutation mutation, DataOutputBuffer serializedMutation) throws IOException
     {
         HashMap<ColumnFamilyStore,Memtable> memtablesToFlush = new HashMap<ColumnFamilyStore, Memtable>(2);
 
         flusherLock_.readLock().lock();
         try
         {
-            CommitLog.open().add(row, serializedRow);
+            CommitLog.open().add(mutation, serializedMutation);
         
-            for (ColumnFamily columnFamily : row.getColumnFamilies())
+            for (ColumnFamily columnFamily : mutation.getColumnFamilies())
             {
                 Memtable memtableToFlush;
                 ColumnFamilyStore cfStore = columnFamilyStores_.get(columnFamily.name());
-                if ((memtableToFlush=cfStore.apply(row.key(), columnFamily)) != null)
+                if ((memtableToFlush=cfStore.apply(mutation.key(), columnFamily)) != null)
                     memtablesToFlush.put(cfStore, memtableToFlush);
             }
         }
@@ -621,7 +621,7 @@
             entry.getKey().switchMemtable(entry.getValue());
     }
 
-    void applyNow(Row row) throws IOException
+    void applyNow(RowMutation row) throws IOException
     {
         String key = row.key();
         for (ColumnFamily columnFamily : row.getColumnFamilies())
@@ -647,11 +647,11 @@
     }
 
     // for binary load path.  skips commitlog.
-    void load(Row row) throws IOException
+    void load(RowMutation rowMutation) throws IOException
     {
-        String key = row.key();
+        String key = rowMutation.key();
                 
-        for (ColumnFamily columnFamily : row.getColumnFamilies())
+        for (ColumnFamily columnFamily : rowMutation.getColumnFamilies())
         {
             Collection<IColumn> columns = columnFamily.getSortedColumns();
             for (IColumn column : columns)
@@ -660,7 +660,6 @@
                 cfStore.applyBinary(key, column.value());
             }
         }
-        row.clear();
     }
 
     public SortedSet<String> getApplicationColumnFamilies()