You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/06/30 00:07:42 UTC

svn commit: r789464 - in /incubator/cassandra/trunk/src/java/org/apache/cassandra/db: CommitLog.java RecoveryManager.java

Author: jbellis
Date: Mon Jun 29 22:07:41 2009
New Revision: 789464

URL: http://svn.apache.org/viewvc?rev=789464&view=rev
Log:
don't remove commitlog files when replay fails; you lose the chance to fix a bug, as well as your data.
patch by jbellis; reviewed by Jun Rao for CASSANDRA-264

Modified:
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RecoveryManager.java

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java?rev=789464&r1=789463&r2=789464&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java Mon Jun 29 22:07:41 2009
@@ -256,72 +256,55 @@
         {
             // IFileReader reader = SequenceFile.bufferedReader(file.getAbsolutePath(), DatabaseDescriptor.getLogFileSizeThreshold());
             IFileReader reader = SequenceFile.reader(file.getAbsolutePath());
-            try
-            {
-                CommitLogHeader clHeader = readCommitLogHeader(reader);
-                /* seek to the lowest position */
-                int lowPos = CommitLogHeader.getLowestPosition(clHeader);
-                /*
-                 * If lowPos == 0 then we need to skip the processing of this
-                 * file.
-                */
-                if (lowPos == 0)
-                    break;
-                else
-                    reader.seek(lowPos);
+            CommitLogHeader clHeader = readCommitLogHeader(reader);
+            /* seek to the lowest position */
+            int lowPos = CommitLogHeader.getLowestPosition(clHeader);
+            /*
+             * If lowPos == 0 then we need to skip the processing of this
+             * file.
+            */
+            if (lowPos == 0)
+                break;
+            else
+                reader.seek(lowPos);
 
-                Set<Table> tablesRecovered = new HashSet<Table>();
+            Set<Table> tablesRecovered = new HashSet<Table>();
 
-                /* read the logs populate RowMutation and apply */
-                while ( !reader.isEOF() )
+            /* read the logs populate RowMutation and apply */
+            while ( !reader.isEOF() )
+            {
+                byte[] bytes = new byte[(int)reader.readLong()];
+                reader.readDirect(bytes);
+                bufIn.reset(bytes, bytes.length);
+
+                /* read the commit log entry */
+                Row row = Row.serializer().deserialize(bufIn);
+                Table table = Table.open(row.getTable());
+                tablesRecovered.add(table);
+                Collection<ColumnFamily> columnFamilies = new ArrayList<ColumnFamily>(row.getColumnFamilies());
+                /* remove column families that have already been flushed */
+                for (ColumnFamily columnFamily : columnFamilies)
                 {
-                    byte[] bytes = new byte[(int)reader.readLong()];
-                    reader.readDirect(bytes);
-                    bufIn.reset(bytes, bytes.length);
-
-                    /* read the commit log entry */
-                    try
-                    {                        
-                        Row row = Row.serializer().deserialize(bufIn);
-                        Table table = Table.open(row.getTable());
-                        tablesRecovered.add(table);
-                        Collection<ColumnFamily> columnFamilies = new ArrayList<ColumnFamily>(row.getColumnFamilies());
-                        /* remove column families that have already been flushed */
-                        for (ColumnFamily columnFamily : columnFamilies)
-                        {
-                        	/* TODO: Remove this to not process Hints */
-                        	if ( !DatabaseDescriptor.isApplicationColumnFamily(columnFamily.name()) )
-                        	{
-                        		row.removeColumnFamily(columnFamily);
-                        		continue;
-                        	}	
-                            int id = table.getColumnFamilyId(columnFamily.name());
-                            if ( !clHeader.isDirty(id) || reader.getCurrentPosition() < clHeader.getPosition(id) )
-                                row.removeColumnFamily(columnFamily);
-                        }
-                        if ( !row.isEmpty() )
-                        {                            
-                        	table.applyNow(row);
-                        }
-                    }
-                    catch ( IOException e )
+                    /* TODO: Remove this to not process Hints */
+                    if ( !DatabaseDescriptor.isApplicationColumnFamily(columnFamily.name()) )
                     {
-                        logger_.error("Unexpected error reading " + file.getName() + "; attempting to continue with the next entry", e);
+                        row.removeColumnFamily(columnFamily);
+                        continue;
                     }
+                    int id = table.getColumnFamilyId(columnFamily.name());
+                    if ( !clHeader.isDirty(id) || reader.getCurrentPosition() < clHeader.getPosition(id) )
+                        row.removeColumnFamily(columnFamily);
                 }
-                reader.close();
-                /* apply the rows read -- success will result in the CL file being discarded */
-                for (Table table : tablesRecovered)
+                if ( !row.isEmpty() )
                 {
-                    table.flush(true);
+                    table.applyNow(row);
                 }
             }
-            catch (Throwable th)
+            reader.close();
+            /* apply the rows read -- success will result in the CL file being discarded */
+            for (Table table : tablesRecovered)
             {
-                logger_.error("Fatal error reading " + file.getName(), th);
-                /* close the reader and delete this commit log. */
-                reader.close();
-                FileUtils.delete(new File[]{ file });
+                table.flush(true);
             }
         }
     }

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RecoveryManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RecoveryManager.java?rev=789464&r1=789463&r2=789464&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RecoveryManager.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RecoveryManager.java Mon Jun 29 22:07:41 2009
@@ -24,6 +24,7 @@
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.utils.FileUtils;
 import org.apache.log4j.Logger;
+import org.apache.commons.lang.StringUtils;
 
 
 /**
@@ -54,7 +55,11 @@
     public static void doRecovery() throws IOException
     {
         File[] files = getListofCommitLogs();
+        if (files.length == 0)
+            return;
+
         Arrays.sort(files, new FileUtils.FileComparator());
+        logger_.info("Replaying " + StringUtils.join(files, ", "));
         new CommitLog(true).recover(files);
         FileUtils.delete(files);
     }