You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/06/18 22:22:10 UTC
svn commit: r786243 - in /incubator/cassandra/trunk:
src/java/org/apache/cassandra/config/ src/java/org/apache/cassandra/db/
test/unit/org/apache/cassandra/db/
Author: jbellis
Date: Thu Jun 18 20:22:10 2009
New Revision: 786243
URL: http://svn.apache.org/viewvc?rev=786243&view=rev
Log:
add CommitLog and RecoveryManager tests. patch by jbellis; reviewed by goffinet for CASSANDRA-237
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RecoveryManager.java
incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/CommitLogTest.java
incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/RecoveryManagerTest.java
incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/TableTest.java
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java?rev=786243&r1=786242&r2=786243&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java Thu Jun 18 20:22:10 2009
@@ -24,10 +24,7 @@
import org.apache.log4j.Logger;
-import org.apache.cassandra.db.ColumnFamily;
-import org.apache.cassandra.db.Table;
-import org.apache.cassandra.db.TypeInfo;
-import org.apache.cassandra.db.SystemTable;
+import org.apache.cassandra.db.*;
import org.apache.cassandra.utils.FileUtils;
import org.apache.cassandra.utils.XMLUtils;
import org.w3c.dom.Node;
@@ -63,7 +60,6 @@
private static int currentIndex_ = 0;
private static String logFileDirectory_;
private static String bootstrapFileDirectory_;
- private static int logRotationThreshold_ = 128*1024*1024;
private static boolean fastSync_ = false;
private static boolean rackAware_ = false;
private static int threadsPerPool_ = 4;
@@ -293,7 +289,7 @@
/* threshold after which commit log should be rotated. */
String value = xmlUtils.getNodeValue("/Storage/CommitLogRotationThresholdInMB");
if ( value != null)
- logRotationThreshold_ = Integer.parseInt(value) * 1024 * 1024;
+ CommitLog.setSegmentSize(Integer.parseInt(value) * 1024 * 1024);
/* fast sync option */
value = xmlUtils.getNodeValue("/Storage/CommitLogFastSync");
@@ -743,11 +739,6 @@
bootstrapFileDirectory_ = bfLocation;
}
- public static int getLogFileSizeThreshold()
- {
- return logRotationThreshold_;
- }
-
public static String getLogFileLocation()
{
return logFileDirectory_;
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=786243&r1=786242&r2=786243&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Thu Jun 18 20:22:10 2009
@@ -1661,4 +1661,17 @@
lock_.readLock().unlock();
}
}
+
+ void clearUnsafe()
+ {
+ lock_.writeLock().lock();
+ try
+ {
+ memtable_.clearUnsafe();
+ }
+ finally
+ {
+ lock_.writeLock().unlock();
+ }
+ }
}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java?rev=786243&r1=786242&r2=786243&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java Thu Jun 18 20:22:10 2009
@@ -61,7 +61,7 @@
*/
public class CommitLog
{
- private static final int bufSize_ = 128*1024*1024;
+ private static volatile int SEGMENT_SIZE = 128*1024*1024; // roll after log gets this big
private static Map<String, CommitLog> instances_ = new HashMap<String, CommitLog>();
private static Lock lock_ = new ReentrantLock();
private static Logger logger_ = Logger.getLogger(CommitLog.class);
@@ -112,6 +112,16 @@
}
}
+ public static void setSegmentSize(int size)
+ {
+ SEGMENT_SIZE = size;
+ }
+
+ static int getSegmentCount()
+ {
+ return clHeaders_.size();
+ }
+
static long getCreationTime(String file)
{
String[] entries = FBUtilities.strip(file, "-.");
@@ -134,9 +144,7 @@
{
if ( DatabaseDescriptor.isFastSync() )
{
- /* Add this to the threshold */
- int bufSize = 4*1024*1024;
- return SequenceFile.fastWriter(file, CommitLog.bufSize_ + bufSize);
+ return SequenceFile.fastWriter(file, 4*1024*1024);
}
else
return SequenceFile.writer(file);
@@ -178,9 +186,6 @@
private CommitLogHeader clHeader_;
private IFileWriter logWriter_;
private long commitHeaderStartPos_;
- /* Force rollover the commit log on the next insert */
- private boolean forcedRollOver_ = false;
-
/*
* Generates a file name of the format CommitLog-<table>-<timestamp>.log in the
@@ -456,8 +461,7 @@
/* Update the header */
updateHeader(row);
logWriter_.append(table_, cfBuffer);
- fileSize = logWriter_.getFileSize();
- checkThresholdAndRollLog(fileSize);
+ checkThresholdAndRollLog();
}
catch (IOException e)
{
@@ -573,50 +577,38 @@
}
}
- private void checkThresholdAndRollLog( long fileSize )
+ private void checkThresholdAndRollLog()
{
try
{
- if ( fileSize >= DatabaseDescriptor.getLogFileSizeThreshold() || forcedRollOver_ )
+ if (logWriter_.getFileSize() >= SEGMENT_SIZE)
{
- if ( logWriter_.getFileSize() >= DatabaseDescriptor.getLogFileSizeThreshold() || forcedRollOver_ )
- {
- /* Rolls the current log file over to a new one. */
- setNextFileName();
- String oldLogFile = logWriter_.getFileName();
- //history_.add(oldLogFile);
- logWriter_.close();
-
- /* point reader/writer to a new commit log file. */
- // logWriter_ = SequenceFile.writer(logFile_);
- logWriter_ = CommitLog.createWriter(logFile_);
- /* squirrel away the old commit log header */
- clHeaders_.put(oldLogFile, new CommitLogHeader( clHeader_ ));
- /*
- * We need to zero out positions because the positions in
- * the old file do not make sense in the new one.
- */
- clHeader_.zeroPositions();
- writeCommitLogHeader(clHeader_.toByteArray(), false);
- // Get the list of files in commit log directory if it is greater than a certain number
- // Force flush all the column families that way we ensure that a slowly populated column family is not screwing up
- // by accumulating the commit logs .
- }
+ /* Rolls the current log file over to a new one. */
+ setNextFileName();
+ String oldLogFile = logWriter_.getFileName();
+ //history_.add(oldLogFile);
+ logWriter_.close();
+
+ /* point reader/writer to a new commit log file. */
+ // logWriter_ = SequenceFile.writer(logFile_);
+ logWriter_ = CommitLog.createWriter(logFile_);
+ /* squirrel away the old commit log header */
+ clHeaders_.put(oldLogFile, new CommitLogHeader(clHeader_));
+ /*
+ * We need to zero out positions because the positions in
+ * the old file do not make sense in the new one.
+ */
+ clHeader_.zeroPositions();
+ writeCommitLogHeader(clHeader_.toByteArray(), false);
+ // Get the list of files in commit log directory if it is greater than a certain number
+ // Force flush all the column families that way we ensure that a slowly populated column family is not screwing up
+ // by accumulating the commit logs .
}
}
- catch ( IOException e )
+ catch (IOException e)
{
logger_.info(LogUtil.throwableToString(e));
}
- finally
- {
- forcedRollOver_ = false;
- }
- }
-
- public void setForcedRollOver()
- {
- forcedRollOver_ = true;
}
public static void main(String[] args) throws Throwable
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java?rev=786243&r1=786242&r2=786243&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java Thu Jun 18 20:22:10 2009
@@ -388,4 +388,9 @@
}
};
}
+
+ public void clearUnsafe()
+ {
+ columnFamilies_.clear();
+ }
}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RecoveryManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RecoveryManager.java?rev=786243&r1=786242&r2=786243&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RecoveryManager.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RecoveryManager.java Thu Jun 18 20:22:10 2009
@@ -70,7 +70,7 @@
return tableToCommitLogs;
}
- public void doRecovery() throws IOException
+ public static void doRecovery() throws IOException
{
File[] files = getListofCommitLogs();
Map<String, List<File>> tableToCommitLogs = getListOFCommitLogsPerTable();
@@ -78,7 +78,7 @@
FileUtils.delete(files);
}
- private void recoverEachTable(Map<String, List<File>> tableToCommitLogs) throws IOException
+ private static void recoverEachTable(Map<String, List<File>> tableToCommitLogs) throws IOException
{
Comparator<File> fCmp = new FileUtils.FileComparator();
Set<String> tables = tableToCommitLogs.keySet();
@@ -90,12 +90,4 @@
clog.recover(clogs);
}
}
-
- public static void main(String[] args) throws Throwable
- {
- long start = System.currentTimeMillis();
- RecoveryManager rm = RecoveryManager.instance();
- rm.doRecovery();
- logger_.debug( "Time taken : " + (System.currentTimeMillis() - start) + " ms.");
- }
}
Modified: incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/CommitLogTest.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/CommitLogTest.java?rev=786243&r1=786242&r2=786243&view=diff
==============================================================================
--- incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/CommitLogTest.java (original)
+++ incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/CommitLogTest.java Thu Jun 18 20:22:10 2009
@@ -19,6 +19,7 @@
package org.apache.cassandra.db;
import java.io.IOException;
+import java.util.concurrent.ExecutionException;
import org.junit.Test;
@@ -27,32 +28,33 @@
public class CommitLogTest extends CleanupHelper
{
@Test
- public void testMain() throws IOException {
- // TODO this is useless, since it assumes we have a working set of commit logs to parse
- /*
- File logDir = new File(DatabaseDescriptor.getLogFileLocation());
- File[] files = logDir.listFiles();
- Arrays.sort( files, new FileUtils.FileComparator() );
+ public void testCleanup() throws IOException, ExecutionException, InterruptedException
+ {
+ assert CommitLog.getSegmentCount() == 0;
+ CommitLog.setSegmentSize(1000);
+
+ Table table = Table.open("Table1");
+ ColumnFamilyStore store1 = table.getColumnFamilyStore("Standard1");
+ ColumnFamilyStore store2 = table.getColumnFamilyStore("Standard2");
+ RowMutation rm;
+ byte[] value = new byte[501];
- byte[] bytes = new byte[CommitLogHeader.size(Integer.parseInt(args[0]))];
- for ( File file : files )
+ // add data. use relatively large values to force quick segment creation since we have a low flush threshold in the test config.
+ for (int i = 0; i < 10; i++)
{
- CommitLog clog = new CommitLog( file );
- clog.readCommitLogHeader(file.getAbsolutePath(), bytes);
- DataInputBuffer bufIn = new DataInputBuffer();
- bufIn.reset(bytes, 0, bytes.length);
- CommitLogHeader clHeader = CommitLogHeader.serializer().deserialize(bufIn);
-
- StringBuilder sb = new StringBuilder("");
- for ( byte b : bytes )
- {
- sb.append(b);
- sb.append(" ");
- }
-
- System.out.println("FILE:" + file);
- System.out.println(clHeader.toString());
+ rm = new RowMutation("Table1", "key1");
+ rm.add("Standard1:Column1", value, 0);
+ rm.add("Standard2:Column1", value, 0);
+ rm.apply();
}
- */
+ assert CommitLog.getSegmentCount() > 1;
+
+ // nothing should get removed after flushing just Standard1
+ store1.forceBlockingFlush();
+ assert CommitLog.getSegmentCount() > 1;
+
+ // after flushing Standard2 we should be able to clean out all segments
+ store2.forceBlockingFlush();
+ assert CommitLog.getSegmentCount() == 1;
}
}
Modified: incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/RecoveryManagerTest.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/RecoveryManagerTest.java?rev=786243&r1=786242&r2=786243&view=diff
==============================================================================
--- incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/RecoveryManagerTest.java (original)
+++ incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/RecoveryManagerTest.java Thu Jun 18 20:22:10 2009
@@ -19,17 +19,40 @@
package org.apache.cassandra.db;
import java.io.IOException;
+import java.util.concurrent.ExecutionException;
import org.junit.Test;
import org.apache.cassandra.CleanupHelper;
+import static org.apache.cassandra.db.TableTest.assertColumns;
public class RecoveryManagerTest extends CleanupHelper
{
@Test
- public void testDoRecovery() throws IOException {
+ public void testNothing() throws IOException {
// TODO nothing to recover
RecoveryManager rm = RecoveryManager.instance();
rm.doRecovery();
}
+
+ @Test
+ public void testSomething() throws IOException, ExecutionException, InterruptedException
+ {
+ Table table1 = Table.open("Table1");
+
+ RowMutation rm;
+ ColumnFamily cf;
+
+ rm = new RowMutation("Table1", "keymulti");
+ cf = new ColumnFamily("Standard1", "Standard");
+ cf.addColumn(new Column("col1", "val1".getBytes(), 1L));
+ rm.add(cf);
+ rm.apply();
+
+ table1.getColumnFamilyStore("Standard1").clearUnsafe();
+
+ RecoveryManager.doRecovery();
+
+ assertColumns(table1.get("keymulti", "Standard1"), "col1");
+ }
}
Modified: incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/TableTest.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/TableTest.java?rev=786243&r1=786242&r2=786243&view=diff
==============================================================================
--- incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/TableTest.java (original)
+++ incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/TableTest.java Thu Jun 18 20:22:10 2009
@@ -349,7 +349,7 @@
assertEquals(new String(cfres.getColumn("col1992").value()), "vvvvvvvvvvvvvvvv1992");
}
- private void assertColumns(ColumnFamily columnFamily, String... columnNames)
+ public static void assertColumns(ColumnFamily columnFamily, String... columnNames)
{
assertNotNull(columnFamily);
SortedSet<IColumn> columns = columnFamily.getAllColumns();