You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/06/18 22:22:19 UTC

svn commit: r786245 - in /incubator/cassandra/trunk/src/java/org/apache/cassandra: db/CommitLog.java db/CommitLogHeader.java db/Memtable.java db/RecoveryManager.java utils/BitSetSerializer.java

Author: jbellis
Date: Thu Jun 18 20:22:18 2009
New Revision: 786245

URL: http://svn.apache.org/viewvc?rev=786245&view=rev
Log:
switch from byte[] to BitSet in CommitLogHeader.  patch by jbellis; reviewed by goffinet for CASSANDRA-237

Modified:
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLogHeader.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RecoveryManager.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/utils/BitSetSerializer.java

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java?rev=786245&r1=786244&r2=786245&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java Thu Jun 18 20:22:18 2009
@@ -29,13 +29,11 @@
 import org.apache.cassandra.io.SequenceFile;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.FileUtils;
-import org.apache.cassandra.utils.LogUtil;
+
 import org.apache.log4j.Logger;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
-import org.apache.commons.lang.StringUtils;
-
 /*
  * Commit Log tracks every write operation into the system. The aim
  * of the commit log is to be able to successfully recover data that was
@@ -182,7 +180,6 @@
     /* header for current commit log */
     private CommitLogHeader clHeader_;
     private IFileWriter logWriter_;
-    private long commitHeaderStartPos_;
 
     /*
      * Generates a file name of the format CommitLog-<table>-<timestamp>.log in the
@@ -221,27 +218,22 @@
     CommitLog(File logFile) throws IOException
     {
         table_ = CommitLog.getTableName(logFile.getName());
-        logFile_ = logFile.getAbsolutePath();        
+        logFile_ = logFile.getAbsolutePath();
         logWriter_ = CommitLog.createWriter(logFile_);
-        commitHeaderStartPos_ = 0L;
     }
 
     String getLogFile()
     {
         return logFile_;
     }
-
-    void readCommitLogHeader(String logFile, byte[] bytes) throws IOException
+    
+    private CommitLogHeader readCommitLogHeader(IFileReader logReader) throws IOException
     {
-        IFileReader logReader = SequenceFile.reader(logFile);
-        try
-        {
-            logReader.readDirect(bytes);
-        }
-        finally
-        {
-            logReader.close();
-        }
+        int size = (int)logReader.readLong();
+        byte[] bytes = new byte[size];
+        logReader.readDirect(bytes);
+        ByteArrayInputStream byteStream = new ByteArrayInputStream(bytes);
+        return CommitLogHeader.serializer().deserialize(new DataInputStream(byteStream));
     }
 
     /*
@@ -253,7 +245,6 @@
         Table table = Table.open(table_);
         int cfSize = table.getNumberOfColumnFamilies();
         /* record the beginning of the commit header */
-        commitHeaderStartPos_ = logWriter_.getCurrentPosition();
         /* write the commit log header */
         clHeader_ = new CommitLogHeader(cfSize);
         writeCommitLogHeader(clHeader_.toByteArray(), false);
@@ -263,69 +254,28 @@
     {
         /* record the current position */
         long currentPos = logWriter_.getCurrentPosition();
-        logWriter_.seek(commitHeaderStartPos_);
+        logWriter_.seek(0);
         /* write the commit log header */
+        logWriter_.writeLong(bytes.length);
         logWriter_.writeDirect(bytes);
-        if ( reset )
+        if (reset)
         {
             /* seek back to the old position */
             logWriter_.seek(currentPos);
         }
     }
 
-    void recover(List<File> clogs) throws IOException
-    {
-        Table table = Table.open(table_);
-        int cfSize = table.getNumberOfColumnFamilies();
-        int size = CommitLogHeader.size(cfSize);
-        byte[] header = new byte[size];
-        byte[] header2 = new byte[size];
-        int index = clogs.size() - 1;
-
-        File file = clogs.get(index);
-        readCommitLogHeader(file.getAbsolutePath(), header);
-
-        Stack<File> filesNeeded = new Stack<File>();
-        filesNeeded.push(file);
-
-        /*
-         * Identify files that we need for processing. This can be done
-         * using the information in the header of each file. Simply and
-         * the byte[] (which are the headers) and stop at the file where
-         * the result is a zero.
-        */
-        for ( int i = (index - 1); i >= 0; --i )
-        {
-            file = clogs.get(i);
-            readCommitLogHeader(file.getAbsolutePath(), header2);
-            byte[] result = CommitLogHeader.and(header, header2);
-            if (CommitLogHeader.isZero(result))
-                break;
-            filesNeeded.push(file);
-        }
-
-        logger_.info("Replaying logs from " + StringUtils.join(filesNeeded, ", "));
-        doRecovery(filesNeeded, header);
-    }
-
-    private void doRecovery(Stack<File> filesNeeded, byte[] header) throws IOException
+    void recover(File[] clogs) throws IOException
     {
-        Table table = Table.open(table_);
-
         DataInputBuffer bufIn = new DataInputBuffer();
-        DataOutputBuffer bufOut = new DataOutputBuffer();        
 
-        while ( !filesNeeded.isEmpty() )
+        for (File file : clogs)
         {
-            File file = filesNeeded.pop();
             // IFileReader reader = SequenceFile.bufferedReader(file.getAbsolutePath(), DatabaseDescriptor.getLogFileSizeThreshold());
             IFileReader reader = SequenceFile.reader(file.getAbsolutePath());
             try
             {
-                reader.readDirect(header);
-                /* deserialize the commit log header */
-                bufIn.reset(header, 0, header.length);
-                CommitLogHeader clHeader = CommitLogHeader.serializer().deserialize(bufIn);
+                CommitLogHeader clHeader = readCommitLogHeader(reader);
                 /* seek to the lowest position */
                 int lowPos = CommitLogHeader.getLowestPosition(clHeader);
                 /*
@@ -337,24 +287,21 @@
                 else
                     reader.seek(lowPos);
 
+                Set<Table> tablesRecovered = new HashSet<Table>();
+
                 /* read the logs populate RowMutation and apply */
                 while ( !reader.isEOF() )
                 {
-                    bufOut.reset();
-                    long bytesRead = reader.next(bufOut);
-                    if ( bytesRead == -1 )
-                        break;
-
-                    bufIn.reset(bufOut.getData(), bufOut.getLength());
-                    /* Skip over the commit log key portion */
-                    bufIn.readUTF();
-                    /* Skip over data size */
-                    bufIn.readInt();
-                    
+                    byte[] bytes = new byte[(int)reader.readLong()];
+                    reader.readDirect(bytes);
+                    bufIn.reset(bytes, bytes.length);
+
                     /* read the commit log entry */
                     try
                     {                        
                         Row row = Row.serializer().deserialize(bufIn);
+                        Table table = Table.open(table_);
+                        tablesRecovered.add(table);
                         Collection<ColumnFamily> columnFamilies = new ArrayList<ColumnFamily>(row.getColumnFamilies());
                         /* remove column families that have already been flushed */
                         for (ColumnFamily columnFamily : columnFamilies)
@@ -366,7 +313,7 @@
                         		continue;
                         	}	
                             int id = table.getColumnFamilyId(columnFamily.name());
-                            if ( clHeader.get(id) == 0 || reader.getCurrentPosition() < clHeader.getPosition(id) )
+                            if ( !clHeader.isDirty(id) || reader.getCurrentPosition() < clHeader.getPosition(id) )
                                 row.removeColumnFamily(columnFamily);
                         }
                         if ( !row.isEmpty() )
@@ -381,7 +328,10 @@
                 }
                 reader.close();
                 /* apply the rows read -- success will result in the CL file being discarded */
-                table.flush(true);
+                for (Table table : tablesRecovered)
+                {
+                    table.flush(true);
+                }
             }
             catch (Throwable th)
             {
@@ -397,20 +347,20 @@
      * Update the header of the commit log if a new column family
      * is encountered for the first time.
     */
-    private void updateHeader(Row row) throws IOException
+    private void maybeUpdateHeader(Row row) throws IOException
     {
         Table table = Table.open(table_);
         for (ColumnFamily columnFamily : row.getColumnFamilies())
         {
-        	int id = table.getColumnFamilyId(columnFamily.name());
-        	if ( clHeader_.get(id) == 0 || ( clHeader_.get(id) == 1 && clHeader_.getPosition(id) == 0 ) )
-        	{
-            	if ( clHeader_.get(id) == 0 || ( clHeader_.get(id) == 1 && clHeader_.getPosition(id) == 0 ) )
-            	{
-	        		clHeader_.turnOn( id, logWriter_.getCurrentPosition() );
-	        		writeCommitLogHeader(clHeader_.toByteArray(), true);
-            	}
-        	}
+            int id = table.getColumnFamilyId(columnFamily.name());
+            if (!clHeader_.isDirty(id) || (clHeader_.isDirty(id) && clHeader_.getPosition(id) == 0))
+            {
+                if (!clHeader_.isDirty(id) || (clHeader_.isDirty(id) && clHeader_.getPosition(id) == 0))
+                {
+                    clHeader_.turnOn(id, logWriter_.getCurrentPosition());
+                    writeCommitLogHeader(clHeader_.toByteArray(), true);
+                }
+            }
         }
     }
     
@@ -439,8 +389,9 @@
             currentPosition = logWriter_.getCurrentPosition();
             cLogCtx = new CommitLogContext(logFile_, currentPosition);
             /* Update the header */
-            updateHeader(row);
-            logWriter_.append(table_, cfBuffer);
+            maybeUpdateHeader(row);
+            logWriter_.writeLong(cfBuffer.getLength());
+            logWriter_.append(cfBuffer);
             checkThresholdAndRollLog();
         }
         catch (IOException e)

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLogHeader.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLogHeader.java?rev=786245&r1=786244&r2=786245&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLogHeader.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLogHeader.java Thu Jun 18 20:22:18 2009
@@ -19,10 +19,12 @@
 package org.apache.cassandra.db;
 
 import java.io.*;
+import java.util.BitSet;
 
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.io.DataInputBuffer;
 import org.apache.cassandra.io.ICompactSerializer;
+import org.apache.cassandra.io.DataInputBuffer;
+import org.apache.cassandra.utils.BitSetSerializer;
+import org.apache.cassandra.config.DatabaseDescriptor;
 
 /**
  * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
@@ -30,40 +32,28 @@
 
 class CommitLogHeader
 {
-    private static ICompactSerializer<CommitLogHeader> serializer_;
+    private static CommitLogHeaderSerializer serializer = new CommitLogHeaderSerializer();
 
-    static
+    static CommitLogHeaderSerializer serializer()
     {
-        serializer_ = new CommitLogHeaderSerializer();
+        return serializer;
     }
-    
-    static ICompactSerializer<CommitLogHeader> serializer()
-    {
-        return serializer_;
-    }
-    
-    static int size(int size)
+        
+    public static BitSet and(byte[] bytes1, byte[] bytes2) throws IOException
     {
-        /* 
-         * We serialize the CommitLogHeader as a byte[] and write it
-         * to disk. So we first write an "int" to specify the length 
-         * of the byte[] which is why we first have a 4 in the sum.
-         * We then have size which is the number of bits to track who
-         * has been flushed and then the rest is the position[]
-         * size = #of column families 
-         *        + 
-         *        size of the bitset 
-         *        + 
-         *        size of position array 
-         */
-        return 4 + size + (4 * size); 
+        DataInputBuffer bufIn = new DataInputBuffer();
+        bufIn.reset(bytes1, 0, bytes1.length);
+        CommitLogHeader header1 = serializer.deserialize(bufIn);
+        bufIn.reset(bytes2, 0, bytes2.length);
+        CommitLogHeader header2 = serializer.deserialize(bufIn);
+        header1.and(header2);
+        return header1.dirty;
     }
-    
+
     static int getLowestPosition(CommitLogHeader clHeader)
     {
-        int[] positions = clHeader.getPositions();
         int minPosition = Integer.MAX_VALUE;
-        for ( int position : positions )
+        for ( int position : clHeader.lastFlushedAt)
         {
             if ( position < minPosition && position > 0)
             {
@@ -75,49 +65,13 @@
             minPosition = 0;
         return minPosition;
     }
-    
-    /* 
-     * Bitwise & of each byte in the two arrays.
-     * Both arrays are of same length. In order
-     * to be memory efficient the result is in
-     * the third parameter.
-    */
-    static byte[] and(byte[] bytes, byte[] bytes2) throws IOException
-    { 
-        DataInputBuffer bufIn = new DataInputBuffer();
-        bufIn.reset(bytes, 0, bytes.length);
-        CommitLogHeader clHeader = CommitLogHeader.serializer().deserialize(bufIn);
-        byte[] clh = clHeader.getBitSet();
-        
-        bufIn.reset(bytes2, 0, bytes2.length);
-        CommitLogHeader clHeader2 = CommitLogHeader.serializer().deserialize(bufIn);
-        byte[] clh2 = clHeader2.getBitSet();
-        
-        byte[] result = new byte[clh.length];
-        for ( int i = 0; i < clh.length; ++i )
-        {            
-            result[i] = (byte)(clh[i] & clh2[i]);
-        }
-        
-        return result;
-    }
-    
-    static boolean isZero(byte[] bytes)
-    {
-        for ( byte b : bytes )
-        {
-            if ( b == 1 )
-                return false;
-        }
-        return true;
-    }
-    
-    private byte[] dirty = new byte[0]; // columnfamilies with un-flushed data in this CommitLog
-    private int[] lastFlushedAt = new int[0]; // position at which each CF was last flushed
+
+    private BitSet dirty; // columnfamilies with un-flushed data in this CommitLog
+    private int[] lastFlushedAt; // position at which each CF was last flushed
     
     CommitLogHeader(int size)
     {
-        dirty = new byte[size];
+        dirty = new BitSet(size);
         lastFlushedAt = new int[size];
     }
     
@@ -126,7 +80,7 @@
      * also builds an index of position to column family
      * Id.
     */
-    CommitLogHeader(byte[] dirty, int[] lastFlushedAt)
+    CommitLogHeader(BitSet dirty, int[] lastFlushedAt)
     {
         this.dirty = dirty;
         this.lastFlushedAt = lastFlushedAt;
@@ -134,15 +88,14 @@
     
     CommitLogHeader(CommitLogHeader clHeader)
     {
-        dirty = new byte[clHeader.dirty.length];
-        System.arraycopy(clHeader.dirty, 0, dirty, 0, dirty.length);
+        dirty = (BitSet)clHeader.dirty.clone();
         lastFlushedAt = new int[clHeader.lastFlushedAt.length];
         System.arraycopy(clHeader.lastFlushedAt, 0, lastFlushedAt, 0, lastFlushedAt.length);
     }
     
-    byte get(int index)
+    boolean isDirty(int index)
     {
-        return dirty[index];
+        return dirty.get(index);
     } 
     
     int getPosition(int index)
@@ -152,59 +105,30 @@
     
     void turnOn(int index, long position)
     {
-        turnOn(dirty, index, position);
-    }
-    
-    void turnOn(byte[] bytes, int index, long position)
-    {
-        bytes[index] = (byte)1;
-        lastFlushedAt[index] = (int)position;
+        dirty.set(index);
+        lastFlushedAt[index] = (int) position;
     }
-    
+
     void turnOff(int index)
     {
-        turnOff(dirty, index);
-    }
-    
-    void turnOff(byte[] bytes, int index)
-    {
-        bytes[index] = (byte)0;
+        dirty.set(index, false);
         lastFlushedAt[index] = 0;
     }
-    
+
     boolean isSafeToDelete() throws IOException
     {
-        for (byte b : dirty)
-        {
-            if (b == 1)
-                return false;
-        }
-        return true;
+        return dirty.isEmpty();
     }
 
-    byte[] getBitSet()
-    {
-        return dirty;
-    }
-    
-    int[] getPositions()
-    {
-        return lastFlushedAt;
-    }
-    
     void zeroPositions()
     {
         int size = lastFlushedAt.length;
         lastFlushedAt = new int[size];
     }
     
-    void and (CommitLogHeader commitLogHeader)
-    {        
-        byte[] clh2 = commitLogHeader.dirty;
-        for ( int i = 0; i < dirty.length; ++i )
-        {            
-            dirty[i] = (byte)(dirty[i] & clh2[i]);
-        }
+    void and(CommitLogHeader commitLogHeader)
+    {
+        dirty.and(commitLogHeader.dirty);
     }
     
     byte[] toByteArray() throws IOException
@@ -218,9 +142,9 @@
     public String toString()
     {
         StringBuilder sb = new StringBuilder("");        
-        for ( int i = 0; i < dirty.length; ++i )
+        for ( int i = 0; i < dirty.size(); ++i )
         {
-            sb.append(dirty[i]);
+            sb.append((dirty.get(i) ? 0 : 1));
             sb.append(":");
             Table table = Table.open( DatabaseDescriptor.getTables().get(0));
             sb.append(table.getColumnFamilyName(i));
@@ -234,36 +158,28 @@
         }        
         return sb.toString();
     }
-}
 
-class CommitLogHeaderSerializer implements ICompactSerializer<CommitLogHeader>
-{
-    public void serialize(CommitLogHeader clHeader, DataOutputStream dos) throws IOException
-    {        
-        dos.writeInt(clHeader.getBitSet().length);
-        dos.write(clHeader.getBitSet());
-        int[] positions = clHeader.getPositions();        
-        
-        for ( int position : positions )
+    static class CommitLogHeaderSerializer implements ICompactSerializer<CommitLogHeader>
+    {
+        public void serialize(CommitLogHeader clHeader, DataOutputStream dos) throws IOException
         {
-            dos.writeInt(position);
+            BitSetSerializer.serialize(clHeader.dirty, dos);
+            dos.writeInt(clHeader.lastFlushedAt.length);
+            for (int position : clHeader.lastFlushedAt)
+            {
+                dos.writeInt(position);
+            }
         }
-    }
-    
-    public CommitLogHeader deserialize(DataInputStream dis) throws IOException
-    {
-        int size = dis.readInt();
-        byte[] bitFlags = new byte[size];
-        dis.readFully(bitFlags);
-        
-        int[] position = new int[size];
-        for ( int i = 0; i < size; ++i )
+
+        public CommitLogHeader deserialize(DataInputStream dis) throws IOException
         {
-            position[i] = dis.readInt();
+            BitSet bitFlags = BitSetSerializer.deserialize(dis);
+            int[] position = new int[dis.readInt()];
+            for (int i = 0; i < position.length; ++i)
+            {
+                position[i] = dis.readInt();
+            }
+            return new CommitLogHeader(bitFlags, position);
         }
-                                                 
-        return new CommitLogHeader(bitFlags, position);
     }
 }
-
-

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java?rev=786245&r1=786244&r2=786245&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java Thu Jun 18 20:22:18 2009
@@ -389,7 +389,7 @@
         };
     }
 
-    public void clearUnsafe()
+    void clearUnsafe()
     {
         columnFamilies_.clear();
     }

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RecoveryManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RecoveryManager.java?rev=786245&r1=786244&r2=786245&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RecoveryManager.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RecoveryManager.java Thu Jun 18 20:22:18 2009
@@ -34,11 +34,13 @@
 {
     private static RecoveryManager instance_;
     private static Logger logger_ = Logger.getLogger(RecoveryManager.class);
-    
+
     synchronized static RecoveryManager instance() throws IOException
     {
-        if ( instance_ == null )
+        if (instance_ == null)
+        {
             instance_ = new RecoveryManager();
+        }
         return instance_;
     }
 
@@ -46,48 +48,14 @@
     {
         String directory = DatabaseDescriptor.getLogFileLocation();
         File file = new File(directory);
-        File[] files = file.listFiles();
-        return files;
+        return file.listFiles();
     }
-    
-    public static Map<String, List<File>> getListOFCommitLogsPerTable()
-    {
-        File[] files = getListofCommitLogs();
-        /* Maintains a mapping of table name to a list of commit log files */
-        Map<String, List<File>> tableToCommitLogs = new HashMap<String, List<File>>();
-        
-        for (File f : files)
-        {
-            String table = CommitLog.getTableName(f.getName());
-            List<File> clogs = tableToCommitLogs.get(table);
-            if ( clogs == null )
-            {
-                clogs = new ArrayList<File>();
-                tableToCommitLogs.put(table, clogs);
-            }
-            clogs.add(f);
-        }
-        return tableToCommitLogs;
-    }
-    
+
     public static void doRecovery() throws IOException
     {
         File[] files = getListofCommitLogs();
-        Map<String, List<File>> tableToCommitLogs = getListOFCommitLogsPerTable();
-        recoverEachTable(tableToCommitLogs);
+        Arrays.sort(files, new FileUtils.FileComparator());
+        new CommitLog(DatabaseDescriptor.getTables().get(0), true).recover(files);
         FileUtils.delete(files);
     }
-    
-    private static void recoverEachTable(Map<String, List<File>> tableToCommitLogs) throws IOException
-    {
-        Comparator<File> fCmp = new FileUtils.FileComparator();
-        Set<String> tables = tableToCommitLogs.keySet();
-        for ( String table : tables )
-        {
-            List<File> clogs = tableToCommitLogs.get(table);
-            Collections.sort(clogs, fCmp);
-            CommitLog clog = new CommitLog(table, true);
-            clog.recover(clogs);
-        }
-    }
 }

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/utils/BitSetSerializer.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/utils/BitSetSerializer.java?rev=786245&r1=786244&r2=786245&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/utils/BitSetSerializer.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/utils/BitSetSerializer.java Thu Jun 18 20:22:18 2009
@@ -27,7 +27,7 @@
 
 import org.apache.cassandra.io.ICompactSerializer;
 
-class BitSetSerializer
+public class BitSetSerializer
 {
     public static void serialize(BitSet bs, DataOutputStream dos) throws IOException
     {