You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/06/30 00:07:42 UTC
svn commit: r789464 - in
/incubator/cassandra/trunk/src/java/org/apache/cassandra/db: CommitLog.java
RecoveryManager.java
Author: jbellis
Date: Mon Jun 29 22:07:41 2009
New Revision: 789464
URL: http://svn.apache.org/viewvc?rev=789464&view=rev
Log:
don't remove commitlog files when replay fails; you lose the chance to fix a bug, as well as your data.
patch by jbellis; reviewed by Jun Rao for CASSANDRA-264
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RecoveryManager.java
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java?rev=789464&r1=789463&r2=789464&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java Mon Jun 29 22:07:41 2009
@@ -256,72 +256,55 @@
{
// IFileReader reader = SequenceFile.bufferedReader(file.getAbsolutePath(), DatabaseDescriptor.getLogFileSizeThreshold());
IFileReader reader = SequenceFile.reader(file.getAbsolutePath());
- try
- {
- CommitLogHeader clHeader = readCommitLogHeader(reader);
- /* seek to the lowest position */
- int lowPos = CommitLogHeader.getLowestPosition(clHeader);
- /*
- * If lowPos == 0 then we need to skip the processing of this
- * file.
- */
- if (lowPos == 0)
- break;
- else
- reader.seek(lowPos);
+ CommitLogHeader clHeader = readCommitLogHeader(reader);
+ /* seek to the lowest position */
+ int lowPos = CommitLogHeader.getLowestPosition(clHeader);
+ /*
+ * If lowPos == 0 then we need to skip the processing of this
+ * file.
+ */
+ if (lowPos == 0)
+ break;
+ else
+ reader.seek(lowPos);
- Set<Table> tablesRecovered = new HashSet<Table>();
+ Set<Table> tablesRecovered = new HashSet<Table>();
- /* read the logs populate RowMutation and apply */
- while ( !reader.isEOF() )
+ /* read the logs populate RowMutation and apply */
+ while ( !reader.isEOF() )
+ {
+ byte[] bytes = new byte[(int)reader.readLong()];
+ reader.readDirect(bytes);
+ bufIn.reset(bytes, bytes.length);
+
+ /* read the commit log entry */
+ Row row = Row.serializer().deserialize(bufIn);
+ Table table = Table.open(row.getTable());
+ tablesRecovered.add(table);
+ Collection<ColumnFamily> columnFamilies = new ArrayList<ColumnFamily>(row.getColumnFamilies());
+ /* remove column families that have already been flushed */
+ for (ColumnFamily columnFamily : columnFamilies)
{
- byte[] bytes = new byte[(int)reader.readLong()];
- reader.readDirect(bytes);
- bufIn.reset(bytes, bytes.length);
-
- /* read the commit log entry */
- try
- {
- Row row = Row.serializer().deserialize(bufIn);
- Table table = Table.open(row.getTable());
- tablesRecovered.add(table);
- Collection<ColumnFamily> columnFamilies = new ArrayList<ColumnFamily>(row.getColumnFamilies());
- /* remove column families that have already been flushed */
- for (ColumnFamily columnFamily : columnFamilies)
- {
- /* TODO: Remove this to not process Hints */
- if ( !DatabaseDescriptor.isApplicationColumnFamily(columnFamily.name()) )
- {
- row.removeColumnFamily(columnFamily);
- continue;
- }
- int id = table.getColumnFamilyId(columnFamily.name());
- if ( !clHeader.isDirty(id) || reader.getCurrentPosition() < clHeader.getPosition(id) )
- row.removeColumnFamily(columnFamily);
- }
- if ( !row.isEmpty() )
- {
- table.applyNow(row);
- }
- }
- catch ( IOException e )
+ /* TODO: Remove this to not process Hints */
+ if ( !DatabaseDescriptor.isApplicationColumnFamily(columnFamily.name()) )
{
- logger_.error("Unexpected error reading " + file.getName() + "; attempting to continue with the next entry", e);
+ row.removeColumnFamily(columnFamily);
+ continue;
}
+ int id = table.getColumnFamilyId(columnFamily.name());
+ if ( !clHeader.isDirty(id) || reader.getCurrentPosition() < clHeader.getPosition(id) )
+ row.removeColumnFamily(columnFamily);
}
- reader.close();
- /* apply the rows read -- success will result in the CL file being discarded */
- for (Table table : tablesRecovered)
+ if ( !row.isEmpty() )
{
- table.flush(true);
+ table.applyNow(row);
}
}
- catch (Throwable th)
+ reader.close();
+ /* apply the rows read -- success will result in the CL file being discarded */
+ for (Table table : tablesRecovered)
{
- logger_.error("Fatal error reading " + file.getName(), th);
- /* close the reader and delete this commit log. */
- reader.close();
- FileUtils.delete(new File[]{ file });
+ table.flush(true);
}
}
}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RecoveryManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RecoveryManager.java?rev=789464&r1=789463&r2=789464&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RecoveryManager.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RecoveryManager.java Mon Jun 29 22:07:41 2009
@@ -24,6 +24,7 @@
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.utils.FileUtils;
import org.apache.log4j.Logger;
+import org.apache.commons.lang.StringUtils;
/**
@@ -54,7 +55,11 @@
public static void doRecovery() throws IOException
{
File[] files = getListofCommitLogs();
+ if (files.length == 0)
+ return;
+
Arrays.sort(files, new FileUtils.FileComparator());
+ logger_.info("Replaying " + StringUtils.join(files, ", "));
new CommitLog(true).recover(files);
FileUtils.delete(files);
}