You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/06/18 22:22:19 UTC
svn commit: r786245 - in
/incubator/cassandra/trunk/src/java/org/apache/cassandra: db/CommitLog.java
db/CommitLogHeader.java db/Memtable.java db/RecoveryManager.java
utils/BitSetSerializer.java
Author: jbellis
Date: Thu Jun 18 20:22:18 2009
New Revision: 786245
URL: http://svn.apache.org/viewvc?rev=786245&view=rev
Log:
switch from byte[] to BitSet in CommitLogHeader. patch by jbellis; reviewed by goffinet for CASSANDRA-237
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLogHeader.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RecoveryManager.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/utils/BitSetSerializer.java
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java?rev=786245&r1=786244&r2=786245&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java Thu Jun 18 20:22:18 2009
@@ -29,13 +29,11 @@
import org.apache.cassandra.io.SequenceFile;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.FileUtils;
-import org.apache.cassandra.utils.LogUtil;
+
import org.apache.log4j.Logger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
-import org.apache.commons.lang.StringUtils;
-
/*
* Commit Log tracks every write operation into the system. The aim
* of the commit log is to be able to successfully recover data that was
@@ -182,7 +180,6 @@
/* header for current commit log */
private CommitLogHeader clHeader_;
private IFileWriter logWriter_;
- private long commitHeaderStartPos_;
/*
* Generates a file name of the format CommitLog-<table>-<timestamp>.log in the
@@ -221,27 +218,22 @@
CommitLog(File logFile) throws IOException
{
table_ = CommitLog.getTableName(logFile.getName());
- logFile_ = logFile.getAbsolutePath();
+ logFile_ = logFile.getAbsolutePath();
logWriter_ = CommitLog.createWriter(logFile_);
- commitHeaderStartPos_ = 0L;
}
String getLogFile()
{
return logFile_;
}
-
- void readCommitLogHeader(String logFile, byte[] bytes) throws IOException
+
+ private CommitLogHeader readCommitLogHeader(IFileReader logReader) throws IOException
{
- IFileReader logReader = SequenceFile.reader(logFile);
- try
- {
- logReader.readDirect(bytes);
- }
- finally
- {
- logReader.close();
- }
+ int size = (int)logReader.readLong();
+ byte[] bytes = new byte[size];
+ logReader.readDirect(bytes);
+ ByteArrayInputStream byteStream = new ByteArrayInputStream(bytes);
+ return CommitLogHeader.serializer().deserialize(new DataInputStream(byteStream));
}
/*
@@ -253,7 +245,6 @@
Table table = Table.open(table_);
int cfSize = table.getNumberOfColumnFamilies();
/* record the beginning of the commit header */
- commitHeaderStartPos_ = logWriter_.getCurrentPosition();
/* write the commit log header */
clHeader_ = new CommitLogHeader(cfSize);
writeCommitLogHeader(clHeader_.toByteArray(), false);
@@ -263,69 +254,28 @@
{
/* record the current position */
long currentPos = logWriter_.getCurrentPosition();
- logWriter_.seek(commitHeaderStartPos_);
+ logWriter_.seek(0);
/* write the commit log header */
+ logWriter_.writeLong(bytes.length);
logWriter_.writeDirect(bytes);
- if ( reset )
+ if (reset)
{
/* seek back to the old position */
logWriter_.seek(currentPos);
}
}
- void recover(List<File> clogs) throws IOException
- {
- Table table = Table.open(table_);
- int cfSize = table.getNumberOfColumnFamilies();
- int size = CommitLogHeader.size(cfSize);
- byte[] header = new byte[size];
- byte[] header2 = new byte[size];
- int index = clogs.size() - 1;
-
- File file = clogs.get(index);
- readCommitLogHeader(file.getAbsolutePath(), header);
-
- Stack<File> filesNeeded = new Stack<File>();
- filesNeeded.push(file);
-
- /*
- * Identify files that we need for processing. This can be done
- * using the information in the header of each file. Simply and
- * the byte[] (which are the headers) and stop at the file where
- * the result is a zero.
- */
- for ( int i = (index - 1); i >= 0; --i )
- {
- file = clogs.get(i);
- readCommitLogHeader(file.getAbsolutePath(), header2);
- byte[] result = CommitLogHeader.and(header, header2);
- if (CommitLogHeader.isZero(result))
- break;
- filesNeeded.push(file);
- }
-
- logger_.info("Replaying logs from " + StringUtils.join(filesNeeded, ", "));
- doRecovery(filesNeeded, header);
- }
-
- private void doRecovery(Stack<File> filesNeeded, byte[] header) throws IOException
+ void recover(File[] clogs) throws IOException
{
- Table table = Table.open(table_);
-
DataInputBuffer bufIn = new DataInputBuffer();
- DataOutputBuffer bufOut = new DataOutputBuffer();
- while ( !filesNeeded.isEmpty() )
+ for (File file : clogs)
{
- File file = filesNeeded.pop();
// IFileReader reader = SequenceFile.bufferedReader(file.getAbsolutePath(), DatabaseDescriptor.getLogFileSizeThreshold());
IFileReader reader = SequenceFile.reader(file.getAbsolutePath());
try
{
- reader.readDirect(header);
- /* deserialize the commit log header */
- bufIn.reset(header, 0, header.length);
- CommitLogHeader clHeader = CommitLogHeader.serializer().deserialize(bufIn);
+ CommitLogHeader clHeader = readCommitLogHeader(reader);
/* seek to the lowest position */
int lowPos = CommitLogHeader.getLowestPosition(clHeader);
/*
@@ -337,24 +287,21 @@
else
reader.seek(lowPos);
+ Set<Table> tablesRecovered = new HashSet<Table>();
+
/* read the logs populate RowMutation and apply */
while ( !reader.isEOF() )
{
- bufOut.reset();
- long bytesRead = reader.next(bufOut);
- if ( bytesRead == -1 )
- break;
-
- bufIn.reset(bufOut.getData(), bufOut.getLength());
- /* Skip over the commit log key portion */
- bufIn.readUTF();
- /* Skip over data size */
- bufIn.readInt();
-
+ byte[] bytes = new byte[(int)reader.readLong()];
+ reader.readDirect(bytes);
+ bufIn.reset(bytes, bytes.length);
+
/* read the commit log entry */
try
{
Row row = Row.serializer().deserialize(bufIn);
+ Table table = Table.open(table_);
+ tablesRecovered.add(table);
Collection<ColumnFamily> columnFamilies = new ArrayList<ColumnFamily>(row.getColumnFamilies());
/* remove column families that have already been flushed */
for (ColumnFamily columnFamily : columnFamilies)
@@ -366,7 +313,7 @@
continue;
}
int id = table.getColumnFamilyId(columnFamily.name());
- if ( clHeader.get(id) == 0 || reader.getCurrentPosition() < clHeader.getPosition(id) )
+ if ( !clHeader.isDirty(id) || reader.getCurrentPosition() < clHeader.getPosition(id) )
row.removeColumnFamily(columnFamily);
}
if ( !row.isEmpty() )
@@ -381,7 +328,10 @@
}
reader.close();
/* apply the rows read -- success will result in the CL file being discarded */
- table.flush(true);
+ for (Table table : tablesRecovered)
+ {
+ table.flush(true);
+ }
}
catch (Throwable th)
{
@@ -397,20 +347,20 @@
* Update the header of the commit log if a new column family
* is encountered for the first time.
*/
- private void updateHeader(Row row) throws IOException
+ private void maybeUpdateHeader(Row row) throws IOException
{
Table table = Table.open(table_);
for (ColumnFamily columnFamily : row.getColumnFamilies())
{
- int id = table.getColumnFamilyId(columnFamily.name());
- if ( clHeader_.get(id) == 0 || ( clHeader_.get(id) == 1 && clHeader_.getPosition(id) == 0 ) )
- {
- if ( clHeader_.get(id) == 0 || ( clHeader_.get(id) == 1 && clHeader_.getPosition(id) == 0 ) )
- {
- clHeader_.turnOn( id, logWriter_.getCurrentPosition() );
- writeCommitLogHeader(clHeader_.toByteArray(), true);
- }
- }
+ int id = table.getColumnFamilyId(columnFamily.name());
+ if (!clHeader_.isDirty(id) || (clHeader_.isDirty(id) && clHeader_.getPosition(id) == 0))
+ {
+ if (!clHeader_.isDirty(id) || (clHeader_.isDirty(id) && clHeader_.getPosition(id) == 0))
+ {
+ clHeader_.turnOn(id, logWriter_.getCurrentPosition());
+ writeCommitLogHeader(clHeader_.toByteArray(), true);
+ }
+ }
}
}
@@ -439,8 +389,9 @@
currentPosition = logWriter_.getCurrentPosition();
cLogCtx = new CommitLogContext(logFile_, currentPosition);
/* Update the header */
- updateHeader(row);
- logWriter_.append(table_, cfBuffer);
+ maybeUpdateHeader(row);
+ logWriter_.writeLong(cfBuffer.getLength());
+ logWriter_.append(cfBuffer);
checkThresholdAndRollLog();
}
catch (IOException e)
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLogHeader.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLogHeader.java?rev=786245&r1=786244&r2=786245&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLogHeader.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLogHeader.java Thu Jun 18 20:22:18 2009
@@ -19,10 +19,12 @@
package org.apache.cassandra.db;
import java.io.*;
+import java.util.BitSet;
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.io.DataInputBuffer;
import org.apache.cassandra.io.ICompactSerializer;
+import org.apache.cassandra.io.DataInputBuffer;
+import org.apache.cassandra.utils.BitSetSerializer;
+import org.apache.cassandra.config.DatabaseDescriptor;
/**
* Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
@@ -30,40 +32,28 @@
class CommitLogHeader
{
- private static ICompactSerializer<CommitLogHeader> serializer_;
+ private static CommitLogHeaderSerializer serializer = new CommitLogHeaderSerializer();
- static
+ static CommitLogHeaderSerializer serializer()
{
- serializer_ = new CommitLogHeaderSerializer();
+ return serializer;
}
-
- static ICompactSerializer<CommitLogHeader> serializer()
- {
- return serializer_;
- }
-
- static int size(int size)
+
+ public static BitSet and(byte[] bytes1, byte[] bytes2) throws IOException
{
- /*
- * We serialize the CommitLogHeader as a byte[] and write it
- * to disk. So we first write an "int" to specify the length
- * of the byte[] which is why we first have a 4 in the sum.
- * We then have size which is the number of bits to track who
- * has been flushed and then the rest is the position[]
- * size = #of column families
- * +
- * size of the bitset
- * +
- * size of position array
- */
- return 4 + size + (4 * size);
+ DataInputBuffer bufIn = new DataInputBuffer();
+ bufIn.reset(bytes1, 0, bytes1.length);
+ CommitLogHeader header1 = serializer.deserialize(bufIn);
+ bufIn.reset(bytes2, 0, bytes2.length);
+ CommitLogHeader header2 = serializer.deserialize(bufIn);
+ header1.and(header2);
+ return header1.dirty;
}
-
+
static int getLowestPosition(CommitLogHeader clHeader)
{
- int[] positions = clHeader.getPositions();
int minPosition = Integer.MAX_VALUE;
- for ( int position : positions )
+ for ( int position : clHeader.lastFlushedAt)
{
if ( position < minPosition && position > 0)
{
@@ -75,49 +65,13 @@
minPosition = 0;
return minPosition;
}
-
- /*
- * Bitwise & of each byte in the two arrays.
- * Both arrays are of same length. In order
- * to be memory efficient the result is in
- * the third parameter.
- */
- static byte[] and(byte[] bytes, byte[] bytes2) throws IOException
- {
- DataInputBuffer bufIn = new DataInputBuffer();
- bufIn.reset(bytes, 0, bytes.length);
- CommitLogHeader clHeader = CommitLogHeader.serializer().deserialize(bufIn);
- byte[] clh = clHeader.getBitSet();
-
- bufIn.reset(bytes2, 0, bytes2.length);
- CommitLogHeader clHeader2 = CommitLogHeader.serializer().deserialize(bufIn);
- byte[] clh2 = clHeader2.getBitSet();
-
- byte[] result = new byte[clh.length];
- for ( int i = 0; i < clh.length; ++i )
- {
- result[i] = (byte)(clh[i] & clh2[i]);
- }
-
- return result;
- }
-
- static boolean isZero(byte[] bytes)
- {
- for ( byte b : bytes )
- {
- if ( b == 1 )
- return false;
- }
- return true;
- }
-
- private byte[] dirty = new byte[0]; // columnfamilies with un-flushed data in this CommitLog
- private int[] lastFlushedAt = new int[0]; // position at which each CF was last flushed
+
+ private BitSet dirty; // columnfamilies with un-flushed data in this CommitLog
+ private int[] lastFlushedAt; // position at which each CF was last flushed
CommitLogHeader(int size)
{
- dirty = new byte[size];
+ dirty = new BitSet(size);
lastFlushedAt = new int[size];
}
@@ -126,7 +80,7 @@
* also builds an index of position to column family
* Id.
*/
- CommitLogHeader(byte[] dirty, int[] lastFlushedAt)
+ CommitLogHeader(BitSet dirty, int[] lastFlushedAt)
{
this.dirty = dirty;
this.lastFlushedAt = lastFlushedAt;
@@ -134,15 +88,14 @@
CommitLogHeader(CommitLogHeader clHeader)
{
- dirty = new byte[clHeader.dirty.length];
- System.arraycopy(clHeader.dirty, 0, dirty, 0, dirty.length);
+ dirty = (BitSet)clHeader.dirty.clone();
lastFlushedAt = new int[clHeader.lastFlushedAt.length];
System.arraycopy(clHeader.lastFlushedAt, 0, lastFlushedAt, 0, lastFlushedAt.length);
}
- byte get(int index)
+ boolean isDirty(int index)
{
- return dirty[index];
+ return dirty.get(index);
}
int getPosition(int index)
@@ -152,59 +105,30 @@
void turnOn(int index, long position)
{
- turnOn(dirty, index, position);
- }
-
- void turnOn(byte[] bytes, int index, long position)
- {
- bytes[index] = (byte)1;
- lastFlushedAt[index] = (int)position;
+ dirty.set(index);
+ lastFlushedAt[index] = (int) position;
}
-
+
void turnOff(int index)
{
- turnOff(dirty, index);
- }
-
- void turnOff(byte[] bytes, int index)
- {
- bytes[index] = (byte)0;
+ dirty.set(index, false);
lastFlushedAt[index] = 0;
}
-
+
boolean isSafeToDelete() throws IOException
{
- for (byte b : dirty)
- {
- if (b == 1)
- return false;
- }
- return true;
+ return dirty.isEmpty();
}
- byte[] getBitSet()
- {
- return dirty;
- }
-
- int[] getPositions()
- {
- return lastFlushedAt;
- }
-
void zeroPositions()
{
int size = lastFlushedAt.length;
lastFlushedAt = new int[size];
}
- void and (CommitLogHeader commitLogHeader)
- {
- byte[] clh2 = commitLogHeader.dirty;
- for ( int i = 0; i < dirty.length; ++i )
- {
- dirty[i] = (byte)(dirty[i] & clh2[i]);
- }
+ void and(CommitLogHeader commitLogHeader)
+ {
+ dirty.and(commitLogHeader.dirty);
}
byte[] toByteArray() throws IOException
@@ -218,9 +142,9 @@
public String toString()
{
StringBuilder sb = new StringBuilder("");
- for ( int i = 0; i < dirty.length; ++i )
+ for ( int i = 0; i < dirty.size(); ++i )
{
- sb.append(dirty[i]);
+ sb.append((dirty.get(i) ? 0 : 1));
sb.append(":");
Table table = Table.open( DatabaseDescriptor.getTables().get(0));
sb.append(table.getColumnFamilyName(i));
@@ -234,36 +158,28 @@
}
return sb.toString();
}
-}
-class CommitLogHeaderSerializer implements ICompactSerializer<CommitLogHeader>
-{
- public void serialize(CommitLogHeader clHeader, DataOutputStream dos) throws IOException
- {
- dos.writeInt(clHeader.getBitSet().length);
- dos.write(clHeader.getBitSet());
- int[] positions = clHeader.getPositions();
-
- for ( int position : positions )
+ static class CommitLogHeaderSerializer implements ICompactSerializer<CommitLogHeader>
+ {
+ public void serialize(CommitLogHeader clHeader, DataOutputStream dos) throws IOException
{
- dos.writeInt(position);
+ BitSetSerializer.serialize(clHeader.dirty, dos);
+ dos.writeInt(clHeader.lastFlushedAt.length);
+ for (int position : clHeader.lastFlushedAt)
+ {
+ dos.writeInt(position);
+ }
}
- }
-
- public CommitLogHeader deserialize(DataInputStream dis) throws IOException
- {
- int size = dis.readInt();
- byte[] bitFlags = new byte[size];
- dis.readFully(bitFlags);
-
- int[] position = new int[size];
- for ( int i = 0; i < size; ++i )
+
+ public CommitLogHeader deserialize(DataInputStream dis) throws IOException
{
- position[i] = dis.readInt();
+ BitSet bitFlags = BitSetSerializer.deserialize(dis);
+ int[] position = new int[dis.readInt()];
+ for (int i = 0; i < position.length; ++i)
+ {
+ position[i] = dis.readInt();
+ }
+ return new CommitLogHeader(bitFlags, position);
}
-
- return new CommitLogHeader(bitFlags, position);
}
}
-
-
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java?rev=786245&r1=786244&r2=786245&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java Thu Jun 18 20:22:18 2009
@@ -389,7 +389,7 @@
};
}
- public void clearUnsafe()
+ void clearUnsafe()
{
columnFamilies_.clear();
}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RecoveryManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RecoveryManager.java?rev=786245&r1=786244&r2=786245&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RecoveryManager.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RecoveryManager.java Thu Jun 18 20:22:18 2009
@@ -34,11 +34,13 @@
{
private static RecoveryManager instance_;
private static Logger logger_ = Logger.getLogger(RecoveryManager.class);
-
+
synchronized static RecoveryManager instance() throws IOException
{
- if ( instance_ == null )
+ if (instance_ == null)
+ {
instance_ = new RecoveryManager();
+ }
return instance_;
}
@@ -46,48 +48,14 @@
{
String directory = DatabaseDescriptor.getLogFileLocation();
File file = new File(directory);
- File[] files = file.listFiles();
- return files;
+ return file.listFiles();
}
-
- public static Map<String, List<File>> getListOFCommitLogsPerTable()
- {
- File[] files = getListofCommitLogs();
- /* Maintains a mapping of table name to a list of commit log files */
- Map<String, List<File>> tableToCommitLogs = new HashMap<String, List<File>>();
-
- for (File f : files)
- {
- String table = CommitLog.getTableName(f.getName());
- List<File> clogs = tableToCommitLogs.get(table);
- if ( clogs == null )
- {
- clogs = new ArrayList<File>();
- tableToCommitLogs.put(table, clogs);
- }
- clogs.add(f);
- }
- return tableToCommitLogs;
- }
-
+
public static void doRecovery() throws IOException
{
File[] files = getListofCommitLogs();
- Map<String, List<File>> tableToCommitLogs = getListOFCommitLogsPerTable();
- recoverEachTable(tableToCommitLogs);
+ Arrays.sort(files, new FileUtils.FileComparator());
+ new CommitLog(DatabaseDescriptor.getTables().get(0), true).recover(files);
FileUtils.delete(files);
}
-
- private static void recoverEachTable(Map<String, List<File>> tableToCommitLogs) throws IOException
- {
- Comparator<File> fCmp = new FileUtils.FileComparator();
- Set<String> tables = tableToCommitLogs.keySet();
- for ( String table : tables )
- {
- List<File> clogs = tableToCommitLogs.get(table);
- Collections.sort(clogs, fCmp);
- CommitLog clog = new CommitLog(table, true);
- clog.recover(clogs);
- }
- }
}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/utils/BitSetSerializer.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/utils/BitSetSerializer.java?rev=786245&r1=786244&r2=786245&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/utils/BitSetSerializer.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/utils/BitSetSerializer.java Thu Jun 18 20:22:18 2009
@@ -27,7 +27,7 @@
import org.apache.cassandra.io.ICompactSerializer;
-class BitSetSerializer
+public class BitSetSerializer
{
public static void serialize(BitSet bs, DataOutputStream dos) throws IOException
{