You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/06/30 00:07:49 UTC
svn commit: r789465 - in /incubator/cassandra/trunk:
src/java/org/apache/cassandra/db/CommitLog.java
test/unit/org/apache/cassandra/db/RecoveryManager2Test.java
test/unit/org/apache/cassandra/db/RecoveryManagerTest.java
Author: jbellis
Date: Mon Jun 29 22:07:49 2009
New Revision: 789465
URL: http://svn.apache.org/viewvc?rev=789465&view=rev
Log:
add test catching buggy update of header on flush; refactor so there is only one version of code doing those writes (the correct one).
patch by jbellis; reviewed by Jun Rao for CASSANDRA-264
Added:
incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/RecoveryManager2Test.java
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java
incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/RecoveryManagerTest.java
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java?rev=789465&r1=789464&r2=789465&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java Mon Jun 29 22:07:49 2009
@@ -123,18 +123,6 @@
return Long.parseLong(entries[entries.length - 2]);
}
- /*
- * Write the serialized commit log header into the specified commit log.
- */
- private static void writeCommitLogHeader(String commitLogFileName, byte[] bytes) throws IOException
- {
- IFileWriter logWriter = CommitLog.createWriter(commitLogFileName);
- logWriter.seek(0L);
- /* write the commit log header */
- logWriter.writeDirect(bytes);
- logWriter.close();
- }
-
private static IFileWriter createWriter(String file) throws IOException
{
return SequenceFile.writer(file);
@@ -221,31 +209,40 @@
}
/*
+ * Write the serialized commit log header into the specified commit log.
+ */
+ private static void writeCommitLogHeader(String commitLogFileName, byte[] bytes) throws IOException
+ {
+ IFileWriter logWriter = CommitLog.createWriter(commitLogFileName);
+ writeCommitLogHeader(logWriter, bytes);
+ }
+
+ /*
* This is invoked on startup via the ctor. It basically
* writes a header with all bits set to zero.
*/
private void writeCommitLogHeader() throws IOException
{
int cfSize = Table.TableMetadata.getColumnFamilyCount();
- /* record the beginning of the commit header */
- /* write the commit log header */
clHeader_ = new CommitLogHeader(cfSize);
- writeCommitLogHeader(clHeader_.toByteArray(), false);
+ writeCommitLogHeader(logWriter_, clHeader_.toByteArray());
}
- private void writeCommitLogHeader(byte[] bytes, boolean reset) throws IOException
+ /** writes header at the beginning of the file, then seeks back to current position */
+ private void seekAndWriteCommitLogHeader(byte[] bytes) throws IOException
{
- /* record the current position */
long currentPos = logWriter_.getCurrentPosition();
logWriter_.seek(0);
- /* write the commit log header */
- logWriter_.writeLong(bytes.length);
- logWriter_.writeDirect(bytes);
- if (reset)
- {
- /* seek back to the old position */
- logWriter_.seek(currentPos);
- }
+
+ writeCommitLogHeader(logWriter_, bytes);
+
+ logWriter_.seek(currentPos);
+ }
+
+ private static void writeCommitLogHeader(IFileWriter logWriter, byte[] bytes) throws IOException
+ {
+ logWriter.writeLong(bytes.length);
+ logWriter.writeDirect(bytes);
}
void recover(File[] clogs) throws IOException
@@ -254,7 +251,6 @@
for (File file : clogs)
{
- // IFileReader reader = SequenceFile.bufferedReader(file.getAbsolutePath(), DatabaseDescriptor.getLogFileSizeThreshold());
IFileReader reader = SequenceFile.reader(file.getAbsolutePath());
CommitLogHeader clHeader = readCommitLogHeader(reader);
/* seek to the lowest position */
@@ -322,7 +318,7 @@
if (!clHeader_.isDirty(id) || (clHeader_.isDirty(id) && clHeader_.getPosition(id) == 0))
{
clHeader_.turnOn(id, logWriter_.getCurrentPosition());
- writeCommitLogHeader(clHeader_.toByteArray(), true);
+ seekAndWriteCommitLogHeader(clHeader_.toByteArray());
}
}
}
@@ -380,18 +376,17 @@
{
Table table = Table.open(tableName);
int id = table.getColumnFamilyId(cf);
- /* trying discarding old commit log files */
- discard(cLogCtx, id);
+ discardCompletedSegments(cLogCtx, id);
}
/*
- * Check if old commit logs can be deleted.
+ * Delete log segments whose contents have been turned into SSTables.
*
* param @ cLogCtx The commitLog context .
* param @ id id of the columnFamily being flushed to disk.
*
*/
- private void discard(CommitLog.CommitLogContext cLogCtx, int id) throws IOException
+ private void discardCompletedSegments(CommitLog.CommitLogContext cLogCtx, int id) throws IOException
{
/* retrieve the commit log header associated with the file in the context */
CommitLogHeader commitLogHeader = clHeaders_.get(cLogCtx.file);
@@ -482,7 +477,7 @@
// whether it's safe to remove a given log segment by and-ing its dirty
// with the current one.
clHeader_.zeroPositions();
- writeCommitLogHeader(clHeader_.toByteArray(), false);
+ writeCommitLogHeader(logWriter_, clHeader_.toByteArray());
}
}
}
\ No newline at end of file
Added: incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/RecoveryManager2Test.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/RecoveryManager2Test.java?rev=789465&view=auto
==============================================================================
--- incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/RecoveryManager2Test.java (added)
+++ incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/RecoveryManager2Test.java Mon Jun 29 22:07:49 2009
@@ -0,0 +1,39 @@
+package org.apache.cassandra.db;
+
+import java.io.IOException;
+import java.util.concurrent.ExecutionException;
+import java.util.Set;
+import java.util.HashSet;
+import java.util.Arrays;
+
+import org.junit.Test;
+
+import org.apache.cassandra.CleanupHelper;
+
+public class RecoveryManager2Test extends CleanupHelper
+{
+ @Test
+ public void testWithFlush() throws IOException, ExecutionException, InterruptedException
+ {
+ Table table1 = Table.open("Table1");
+ Set<String> keys = new HashSet<String>();
+
+ for (int i = 0; i < 100; i++)
+ {
+ String key = "key" + i;
+ RowMutation rm = new RowMutation("Table1", key);
+ ColumnFamily cf = ColumnFamily.create("Table1", "Standard1");
+ cf.addColumn(new Column("col1", "val1".getBytes(), 1L));
+ rm.add(cf);
+ rm.apply();
+ keys.add(key);
+ }
+ table1.getColumnFamilyStore("Standard1").forceBlockingFlush();
+
+ table1.getColumnFamilyStore("Standard1").clearUnsafe();
+ RecoveryManager.doRecovery();
+
+ Set<String> foundKeys = new HashSet<String>(table1.getKeyRange(Arrays.asList("Standard1"), "", "", 1000));
+ assert keys.equals(foundKeys);
+ }
+}
Modified: incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/RecoveryManagerTest.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/RecoveryManagerTest.java?rev=789465&r1=789464&r2=789465&view=diff
==============================================================================
--- incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/RecoveryManagerTest.java (original)
+++ incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/RecoveryManagerTest.java Mon Jun 29 22:07:49 2009
@@ -20,6 +20,9 @@
import java.io.IOException;
import java.util.concurrent.ExecutionException;
+import java.util.Arrays;
+import java.util.Set;
+import java.util.HashSet;
import org.junit.Test;
@@ -36,7 +39,7 @@
}
@Test
- public void testSomething() throws IOException, ExecutionException, InterruptedException
+ public void testOne() throws IOException, ExecutionException, InterruptedException
{
Table table1 = Table.open("Table1");
Table table2 = Table.open("Table2");