You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/03/27 03:19:01 UTC

svn commit: r758971 - in /incubator/cassandra/trunk/src/org/apache/cassandra/db: ColumnFamilyStore.java HintedHandOffManager.java Memtable.java MemtableManager.java Table.java

Author: jbellis
Date: Fri Mar 27 02:19:01 2009
New Revision: 758971

URL: http://svn.apache.org/viewvc?rev=758971&view=rev
Log:
add SuperColumn support to forceFlush.  split out recovery flushing into flushOnRecovery.

Modified:
    incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnFamilyStore.java
    incubator/cassandra/trunk/src/org/apache/cassandra/db/HintedHandOffManager.java
    incubator/cassandra/trunk/src/org/apache/cassandra/db/Memtable.java
    incubator/cassandra/trunk/src/org/apache/cassandra/db/MemtableManager.java
    incubator/cassandra/trunk/src/org/apache/cassandra/db/Table.java

Modified: incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnFamilyStore.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnFamilyStore.java?rev=758971&r1=758970&r2=758971&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnFamilyStore.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnFamilyStore.java Fri Mar 27 02:19:01 2009
@@ -395,11 +395,11 @@
         binaryMemtable_.get().put(key, buffer);
     }
 
-    void forceFlush(boolean fRecovery) throws IOException
+    void forceFlush() throws IOException
     {
         //MemtableManager.instance().submit(getColumnFamilyName(), memtable_.get() , CommitLog.CommitLogContext.NULL);
         //memtable_.get().flush(true, CommitLog.CommitLogContext.NULL);
-        memtable_.get().forceflush(this, fRecovery);
+        memtable_.get().forceflush(this);
     }
 
     void forceFlushBinary() throws IOException
@@ -1512,4 +1512,14 @@
                 + totalBytesWritten + "   Total keys read ..." + totalkeysRead);
         return;
     }
+
+    public boolean isSuper()
+    {
+        return DatabaseDescriptor.getColumnType(getColumnFamilyName()).equals("Super");
+    }
+
+    public void flushMemtableOnRecovery() throws IOException
+    {
+        memtable_.get().flushOnRecovery();
+    }
 }

Modified: incubator/cassandra/trunk/src/org/apache/cassandra/db/HintedHandOffManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/HintedHandOffManager.java?rev=758971&r1=758970&r2=758971&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/HintedHandOffManager.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/HintedHandOffManager.java Fri Mar 27 02:19:01 2009
@@ -142,7 +142,7 @@
             	if(hintedColumnFamily == null)
             	{
                     // Force flush now
-                    columnFamilyStore_.forceFlush(false);
+                    columnFamilyStore_.forceFlush();
             		return;
             	}
             	Collection<IColumn> keys = hintedColumnFamily.getAllColumns();
@@ -177,7 +177,7 @@
                 	}
             	}
                 // Force flush now
-                columnFamilyStore_.forceFlush(false);
+                columnFamilyStore_.forceFlush();
 
                 // Now do a major compaction
                 columnFamilyStore_.forceCompaction(null, null, 0, null);

Modified: incubator/cassandra/trunk/src/org/apache/cassandra/db/Memtable.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/Memtable.java?rev=758971&r1=758970&r2=758971&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/Memtable.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/Memtable.java Fri Mar 27 02:19:01 2009
@@ -20,7 +20,12 @@
 
 import java.io.FileOutputStream;
 import java.io.IOException;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
@@ -124,7 +129,7 @@
             key_ = key;
             columnFamilyName_ = cfName;
         }
-        
+
         Getter(String key, String cfName, IFilter filter)
         {
             this(key, cfName);
@@ -133,7 +138,7 @@
 
         public ColumnFamily call()
         {
-        	ColumnFamily cf = getLocalCopy(key_, columnFamilyName_, filter_);            
+        	ColumnFamily cf = getLocalCopy(key_, columnFamilyName_, filter_);
             return cf;
         }
     }
@@ -179,7 +184,7 @@
     }
 
     /**
-     * Compares two Memtable based on creation time. 
+     * Compares two Memtable based on creation time.
      * @param rhs
      * @return
      */
@@ -283,27 +288,30 @@
     /*
      * This version is used to switch memtable and force flush.
     */
-    void forceflush(ColumnFamilyStore cfStore, boolean fRecovery) throws IOException
+    public void forceflush(ColumnFamilyStore cfStore) throws IOException
     {
-        if(!fRecovery)
+        RowMutation rm = new RowMutation(DatabaseDescriptor.getTables().get(0), flushKey_);
+
+        try
         {
-	    	RowMutation rm = new RowMutation(DatabaseDescriptor.getTables().get(0), flushKey_);
-	        try
-	        {
-	            rm.add(cfStore.columnFamily_ + ":Column","0".getBytes());
-	            rm.apply();
-	        }
-	        catch(ColumnFamilyNotDefinedException ex)
-	        {
-	            logger_.debug(LogUtil.throwableToString(ex));
-	        }
+            if (cfStore.isSuper())
+            {
+                rm.add(cfStore.getColumnFamilyName() + ":SC1:Column", "0".getBytes(), 0);
+            } else {
+                rm.add(cfStore.getColumnFamilyName() + ":Column", "0".getBytes(), 0);
+            }
+            rm.apply();
         }
-        else
+        catch(ColumnFamilyNotDefinedException ex)
         {
-        	flush(CommitLog.CommitLogContext.NULL);
+            logger_.debug(LogUtil.throwableToString(ex));
         }
     }
 
+    void flushOnRecovery() throws IOException {
+        flush(CommitLog.CommitLogContext.NULL);
+    }
+
     private void resolve(String key, ColumnFamily columnFamily)
     {
     	ColumnFamily oldCf = columnFamilies_.get(key);
@@ -397,7 +405,7 @@
     	}
     	return cf;
     }
-    
+
     ColumnFamily get(String key, String cfName, IFilter filter)
     {
     	printExecutorStats();
@@ -431,10 +439,6 @@
     	apartments_.get(cfName_).submit(deleter);
     }
 
-    /*
-     * param recoveryMode - indicates if this was invoked during
-     *                      recovery.
-    */
     void flush(CommitLog.CommitLogContext cLogCtx) throws IOException
     {
         ColumnFamilyStore cfStore = Table.open(table_).getColumnFamilyStore(cfName_);
@@ -451,18 +455,18 @@
         String directory = DatabaseDescriptor.getDataFileLocation();
         String filename = cfStore.getNextFileName();
         SSTable ssTable = new SSTable(directory, filename, pType);
-        switch (pType) 
+        switch (pType)
         {
             case OPHF:
                 flushForOrderPreservingPartitioner(ssTable, cfStore, cLogCtx);
                 break;
-                
+
             default:
                 flushForRandomPartitioner(ssTable, cfStore, cLogCtx);
                 break;
-        }        
+        }
     }
-    
+
     private void flushForRandomPartitioner(SSTable ssTable, ColumnFamilyStore cfStore, CommitLog.CommitLogContext cLogCtx) throws IOException
     {
         /* List of primary keys in sorted order */
@@ -489,7 +493,7 @@
         cfStore.storeLocation( ssTable.getDataFileLocation(), bf );
         buffer.close();
     }
-    
+
     private void flushForOrderPreservingPartitioner(SSTable ssTable, ColumnFamilyStore cfStore, CommitLog.CommitLogContext cLogCtx) throws IOException
     {
         List<String> keys = new ArrayList<String>( columnFamilies_.keySet() );

Modified: incubator/cassandra/trunk/src/org/apache/cassandra/db/MemtableManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/MemtableManager.java?rev=758971&r1=758970&r2=758971&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/MemtableManager.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/MemtableManager.java Fri Mar 27 02:19:01 2009
@@ -27,10 +27,8 @@
 
 import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
 import org.apache.cassandra.concurrent.ThreadFactoryImpl;
-import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.utils.LogUtil;
 import org.apache.log4j.Logger;
-import org.apache.cassandra.utils.*;
 
 /**
  * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )

Modified: incubator/cassandra/trunk/src/org/apache/cassandra/db/Table.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/Table.java?rev=758971&r1=758970&r2=758971&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/Table.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/Table.java Fri Mar 27 02:19:01 2009
@@ -847,10 +847,15 @@
         Set<String> cfNames = columnFamilyStores_.keySet();
         for ( String cfName : cfNames )
         {
-            columnFamilyStores_.get(cfName).forceFlush(fRecovery);
+            if (fRecovery) {
+                columnFamilyStores_.get(cfName).flushMemtableOnRecovery();
+            } else {
+                columnFamilyStores_.get(cfName).forceFlush();
+            }
         }
     }
 
+
     void delete(Row row) throws IOException
     {
         String key = row.key();
@@ -895,7 +900,7 @@
     	            }
     	            else if(column.timestamp() == 3)
     	            {
-    	            	cfStore.forceFlush(false);
+    	            	cfStore.forceFlush();
     	            }
     	            else if(column.timestamp() == 4)
     	            {