You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2009/03/27 06:39:42 UTC

svn commit: r759026 [2/3] - /incubator/cassandra/trunk/src/org/apache/cassandra/db/

Modified: incubator/cassandra/trunk/src/org/apache/cassandra/db/CountFilter.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/CountFilter.java?rev=759026&r1=759025&r2=759026&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/CountFilter.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/CountFilter.java Fri Mar 27 05:39:40 2009
@@ -21,6 +21,7 @@
 import java.io.IOException;
 import java.util.Collection;
 
+import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.io.DataInputBuffer;
 import org.apache.cassandra.io.SSTable;
 
@@ -36,19 +37,19 @@
 {
 	private long countLimit_;
 	private boolean isDone_;
-
+	
 	CountFilter(int countLimit)
 	{
-		countLimit_ = countLimit;
+		countLimit_ = countLimit;		
 		isDone_ = false;
 	}
-
+	
 	public ColumnFamily filter(String cfNameParam, ColumnFamily columnFamily)
 	{
     	String[] values = RowMutation.getColumnAndColumnFamily(cfNameParam);
         if ( columnFamily == null )
             return columnFamily;
-
+        
 		String cfName = columnFamily.name();
 		ColumnFamily filteredCf = new ColumnFamily(cfName);
 		if( countLimit_ <= 0 )
@@ -61,7 +62,7 @@
     		Collection<IColumn> columns = columnFamily.getAllColumns();
     		for(IColumn column : columns)
     		{
-    			filteredCf.addColumn(column);
+    			filteredCf.addColumn(column.name(), column);
     			countLimit_--;
     			if( countLimit_ <= 0 )
     			{
@@ -70,14 +71,14 @@
     			}
     		}
 		}
-		else if(values.length == 2 && columnFamily.isSuper())
+		else if(values.length == 2 && DatabaseDescriptor.getColumnType(cfName).equals("Super"))
 		{
     		Collection<IColumn> columns = columnFamily.getAllColumns();
     		for(IColumn column : columns)
     		{
     			SuperColumn superColumn = (SuperColumn)column;
     			SuperColumn filteredSuperColumn = new SuperColumn(superColumn.name());
-				filteredCf.addColumn(filteredSuperColumn);
+				filteredCf.addColumn(filteredSuperColumn.name(), filteredSuperColumn);
         		Collection<IColumn> subColumns = superColumn.getSubColumns();
         		for(IColumn subColumn : subColumns)
         		{
@@ -90,14 +91,14 @@
 	    			}
         		}
     		}
-		}
-    	else
+		}    	
+    	else 
     	{
     		throw new UnsupportedOperationException();
     	}
 		return filteredCf;
 	}
-
+    
     public IColumn filter(IColumn column, DataInputStream dis) throws IOException
     {
 		countLimit_--;
@@ -107,7 +108,7 @@
 		}
 		return column;
     }
-
+	
 	public boolean isDone()
 	{
 		return isDone_;

Modified: incubator/cassandra/trunk/src/org/apache/cassandra/db/DBManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/DBManager.java?rev=759026&r1=759025&r2=759026&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/DBManager.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/DBManager.java Fri Mar 27 05:39:40 2009
@@ -18,18 +18,27 @@
 
 package org.apache.cassandra.db;
 
+import java.io.File;
 import java.io.IOException;
+import java.math.BigInteger;
+import java.net.InetAddress;
 import java.util.Map;
+import java.util.Random;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
+import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.dht.IPartitioner;
-import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.io.DataOutputBuffer;
+import org.apache.cassandra.io.IFileWriter;
+import org.apache.cassandra.io.SequenceFile;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.BasicUtilities;
 import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.GuidGenerator;
+import org.apache.cassandra.utils.HashingSchemes;
 
 
 /**
@@ -61,23 +70,23 @@
 
     public static class StorageMetadata
     {
-        private Token myToken;
+        private BigInteger storageId_;
         private int generation_;
 
-        StorageMetadata(Token storageId, int generation)
+        StorageMetadata(BigInteger storageId, int generation)
         {
-            myToken = storageId;
+            storageId_ = storageId;
             generation_ = generation;
         }
 
-        public Token getStorageId()
+        public BigInteger getStorageId()
         {
-            return myToken;
+            return storageId_;
         }
 
-        public void setStorageId(Token storageId)
+        public void setStorageId(BigInteger storageId)
         {
-            myToken = storageId;
+            storageId_ = storageId;
         }
 
         public int getGeneration()
@@ -88,7 +97,10 @@
 
     public DBManager() throws Throwable
     {
-        Set<String> tables = DatabaseDescriptor.getTableToColumnFamilyMap().keySet();
+        /* Read the configuration file */
+        Map<String, Map<String, CFMetaData>> tableToColumnFamilyMap = DatabaseDescriptor.init();
+        storeMetadata(tableToColumnFamilyMap);
+        Set<String> tables = tableToColumnFamilyMap.keySet();
         
         for (String table : tables)
         {
@@ -101,6 +113,47 @@
     }
 
     /*
+     * Create the metadata tables. This table has information about
+     * the table name and the column families that make up the table.
+     * Each column family also has an associated ID which is an int.
+    */
+    private static void storeMetadata(Map<String, Map<String, CFMetaData>> tableToColumnFamilyMap) throws Throwable
+    {
+        AtomicInteger idGenerator = new AtomicInteger(0);
+        Set<String> tables = tableToColumnFamilyMap.keySet();
+
+        for ( String table : tables )
+        {
+            Table.TableMetadata tmetadata = Table.TableMetadata.instance();
+            if ( tmetadata.isEmpty() )
+            {
+                tmetadata = Table.TableMetadata.instance();
+                /* Column families associated with this table */
+                Map<String, CFMetaData> columnFamilies = tableToColumnFamilyMap.get(table);
+
+                for (String columnFamily : columnFamilies.keySet())
+                {
+                    tmetadata.add(columnFamily, idGenerator.getAndIncrement(), DatabaseDescriptor.getColumnType(columnFamily));
+                }
+
+                /*
+                 * Here we add all the system related column families. 
+                */
+                /* Add the TableMetadata column family to this map. */
+                tmetadata.add(Table.TableMetadata.cfName_, idGenerator.getAndIncrement());
+                /* Add the LocationInfo column family to this map. */
+                tmetadata.add(SystemTable.cfName_, idGenerator.getAndIncrement());
+                /* Add the recycle column family to this map. */
+                tmetadata.add(Table.recycleBin_, idGenerator.getAndIncrement());
+                /* Add the Hints column family to this map. */
+                tmetadata.add(Table.hints_, idGenerator.getAndIncrement(), ColumnFamily.getColumnType("Super"));
+                tmetadata.apply();
+                idGenerator.set(0);
+            }
+        }
+    }
+
+    /*
      * This method reads the system table and retrieves the metadata
      * associated with this storage instance. Currently we store the
      * metadata in a Column Family called LocatioInfo which has two
@@ -114,17 +167,22 @@
         SystemTable sysTable = SystemTable.openSystemTable(SystemTable.name_);
         Row row = sysTable.get(FBUtilities.getHostName());
 
-        IPartitioner p = StorageService.getPartitioner();
+        Random random = new Random();
         if ( row == null )
         {
-            Token token = p.getDefaultToken();
+        	/* Generate a token for this Storage node */                       
+            String guid = GuidGenerator.guid();
+            BigInteger token = StorageService.hash(guid);
+            if ( token.signum() == -1 )
+                token = token.multiply(BigInteger.valueOf(-1L));
+
             int generation = 1;
 
             String key = FBUtilities.getHostName();
             row = new Row(key);
             ColumnFamily cf = new ColumnFamily(SystemTable.cfName_);
-            cf.addColumn(new Column(SystemTable.token_, p.getTokenFactory().toByteArray(token)));
-            cf.addColumn(new Column(SystemTable.generation_, BasicUtilities.intToByteArray(generation)) );
+            cf.addColumn(SystemTable.token_, new Column(SystemTable.token_, token.toByteArray()) );
+            cf.addColumn(SystemTable.generation_, new Column(SystemTable.generation_, BasicUtilities.intToByteArray(generation)) );
             row.addColumnFamily(cf);
             sysTable.apply(row);
             storageMetadata = new StorageMetadata( token, generation);
@@ -132,22 +190,22 @@
         else
         {
             /* we crashed and came back up need to bump generation # */
-        	Map<String, ColumnFamily> columnFamilies = row.getColumnFamilyMap();
+        	Map<String, ColumnFamily> columnFamilies = row.getColumnFamilies();
         	Set<String> cfNames = columnFamilies.keySet();
 
             for ( String cfName : cfNames )
             {
             	ColumnFamily columnFamily = columnFamilies.get(cfName);
 
-                IColumn tokenColumn = columnFamily.getColumn(SystemTable.token_);
-                Token token = p.getTokenFactory().fromByteArray(tokenColumn.value());
+                IColumn token = columnFamily.getColumn(SystemTable.token_);
+                BigInteger bi = new BigInteger( token.value() );
 
                 IColumn generation = columnFamily.getColumn(SystemTable.generation_);
                 int gen = BasicUtilities.byteArrayToInt(generation.value()) + 1;
 
                 Column generation2 = new Column("Generation", BasicUtilities.intToByteArray(gen), generation.timestamp() + 1);
-                columnFamily.addColumn(generation2);
-                storageMetadata = new StorageMetadata(token, gen);
+                columnFamily.addColumn("Generation", generation2);
+                storageMetadata = new StorageMetadata( bi, gen );
                 break;
             }
             sysTable.reset(row);

Modified: incubator/cassandra/trunk/src/org/apache/cassandra/db/FileStruct.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/FileStruct.java?rev=759026&r1=759025&r2=759026&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/FileStruct.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/FileStruct.java Fri Mar 27 05:39:40 2009
@@ -19,189 +19,126 @@
 package org.apache.cassandra.db;
 
 import java.io.IOException;
-import java.util.Iterator;
+import java.math.BigInteger;
 
-import org.apache.cassandra.io.Coordinate;
+import org.apache.cassandra.continuations.Suspendable;
 import org.apache.cassandra.io.DataInputBuffer;
 import org.apache.cassandra.io.DataOutputBuffer;
 import org.apache.cassandra.io.IFileReader;
 import org.apache.cassandra.io.SSTable;
+import org.apache.cassandra.io.SequenceFile;
+import org.apache.cassandra.service.PartitionerType;
+import org.apache.cassandra.service.StorageService;
 
 
-public class FileStruct implements Comparable<FileStruct>, Iterable<String>
+public class FileStruct implements Comparable<FileStruct>
 {
+    IFileReader reader_;
+    String key_;        
+    DataInputBuffer bufIn_;
+    DataOutputBuffer bufOut_;
     
-    private String key = null;
-    private boolean exhausted = false;
-    private IFileReader reader;
-    private DataInputBuffer bufIn;
-    private DataOutputBuffer bufOut;
-
-    public FileStruct(IFileReader reader)
-    {
-        this.reader = reader;
-        bufIn = new DataInputBuffer();
-        bufOut = new DataOutputBuffer();
-    }
-
-    public String getFileName()
-    {
-        return reader.getFileName();
-    }
-
-    public void close() throws IOException
-    {
-        reader.close();
-    }
-
-    public boolean isExhausted()
+    public FileStruct()
     {
-        return exhausted;
     }
-
-    public DataInputBuffer getBufIn()
+    
+    public FileStruct(String file, int bufSize) throws IOException
     {
-        return bufIn;
+        bufIn_ = new DataInputBuffer();
+        bufOut_ = new DataOutputBuffer();
+        reader_ = SequenceFile.bufferedReader(file, bufSize);
+        long bytesRead = advance();
+        if ( bytesRead == -1L )
+            throw new IOException("Either the file is empty or EOF has been reached.");          
     }
-
+    
     public String getKey()
     {
-        return key;
-    }
-
-    public int compareTo(FileStruct f)
-    {
-        return key.compareTo(f.key);
-    }
-
-    // we don't use SequenceReader.seekTo, since that (sometimes) throws an exception
-    // if the key is not found.  unsure if this behavior is desired.
-    public void seekTo(String seekKey)
-    {
-        try
+        String key = key_;
+        if ( !key.equals(SSTable.blockIndexKey_) )
         {
-            Coordinate range = SSTable.getCoordinates(seekKey, reader);
-            reader.seek(range.end_);
-            long position = reader.getPositionFromBlockIndex(seekKey);
-            if (position == -1)
+            PartitionerType pType = StorageService.getPartitionerType();          
+            switch ( pType )
             {
-                reader.seek(range.start_);
-            }
-            else
-            {
-                reader.seek(position);
-            }
-
-            while (!exhausted)
-            {
-                getNextKey();
-                if (key.compareTo(seekKey) >= 0)
-                {
+                case OPHF:                
+                    break;
+                 
+                default:
+                    String[] peices = key.split(":");                    
+                    key = peices[1];                
                     break;
-                }
-            }
-        }
-        catch (IOException e)
-        {
-            throw new RuntimeException("corrupt sstable", e);
-        }
-    }
-
-    /*
-     * Read the next key from the data file, skipping block indexes.
-     * Caller must check isExhausted after each call to see if further
-     * reads are valid.
-     */
-    public void getNextKey()
-    {
-        if (exhausted)
-        {
-            throw new IndexOutOfBoundsException();
-        }
-
-        try
-        {
-            bufOut.reset();
-            if (reader.isEOF())
-            {
-                reader.close();
-                exhausted = true;
-                return;
-            }
-
-            long bytesread = reader.next(bufOut);
-            if (bytesread == -1)
-            {
-                reader.close();
-                exhausted = true;
-                return;
-            }
-
-            bufIn.reset(bufOut.getData(), bufOut.getLength());
-            key = bufIn.readUTF();
-            /* If the key we read is the Block Index Key then omit and read the next key. */
-            if (key.equals(SSTable.blockIndexKey_))
-            {
-                bufOut.reset();
-                bytesread = reader.next(bufOut);
-                if (bytesread == -1)
-                {
-                    reader.close();
-                    exhausted = true;
-                    return;
-                }
-                bufIn.reset(bufOut.getData(), bufOut.getLength());
-                key = bufIn.readUTF();
             }
         }
-        catch (IOException e)
-        {
-            throw new RuntimeException(e);
-        }
+        return key;
     }
-
-    public Iterator<String> iterator()
+    
+    public DataOutputBuffer getBuffer()
     {
-        return new FileStructIterator();
+        return bufOut_;
     }
-
-    private class FileStructIterator implements Iterator<String>
-    {
-        String saved;
-
-        public FileStructIterator()
-        {
-            if (getKey() == null && !isExhausted())
-            {
-                forward();
-            }
-        }
-
-        private void forward()
+    
+    public long advance() throws IOException
+    {        
+        long bytesRead = -1L;
+        bufOut_.reset();
+        /* advance and read the next key in the file. */           
+        if (reader_.isEOF())
         {
-            getNextKey();
-            saved = isExhausted() ? null : getKey();
+            reader_.close();
+            return bytesRead;
         }
-
-        public boolean hasNext()
+            
+        bytesRead = reader_.next(bufOut_);        
+        if (bytesRead == -1)
         {
-            return saved != null;
+            reader_.close();
+            return bytesRead;
         }
 
-        public String next()
+        bufIn_.reset(bufOut_.getData(), bufOut_.getLength());
+        key_ = bufIn_.readUTF();
+        /* If the key we read is the Block Index Key then omit and read the next key. */
+        if ( key_.equals(SSTable.blockIndexKey_) )
         {
-            if (saved == null)
+            bufOut_.reset();
+            bytesRead = reader_.next(bufOut_);
+            if (bytesRead == -1)
             {
-                throw new IndexOutOfBoundsException();
+                reader_.close();
+                return bytesRead;
             }
-            String key = saved;
-            forward();
-            return key;
+            bufIn_.reset(bufOut_.getData(), bufOut_.getLength());
+            key_ = bufIn_.readUTF();
         }
+        
+        return bytesRead;
+    }
 
-        public void remove()
-        {
-            throw new UnsupportedOperationException();
+    public int compareTo(FileStruct f)
+    {
+    	int value = 0;
+        PartitionerType pType = StorageService.getPartitionerType();
+        switch( pType )
+        {
+            case OPHF:
+                value = key_.compareTo(f.key_);                    
+                break;
+                
+            default:
+            	String lhs = key_.split(":")[0];            
+                BigInteger b = new BigInteger(lhs);
+                String rhs = f.key_.split(":")[0];
+                BigInteger b2 = new BigInteger(rhs);
+                value = b.compareTo(b2);
+                break;
         }
+        return value;
+    }
+    
+    public void close() throws IOException
+    {
+        bufIn_.close();
+        bufOut_.close();
+        reader_.close();
     }
 }

Modified: incubator/cassandra/trunk/src/org/apache/cassandra/db/FileStructComparator.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/FileStructComparator.java?rev=759026&r1=759025&r2=759026&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/FileStructComparator.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/FileStructComparator.java Fri Mar 27 05:39:40 2009
@@ -6,6 +6,13 @@
 {
     public int compare(FileStruct f, FileStruct f2)
     {
-        return f.getFileName().compareTo(f2.getFileName());
+        return f.reader_.getFileName().compareTo(f2.reader_.getFileName());
+    }
+
+    public boolean equals(Object o)
+    {
+        if (!(o instanceof FileStructComparator))
+            return false;
+        return true;
     }
 }
\ No newline at end of file

Modified: incubator/cassandra/trunk/src/org/apache/cassandra/db/HintedHandOffManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/HintedHandOffManager.java?rev=759026&r1=759025&r2=759026&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/HintedHandOffManager.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/HintedHandOffManager.java Fri Mar 27 05:39:40 2009
@@ -24,8 +24,6 @@
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
-import org.apache.log4j.Logger;
-
 import org.apache.cassandra.concurrent.DebuggableScheduledThreadPoolExecutor;
 import org.apache.cassandra.concurrent.ThreadFactoryImpl;
 import org.apache.cassandra.config.DatabaseDescriptor;
@@ -34,7 +32,11 @@
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.service.IComponentShutdown;
+import org.apache.cassandra.service.IResponseResolver;
+import org.apache.cassandra.service.QuorumResponseHandler;
 import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.service.WriteResponseResolver;
+import org.apache.log4j.Logger;
 
 
 /**
@@ -108,14 +110,14 @@
         private void deleteEndPoint(String endpointAddress, String key) throws Exception
         {
         	RowMutation rm = new RowMutation(DatabaseDescriptor.getTables().get(0), key_);
-        	rm.delete(Table.hints_ + ":" + key + ":" + endpointAddress, System.currentTimeMillis());
+        	rm.delete(Table.hints_ + ":" + key + ":" + endpointAddress);
         	rm.apply();
         }
 
         private void deleteKey(String key) throws Exception
         {
         	RowMutation rm = new RowMutation(DatabaseDescriptor.getTables().get(0), key_);
-        	rm.delete(Table.hints_ + ":" + key, System.currentTimeMillis());
+        	rm.delete(Table.hints_ + ":" + key);
         	rm.apply();
         }
 
@@ -140,7 +142,7 @@
             	if(hintedColumnFamily == null)
             	{
                     // Force flush now
-                    columnFamilyStore_.forceFlush();
+                    columnFamilyStore_.forceFlush(false);
             		return;
             	}
             	Collection<IColumn> keys = hintedColumnFamily.getAllColumns();
@@ -175,7 +177,7 @@
                 	}
             	}
                 // Force flush now
-                columnFamilyStore_.forceFlush();
+                columnFamilyStore_.forceFlush(false);
 
                 // Now do a major compaction
                 columnFamilyStore_.forceCompaction(null, null, 0, null);

Modified: incubator/cassandra/trunk/src/org/apache/cassandra/db/IColumn.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/IColumn.java?rev=759026&r1=759025&r2=759026&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/IColumn.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/IColumn.java Fri Mar 27 05:39:40 2009
@@ -29,7 +29,6 @@
 {
     public static short UtfPrefix_ = 2;
     public boolean isMarkedForDelete();
-    public long getMarkedForDeleteAt();
     public String name();
     public int size();
     public int serializedSize();
@@ -40,7 +39,10 @@
     public Collection<IColumn> getSubColumns();
     public IColumn getSubColumn(String columnName);
     public void addColumn(String name, IColumn column);
+    public void delete();
+    public void repair(IColumn column);
     public IColumn diff(IColumn column);
+    public boolean putColumn(IColumn column);
     public int getObjectCount();
-    public byte[] digest();
+    public byte[] digest();    
 }

Modified: incubator/cassandra/trunk/src/org/apache/cassandra/db/Memtable.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/Memtable.java?rev=759026&r1=759025&r2=759026&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/Memtable.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/Memtable.java Fri Mar 27 05:39:40 2009
@@ -20,14 +20,7 @@
 
 import java.io.FileOutputStream;
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.Iterator;
-import java.util.PriorityQueue;
+import java.util.*;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
@@ -38,16 +31,19 @@
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
-import org.apache.log4j.Logger;
-
 import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
 import org.apache.cassandra.concurrent.ThreadFactoryImpl;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.io.DataOutputBuffer;
 import org.apache.cassandra.io.SSTable;
+import org.apache.cassandra.service.IComponentShutdown;
+import org.apache.cassandra.service.PartitionerType;
+import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.BloomFilter;
 import org.apache.cassandra.utils.LogUtil;
-import org.apache.cassandra.utils.DestructivePQIterator;
+import org.apache.log4j.Logger;
+import org.apache.cassandra.io.*;
+import org.apache.cassandra.utils.*;
 
 /**
  * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
@@ -82,6 +78,7 @@
     private Map<String, ColumnFamily> columnFamilies_ = new HashMap<String, ColumnFamily>();
     /* Lock and Condition for notifying new clients about Memtable switches */
     Lock lock_ = new ReentrantLock();
+    Condition condition_;
 
     Memtable(String table, String cfName) throws IOException
     {
@@ -96,6 +93,7 @@
                     ));
         }
 
+        condition_ = lock_.newCondition();
         table_ = table;
         cfName_ = cfName;
         creationTime_ = System.currentTimeMillis();
@@ -129,7 +127,7 @@
             key_ = key;
             columnFamilyName_ = cfName;
         }
-
+        
         Getter(String key, String cfName, IFilter filter)
         {
             this(key, cfName);
@@ -138,11 +136,29 @@
 
         public ColumnFamily call()
         {
-        	ColumnFamily cf = getLocalCopy(key_, columnFamilyName_, filter_);
+        	ColumnFamily cf = getLocalCopy(key_, columnFamilyName_, filter_);            
             return cf;
         }
     }
 
+    class Remover implements Runnable
+    {
+        private String key_;
+        private ColumnFamily columnFamily_;
+
+        Remover(String key, ColumnFamily columnFamily)
+        {
+            key_ = key;
+            columnFamily_ = columnFamily;
+        }
+
+        public void run()
+        {
+        	columnFamily_.delete();
+            columnFamilies_.put(key_, columnFamily_);
+        }
+    }
+    
     /**
      * Flushes the current memtable to disk.
      * 
@@ -166,7 +182,7 @@
     }
 
     /**
-     * Compares two Memtable based on creation time.
+     * Compares two Memtable based on creation time. 
      * @param rhs
      * @return
      */
@@ -196,6 +212,13 @@
         currentObjectCount_.addAndGet(newCount - oldCount);
     }
 
+    private boolean isLifetimeViolated()
+    {
+      /* Memtable lifetime in terms of milliseconds */
+      long lifetimeInMillis = DatabaseDescriptor.getMemtableLifetime() * 3600 * 1000;
+      return ( ( System.currentTimeMillis() - creationTime_ ) >= lifetimeInMillis );
+    }
+
     boolean isThresholdViolated(String key)
     {
     	boolean bVal = false;//isLifetimeViolated();
@@ -263,30 +286,27 @@
     /*
      * This version is used to switch memtable and force flush.
     */
-    public void forceflush(ColumnFamilyStore cfStore) throws IOException
+    void forceflush(ColumnFamilyStore cfStore, boolean fRecovery) throws IOException
     {
-        RowMutation rm = new RowMutation(DatabaseDescriptor.getTables().get(0), flushKey_);
-
-        try
+        if(!fRecovery)
         {
-            if (cfStore.isSuper())
-            {
-                rm.add(cfStore.getColumnFamilyName() + ":SC1:Column", "0".getBytes(), 0);
-            } else {
-                rm.add(cfStore.getColumnFamilyName() + ":Column", "0".getBytes(), 0);
-            }
-            rm.apply();
+	    	RowMutation rm = new RowMutation(DatabaseDescriptor.getTables().get(0), flushKey_);
+	        try
+	        {
+	            rm.add(cfStore.columnFamily_ + ":Column","0".getBytes());
+	            rm.apply();
+	        }
+	        catch(ColumnFamilyNotDefinedException ex)
+	        {
+	            logger_.debug(LogUtil.throwableToString(ex));
+	        }
         }
-        catch(ColumnFamilyNotDefinedException ex)
+        else
         {
-            logger_.debug(LogUtil.throwableToString(ex));
+        	flush(CommitLog.CommitLogContext.NULL);
         }
     }
 
-    void flushOnRecovery() throws IOException {
-        flush(CommitLog.CommitLogContext.NULL);
-    }
-
     private void resolve(String key, ColumnFamily columnFamily)
     {
     	ColumnFamily oldCf = columnFamilies_.get(key);
@@ -299,7 +319,6 @@
             int newObjectCount = oldCf.getColumnCount();
             resolveSize(oldSize, newSize);
             resolveCount(oldObjectCount, newObjectCount);
-            oldCf.delete(Math.max(oldCf.getMarkedForDeleteAt(), columnFamily.getMarkedForDeleteAt()));
         }
         else
         {
@@ -320,46 +339,68 @@
         	resolve(key, columnFamily);
     }
 
-    ColumnFamily getLocalCopy(String key, String columnFamilyColumn, IFilter filter)
+    ColumnFamily getLocalCopy(String key, String cfName, IFilter filter)
     {
-    	String[] values = RowMutation.getColumnAndColumnFamily(columnFamilyColumn);
+    	String[] values = RowMutation.getColumnAndColumnFamily(cfName);
     	ColumnFamily columnFamily = null;
         if(values.length == 1 )
         {
-        	columnFamily = columnFamilies_.get(key);
+        	columnFamily = columnFamilies_.get(key);        	
         }
         else
         {
         	ColumnFamily cFamily = columnFamilies_.get(key);
-        	if (cFamily == null) return null;
-
-        	if (values.length == 2) {
-                IColumn column = cFamily.getColumn(values[1]); // super or normal column
-                if (column != null )
-                {
-                    columnFamily = new ColumnFamily(cfName_);
-                    columnFamily.addColumn(column);
-                }
+        	if(cFamily == null)
+        		return null;
+        	IColumn column = null;
+        	if(values.length == 2)
+        	{
+        		column = cFamily.getColumn(values[1]);
+        		if(column != null )
+        		{
+        			columnFamily = new ColumnFamily(cfName_);
+        			columnFamily.addColumn(column.name(), column);
+        		}
         	}
-            else
-            {
-                assert values.length == 3;
-                SuperColumn superColumn = (SuperColumn)cFamily.getColumn(values[1]);
-                if (superColumn != null)
-                {
-                    IColumn subColumn = superColumn.getSubColumn(values[2]);
-                    if (subColumn != null)
-                    {
-                        columnFamily = new ColumnFamily(cfName_);
-                        columnFamily.addColumn(values[1] + ":" + values[2], subColumn.value(), subColumn.timestamp(), subColumn.isMarkedForDelete());
-                    }
-                }
+        	else
+        	{
+        		column = cFamily.getColumn(values[1]);
+        		if(column != null )
+        		{
+        			 
+        			IColumn subColumn = ((SuperColumn)column).getSubColumn(values[2]);
+        			if(subColumn != null)
+        			{
+	        			columnFamily = new ColumnFamily(cfName_);
+	            		columnFamily.createColumn(values[1] + ":" + values[2], subColumn.value(), subColumn.timestamp());
+        			}
+        		}
         	}
         }
         /* Filter unnecessary data from the column based on the provided filter */
-        return filter.filter(columnFamilyColumn, columnFamily);
+        return filter.filter(cfName, columnFamily);
     }
 
+    ColumnFamily get(String key, String cfName)
+    {
+    	printExecutorStats();
+    	Callable<ColumnFamily> call = new Getter(key, cfName);
+    	ColumnFamily cf = null;
+    	try
+    	{
+    		cf = apartments_.get(cfName_).submit(call).get();
+    	}
+    	catch ( ExecutionException ex )
+    	{
+    		logger_.debug(LogUtil.throwableToString(ex));
+    	}
+    	catch ( InterruptedException ex2 )
+    	{
+    		logger_.debug(LogUtil.throwableToString(ex2));
+    	}
+    	return cf;
+    }
+    
     ColumnFamily get(String key, String cfName, IFilter filter)
     {
     	printExecutorStats();
@@ -380,6 +421,23 @@
     	return cf;
     }
 
+    /*
+     * Although the method is named remove() we cannot remove the key
+     * from memtable. We add it to the memtable but mark it as deleted.
+     * The reason for this because we do not want a successive get()
+     * for the same key to scan the ColumnFamilyStore files for this key.
+    */
+    void remove(String key, ColumnFamily columnFamily) throws IOException
+    {
+    	printExecutorStats();
+    	Runnable deleter = new Remover(key, columnFamily);
+    	apartments_.get(cfName_).submit(deleter);
+    }
+
+    /*
+     * param recoveryMode - indicates if this was invoked during
+     *                      recovery.
+    */
     void flush(CommitLog.CommitLogContext cLogCtx) throws IOException
     {
         ColumnFamilyStore cfStore = Table.open(table_).getColumnFamilyStore(cfName_);
@@ -392,9 +450,51 @@
             return;
         }
 
+        PartitionerType pType = StorageService.getPartitionerType();
         String directory = DatabaseDescriptor.getDataFileLocation();
         String filename = cfStore.getNextFileName();
-        SSTable ssTable = new SSTable(directory, filename);
+        SSTable ssTable = new SSTable(directory, filename, pType);
+        switch (pType) 
+        {
+            case OPHF:
+                flushForOrderPreservingPartitioner(ssTable, cfStore, cLogCtx);
+                break;
+                
+            default:
+                flushForRandomPartitioner(ssTable, cfStore, cLogCtx);
+                break;
+        }        
+    }
+    
+    private void flushForRandomPartitioner(SSTable ssTable, ColumnFamilyStore cfStore, CommitLog.CommitLogContext cLogCtx) throws IOException
+    {
+        /* List of primary keys in sorted order */
+        List<PrimaryKey> pKeys = PrimaryKey.create( columnFamilies_.keySet() );
+        DataOutputBuffer buffer = new DataOutputBuffer();
+        /* Use this BloomFilter to decide if a key exists in a SSTable */
+        BloomFilter bf = new BloomFilter(pKeys.size(), 15);
+        for ( PrimaryKey pKey : pKeys )
+        {
+            buffer.reset();
+            ColumnFamily columnFamily = columnFamilies_.get(pKey.key());
+            if ( columnFamily != null )
+            {
+                /* serialize the cf with column indexes */
+                ColumnFamily.serializer2().serialize( columnFamily, buffer );                                
+                /* Now write the key and value to disk */
+                ssTable.append(pKey.key(), pKey.hash(), buffer);
+                bf.fill(pKey.key());
+                columnFamily.clear();
+            }
+        }
+        ssTable.close(bf);
+        cfStore.onMemtableFlush(cLogCtx);
+        cfStore.storeLocation( ssTable.getDataFileLocation(), bf );
+        buffer.close();
+    }
+    
+    private void flushForOrderPreservingPartitioner(SSTable ssTable, ColumnFamilyStore cfStore, CommitLog.CommitLogContext cLogCtx) throws IOException
+    {
         List<String> keys = new ArrayList<String>( columnFamilies_.keySet() );
         Collections.sort(keys);
         DataOutputBuffer buffer = new DataOutputBuffer();
@@ -407,7 +507,7 @@
             if ( columnFamily != null )
             {
                 /* serialize the cf with column indexes */
-                ColumnFamily.serializerWithIndexes().serialize( columnFamily, buffer );
+                ColumnFamily.serializer2().serialize( columnFamily, buffer );                                              
                 /* Now write the key and value to disk */
                 ssTable.append(key, buffer);
                 bf.fill(key);
@@ -418,13 +518,5 @@
         cfStore.onMemtableFlush(cLogCtx);
         cfStore.storeLocation( ssTable.getDataFileLocation(), bf );
         buffer.close();
-
-        columnFamilies_.clear();
     }
-
-    public Iterator<String> sortedKeyIterator()
-    {
-        return new DestructivePQIterator<String>(new PriorityQueue<String>(columnFamilies_.keySet()));
-    }
-
 }

Modified: incubator/cassandra/trunk/src/org/apache/cassandra/db/MemtableManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/MemtableManager.java?rev=759026&r1=759025&r2=759026&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/MemtableManager.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/MemtableManager.java Fri Mar 27 05:39:40 2009
@@ -27,8 +27,10 @@
 
 import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
 import org.apache.cassandra.concurrent.ThreadFactoryImpl;
+import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.utils.LogUtil;
 import org.apache.log4j.Logger;
+import org.apache.cassandra.utils.*;
 
 /**
  * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
@@ -40,7 +42,7 @@
     private static Lock lock_ = new ReentrantLock();
     private static Logger logger_ = Logger.getLogger(MemtableManager.class);
     private ReentrantReadWriteLock rwLock_ = new ReentrantReadWriteLock(true);
-    public static MemtableManager instance()
+    static MemtableManager instance() 
     {
         if ( instance_ == null )
         {
@@ -157,22 +159,7 @@
     	}
     }
 
-    public List<Memtable> getUnflushedMemtables(String cfName)
-    {
-        rwLock_.readLock().lock();
-        try
-        {
-            List<Memtable> memtables = history_.get(cfName);
-            if (memtables != null)
-            {
-                return new ArrayList<Memtable>(memtables);
-            }
-            return Arrays.asList(new Memtable[0]);
-        }
-        finally
-        {
-            rwLock_.readLock().unlock();
-        }
-    }
+
+
 
 }

Modified: incubator/cassandra/trunk/src/org/apache/cassandra/db/MinorCompactionManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/MinorCompactionManager.java?rev=759026&r1=759025&r2=759026&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/MinorCompactionManager.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/MinorCompactionManager.java Fri Mar 27 05:39:40 2009
@@ -87,6 +87,10 @@
             	columnFamilyStore_.doCompaction();
                 logger_.debug("Finished compaction ..."+columnFamilyStore_.columnFamily_);
             }
+            catch (IOException e)
+            {
+                logger_.debug( LogUtil.throwableToString(e) );
+            }
             catch (Throwable th)
             {
                 logger_.error( LogUtil.throwableToString(th) );
@@ -118,9 +122,16 @@
         public Boolean call()
         {
         	boolean result = true;
-            logger_.debug("Started  compaction ..."+columnFamilyStore_.columnFamily_);
-            result = columnFamilyStore_.doAntiCompaction(ranges_, target_,fileList_);
-            logger_.debug("Finished compaction ..."+columnFamilyStore_.columnFamily_);
+            try
+            {
+                logger_.debug("Started  compaction ..."+columnFamilyStore_.columnFamily_);
+                result = columnFamilyStore_.doAntiCompaction(ranges_, target_,fileList_);
+                logger_.debug("Finished compaction ..."+columnFamilyStore_.columnFamily_);
+            }
+            catch (IOException e)
+            {
+                logger_.debug( LogUtil.throwableToString(e) );
+            }
             return result;
         }
     }
@@ -169,6 +180,10 @@
             	columnFamilyStore_.doCleanupCompaction();
                 logger_.debug("Finished compaction ..."+columnFamilyStore_.columnFamily_);
             }
+            catch (IOException e)
+            {
+                logger_.debug( LogUtil.throwableToString(e) );
+            }
             catch (Throwable th)
             {
                 logger_.error( LogUtil.throwableToString(th) );
@@ -208,9 +223,14 @@
     public Future<Boolean> submit(ColumnFamilyStore columnFamilyStore, List<Range> ranges, EndPoint target, List<String> fileList)
     {
         return compactor_.submit( new FileCompactor2(columnFamilyStore, ranges, target, fileList) );
+    } 
+
+    public Future<Boolean> submit(ColumnFamilyStore columnFamilyStore, List<Range> ranges)
+    {
+        return compactor_.submit( new FileCompactor2(columnFamilyStore, ranges) );
     }
 
-    public void  submitMajor(ColumnFamilyStore columnFamilyStore, long skip)
+    public void  submitMajor(ColumnFamilyStore columnFamilyStore, List<Range> ranges, long skip)
     {
         compactor_.submit( new OnDemandCompactor(columnFamilyStore, skip) );
     }

Modified: incubator/cassandra/trunk/src/org/apache/cassandra/db/NamesFilter.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/NamesFilter.java?rev=759026&r1=759025&r2=759026&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/NamesFilter.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/NamesFilter.java Fri Mar 27 05:39:40 2009
@@ -32,13 +32,13 @@
 public class NamesFilter implements IFilter
 {
     /* list of column names to filter against. */
-    private List<String> names_;
-
+    private List<String> names_ = new ArrayList<String>();  
+    
     NamesFilter(List<String> names)
     {
-        names_ = new ArrayList<String>(names);
+        names_ = names;     
     }
-
+    
     public ColumnFamily filter(String cf, ColumnFamily columnFamily)
     {
         if ( columnFamily == null )
@@ -55,8 +55,8 @@
 			{
 		        if ( names_.contains(column.name()) )
 		        {
-		            names_.remove(column.name());
-					filteredCf.addColumn(column);
+		            names_.remove(column.name());            
+					filteredCf.addColumn(column.name(), column);
 		        }
 				if( isDone() )
 				{
@@ -64,20 +64,20 @@
 				}
 			}
 		}
-		else if ( values.length == 2 && DatabaseDescriptor.getColumnType(cfName).equals("Super"))
+		else if ( values.length == 2 && DatabaseDescriptor.getColumnType(cfName).equals("Super") )
 		{
     		Collection<IColumn> columns = columnFamily.getAllColumns();
     		for(IColumn column : columns)
     		{
     			SuperColumn superColumn = (SuperColumn)column;
     			SuperColumn filteredSuperColumn = new SuperColumn(superColumn.name());
-				filteredCf.addColumn(filteredSuperColumn);
+				filteredCf.addColumn(filteredSuperColumn.name(), filteredSuperColumn);
         		Collection<IColumn> subColumns = superColumn.getSubColumns();
         		for(IColumn subColumn : subColumns)
         		{
     		        if ( names_.contains(subColumn.name()) )
     		        {
-    		            names_.remove(subColumn.name());
+    		            names_.remove(subColumn.name());            
     		            filteredSuperColumn.addColumn(subColumn.name(), subColumn);
     		        }
     				if( isDone() )
@@ -87,28 +87,28 @@
     			}
     		}
 		}
-    	else
+    	else 
     	{
     		throw new UnsupportedOperationException();
     	}
 		return filteredCf;
     }
-
+    
     public IColumn filter(IColumn column, DataInputStream dis) throws IOException
-    {
+    {       
         String columnName = column.name();
         if ( names_.contains(columnName) )
         {
-            names_.remove(columnName);
+            names_.remove(columnName);            
         }
         else
         {
             column = null;
         }
-
+        
         return column;
     }
-
+    
     public boolean isDone()
     {
         return names_.isEmpty();

Modified: incubator/cassandra/trunk/src/org/apache/cassandra/db/RecoveryManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/RecoveryManager.java?rev=759026&r1=759025&r2=759026&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/RecoveryManager.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/RecoveryManager.java Fri Mar 27 05:39:40 2009
@@ -93,6 +93,7 @@
     
     public static void main(String[] args) throws Throwable
     {
+        DatabaseDescriptor.init();
         long start = System.currentTimeMillis();
         RecoveryManager rm = RecoveryManager.instance();
         rm.doRecovery();  

Modified: incubator/cassandra/trunk/src/org/apache/cassandra/db/Row.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/Row.java?rev=759026&r1=759025&r2=759026&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/Row.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/Row.java Fri Mar 27 05:39:40 2009
@@ -18,109 +18,132 @@
 
 package org.apache.cassandra.db;
 
+import java.io.DataInput;
 import java.io.DataInputStream;
+import java.io.DataOutput;
 import java.io.DataOutputStream;
 import java.io.IOException;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Hashtable;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.commons.lang.ArrayUtils;
-import org.apache.commons.lang.StringUtils;
-import org.apache.log4j.Logger;
+import java.io.Serializable;
+import java.util.*;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.cassandra.io.ICompactSerializer;
 import org.apache.cassandra.utils.FBUtilities;
+import org.apache.log4j.Logger;
+import org.apache.cassandra.io.*;
 
-public class Row
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+public class Row implements Serializable
 {
-    private static RowSerializer serializer_ = new RowSerializer();
-    private static Logger logger_ = Logger.getLogger(Row.class);
+    private static ICompactSerializer<Row> serializer_;
+	private static Logger logger_ = Logger.getLogger(Row.class);
 
-    static RowSerializer serializer()
+    static
     {
-        return serializer_;
+        serializer_ = new RowSerializer();
     }
 
-    private String key_;
+    static ICompactSerializer<Row> serializer()
+    {
+        return serializer_;
+    }
 
+    private String key_;    
     private Map<String, ColumnFamily> columnFamilies_ = new Hashtable<String, ColumnFamily>();
+    private transient AtomicInteger size_ = new AtomicInteger(0);
 
+    /* Ctor for JAXB */
     protected Row()
     {
     }
 
     public Row(String key)
     {
-        key_ = key;
+        key_ = key;        
     }
-
+    
     public String key()
     {
         return key_;
     }
-
+    
     void key(String key)
     {
         key_ = key;
     }
-
-    public Set<String> getColumnFamilyNames()
-    {
-        return columnFamilies_.keySet();
-    }
-
-    public Collection<ColumnFamily> getColumnFamilies()
+    
+    public ColumnFamily getColumnFamily(String cfName)
     {
-        return columnFamilies_.values();
+        return columnFamilies_.get(cfName);
     }
 
-    @Deprecated
-    // (use getColumnFamilies or getColumnFamilyNames)
-    public Map<String, ColumnFamily> getColumnFamilyMap()
+    public Map<String, ColumnFamily> getColumnFamilies()
     {
         return columnFamilies_;
     }
 
-    public ColumnFamily getColumnFamily(String cfName)
-    {
-        return columnFamilies_.get(cfName);
-    }
-
     void addColumnFamily(ColumnFamily columnFamily)
     {
         columnFamilies_.put(columnFamily.name(), columnFamily);
+        size_.addAndGet(columnFamily.size());
     }
 
     void removeColumnFamily(ColumnFamily columnFamily)
     {
         columnFamilies_.remove(columnFamily.name());
         int delta = (-1) * columnFamily.size();
+        size_.addAndGet(delta);
+    }
+
+    public int size()
+    {
+        return size_.get();
     }
 
     public boolean isEmpty()
     {
-        return (columnFamilies_.size() == 0);
+    	return ( columnFamilies_.size() == 0 );
     }
+    
+    /**
+     * This is used as oldRow.merge(newRow). Basically we take the newRow
+     * and merge it into the oldRow.
+    */
+    void merge(Row row)
+    {
+        Map<String, ColumnFamily> columnFamilies = row.getColumnFamilies();
+        Set<String> cfNames = columnFamilies.keySet();
 
-    /*
+        for ( String cfName : cfNames )
+        {
+            ColumnFamily cf = columnFamilies_.get(cfName);
+            if ( cf == null )
+                columnFamilies_.put(cfName, columnFamilies.get(cfName));
+            else
+            {
+                cf.merge(columnFamilies.get(cfName));
+            }
+        }
+    }
+
+    /**
      * This function will repair the current row with the input row
      * what that means is that if there are any differences between the 2 rows then
      * this fn will make the current row take the latest changes .
      */
     public void repair(Row row)
     {
-        Map<String, ColumnFamily> columnFamilies = row.getColumnFamilyMap();
+        Map<String, ColumnFamily> columnFamilies = row.getColumnFamilies();
         Set<String> cfNames = columnFamilies.keySet();
 
-        for (String cfName : cfNames)
+        for ( String cfName : cfNames )
         {
             ColumnFamily cf = columnFamilies_.get(cfName);
-            if (cf == null)
+            if ( cf == null )
             {
-                cf = new ColumnFamily(cfName);
+            	cf = new ColumnFamily(cfName);
                 columnFamilies_.put(cfName, cf);
             }
             cf.repair(columnFamilies.get(cfName));
@@ -128,7 +151,7 @@
 
     }
 
-    /*
+    /**
      * This function will calculate the difference between 2 rows
      * and return the resultant row. This assumes that the row that
      * is being submitted is a super set of the current row so
@@ -139,82 +162,78 @@
     public Row diff(Row row)
     {
         Row rowDiff = new Row(key_);
-        Map<String, ColumnFamily> columnFamilies = row.getColumnFamilyMap();
+    	Map<String, ColumnFamily> columnFamilies = row.getColumnFamilies();
         Set<String> cfNames = columnFamilies.keySet();
 
-        for (String cfName : cfNames)
+        for ( String cfName : cfNames )
         {
             ColumnFamily cf = columnFamilies_.get(cfName);
             ColumnFamily cfDiff = null;
-            if (cf == null)
-                rowDiff.getColumnFamilyMap().put(cfName, columnFamilies.get(cfName));
+            if ( cf == null )
+            	rowDiff.getColumnFamilies().put(cfName, columnFamilies.get(cfName));
             else
             {
-                cfDiff = cf.diff(columnFamilies.get(cfName));
-                if (cfDiff != null)
-                    rowDiff.getColumnFamilyMap().put(cfName, cfDiff);
+            	cfDiff = cf.diff(columnFamilies.get(cfName));
+            	if(cfDiff != null)
+            		rowDiff.getColumnFamilies().put(cfName, cfDiff);
             }
         }
-        if (rowDiff.getColumnFamilyMap().size() != 0)
-            return rowDiff;
+        if(rowDiff.getColumnFamilies().size() != 0)
+        	return rowDiff;
         else
-            return null;
+        	return null;
     }
-
+    
     public Row cloneMe()
     {
-        Row row = new Row(key_);
-        row.columnFamilies_ = new HashMap<String, ColumnFamily>(columnFamilies_);
-        return row;
+    	Row row = new Row(key_);
+    	row.columnFamilies_ = new HashMap<String, ColumnFamily>(columnFamilies_);
+    	return row;
     }
 
     public byte[] digest()
     {
         long start = System.currentTimeMillis();
-        Set<String> cfamilies = columnFamilies_.keySet();
-        byte[] xorHash = ArrayUtils.EMPTY_BYTE_ARRAY;
-        for (String cFamily : cfamilies)
-        {
-            if (xorHash.length == 0)
-            {
-                xorHash = columnFamilies_.get(cFamily).digest();
-            }
-            else
-            {
-                byte[] tmpHash = columnFamilies_.get(cFamily).digest();
-                xorHash = FBUtilities.xor(xorHash, tmpHash);
-            }
-        }
+    	Set<String> cfamilies = columnFamilies_.keySet();
+    	byte[] xorHash = new byte[0];
+    	byte[] tmpHash = new byte[0];
+    	for(String cFamily : cfamilies)
+    	{
+    		if(xorHash.length == 0)
+    		{
+    			xorHash = columnFamilies_.get(cFamily).digest();
+    		}
+    		else
+    		{
+    			tmpHash = columnFamilies_.get(cFamily).digest();
+    			xorHash = FBUtilities.xor(xorHash, tmpHash);
+    		}
+    	}
         logger_.info("DIGEST TIME: " + (System.currentTimeMillis() - start)
-                     + " ms.");
-        return xorHash;
+                + " ms.");
+    	return xorHash;
     }
-
+    
     void clear()
-    {
+    {        
         columnFamilies_.clear();
     }
-
-    public String toString()
-    {
-        return "Row(" + key_ + " [" + StringUtils.join(columnFamilies_.values(), ", ") + ")]";
-    }
 }
 
 class RowSerializer implements ICompactSerializer<Row>
 {
     public void serialize(Row row, DataOutputStream dos) throws IOException
     {
-        dos.writeUTF(row.key());
-        Map<String, ColumnFamily> columnFamilies = row.getColumnFamilyMap();
-        int size = columnFamilies.size();
+        dos.writeUTF(row.key());        
+        Map<String, ColumnFamily> columnFamilies = row.getColumnFamilies();
+        int size = columnFamilies.size();        
         dos.writeInt(size);
-
-        if (size > 0)
-        {
-            Set<String> cNames = columnFamilies.keySet();
-            for (String cName : cNames)
-            {
+        
+        if ( size > 0 )
+        {   
+        	Set<String> cNames = columnFamilies.keySet();
+            for ( String cName : cNames )
+            {                
                 ColumnFamily.serializer().serialize(columnFamilies.get(cName), dos);
             }
         }
@@ -222,13 +241,13 @@
 
     public Row deserialize(DataInputStream dis) throws IOException
     {
-        String key = dis.readUTF();
-        Row row = new Row(key);
+        String key = dis.readUTF();        
+        Row row = new Row(key);        
         int size = dis.readInt();
-
-        if (size > 0)
-        {
-            for (int i = 0; i < size; ++i)
+        
+        if ( size > 0 )
+        { 
+            for ( int i = 0; i < size; ++i )
             {
                 ColumnFamily cf = ColumnFamily.serializer().deserialize(dis);
                 row.addColumnFamily(cf);

Modified: incubator/cassandra/trunk/src/org/apache/cassandra/db/RowMutation.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/RowMutation.java?rev=759026&r1=759025&r2=759026&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/RowMutation.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/RowMutation.java Fri Mar 27 05:39:40 2009
@@ -18,30 +18,13 @@
 
 package org.apache.cassandra.db;
 
-import java.io.ByteArrayOutputStream;
+import java.util.*;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.io.Serializable;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ExecutionException;
-
-import org.apache.commons.lang.ArrayUtils;
-import org.apache.commons.lang.StringUtils;
 
 import org.apache.cassandra.io.ICompactSerializer;
-import org.apache.cassandra.net.EndPoint;
-import org.apache.cassandra.net.Message;
-import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.service.batch_mutation_super_t;
-import org.apache.cassandra.service.batch_mutation_t;
-import org.apache.cassandra.service.column_t;
-import org.apache.cassandra.service.superColumn_t;
-import org.apache.cassandra.utils.FBUtilities;
 
 
 /**
@@ -49,107 +32,121 @@
  */
 
 public class RowMutation implements Serializable
-{
-    private static ICompactSerializer<RowMutation> serializer_;
-    public static final String HINT = "HINT";
-
+{      
+	private static ICompactSerializer<RowMutation> serializer_;	
+	
     static
     {
         serializer_ = new RowMutationSerializer();
-    }   
+    }
 
     static ICompactSerializer<RowMutation> serializer()
     {
         return serializer_;
     }
-
+        
     private String table_;
-    private String key_;
-    protected Map<String, ColumnFamily> modifications_ = new HashMap<String, ColumnFamily>();
-
+    private String key_;       
+    protected Map<String, ColumnFamily> modifications_ = new HashMap<String, ColumnFamily>();       
+    protected Map<String, ColumnFamily> deletions_ = new HashMap<String, ColumnFamily>();
+    
     /* Ctor for JAXB */
     private RowMutation()
     {
     }
-
+    
     public RowMutation(String table, String key)
     {
         table_ = table;
         key_ = key;
     }
-
+    
     public RowMutation(String table, Row row)
     {
         table_ = table;
         key_ = row.key();
-        for (ColumnFamily cf : row.getColumnFamilies())
+        Map<String, ColumnFamily> cfSet = row.getColumnFamilies();
+        Set<String> keyset = cfSet.keySet();
+        for(String cfName : keyset)
         {
-            add(cf);
+        	add(cfName, cfSet.get(cfName));
         }
     }
 
-    protected RowMutation(String table, String key, Map<String, ColumnFamily> modifications)
+    protected RowMutation(String table, String key, Map<String, ColumnFamily> modifications, Map<String, ColumnFamily> deletions)
     {
-        table_ = table;
-        key_ = key;
-        modifications_ = modifications;
+    	table_ = table;
+    	key_ = key;
+    	modifications_ = modifications;
+    	deletions_ = deletions;
     }
-
+    
     public static String[] getColumnAndColumnFamily(String cf)
     {
         return cf.split(":");
     }
-
+    
     String table()
     {
         return table_;
     }
-
+    
     public String key()
     {
         return key_;
     }
-
+    
     void addHints(String hint) throws IOException, ColumnFamilyNotDefinedException
-    {
+    {        
         String cfName = Table.hints_ + ":" + hint;
-        add(cfName, ArrayUtils.EMPTY_BYTE_ARRAY, 0);
+        add(cfName, new byte[0]);
     }
-
+    
     /*
      * Specify a column family name and the corresponding column
-     * family object.
+     * family object. 
      * param @ cf - column family name
      * param @ columnFamily - the column family.
     */
-    public void add(ColumnFamily columnFamily)
+    public void add(String cf, ColumnFamily columnFamily)
+    {       
+        modifications_.put(cf, columnFamily);
+    }
+    
+    /*
+     * Specify a column name and a corresponding value for
+     * the column. Column name is specified as <column family>:column.
+     * This will result in a ColumnFamily associated with
+     * <column family> as name and a Column with <column>
+     * as name.
+     * 
+     * param @ cf - column name as <column family>:<column>
+     * param @ value - value associated with the column
+    */
+    public void add(String cf, byte[] value) throws IOException, ColumnFamilyNotDefinedException
     {
-        if (modifications_.containsKey(columnFamily.name()))
-        {
-            throw new IllegalArgumentException("ColumnFamily " + columnFamily.name() + " is already being modified");
-        }
-        modifications_.put(columnFamily.name(), columnFamily);
+        add(cf, value, 0);
     }
-
+    
     /*
      * Specify a column name and a corresponding value for
      * the column. Column name is specified as <column family>:column.
      * This will result in a ColumnFamily associated with
      * <column family> as name and a Column with <column>
-     * as name. The columan can be further broken up
+     * as name. The columan can be further broken up 
      * as super column name : columnname  in case of super columns
-     *
+     * 
      * param @ cf - column name as <column family>:<column>
      * param @ value - value associated with the column
      * param @ timestamp - ts associated with this data.
     */
     public void add(String cf, byte[] value, long timestamp)
-    {
+    {        
         String[] values = RowMutation.getColumnAndColumnFamily(cf);
-
+       
         if ( values.length == 0 || values.length == 1 || values.length > 3 )
             throw new IllegalArgumentException("Column Family " + cf + " in invalid format. Must be in <column family>:<column> format.");
-
+        
         ColumnFamily columnFamily = modifications_.get(values[0]);
         if( values.length == 2 )
         {
@@ -157,7 +154,7 @@
             {
             	columnFamily = new ColumnFamily(values[0], ColumnFamily.getColumnType("Standard"));
             }
-        	columnFamily.addColumn(values[1], value, timestamp);
+        	columnFamily.createColumn(values[1], value, timestamp);
         }
         if( values.length == 3 )
         {
@@ -165,203 +162,172 @@
             {
             	columnFamily = new ColumnFamily(values[0], ColumnFamily.getColumnType("Super"));
             }
-        	columnFamily.addColumn(values[1]+ ":" + values[2], value, timestamp);
+        	columnFamily.createColumn(values[1]+ ":" + values[2], value, timestamp);
         }
         modifications_.put(values[0], columnFamily);
     }
-
-    public void delete(String columnFamilyColumn, long timestamp)
-    {
-        String[] values = RowMutation.getColumnAndColumnFamily(columnFamilyColumn);
-        String cfName = values[0];
-        if (modifications_.containsKey(cfName))
-        {
-            throw new IllegalArgumentException("ColumnFamily " + cfName + " is already being modified");
-        }
-
-        if (values.length == 0 || values.length > 3)
-            throw new IllegalArgumentException("Column Family " + columnFamilyColumn + " in invalid format. Must be in <column family>:<column> format.");
-
-        ColumnFamily columnFamily = modifications_.get(cfName);
-        if (columnFamily == null)
-            columnFamily = new ColumnFamily(cfName);
-        if (values.length == 2)
-        {
-            columnFamily.addColumn(values[1], ArrayUtils.EMPTY_BYTE_ARRAY, timestamp, true);
-        }
-        else if (values.length == 3)
+    
+    /*
+     * Specify a column name to be deleted. Column name is
+     * specified as <column family>:column. This will result
+     * in a ColumnFamily associated with <column family> as
+     * name and perhaps Column with <column> as name being
+     * marked as deleted.
+     * TODO : Delete is NOT correct as we do not know 
+     * the CF type so we need to fix that.
+     * param @ cf - column name as <column family>:<column>     
+    */
+    public void delete(String cf)
+    {        
+        String[] values = RowMutation.getColumnAndColumnFamily(cf);
+        
+        if ( values.length == 0 || values.length > 3 )
+            throw new IllegalArgumentException("Column Family " + cf + " in invalid format. Must be in <column family>:<column> format.");
+     
+        ColumnFamily columnFamily = modifications_.get(values[0]);
+        if ( columnFamily == null )
+            columnFamily = new ColumnFamily(values[0]);
+        if(values.length == 2 )
         {
-            columnFamily.addColumn(values[1] + ":" + values[2], ArrayUtils.EMPTY_BYTE_ARRAY, timestamp, true);
+	        columnFamily.createColumn( values[1]);
         }
-        else
+        if(values.length == 3 )
         {
-            assert values.length == 1;
-            columnFamily.delete(timestamp);
+	        columnFamily.createColumn( values[1] + ":" + values[2]);
         }
-        modifications_.put(cfName, columnFamily);
+        deletions_.put(values[0], columnFamily);
     }
-
-    /*
+    
+    /* 
      * This is equivalent to calling commit. Applies the changes to
      * to the table that is obtained by calling Table.open().
     */
     public void apply() throws IOException, ColumnFamilyNotDefinedException
-    {
+    {  
         Row row = new Row(key_);
-        apply(row);
-    }
-
-    /*
-     * Allows RowMutationVerbHandler to optimize by re-using a single Row object.
-    */
-    void apply(Row emptyRow) throws IOException, ColumnFamilyNotDefinedException
-    {
-        assert emptyRow.getColumnFamilyMap().size() == 0;
         Table table = Table.open(table_);
-        for (String cfName : modifications_.keySet())
-        {
-            if (!table.isValidColumnFamily(cfName))
+        Set<String> cfNames = modifications_.keySet();
+        for (String cfName : cfNames )
+        {        
+            if ( !table.isValidColumnFamily(cfName) )
+                throw new ColumnFamilyNotDefinedException("Column Family " + cfName + " has not been defined.");
+            row.addColumnFamily( modifications_.get(cfName) );            
+        }
+        table.apply(row);
+                
+        Set<String> cfNames2 = deletions_.keySet();
+        for (String cfName : cfNames2 )
+        {    
+            if ( !table.isValidColumnFamily(cfName) )
                 throw new ColumnFamilyNotDefinedException("Column Family " + cfName + " has not been defined.");
-            emptyRow.addColumnFamily(modifications_.get(cfName));
+            row.addColumnFamily( deletions_.get(cfName) );        
         }
-        table.apply(emptyRow);
+        if ( deletions_.size() > 0 )
+            table.delete(row);
     }
-
-    /*
+    
+    /* 
      * This is equivalent to calling commit. Applies the changes to
      * to the table that is obtained by calling Table.open().
     */
-    void load(Row row) throws IOException, ColumnFamilyNotDefinedException, ExecutionException, InterruptedException
-    {
-        Table table = Table.open(table_);
+    void apply(Row row) throws IOException, ColumnFamilyNotDefinedException
+    {                
+        Table table = Table.open(table_);        
         Set<String> cfNames = modifications_.keySet();
-        for (String cfName : cfNames)
-        {
-            if (!table.isValidColumnFamily(cfName))
+        for (String cfName : cfNames )
+        {        
+            if ( !table.isValidColumnFamily(cfName) )
                 throw new ColumnFamilyNotDefinedException("Column Family " + cfName + " has not been defined.");
-            row.addColumnFamily(modifications_.get(cfName));
+            row.addColumnFamily( modifications_.get(cfName) );            
         }
-        table.load(row);
-    }
-
-    public Message makeRowMutationMessage() throws IOException
-    {
-        return makeRowMutationMessage(StorageService.mutationVerbHandler_);
-    }
-
-    public Message makeRowMutationMessage(String verbHandlerName) throws IOException
-    {
-        ByteArrayOutputStream bos = new ByteArrayOutputStream();
-        DataOutputStream dos = new DataOutputStream(bos);
-        serializer().serialize(this, dos);
-        EndPoint local = StorageService.getLocalStorageEndPoint();
-        EndPoint from = (local != null) ? local : new EndPoint(FBUtilities.getHostName(), 7000);
-        return new Message(from, StorageService.mutationStage_, verbHandlerName, bos.toByteArray());
-    }
-
-    public static RowMutation getRowMutation(batch_mutation_t batchMutation)
-    {
-        RowMutation rm = new RowMutation(batchMutation.table,
-                                         batchMutation.key.trim());
-        for (String cfname : batchMutation.cfmap.keySet())
-        {
-            List<column_t> list = batchMutation.cfmap.get(cfname);
-            for (column_t columnData : list)
-            {
-                rm.add(cfname + ":" + columnData.columnName,
-                       columnData.value.getBytes(), columnData.timestamp);
-
-            }
+        table.apply(row);
+                
+        Set<String> cfNames2 = deletions_.keySet();
+        for (String cfName : cfNames2 )
+        {    
+            if ( !table.isValidColumnFamily(cfName) )
+                throw new ColumnFamilyNotDefinedException("Column Family " + cfName + " has not been defined.");
+            row.addColumnFamily( deletions_.get(cfName) );        
         }
-        return rm;
+        if ( deletions_.size() > 0 )
+            table.delete(row);
     }
-
-    public static RowMutation getRowMutation(batch_mutation_super_t batchMutationSuper)
-    {
-        RowMutation rm = new RowMutation(batchMutationSuper.table,
-                                         batchMutationSuper.key.trim());
-        Set keys = batchMutationSuper.cfmap.keySet();
-        Iterator keyIter = keys.iterator();
-        while (keyIter.hasNext())
-        {
-            Object key = keyIter.next(); // Get the next key.
-            List<superColumn_t> list = batchMutationSuper.cfmap.get(key);
-            for (superColumn_t superColumnData : list)
-            {
-                if (superColumnData.columns.size() != 0)
-                {
-                    for (column_t columnData : superColumnData.columns)
-                    {
-                        rm.add(key.toString() + ":" + superColumnData.name + ":" + columnData.columnName,
-                               columnData.value.getBytes(), columnData.timestamp);
-                    }
-                }
-                else
-                {
-                    rm.add(key.toString() + ":" + superColumnData.name, ArrayUtils.EMPTY_BYTE_ARRAY, 0);
-                }
-            }
+    
+    /* 
+     * This is equivalent to calling commit. Applies the changes to
+     * to the table that is obtained by calling Table.open().
+    */
+    void load(Row row) throws IOException, ColumnFamilyNotDefinedException
+    {                
+        Table table = Table.open(table_);        
+        Set<String> cfNames = modifications_.keySet();
+        for (String cfName : cfNames )
+        {        
+            if ( !table.isValidColumnFamily(cfName) )
+                throw new ColumnFamilyNotDefinedException("Column Family " + cfName + " has not been defined.");
+            row.addColumnFamily( modifications_.get(cfName) );            
         }
-        return rm;
-    }
-
-    public String toString()
-    {
-        return "RowMutation(" +
-               "key='" + key_ + '\'' +
-               ", modifications=[" + StringUtils.join(modifications_.values(), ", ") + "]" +
-               ')';
-    }
+        table.load(row);
+    }  
 }
 
 class RowMutationSerializer implements ICompactSerializer<RowMutation>
 {
-    private void freezeTheMaps(Map<String, ColumnFamily> map, DataOutputStream dos) throws IOException
-    {
-        int size = map.size();
+	private void freezeTheMaps(Map<String, ColumnFamily> map, DataOutputStream dos) throws IOException
+	{
+		int size = map.size();
         dos.writeInt(size);
-        if (size > 0)
-        {
+        if ( size > 0 )
+        {   
             Set<String> keys = map.keySet();
-            for (String key : keys)
-            {
-                dos.writeUTF(key);
+            for( String key : keys )
+            {                
+            	dos.writeUTF(key);
                 ColumnFamily cf = map.get(key);
-                if (cf != null)
+                if ( cf != null )
                 {
                     ColumnFamily.serializer().serialize(cf, dos);
-                }
+                }                
             }
         }
-    }
-
-    public void serialize(RowMutation rm, DataOutputStream dos) throws IOException
-    {
-        dos.writeUTF(rm.table());
-        dos.writeUTF(rm.key());
-
-        /* serialize the modifications_ in the mutation */
+	}
+	
+	public void serialize(RowMutation rm, DataOutputStream dos) throws IOException
+	{
+		dos.writeUTF(rm.table());
+		dos.writeUTF(rm.key());
+		
+		/* serialize the modifications_ in the mutation */
         freezeTheMaps(rm.modifications_, dos);
-    }
-
-    private Map<String, ColumnFamily> defreezeTheMaps(DataInputStream dis) throws IOException
-    {
-        Map<String, ColumnFamily> map = new HashMap<String, ColumnFamily>();
+        
+        /* serialize the deletions_ in the mutation */
+        freezeTheMaps(rm.deletions_, dos);
+	}
+	
+	private Map<String, ColumnFamily> defreezeTheMaps(DataInputStream dis) throws IOException
+	{
+		Map<String, ColumnFamily> map = new HashMap<String, ColumnFamily>();
         int size = dis.readInt();
-        for (int i = 0; i < size; ++i)
+        for ( int i = 0; i < size; ++i )
         {
-            String key = dis.readUTF();
+            String key = dis.readUTF();  
             ColumnFamily cf = ColumnFamily.serializer().deserialize(dis);
-            map.put(key, cf);
+            map.put(key, cf);            
         }
         return map;
-    }
-
+	}
+	
     public RowMutation deserialize(DataInputStream dis) throws IOException
     {
-        String table = dis.readUTF();
-        String key = dis.readUTF();
-        Map<String, ColumnFamily> modifications = defreezeTheMaps(dis);
-        return new RowMutation(table, key, modifications);
+    	String table = dis.readUTF();
+    	String key = dis.readUTF();
+    	
+    	/* Defreeze the modifications_ map */
+    	Map<String, ColumnFamily> modifications = defreezeTheMaps(dis);
+    	
+    	/* Defreeze the deletions_ map */
+    	Map<String, ColumnFamily> deletions = defreezeTheMaps(dis);
+    	
+    	return new RowMutation(table, key, modifications, deletions);
     }
 }

Modified: incubator/cassandra/trunk/src/org/apache/cassandra/db/RowMutationVerbHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/RowMutationVerbHandler.java?rev=759026&r1=759025&r2=759026&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/RowMutationVerbHandler.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/RowMutationVerbHandler.java Fri Mar 27 05:39:40 2009
@@ -46,14 +46,18 @@
         protected Row row_ = new Row();
         protected DataInputBuffer buffer_ = new DataInputBuffer();
     }
-
-    private static Logger logger_ = Logger.getLogger(RowMutationVerbHandler.class);
+    
+    private static Logger logger_ = Logger.getLogger(RowMutationVerbHandler.class);     
     /* We use this so that we can reuse the same row mutation context for the mutation. */
     private static ThreadLocal<RowMutationContext> tls_ = new InheritableThreadLocal<RowMutationContext>();
-
+    
     public void doVerb(Message message)
     {
-        byte[] bytes = (byte[]) message.getMessageBody()[0];
+        /* For DEBUG only. Printing queue length */                         
+        logger_.info( "ROW MUTATION STAGE: " + StageManager.getStageTaskCount(StorageService.mutationStage_) );
+        /* END DEBUG */
+            
+        byte[] bytes = (byte[])message.getMessageBody()[0];
         /* Obtain a Row Mutation Context from TLS */
         RowMutationContext rowMutationCtx = tls_.get();
         if ( rowMutationCtx == null )
@@ -61,47 +65,51 @@
             rowMutationCtx = new RowMutationContext();
             tls_.set(rowMutationCtx);
         }
-
-        rowMutationCtx.buffer_.reset(bytes, bytes.length);
-
+                
+        rowMutationCtx.buffer_.reset(bytes, bytes.length);        
+        
         try
         {
-            RowMutation rm = RowMutation.serializer().deserialize(rowMutationCtx.buffer_);
-            logger_.debug("Applying " + rm);
-
+            RowMutationMessage rmMsg = RowMutationMessage.serializer().deserialize(rowMutationCtx.buffer_);
+            RowMutation rm = rmMsg.getRowMutation();
             /* Check if there were any hints in this message */
-            byte[] hintedBytes = message.getHeader(RowMutation.HINT);
+            byte[] hintedBytes = message.getHeader(RowMutationMessage.hint_);            
             if ( hintedBytes != null && hintedBytes.length > 0 )
             {
             	EndPoint hint = EndPoint.fromBytes(hintedBytes);
-                logger_.debug("Adding hint for " + hint);
                 /* add necessary hints to this mutation */
-                RowMutation hintedMutation = new RowMutation(rm.table(), HintedHandOffManager.key_);
-                hintedMutation.addHints(rm.key() + ":" + hint.getHost());
-                hintedMutation.apply();
+                try
+                {
+                	RowMutation hintedMutation = new RowMutation(rm.table(), HintedHandOffManager.key_);
+                	hintedMutation.addHints(rm.key() + ":" + hint.getHost());
+                	hintedMutation.apply();
+                }
+                catch ( ColumnFamilyNotDefinedException ex )
+                {
+                    logger_.debug(LogUtil.throwableToString(ex));
+                }
             }
-
-            long start = System.currentTimeMillis();
-
-            rowMutationCtx.row_.clear();
+            
+            long start = System.currentTimeMillis(); 
+            
             rowMutationCtx.row_.key(rm.key());
             rm.apply(rowMutationCtx.row_);
-
-            long end = System.currentTimeMillis();
-
-            WriteResponse response = new WriteResponse(rm.table(), rm.key(), true);
-            Message responseMessage = WriteResponse.makeWriteResponseMessage(message, response);
-            logger_.debug("Mutation applied in " + (end - start) + "ms.  Sending response to " +  message.getFrom() + " for key :" + rm.key());
-            MessagingService.getMessagingInstance().sendOneWay(responseMessage, message.getFrom());
-        }
-        catch(ColumnFamilyNotDefinedException ex)
+            
+            long end = System.currentTimeMillis();                       
+            logger_.info("ROW MUTATION APPLY: " + (end - start) + " ms.");
+            
+            /*WriteResponseMessage writeResponseMessage = new WriteResponseMessage(rm.table(), rm.key(), true);
+            Message response = message.getReply( StorageService.getLocalStorageEndPoint(), new Object[]{writeResponseMessage} );
+            logger_.debug("Sending teh response to " +  message.getFrom() + " for key :" + rm.key());
+            MessagingService.getMessagingInstance().sendOneWay(response, message.getFrom());  */                    
+        }         
+        catch( ColumnFamilyNotDefinedException ex )
         {
-            // TODO shouldn't this be checked before it's sent to us?
-            logger_.warn("column family not defined, and no way to tell the client", ex);
-        }
-        catch (IOException e)
+            logger_.debug(LogUtil.throwableToString(ex));
+        }        
+        catch ( IOException e )
         {
-            logger_.error("Error in row mutation", e);
-        }
+            logger_.debug(LogUtil.throwableToString(e));            
+        }        
     }
 }

Added: incubator/cassandra/trunk/src/org/apache/cassandra/db/SequentialScanner.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/SequentialScanner.java?rev=759026&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/SequentialScanner.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/SequentialScanner.java Fri Mar 27 05:39:40 2009
@@ -0,0 +1,178 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.PriorityQueue;
+import java.util.Queue;
+import org.apache.log4j.Logger;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.continuations.Suspendable;
+import org.apache.cassandra.db.ColumnFamilyNotDefinedException;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.FileStruct;
+import org.apache.cassandra.db.IScanner;
+import org.apache.cassandra.db.Row;
+import org.apache.cassandra.db.Table;
+import org.apache.cassandra.io.DataInputBuffer;
+import org.apache.cassandra.io.SSTable;
+import org.apache.cassandra.io.SequenceFile;
+import org.apache.cassandra.net.EndPoint;
+import org.apache.cassandra.net.IAsyncResult;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.LogUtil;
+
+/**
+ * This class is used to scan through all the keys in disk
+ * in Iterator style. Usage is as follows:
+ * SequentialScanner scanner = new SequentialScanner("table");
+ * 
+ * while ( scanner.hasNext() )
+ * {
+ *     Row row = scanner.next();
+ *     // Do something with the row
+ * }
+ * 
+ * @author alakshman
+ *
+ */
+
+public class SequentialScanner implements IScanner<Row>
+{   
+    private static Logger logger_ = Logger.getLogger( SequentialScanner.class );
+    private final static int bufSize_ = 1024*1024;
+    
+    /* Table over which we want to perform a sequential scan. */
+    private String table_;     
+    private Queue<FileStruct> fsQueue_ = new PriorityQueue<FileStruct>();
+    
+    public SequentialScanner(String table) throws IOException
+    {
+        table_ = table;    
+        List<String> allFiles = Table.open(table_).getAllSSTablesOnDisk();
+        
+        for (String file : allFiles)
+        {                                         
+            FileStruct fs = new FileStruct(file, SequentialScanner.bufSize_);             
+            fsQueue_.add(fs);               
+        }
+    }
+    
+    /**
+     * Determines if there is anything more to be 
+     * scanned.
+     * @return true if more elements are remanining
+     * else false.
+     */
+    public boolean hasNext() throws IOException
+    {
+        boolean hasNext = ( fsQueue_.size() > 0 ) ? true : false;
+        return hasNext;
+    }
+    
+    /**
+     * Returns the next row associated with the smallest key
+     * on disk.
+     * 
+     * @return row of the next smallest key on disk.
+     */
+    public Row next() throws IOException
+    {
+        if ( fsQueue_.size() == 0 )
+            throw new IllegalStateException("Nothing in the stream to scan.");
+    
+        Row row = null;          
+        FileStruct fs = fsQueue_.poll();   
+        
+        // Process the key only if it is in the primary range and not a block index.
+        if ( StorageService.instance().isPrimary(fs.getKey()) && !fs.getKey().equals(SSTable.blockIndexKey_) )
+        {            
+            row = Table.open(table_).get(fs.getKey());                
+        }
+        
+        doCorrections(fs.getKey());        
+        long bytesRead = fs.advance();
+        if ( bytesRead != -1L )
+            fsQueue_.add(fs);
+        return row;
+    }
+    
+    /**
+     * This method advances the pointer in the file struct
+     * in the even the same key occurs in multiple files.
+     * 
+     * @param key key we are interested in.
+     * @throws IOException
+     */
+    private void doCorrections(String key) throws IOException
+    { 
+        List<FileStruct> lfs = new ArrayList<FileStruct>();
+        Iterator<FileStruct> it = fsQueue_.iterator();
+        
+        while ( it.hasNext() )
+        {
+            FileStruct fs = it.next();
+            /* 
+             * We encountered a key that is greater 
+             * than the key we are currently serving 
+             * so scram. 
+            */
+            if ( fs.getKey().compareTo(key) != 0 )
+            {                
+                break;
+            }
+            else
+            {                
+                lfs.add(fs);
+            }
+        }
+        
+        for ( FileStruct fs : lfs )
+        {
+            /* discard duplicate entries. */
+            fsQueue_.poll();
+            long bytesRead = fs.advance();
+            if ( bytesRead != -1L )
+            {                
+                fsQueue_.add(fs);
+            }
+        }
+    }
+    
+    public void close() throws IOException
+    {
+        if ( fsQueue_.size() > 0 )
+        {            
+            for ( int i = 0; i < fsQueue_.size(); ++i )
+            {
+                FileStruct fs = fsQueue_.poll();
+                fs.close();
+            }
+        }
+    }
+    
+    public void fetch(String key, String cf) throws IOException, ColumnFamilyNotDefinedException
+    {
+        throw new UnsupportedOperationException("This operation does not make sense in the SequentialScanner");
+    }
+}