You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/03/27 17:39:18 UTC

svn commit: r759223 - in /incubator/cassandra/trunk/src/org/apache/cassandra: db/ service/

Author: jbellis
Date: Fri Mar 27 16:39:17 2009
New Revision: 759223

URL: http://svn.apache.org/viewvc?rev=759223&view=rev
Log:
finish remove support.  Split CFS.resolve() into resolve(), which combines ColumnFamilies, and removeDeleted(), which takes a single ColumnFamily and returns a new one with deleted IColumns removed. Keep deletion information around until removeDeleted is called so that deletion information can properly supress older IColumns.

Modified:
    incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnFamilyStore.java
    incubator/cassandra/trunk/src/org/apache/cassandra/db/HintedHandOffManager.java
    incubator/cassandra/trunk/src/org/apache/cassandra/db/Memtable.java
    incubator/cassandra/trunk/src/org/apache/cassandra/db/RowMutation.java
    incubator/cassandra/trunk/src/org/apache/cassandra/db/RowMutationVerbHandler.java
    incubator/cassandra/trunk/src/org/apache/cassandra/db/Table.java
    incubator/cassandra/trunk/src/org/apache/cassandra/service/CassandraServer.java
    incubator/cassandra/trunk/src/org/apache/cassandra/service/StorageProxy.java

Modified: incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnFamilyStore.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnFamilyStore.java?rev=759223&r1=759222&r2=759223&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnFamilyStore.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnFamilyStore.java Fri Mar 27 16:39:17 2009
@@ -18,15 +18,27 @@
 
 package org.apache.cassandra.db;
 
-import java.io.*;
+import java.io.File;
+import java.io.IOException;
 import java.math.BigInteger;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.PriorityQueue;
+import java.util.Set;
+import java.util.StringTokenizer;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.log4j.Logger;
+
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.io.DataInputBuffer;
@@ -40,7 +52,6 @@
 import org.apache.cassandra.utils.BloomFilter;
 import org.apache.cassandra.utils.FileUtils;
 import org.apache.cassandra.utils.LogUtil;
-import org.apache.log4j.Logger;
 
 /**
  * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
@@ -431,66 +442,46 @@
         binaryMemtable_.get().put(key, buffer);
     }
 
+    public ColumnFamily getColumnFamily(String key, String columnFamilyColumn, IFilter filter) throws IOException
+    {
+        List<ColumnFamily> columnFamilies = getColumnFamilies(key, columnFamilyColumn, filter);
+        return resolveAndRemoveDeleted(columnFamilies);
+    }
+
     /**
      *
      * Get the column family in the most efficient order.
      * 1. Memtable
      * 2. Sorted list of files
      */
-    public ColumnFamily getColumnFamily(String key, String cf, IFilter filter) throws IOException
+    List<ColumnFamily> getColumnFamilies(String key, String columnFamilyColumn, IFilter filter) throws IOException
     {
-    	List<ColumnFamily> columnFamilies = new ArrayList<ColumnFamily>();
-    	ColumnFamily columnFamily = null;
-    	long start = System.currentTimeMillis();
-        /* Get the ColumnFamily from Memtable */
-    	getColumnFamilyFromCurrentMemtable(key, cf, filter, columnFamilies);
-        if(columnFamilies.size() != 0)
+        List<ColumnFamily> columnFamilies = getMemoryColumnFamilies(key, columnFamilyColumn, filter);
+        if (columnFamilies.size() == 0 || !filter.isDone())
         {
-	        if(filter.isDone())
-	        	return columnFamilies.get(0);
+            long start = System.currentTimeMillis();
+            getColumnFamilyFromDisk(key, columnFamilyColumn, columnFamilies, filter);
+            logger_.debug("DISK TIME: " + (System.currentTimeMillis() - start) + " ms.");
         }
-        /* Check if MemtableManager has any historical information */
-        MemtableManager.instance().getColumnFamily(key, columnFamily_, cf, filter, columnFamilies);
-        if(columnFamilies.size() != 0)
-        {
-        	columnFamily = resolve(columnFamilies);
-	        if(filter.isDone())
-	        	return columnFamily;
-	        columnFamilies.clear();
-	        columnFamilies.add(columnFamily);
-        }
-        getColumnFamilyFromDisk(key, cf, columnFamilies, filter);
-        logger_.debug("DISK TIME: " + (System.currentTimeMillis() - start)
-                + " ms.");
-        columnFamily = resolve(columnFamilies);
-       
-        return columnFamily;
+        return columnFamilies;
     }
-    
-    public ColumnFamily getColumnFamilyFromMemory(String key, String cf, IFilter filter) 
+
+    private List<ColumnFamily> getMemoryColumnFamilies(String key, String columnFamilyColumn, IFilter filter)
     {
         List<ColumnFamily> columnFamilies = new ArrayList<ColumnFamily>();
-        ColumnFamily columnFamily = null;
-        long start = System.currentTimeMillis();
         /* Get the ColumnFamily from Memtable */
-        getColumnFamilyFromCurrentMemtable(key, cf, filter, columnFamilies);
-        if(columnFamilies.size() != 0)
+        getColumnFamilyFromCurrentMemtable(key, columnFamilyColumn, filter, columnFamilies);
+        if (columnFamilies.size() == 0 || !filter.isDone())
         {
-            if(filter.isDone())
-                return columnFamilies.get(0);
-        }
-        /* Check if MemtableManager has any historical information */
-        MemtableManager.instance().getColumnFamily(key, columnFamily_, cf, filter, columnFamilies);
-        if(columnFamilies.size() != 0)
-        {
-            columnFamily = resolve(columnFamilies);
-            if(filter.isDone())
-                return columnFamily;
-            columnFamilies.clear();
-            columnFamilies.add(columnFamily);
+            /* Check if MemtableManager has any historical information */
+            MemtableManager.instance().getColumnFamily(key, columnFamily_, columnFamilyColumn, filter, columnFamilies);
         }
-        columnFamily = resolve(columnFamilies);
-        return columnFamily;
+        return columnFamilies;
+    }
+
+    public ColumnFamily getColumnFamilyFromMemory(String key, String columnFamilyColumn, IFilter filter)
+    {
+        return resolveAndRemoveDeleted(getMemoryColumnFamilies(key, columnFamilyColumn, filter));
     }
 
     /**
@@ -530,33 +521,14 @@
             long start = System.currentTimeMillis();
             if (columnFamily != null)
             {
-	            /*
-	             * TODO
-	             * By using the filter before removing deleted columns 
-	             * we have a efficient implementation of timefilter 
-	             * but for count filter this can return wrong results 
-	             * we need to take care of that later.
-	             */
-                /* suppress columns marked for delete */
-                Map<String, IColumn> columns = columnFamily.getColumns();
-                Set<String> cNames = columns.keySet();
-
-                for (String cName : cNames)
-                {
-                    IColumn column = columns.get(cName);
-                    if (column.isMarkedForDelete())
-                        columns.remove(cName);
-                }
                 columnFamilies.add(columnFamily);
                 if(filter.isDone())
                 {
                 	break;
                 }
             }
-            logger_.debug("DISK Data structure population  TIME: " + (System.currentTimeMillis() - start)
-                    + " ms.");
+            logger_.debug("DISK Data structure population  TIME: " + (System.currentTimeMillis() - start) + " ms.");
         }
-        files.clear();  	
     }
 
 
@@ -570,12 +542,11 @@
 		if (bufIn.getLength() == 0)
 			return null;
         start = System.currentTimeMillis();
-        ColumnFamily columnFamily = null;
-       	columnFamily = ColumnFamily.serializer().deserialize(bufIn, cf, filter);
+        ColumnFamily columnFamily = ColumnFamily.serializer().deserialize(bufIn, cf, filter);
 		logger_.debug("DISK Deserialize TIME: " + (System.currentTimeMillis() - start) + " ms.");
 		if (columnFamily == null)
-			return columnFamily;
-		return (!columnFamily.isMarkedForDelete()) ? columnFamily : null;
+			return null;
+		return columnFamily;
 	}
 
     private void getColumnFamilyFromCurrentMemtable(String key, String cf, IFilter filter, List<ColumnFamily> columnFamilies)
@@ -584,20 +555,66 @@
         ColumnFamily columnFamily = memtable_.get().get(key, cf, filter);
         if (columnFamily != null)
         {
-            if (!columnFamily.isMarkedForDelete())
-                columnFamilies.add(columnFamily);
+            columnFamilies.add(columnFamily);
         }
     }
     
-    private ColumnFamily resolve(List<ColumnFamily> columnFamilies)
+    /** merge all columnFamilies into a single instance, with only the newest versions of columns preserved. */
+    static ColumnFamily resolve(List<ColumnFamily> columnFamilies)
     {
         int size = columnFamilies.size();
         if (size == 0)
-            return null;        
-        ColumnFamily cf = columnFamilies.get(0);
-        for ( int i = 1; i < size ; ++i )
-        {
-            cf.addColumns(columnFamilies.get(i));
+            return null;
+
+        // start from nothing so that we don't include potential deleted columns from the first instance
+        String cfname = columnFamilies.get(0).name();
+        ColumnFamily cf = new ColumnFamily(cfname);
+
+        // merge
+        for (ColumnFamily cf2 : columnFamilies)
+        {
+            assert cf.name().equals(cf2.name());
+            cf.addColumns(cf2);
+            cf.delete(Math.max(cf.getMarkedForDeleteAt(), cf2.getMarkedForDeleteAt()));
+        }
+        return cf;
+    }
+
+    /** like resolve, but leaves the resolved CF as the only item in the list */
+    private static void merge(List<ColumnFamily> columnFamilies)
+    {
+        ColumnFamily cf = resolve(columnFamilies);
+        columnFamilies.clear();
+        columnFamilies.add(cf);
+    }
+
+    private static ColumnFamily resolveAndRemoveDeleted(List<ColumnFamily> columnFamilies) {
+        ColumnFamily cf = resolve(columnFamilies);
+        return removeDeleted(cf);
+    }
+
+    static ColumnFamily removeDeleted(ColumnFamily cf) {
+        if (cf == null) {
+            return null;
+        }
+        for (String cname : new ArrayList<String>(cf.getColumns().keySet())) {
+            IColumn c = cf.getColumns().get(cname);
+            if (c instanceof SuperColumn) {
+                long min_timestamp = Math.max(c.getMarkedForDeleteAt(), cf.getMarkedForDeleteAt());
+                // don't operate directly on the supercolumn, it could be the one in the memtable
+                cf.remove(cname);
+                IColumn sc = new SuperColumn(cname);
+                for (IColumn subColumn : c.getSubColumns()) {
+                    if (!subColumn.isMarkedForDelete() && subColumn.timestamp() >= min_timestamp) {
+                        sc.addColumn(subColumn.name(), subColumn);
+                    }
+                }
+                if (sc.getSubColumns().size() > 0) {
+                    cf.addColumn(sc);
+                }
+            } else if (c.isMarkedForDelete() || c.timestamp() < cf.getMarkedForDeleteAt()) {
+                cf.remove(cname);
+            }
         }
         return cf;
     }
@@ -610,8 +627,7 @@
      */
     void applyNow(String key, ColumnFamily columnFamily) throws IOException
     {
-        if (!columnFamily.isMarkedForDelete())
-            memtable_.get().putOnRecovery(key, columnFamily);
+         memtable_.get().putOnRecovery(key, columnFamily);
     }
 
     /*
@@ -1125,14 +1141,7 @@
 	                                if(columnFamilies.size() > 1)
 	                                {
 	    		                        // Now merge the 2 column families
-	    			                    columnFamily = resolve(columnFamilies);
-	    			                    columnFamilies.clear();
-	    			                    if( columnFamily != null)
-	    			                    {
-		    			                    // add the merged columnfamily back to the list
-		    			                    columnFamilies.add(columnFamily);
-	    			                    }
-
+                                        merge(columnFamilies);
 	                                }
 			                        // deserialize into column families
 			                        columnFamilies.add(ColumnFamily.serializer().deserialize(filestruct.bufIn_));
@@ -1144,7 +1153,7 @@
 		                    	}
 		                    }
 		                    // Now after merging all crap append to the sstable
-		                    columnFamily = resolve(columnFamilies);
+		                    columnFamily = resolveAndRemoveDeleted(columnFamilies);
 		                    columnFamilies.clear();
 		                    if( columnFamily != null )
 		                    {
@@ -1375,15 +1384,7 @@
 	                                // We want to add only 2 and resolve them right there in order to save on memory footprint
 	                                if(columnFamilies.size() > 1)
 	                                {
-	    		                        // Now merge the 2 column families
-	    			                    columnFamily = resolve(columnFamilies);
-	    			                    columnFamilies.clear();
-	    			                    if( columnFamily != null)
-	    			                    {
-		    			                    // add the merged columnfamily back to the list
-		    			                    columnFamilies.add(columnFamily);
-	    			                    }
-
+	    		                        merge(columnFamilies);
 	                                }
 			                        // deserialize into column families                                    
 			                        columnFamilies.add(ColumnFamily.serializer().deserialize(filestruct.bufIn_));
@@ -1394,7 +1395,7 @@
 		                    	}
 		                    }
 		                    // Now after merging all crap append to the sstable
-		                    columnFamily = resolve(columnFamilies);
+		                    columnFamily = resolveAndRemoveDeleted(columnFamilies);
 		                    columnFamilies.clear();
 		                    if( columnFamily != null )
 		                    {

Modified: incubator/cassandra/trunk/src/org/apache/cassandra/db/HintedHandOffManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/HintedHandOffManager.java?rev=759223&r1=759222&r2=759223&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/HintedHandOffManager.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/HintedHandOffManager.java Fri Mar 27 16:39:17 2009
@@ -24,6 +24,8 @@
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
+import org.apache.log4j.Logger;
+
 import org.apache.cassandra.concurrent.DebuggableScheduledThreadPoolExecutor;
 import org.apache.cassandra.concurrent.ThreadFactoryImpl;
 import org.apache.cassandra.config.DatabaseDescriptor;
@@ -32,11 +34,7 @@
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.service.IComponentShutdown;
-import org.apache.cassandra.service.IResponseResolver;
-import org.apache.cassandra.service.QuorumResponseHandler;
 import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.service.WriteResponseResolver;
-import org.apache.log4j.Logger;
 
 
 /**
@@ -110,14 +108,14 @@
         private void deleteEndPoint(String endpointAddress, String key) throws Exception
         {
         	RowMutation rm = new RowMutation(DatabaseDescriptor.getTables().get(0), key_);
-        	rm.delete(Table.hints_ + ":" + key + ":" + endpointAddress);
+        	rm.delete(Table.hints_ + ":" + key + ":" + endpointAddress, System.currentTimeMillis());
         	rm.apply();
         }
 
         private void deleteKey(String key) throws Exception
         {
         	RowMutation rm = new RowMutation(DatabaseDescriptor.getTables().get(0), key_);
-        	rm.delete(Table.hints_ + ":" + key);
+        	rm.delete(Table.hints_ + ":" + key, System.currentTimeMillis());
         	rm.apply();
         }
 

Modified: incubator/cassandra/trunk/src/org/apache/cassandra/db/Memtable.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/Memtable.java?rev=759223&r1=759222&r2=759223&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/Memtable.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/Memtable.java Fri Mar 27 16:39:17 2009
@@ -307,6 +307,7 @@
             int newObjectCount = oldCf.getColumnCount();
             resolveSize(oldSize, newSize);
             resolveCount(oldObjectCount, newObjectCount);
+            oldCf.delete(Math.max(oldCf.getMarkedForDeleteAt(), columnFamily.getMarkedForDeleteAt()));
         }
         else
         {

Modified: incubator/cassandra/trunk/src/org/apache/cassandra/db/RowMutation.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/RowMutation.java?rev=759223&r1=759222&r2=759223&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/RowMutation.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/RowMutation.java Fri Mar 27 16:39:17 2009
@@ -23,10 +23,15 @@
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.io.Serializable;
+import java.io.ByteArrayOutputStream;
 
 import org.apache.commons.lang.ArrayUtils;
 
 import org.apache.cassandra.io.ICompactSerializer;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.EndPoint;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.FBUtilities;
 
 
 /**
@@ -34,9 +39,10 @@
  */
 
 public class RowMutation implements Serializable
-{      
-	private static ICompactSerializer<RowMutation> serializer_;	
-	
+{
+	private static ICompactSerializer<RowMutation> serializer_;
+    public static final String HINT = "HINT";
+
     static
     {
         serializer_ = new RowMutationSerializer();
@@ -46,23 +52,23 @@
     {
         return serializer_;
     }
-        
+
     private String table_;
-    private String key_;       
-    protected Map<String, ColumnFamily> modifications_ = new HashMap<String, ColumnFamily>();       
+    private String key_;
+    protected Map<String, ColumnFamily> modifications_ = new HashMap<String, ColumnFamily>();
     protected Map<String, ColumnFamily> deletions_ = new HashMap<String, ColumnFamily>();
-    
+
     /* Ctor for JAXB */
     private RowMutation()
     {
     }
-    
+
     public RowMutation(String table, String key)
     {
         table_ = table;
         key_ = key;
     }
-    
+
     public RowMutation(String table, Row row)
     {
         table_ = table;
@@ -82,46 +88,46 @@
     	modifications_ = modifications;
     	deletions_ = deletions;
     }
-    
+
     public static String[] getColumnAndColumnFamily(String cf)
     {
         return cf.split(":");
     }
-    
+
     String table()
     {
         return table_;
     }
-    
+
     public String key()
     {
         return key_;
     }
-    
+
     void addHints(String hint) throws IOException, ColumnFamilyNotDefinedException
-    {        
+    {
         String cfName = Table.hints_ + ":" + hint;
         add(cfName, new byte[0]);
     }
-    
+
     /*
      * Specify a column family name and the corresponding column
-     * family object. 
+     * family object.
      * param @ cf - column family name
      * param @ columnFamily - the column family.
     */
     public void add(String cf, ColumnFamily columnFamily)
-    {       
+    {
         modifications_.put(cf, columnFamily);
     }
-    
+
     /*
      * Specify a column name and a corresponding value for
      * the column. Column name is specified as <column family>:column.
      * This will result in a ColumnFamily associated with
      * <column family> as name and a Column with <column>
      * as name.
-     * 
+     *
      * param @ cf - column name as <column family>:<column>
      * param @ value - value associated with the column
     */
@@ -129,26 +135,26 @@
     {
         add(cf, value, 0);
     }
-    
+
     /*
      * Specify a column name and a corresponding value for
      * the column. Column name is specified as <column family>:column.
      * This will result in a ColumnFamily associated with
      * <column family> as name and a Column with <column>
-     * as name. The columan can be further broken up 
+     * as name. The columan can be further broken up
      * as super column name : columnname  in case of super columns
-     * 
+     *
      * param @ cf - column name as <column family>:<column>
      * param @ value - value associated with the column
      * param @ timestamp - ts associated with this data.
     */
     public void add(String cf, byte[] value, long timestamp)
-    {        
+    {
         String[] values = RowMutation.getColumnAndColumnFamily(cf);
-       
+
         if ( values.length == 0 || values.length == 1 || values.length > 3 )
             throw new IllegalArgumentException("Column Family " + cf + " in invalid format. Must be in <column family>:<column> format.");
-        
+
         ColumnFamily columnFamily = modifications_.get(values[0]);
         if( values.length == 2 )
         {
@@ -169,72 +175,95 @@
         modifications_.put(values[0], columnFamily);
     }
 
-    /*
-     * Specify a column name to be deleted. Column name is
-     * specified as <column family>:column. This will result
-     * in a ColumnFamily associated with <column family> as
-     * name and perhaps Column with <column> as name being
-     * marked as deleted.
-     * TODO : Delete is NOT correct as we do not know
-     * the CF type so we need to fix that.
-     * param @ cf - column name as <column family>:<column>
-    */
-    public void delete(String columnFamilyColumn)
+    public void delete(String columnFamilyColumn, long timestamp)
     {
-        throw new UnsupportedOperationException();
+        String[] values = RowMutation.getColumnAndColumnFamily(columnFamilyColumn);
+        String cfName = values[0];
+        if (modifications_.containsKey(cfName))
+        {
+            throw new IllegalArgumentException("ColumnFamily " + cfName + " is already being modified");
+        }
+
+        if (values.length == 0 || values.length > 3)
+            throw new IllegalArgumentException("Column Family " + columnFamilyColumn + " in invalid format. Must be in <column family>:<column> format.");
+
+        ColumnFamily columnFamily = modifications_.get(cfName);
+        if (columnFamily == null)
+            columnFamily = new ColumnFamily(cfName);
+        if (values.length == 2)
+        {
+            columnFamily.addColumn(values[1], ArrayUtils.EMPTY_BYTE_ARRAY, timestamp, true);
+        }
+        else if (values.length == 3)
+        {
+            columnFamily.addColumn(values[1] + ":" + values[2], ArrayUtils.EMPTY_BYTE_ARRAY, timestamp, true);
+        }
+        else
+        {
+            assert values.length == 1;
+            columnFamily.delete(timestamp);
+        }
+        modifications_.put(cfName, columnFamily);
     }
 
+
     /*
      * This is equivalent to calling commit. Applies the changes to
      * to the table that is obtained by calling Table.open().
     */
     public void apply() throws IOException, ColumnFamilyNotDefinedException
-    {  
+    {
         Row row = new Row(key_);
-        Table table = Table.open(table_);
-        Set<String> cfNames = modifications_.keySet();
-        for (String cfName : cfNames )
-        {        
-            if ( !table.isValidColumnFamily(cfName) )
-                throw new ColumnFamilyNotDefinedException("Column Family " + cfName + " has not been defined.");
-            row.addColumnFamily( modifications_.get(cfName) );            
-        }
-        table.apply(row);
+        apply(row);
     }
-    
-    /* 
-     * This is equivalent to calling commit. Applies the changes to
-     * to the table that is obtained by calling Table.open().
+
+    /*
+     * Allows RowMutationVerbHandler to optimize by re-using a single Row object.
     */
-    void apply(Row row) throws IOException, ColumnFamilyNotDefinedException
-    {                
-        Table table = Table.open(table_);        
-        Set<String> cfNames = modifications_.keySet();
-        for (String cfName : cfNames )
-        {        
-            if ( !table.isValidColumnFamily(cfName) )
+    void apply(Row emptyRow) throws IOException, ColumnFamilyNotDefinedException
+    {
+        assert emptyRow.getColumnFamilyMap().size() == 0;
+        Table table = Table.open(table_);
+        for (String cfName : modifications_.keySet())
+        {
+            if (!table.isValidColumnFamily(cfName))
                 throw new ColumnFamilyNotDefinedException("Column Family " + cfName + " has not been defined.");
-            row.addColumnFamily( modifications_.get(cfName) );            
+            emptyRow.addColumnFamily(modifications_.get(cfName));
         }
-        table.apply(row);
+        table.apply(emptyRow);
     }
-    
-    /* 
+
+    /*
      * This is equivalent to calling commit. Applies the changes to
      * to the table that is obtained by calling Table.open().
     */
     void load(Row row) throws IOException, ColumnFamilyNotDefinedException
-    {                
-        Table table = Table.open(table_);        
+    {
+        Table table = Table.open(table_);
         Set<String> cfNames = modifications_.keySet();
         for (String cfName : cfNames )
-        {        
+        {
             if ( !table.isValidColumnFamily(cfName) )
                 throw new ColumnFamilyNotDefinedException("Column Family " + cfName + " has not been defined.");
-            row.addColumnFamily( modifications_.get(cfName) );            
+            row.addColumnFamily( modifications_.get(cfName) );
         }
         table.load(row);
-    }  
+    }
+
+    public Message makeRowMutationMessage() throws IOException
+    {
+        return makeRowMutationMessage(StorageService.mutationVerbHandler_);
+    }
+
+    public Message makeRowMutationMessage(String verbHandlerName) throws IOException
+    {
+        ByteArrayOutputStream bos = new ByteArrayOutputStream();
+        DataOutputStream dos = new DataOutputStream(bos);
+        serializer().serialize(this, dos);
+        EndPoint local = StorageService.getLocalStorageEndPoint();
+        EndPoint from = (local != null) ? local : new EndPoint(FBUtilities.getHostName(), 7000);
+        return new Message(from, StorageService.mutationStage_, verbHandlerName, bos.toByteArray());
+    }
 }
 
 class RowMutationSerializer implements ICompactSerializer<RowMutation>
@@ -244,56 +273,56 @@
 		int size = map.size();
         dos.writeInt(size);
         if ( size > 0 )
-        {   
+        {
             Set<String> keys = map.keySet();
             for( String key : keys )
-            {                
+            {
             	dos.writeUTF(key);
                 ColumnFamily cf = map.get(key);
                 if ( cf != null )
                 {
                     ColumnFamily.serializer().serialize(cf, dos);
-                }                
+                }
             }
         }
 	}
-	
+
 	public void serialize(RowMutation rm, DataOutputStream dos) throws IOException
 	{
 		dos.writeUTF(rm.table());
 		dos.writeUTF(rm.key());
-		
+
 		/* serialize the modifications_ in the mutation */
         freezeTheMaps(rm.modifications_, dos);
-        
+
         /* serialize the deletions_ in the mutation */
         freezeTheMaps(rm.deletions_, dos);
 	}
-	
+
 	private Map<String, ColumnFamily> defreezeTheMaps(DataInputStream dis) throws IOException
 	{
 		Map<String, ColumnFamily> map = new HashMap<String, ColumnFamily>();
         int size = dis.readInt();
         for ( int i = 0; i < size; ++i )
         {
-            String key = dis.readUTF();  
+            String key = dis.readUTF();
             ColumnFamily cf = ColumnFamily.serializer().deserialize(dis);
-            map.put(key, cf);            
+            map.put(key, cf);
         }
         return map;
 	}
-	
+
     public RowMutation deserialize(DataInputStream dis) throws IOException
     {
     	String table = dis.readUTF();
     	String key = dis.readUTF();
-    	
+
     	/* Defreeze the modifications_ map */
     	Map<String, ColumnFamily> modifications = defreezeTheMaps(dis);
-    	
+
     	/* Defreeze the deletions_ map */
     	Map<String, ColumnFamily> deletions = defreezeTheMaps(dis);
-    	
+
     	return new RowMutation(table, key, modifications, deletions);
     }
 }

Modified: incubator/cassandra/trunk/src/org/apache/cassandra/db/RowMutationVerbHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/RowMutationVerbHandler.java?rev=759223&r1=759222&r2=759223&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/RowMutationVerbHandler.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/RowMutationVerbHandler.java Fri Mar 27 16:39:17 2009
@@ -53,7 +53,7 @@
 
     public void doVerb(Message message)
     {
-        byte[] bytes = (byte[])message.getMessageBody()[0];
+        byte[] bytes = (byte[]) message.getMessageBody()[0];
         /* Obtain a Row Mutation Context from TLS */
         RowMutationContext rowMutationCtx = tls_.get();
         if ( rowMutationCtx == null )
@@ -70,7 +70,7 @@
             logger_.debug("Applying " + rm);
 
             /* Check if there were any hints in this message */
-            byte[] hintedBytes = message.getHeader(RowMutationMessage.hint_);
+            byte[] hintedBytes = message.getHeader(RowMutation.HINT);
             if ( hintedBytes != null && hintedBytes.length > 0 )
             {
             	EndPoint hint = EndPoint.fromBytes(hintedBytes);
@@ -83,6 +83,7 @@
 
             long start = System.currentTimeMillis();
 
+            rowMutationCtx.row_.clear();
             rowMutationCtx.row_.key(rm.key());
             rm.apply(rowMutationCtx.row_);
 

Modified: incubator/cassandra/trunk/src/org/apache/cassandra/db/Table.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/Table.java?rev=759223&r1=759222&r2=759223&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/Table.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/Table.java Fri Mar 27 16:39:17 2009
@@ -808,22 +808,18 @@
      * Once this happens the data associated with the individual column families
      * is also written to the column family store's memtable.
     */
-    public void apply(Row row) throws IOException
-    {        
-        String key = row.key();
+    void apply(Row row) throws IOException
+    {
         /* Add row to the commit log. */
         long start = System.currentTimeMillis();
-               
         CommitLog.CommitLogContext cLogCtx = CommitLog.open(table_).add(row);
-        Map<String, ColumnFamily> columnFamilies = row.getColumnFamilyMap();
-        Set<String> cNames = columnFamilies.keySet();
-        for ( String cName : cNames )
+
+        for (ColumnFamily columnFamily : row.getColumnFamilies())
         {
-        	ColumnFamily columnFamily = columnFamilies.get(cName);
             ColumnFamilyStore cfStore = columnFamilyStores_.get(columnFamily.name());
-            cfStore.apply( key, columnFamily, cLogCtx);            
+            cfStore.apply(row.key(), columnFamily, cLogCtx);
         }
-        row.clear();
+
         long timeTaken = System.currentTimeMillis() - start;
         dbAnalyticsSource_.updateWriteStatistics(timeTaken);
     }

Modified: incubator/cassandra/trunk/src/org/apache/cassandra/service/CassandraServer.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/service/CassandraServer.java?rev=759223&r1=759222&r2=759223&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/service/CassandraServer.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/service/CassandraServer.java Fri Mar 27 16:39:17 2009
@@ -576,6 +576,19 @@
 		throw new UnsupportedOperationException();
 	}
 
+    public boolean remove(String tablename, String key, String columnFamily_column, long timestamp, int block_for)
+	{
+        logger_.debug("remove");
+        RowMutation rm = new RowMutation(tablename, key.trim());
+        rm.delete(columnFamily_column, timestamp);
+        if (block_for > 0) {
+            return StorageProxy.insertBlocking(rm);
+        } else {
+            StorageProxy.insert(rm);
+            return true;
+        }
+	}
+
     public List<superColumn_t> get_slice_super_by_names(String tablename, String key, String columnFamily, List<String> superColumnNames) throws CassandraException, TException
     {
 		ArrayList<superColumn_t> retlist = new ArrayList<superColumn_t>();

Modified: incubator/cassandra/trunk/src/org/apache/cassandra/service/StorageProxy.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/service/StorageProxy.java?rev=759223&r1=759222&r2=759223&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/service/StorageProxy.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/service/StorageProxy.java Fri Mar 27 16:39:17 2009
@@ -26,6 +26,9 @@
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+
+import org.apache.commons.lang.StringUtils;
+
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.ReadMessage;
 import org.apache.cassandra.db.ReadResponseMessage;
@@ -53,28 +56,28 @@
      * sent over the wire to N replicas where some of the replicas
      * may be hints.
      */
-    private static Map<EndPoint, Message> createWriteMessages(RowMutationMessage rmMessage, Map<EndPoint, EndPoint> endpointMap) throws IOException
+    private static Map<EndPoint, Message> createWriteMessages(RowMutation rm, Map<EndPoint, EndPoint> endpointMap) throws IOException
     {
-        Map<EndPoint, Message> messageMap = new HashMap<EndPoint, Message>();
-        Message message = RowMutationMessage.makeRowMutationMessage(rmMessage);
-        
-        Set<EndPoint> targets = endpointMap.keySet();
-        for( EndPoint target : targets )
-        {
-            EndPoint hint = endpointMap.get(target);
+		Map<EndPoint, Message> messageMap = new HashMap<EndPoint, Message>();
+		Message message = rm.makeRowMutationMessage();
+
+		for (Map.Entry<EndPoint, EndPoint> entry : endpointMap.entrySet())
+		{
+            EndPoint target = entry.getKey();
+            EndPoint hint = entry.getValue();
             if ( !target.equals(hint) )
-            {
-                Message hintedMessage = RowMutationMessage.makeRowMutationMessage(rmMessage);
-                hintedMessage.addHeader(RowMutationMessage.hint_, EndPoint.toBytes(hint) );
-                logger_.debug("Sending the hint of " + target.getHost() + " to " + hint.getHost());
-                messageMap.put(target, hintedMessage);
-            }
-            else
-            {
-                messageMap.put(target, message);
-            }
-        }
-        return messageMap;
+			{
+				Message hintedMessage = rm.makeRowMutationMessage();
+				hintedMessage.addHeader(RowMutation.HINT, EndPoint.toBytes(hint) );
+				logger_.debug("Sending the hint of " + target.getHost() + " to " + hint.getHost());
+				messageMap.put(target, hintedMessage);
+			}
+			else
+			{
+				messageMap.put(target, message);
+			}
+		}
+		return messageMap;
     }
     
     /**
@@ -82,38 +85,65 @@
      * across all replicas. This method will take care
      * of the possibility of a replica being down and hint
      * the data across to some other replica. 
-     * @param RowMutation the mutation to be applied 
-     *                    across the replicas
+     * @param rm the mutation to be applied across the replicas
     */
     public static void insert(RowMutation rm)
-    {
+	{
         /*
          * Get the N nodes from storage service where the data needs to be
          * replicated
          * Construct a message for write
          * Send them asynchronously to the replicas.
         */
+        assert rm.key() != null;
+
+		try
+		{
+			Map<EndPoint, EndPoint> endpointMap = StorageService.instance().getNStorageEndPointMap(rm.key());
+			// TODO: throw a thrift exception if we do not have N nodes
+			Map<EndPoint, Message> messageMap = createWriteMessages(rm, endpointMap);
+            logger_.debug("insert writing to [" + StringUtils.join(messageMap.keySet(), ", ") + "]");
+			for (Map.Entry<EndPoint, Message> entry : messageMap.entrySet())
+			{
+				MessagingService.getMessagingInstance().sendOneWay(entry.getValue(), entry.getKey());
+			}
+		}
+        catch (Exception e)
+        {
+            logger_.error( LogUtil.throwableToString(e) );
+        }
+        return;
+    }
+
+    public static boolean insertBlocking(RowMutation rm)
+    {
+        assert rm.key() != null;
+
         try
         {
-            logger_.debug(" insert");
-            Map<EndPoint, EndPoint> endpointMap = StorageService.instance().getNStorageEndPointMap(rm.key());
+            Message message = rm.makeRowMutationMessage();
+
+            IResponseResolver<Boolean> writeResponseResolver = new WriteResponseResolver();
+            QuorumResponseHandler<Boolean> quorumResponseHandler = new QuorumResponseHandler<Boolean>(
+                    DatabaseDescriptor.getReplicationFactor(),
+                    writeResponseResolver);
+            EndPoint[] endpoints = StorageService.instance().getNStorageEndPoint(rm.key());
+            logger_.debug("insertBlocking writing to [" + StringUtils.join(endpoints, ", ") + "]");
             // TODO: throw a thrift exception if we do not have N nodes
-            RowMutationMessage rmMsg = new RowMutationMessage(rm); 
-            /* Create the write messages to be sent */
-            Map<EndPoint, Message> messageMap = createWriteMessages(rmMsg, endpointMap);
-            Set<EndPoint> endpoints = messageMap.keySet();
-            for(EndPoint endpoint : endpoints)
-            {
-                MessagingService.getMessagingInstance().sendOneWay(messageMap.get(endpoint), endpoint);
-            }
+
+            MessagingService.getMessagingInstance().sendRR(message, endpoints, quorumResponseHandler);
+            return quorumResponseHandler.get();
+
+            // TODO: if the result is false that means the writes to all the
+            // servers failed hence we need to throw an exception or return an
+            // error back to the client so that it can take appropriate action.
         }
         catch (Exception e)
         {
-            logger_.info( LogUtil.throwableToString(e) );
+            logger_.error( LogUtil.throwableToString(e) );
+            return false;
         }
-        return;
     }
-
     
     private static Map<String, Message> constructMessages(Map<String, ReadMessage> readMessages) throws IOException
     {