You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/06/18 22:22:10 UTC

svn commit: r786243 - in /incubator/cassandra/trunk: src/java/org/apache/cassandra/config/ src/java/org/apache/cassandra/db/ test/unit/org/apache/cassandra/db/

Author: jbellis
Date: Thu Jun 18 20:22:10 2009
New Revision: 786243

URL: http://svn.apache.org/viewvc?rev=786243&view=rev
Log:
add CommitLog and RecoveryManager tests.  patch by jbellis; reviewed by goffinet for CASSANDRA-237

Modified:
    incubator/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RecoveryManager.java
    incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/CommitLogTest.java
    incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/RecoveryManagerTest.java
    incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/TableTest.java

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java?rev=786243&r1=786242&r2=786243&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java Thu Jun 18 20:22:10 2009
@@ -24,10 +24,7 @@
 
 import org.apache.log4j.Logger;
 
-import org.apache.cassandra.db.ColumnFamily;
-import org.apache.cassandra.db.Table;
-import org.apache.cassandra.db.TypeInfo;
-import org.apache.cassandra.db.SystemTable;
+import org.apache.cassandra.db.*;
 import org.apache.cassandra.utils.FileUtils;
 import org.apache.cassandra.utils.XMLUtils;
 import org.w3c.dom.Node;
@@ -63,7 +60,6 @@
     private static int currentIndex_ = 0;
     private static String logFileDirectory_;
     private static String bootstrapFileDirectory_;
-    private static int logRotationThreshold_ = 128*1024*1024;
     private static boolean fastSync_ = false;
     private static boolean rackAware_ = false;
     private static int threadsPerPool_ = 4;
@@ -293,7 +289,7 @@
             /* threshold after which commit log should be rotated. */
             String value = xmlUtils.getNodeValue("/Storage/CommitLogRotationThresholdInMB");
             if ( value != null)
-                logRotationThreshold_ = Integer.parseInt(value) * 1024 * 1024;
+                CommitLog.setSegmentSize(Integer.parseInt(value) * 1024 * 1024);
 
             /* fast sync option */
             value = xmlUtils.getNodeValue("/Storage/CommitLogFastSync");
@@ -743,11 +739,6 @@
         bootstrapFileDirectory_ = bfLocation;
     }
 
-    public static int getLogFileSizeThreshold()
-    {
-        return logRotationThreshold_;
-    }
-
     public static String getLogFileLocation()
     {
         return logFileDirectory_;

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=786243&r1=786242&r2=786243&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Thu Jun 18 20:22:10 2009
@@ -1661,4 +1661,17 @@
             lock_.readLock().unlock();
         }
     }
+
+    void clearUnsafe()
+    {
+        lock_.writeLock().lock();
+        try
+        {
+            memtable_.clearUnsafe();
+        }
+        finally
+        {
+            lock_.writeLock().unlock();
+        }
+    }
 }

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java?rev=786243&r1=786242&r2=786243&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java Thu Jun 18 20:22:10 2009
@@ -61,7 +61,7 @@
  */
 public class CommitLog
 {
-    private static final int bufSize_ = 128*1024*1024;
+    private static volatile int SEGMENT_SIZE = 128*1024*1024; // roll after log gets this big
     private static Map<String, CommitLog> instances_ = new HashMap<String, CommitLog>();
     private static Lock lock_ = new ReentrantLock();
     private static Logger logger_ = Logger.getLogger(CommitLog.class);
@@ -112,6 +112,16 @@
         }
     }
 
+    public static void setSegmentSize(int size)
+    {
+        SEGMENT_SIZE = size;
+    }
+
+    static int getSegmentCount()
+    {
+        return clHeaders_.size();
+    }
+
     static long getCreationTime(String file)
     {
         String[] entries = FBUtilities.strip(file, "-.");
@@ -134,9 +144,7 @@
     {        
         if ( DatabaseDescriptor.isFastSync() )
         {
-            /* Add this to the threshold */
-            int bufSize = 4*1024*1024;
-            return SequenceFile.fastWriter(file, CommitLog.bufSize_ + bufSize);
+            return SequenceFile.fastWriter(file, 4*1024*1024);
         }
         else
             return SequenceFile.writer(file);
@@ -178,9 +186,6 @@
     private CommitLogHeader clHeader_;
     private IFileWriter logWriter_;
     private long commitHeaderStartPos_;
-    /* Force rollover the commit log on the next insert */
-    private boolean forcedRollOver_ = false;
-
 
     /*
      * Generates a file name of the format CommitLog-<table>-<timestamp>.log in the
@@ -456,8 +461,7 @@
             /* Update the header */
             updateHeader(row);
             logWriter_.append(table_, cfBuffer);
-            fileSize = logWriter_.getFileSize();                       
-            checkThresholdAndRollLog(fileSize);            
+            checkThresholdAndRollLog();
         }
         catch (IOException e)
         {
@@ -573,50 +577,38 @@
         }
     }
 
-    private void checkThresholdAndRollLog( long fileSize )
+    private void checkThresholdAndRollLog()
     {
         try
         {
-            if ( fileSize >= DatabaseDescriptor.getLogFileSizeThreshold() || forcedRollOver_ )
+            if (logWriter_.getFileSize() >= SEGMENT_SIZE)
             {
-                if ( logWriter_.getFileSize() >= DatabaseDescriptor.getLogFileSizeThreshold() || forcedRollOver_ )
-                {
-	                /* Rolls the current log file over to a new one. */
-	                setNextFileName();
-	                String oldLogFile = logWriter_.getFileName();
-	                //history_.add(oldLogFile);
-	                logWriter_.close();
-	
-	                /* point reader/writer to a new commit log file. */
-	                // logWriter_ = SequenceFile.writer(logFile_);
-	                logWriter_ = CommitLog.createWriter(logFile_);
-	                /* squirrel away the old commit log header */
-	                clHeaders_.put(oldLogFile, new CommitLogHeader( clHeader_ ));
-	                /*
-	                 * We need to zero out positions because the positions in
-	                 * the old file do not make sense in the new one.
-	                */
-	                clHeader_.zeroPositions();
-	                writeCommitLogHeader(clHeader_.toByteArray(), false);
-	                // Get the list of files in commit log directory if it is greater than a certain number  
-	                // Force flush all the column families that way we ensure that a slowly populated column family is not screwing up 
-	                // by accumulating the commit logs .
-                }
+                /* Rolls the current log file over to a new one. */
+                setNextFileName();
+                String oldLogFile = logWriter_.getFileName();
+                //history_.add(oldLogFile);
+                logWriter_.close();
+
+                /* point reader/writer to a new commit log file. */
+                // logWriter_ = SequenceFile.writer(logFile_);
+                logWriter_ = CommitLog.createWriter(logFile_);
+                /* squirrel away the old commit log header */
+                clHeaders_.put(oldLogFile, new CommitLogHeader(clHeader_));
+                /*
+                 * We need to zero out positions because the positions in
+                 * the old file do not make sense in the new one.
+                */
+                clHeader_.zeroPositions();
+                writeCommitLogHeader(clHeader_.toByteArray(), false);
+                // Get the list of files in commit log directory if it is greater than a certain number
+                // Force flush all the column families that way we ensure that a slowly populated column family is not screwing up
+                // by accumulating the commit logs .
             }
         }
-        catch ( IOException e )
+        catch (IOException e)
         {
             logger_.info(LogUtil.throwableToString(e));
         }
-        finally
-        {
-        	forcedRollOver_ = false;
-        }
-    }
-
-    public void setForcedRollOver()
-    {
-    	forcedRollOver_ = true;
     }
 
     public static void main(String[] args) throws Throwable

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java?rev=786243&r1=786242&r2=786243&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java Thu Jun 18 20:22:10 2009
@@ -388,4 +388,9 @@
             }
         };
     }
+
+    public void clearUnsafe()
+    {
+        columnFamilies_.clear();
+    }
 }

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RecoveryManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RecoveryManager.java?rev=786243&r1=786242&r2=786243&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RecoveryManager.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RecoveryManager.java Thu Jun 18 20:22:10 2009
@@ -70,7 +70,7 @@
         return tableToCommitLogs;
     }
     
-    public void doRecovery() throws IOException
+    public static void doRecovery() throws IOException
     {
         File[] files = getListofCommitLogs();
         Map<String, List<File>> tableToCommitLogs = getListOFCommitLogsPerTable();
@@ -78,7 +78,7 @@
         FileUtils.delete(files);
     }
     
-    private void recoverEachTable(Map<String, List<File>> tableToCommitLogs) throws IOException
+    private static void recoverEachTable(Map<String, List<File>> tableToCommitLogs) throws IOException
     {
         Comparator<File> fCmp = new FileUtils.FileComparator();
         Set<String> tables = tableToCommitLogs.keySet();
@@ -90,12 +90,4 @@
             clog.recover(clogs);
         }
     }
-    
-    public static void main(String[] args) throws Throwable
-    {
-        long start = System.currentTimeMillis();
-        RecoveryManager rm = RecoveryManager.instance();
-        rm.doRecovery();  
-        logger_.debug( "Time taken : " + (System.currentTimeMillis() - start) + " ms.");
-    }
 }

Modified: incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/CommitLogTest.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/CommitLogTest.java?rev=786243&r1=786242&r2=786243&view=diff
==============================================================================
--- incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/CommitLogTest.java (original)
+++ incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/CommitLogTest.java Thu Jun 18 20:22:10 2009
@@ -19,6 +19,7 @@
 package org.apache.cassandra.db;
 
 import java.io.IOException;
+import java.util.concurrent.ExecutionException;
 
 import org.junit.Test;
 
@@ -27,32 +28,33 @@
 public class CommitLogTest extends CleanupHelper
 {
     @Test
-    public void testMain() throws IOException {
-        // TODO this is useless, since it assumes we have a working set of commit logs to parse
-        /*
-        File logDir = new File(DatabaseDescriptor.getLogFileLocation());
-        File[] files = logDir.listFiles();
-        Arrays.sort( files, new FileUtils.FileComparator() );
+    public void testCleanup() throws IOException, ExecutionException, InterruptedException
+    {
+        assert CommitLog.getSegmentCount() == 0;
+        CommitLog.setSegmentSize(1000);
+
+        Table table = Table.open("Table1");
+        ColumnFamilyStore store1 = table.getColumnFamilyStore("Standard1");
+        ColumnFamilyStore store2 = table.getColumnFamilyStore("Standard2");
+        RowMutation rm;
+        byte[] value = new byte[501];
 
-        byte[] bytes = new byte[CommitLogHeader.size(Integer.parseInt(args[0]))];
-        for ( File file : files )
+        // add data.  use relatively large values to force quick segment creation since we have a low flush threshold in the test config.
+        for (int i = 0; i < 10; i++)
         {
-            CommitLog clog = new CommitLog( file );
-            clog.readCommitLogHeader(file.getAbsolutePath(), bytes);
-            DataInputBuffer bufIn = new DataInputBuffer();
-            bufIn.reset(bytes, 0, bytes.length);
-            CommitLogHeader clHeader = CommitLogHeader.serializer().deserialize(bufIn);
-
-            StringBuilder sb = new StringBuilder("");
-            for ( byte b : bytes )
-            {
-                sb.append(b);
-                sb.append(" ");
-            }
-
-            System.out.println("FILE:" + file);
-            System.out.println(clHeader.toString());
+            rm = new RowMutation("Table1", "key1");
+            rm.add("Standard1:Column1", value, 0);
+            rm.add("Standard2:Column1", value, 0);
+            rm.apply();
         }
-        */
+        assert CommitLog.getSegmentCount() > 1;
+
+        // nothing should get removed after flushing just Standard1
+        store1.forceBlockingFlush();
+        assert CommitLog.getSegmentCount() > 1;
+
+        // after flushing Standard2 we should be able to clean out all segments
+        store2.forceBlockingFlush();
+        assert CommitLog.getSegmentCount() == 1;
     }
 }

Modified: incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/RecoveryManagerTest.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/RecoveryManagerTest.java?rev=786243&r1=786242&r2=786243&view=diff
==============================================================================
--- incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/RecoveryManagerTest.java (original)
+++ incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/RecoveryManagerTest.java Thu Jun 18 20:22:10 2009
@@ -19,17 +19,40 @@
 package org.apache.cassandra.db;
 
 import java.io.IOException;
+import java.util.concurrent.ExecutionException;
 
 import org.junit.Test;
 
 import org.apache.cassandra.CleanupHelper;
+import static org.apache.cassandra.db.TableTest.assertColumns;
 
 public class RecoveryManagerTest extends CleanupHelper
 {
     @Test
-    public void testDoRecovery() throws IOException {
+    public void testNothing() throws IOException {
         // TODO nothing to recover
         RecoveryManager rm = RecoveryManager.instance();
         rm.doRecovery();  
     }
+
+    @Test
+    public void testSomething() throws IOException, ExecutionException, InterruptedException
+    {
+        Table table1 = Table.open("Table1");
+
+        RowMutation rm;
+        ColumnFamily cf;
+
+        rm = new RowMutation("Table1", "keymulti");
+        cf = new ColumnFamily("Standard1", "Standard");
+        cf.addColumn(new Column("col1", "val1".getBytes(), 1L));
+        rm.add(cf);
+        rm.apply();
+
+        table1.getColumnFamilyStore("Standard1").clearUnsafe();
+
+        RecoveryManager.doRecovery();
+
+        assertColumns(table1.get("keymulti", "Standard1"), "col1");
+    }
 }

Modified: incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/TableTest.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/TableTest.java?rev=786243&r1=786242&r2=786243&view=diff
==============================================================================
--- incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/TableTest.java (original)
+++ incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/TableTest.java Thu Jun 18 20:22:10 2009
@@ -349,7 +349,7 @@
         assertEquals(new String(cfres.getColumn("col1992").value()), "vvvvvvvvvvvvvvvv1992");
     }
 
-    private void assertColumns(ColumnFamily columnFamily, String... columnNames)
+    public static void assertColumns(ColumnFamily columnFamily, String... columnNames)
     {
         assertNotNull(columnFamily);
         SortedSet<IColumn> columns = columnFamily.getAllColumns();