You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2009/03/18 07:10:49 UTC

svn commit: r755485 - in /incubator/cassandra/trunk/src/org/apache/cassandra/io: ChecksumManager.java ChecksumRandomAccessFile.java SSTable.java SequenceFile.java

Author: alakshman
Date: Wed Mar 18 06:10:48 2009
New Revision: 755485

URL: http://svn.apache.org/viewvc?rev=755485&view=rev
Log:
Needed to make these changes to fix an issue with reads w/o column indexes.

Modified:
    incubator/cassandra/trunk/src/org/apache/cassandra/io/ChecksumManager.java
    incubator/cassandra/trunk/src/org/apache/cassandra/io/ChecksumRandomAccessFile.java
    incubator/cassandra/trunk/src/org/apache/cassandra/io/SSTable.java
    incubator/cassandra/trunk/src/org/apache/cassandra/io/SequenceFile.java

Modified: incubator/cassandra/trunk/src/org/apache/cassandra/io/ChecksumManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/io/ChecksumManager.java?rev=755485&r1=755484&r2=755485&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/io/ChecksumManager.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/io/ChecksumManager.java Wed Mar 18 06:10:48 2009
@@ -53,7 +53,7 @@
  * @author alakshman
  *
  */
-class ChecksumManager
+public class ChecksumManager
 {    
     private static Logger logger_ = Logger.getLogger(ChecksumManager.class);
     /* Keeps a mapping of checksum manager instances to data file */
@@ -111,6 +111,17 @@
         return chksumMgr;
     }
     
+    /**
+     * This method returns true if the file specified is a 
+     * checksum file and false otherwise.
+     * 
+     * @param file we are interested in.
+     * @return true if checksum file false otherwise.
+     */
+    public static boolean isChecksumFile(String file)
+    {
+        return file.contains(ChecksumManager.checksumPrefix_);
+    }
     
     /**
      * On start read all the check sum files on disk and
@@ -136,11 +147,12 @@
         
         for ( File file : allFiles )
         {                           
-            int fId = SequenceFile.getFileId(file.getName());
-            ChecksumReader chksumRdr = new ChecksumReader(file.getAbsolutePath(), 0L, file.length());
-                        
+            int fId = ChecksumManager.getChecksumFileId(file.getName());
+            RandomAccessFile chksumRdr = new RandomAccessFile(file, "r");            
+            long size = chksumRdr.length();
             int chunk = 0;
-            while ( !chksumRdr.isEOF() )
+            
+            while ( chksumRdr.getFilePointer() != size )
             {
                 long value = chksumRdr.readLong();
                 long key = ChecksumManager.key(fId, ++chunk);
@@ -160,7 +172,7 @@
     {
         File f = new File(dataFile);
         long size = f.length();
-        int fileId = SequenceFile.getFileId(f.getName());
+        int fileId = ChecksumManager.getFileId(f.getName());
         int chunks = (int)(size >> 16L);
         
         for ( int i = 0; i < chunks; ++i )
@@ -171,7 +183,7 @@
         
         /* remove the check sum manager instance */
         chksumMgrs_.remove(dataFile);
-        String chksumFile = f.getParent() + System.getProperty("file.separator") + checksumPrefix_ + fileId + ".db";
+        String chksumFile = ChecksumManager.constructChksumFileNameFromDataFileName(f);
         FileUtils.delete(chksumFile);
     }
     
@@ -184,17 +196,53 @@
         return key;
     }
     
+    public static int getFileId(String file)
+    {
+        String filename = new File(file).getName();
+        /*
+         * File name is of the form <table>-<column family>-<index>-Data.db.
+         * Always split and then use the value which is at index length - 2.
+         */
+        String[] peices = filename.split("-");
+        return Integer.parseInt( peices[peices.length - 2] );
+    }
+    
+    static void close(String dataFile) throws IOException
+    {
+        ChecksumManager.chksumMgrs_.get(dataFile).close();
+    }
+    
+    private static int getChecksumFileId(String file)
+    {
+        String filename = new File(file).getName();
+        /*
+         * File name is of the form <table>-<column family>-Checksum-<index>.db.
+         * This tokenizer will strip the .db portion.
+         */
+        String[] peices = filename.split("-");
+        return Integer.parseInt( peices[3] );
+    }
+    
+    private static String constructChksumFileNameFromDataFileName(File file)
+    {
+        String directory = file.getParent();
+        String f = file.getName();
+        /* we need the table and the column family name. */
+        String[] peices = f.split("-");
+        /* we need the index part of the file name */
+        int fId = ChecksumManager.getFileId(f);
+        String chkSumFile = directory + System.getProperty("file.separator") + peices[0] + "-" + peices[1] + "-" + checksumPrefix_ + fId + "-" + "Data" + ".db";
+        return chkSumFile;
+    }
+    
     private RandomAccessFile raf_;
     private Adler32 adler_ = new Adler32();
     
     ChecksumManager(String dataFile) throws IOException
     {
         File file = new File(dataFile);
-        String directory = file.getParent();
-        String f = file.getName();
-        short fId = SequenceFile.getFileId(f);
-        String chkSumFile = directory + System.getProperty("file.separator") + checksumPrefix_ + fId + ".db";
-        raf_ = new RandomAccessFile(chkSumFile, "rw");
+        String chkSumFile = ChecksumManager.constructChksumFileNameFromDataFileName(file);
+        raf_ = new BufferedRandomAccessFile(chkSumFile, "rw");
     }
     
     /* TODO: Remove later. */
@@ -203,8 +251,8 @@
         File file = new File(dataFile);
         String directory = file.getParent();
         String f = file.getName();
-        short fId = SequenceFile.getFileId(f);        
-        raf_ = new RandomAccessFile(chkSumFile, "rw");
+        int fId = ChecksumManager.getFileId(f);        
+        raf_ = new BufferedRandomAccessFile(chkSumFile, "rw");
         
         file = new File(chkSumFile);        
         ChecksumReader chksumRdr = new ChecksumReader(file.getAbsolutePath(), 0L, file.length());
@@ -288,7 +336,7 @@
      */
     void validateChecksum(String file, int chunkId, byte[] buffer, int startOffset, int length) throws IOException
     {            
-        int fId = SequenceFile.getFileId(file);
+        int fId = ChecksumManager.getFileId(file);
         long key = ChecksumManager.key(fId, chunkId);
         adler_.update(buffer, startOffset, length);
         long currentChksum = adler_.getValue();
@@ -313,6 +361,16 @@
         return chksums_.get(key);
     }
     
+    /**
+     * Close the file handler.
+     * 
+     * @throws IOException
+     */
+    void close() throws IOException
+    {
+        raf_.close();
+    }
+    
     public static void main(String[] args) throws Throwable
     {
         ChecksumReader rdr = new ChecksumReader("C:\\Engagements\\Cassandra\\Checksum-1.db");

Modified: incubator/cassandra/trunk/src/org/apache/cassandra/io/ChecksumRandomAccessFile.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/io/ChecksumRandomAccessFile.java?rev=755485&r1=755484&r2=755485&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/io/ChecksumRandomAccessFile.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/io/ChecksumRandomAccessFile.java Wed Mar 18 06:10:48 2009
@@ -197,7 +197,7 @@
         int chksumChunkId = (int)(chksumChunks*(chunk - 1) + 1);
         do
         {            
-            int fId = SequenceFile.getFileId(filename_);               
+            int fId = ChecksumManager.getFileId(filename_);               
             switch( chksumOps )
             {
                 case LOG:                    

Modified: incubator/cassandra/trunk/src/org/apache/cassandra/io/SSTable.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/io/SSTable.java?rev=755485&r1=755484&r2=755485&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/io/SSTable.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/io/SSTable.java Wed Mar 18 06:10:48 2009
@@ -245,6 +245,15 @@
     {        
         /* remove the cached index table from memory */
         indexMetadataMap_.remove(dataFile);
+        /* Delete the checksum file associated with this data file */
+        try
+        {
+            ChecksumManager.onFileDelete(dataFile);
+        }
+        catch ( IOException ex )
+        {
+            logger_.info( LogUtil.throwableToString(ex) );
+        }
         
         File file = new File(dataFile);
         if ( file.exists() )
@@ -252,7 +261,7 @@
             /* delete the data file */
 			if (file.delete())
 			{			    
-			    logger_.info("** Deleted " + file.getName() + " **");
+			    logger_.info("** Deleted " + file.getName() + " **");                
 			}
 			else
 			{			  
@@ -394,7 +403,7 @@
         blockIndex_ = new TreeMap<String, BlockMetadata>(Collections.reverseOrder());
         blockIndexes_ = new ArrayList<SortedMap<String, BlockMetadata>>();        
         // dataWriter_ = SequenceFile.writer(dataFile_);
-        dataWriter_ = SequenceFile.bufferedWriter(dataFile_, 4*1024*1024);
+        dataWriter_ = SequenceFile.bufferedWriter(dataFile_, 4*1024*1024);        
         SSTable.positionAfterFirstBlockIndex_ = dataWriter_.getCurrentPosition(); 
     } 
     
@@ -424,7 +433,8 @@
     public SSTable(String directory, String filename, PartitionerType pType) throws IOException
     {        
         dataFile_ = directory + System.getProperty("file.separator") + filename + "-Data.db";  
-        dataWriter_ = SequenceFile.bufferedWriter(dataFile_, 4*1024*1024);        
+        dataWriter_ = SequenceFile.bufferedWriter(dataFile_, 4*1024*1024);    
+        // dataWriter_ = SequenceFile.chksumWriter(dataFile_, 4*1024*1024);
         SSTable.positionAfterFirstBlockIndex_ = dataWriter_.getCurrentPosition(); 
         /* set up the block index based on partition type */
         initBlockIndex(pType);
@@ -900,6 +910,7 @@
         try
         {
             dataReader = SequenceFile.reader(dataFile_);
+            // dataReader = SequenceFile.chksumReader(dataFile_, 4*1024*1024);
             /* Morph key into actual key based on the partition type. */ 
             key = morphKey(key);
             Coordinate fileCoordinate = getCoordinates(key, dataReader);

Modified: incubator/cassandra/trunk/src/org/apache/cassandra/io/SequenceFile.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/io/SequenceFile.java?rev=755485&r1=755484&r2=755485&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/io/SequenceFile.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/io/SequenceFile.java Wed Mar 18 06:10:48 2009
@@ -33,6 +33,7 @@
 import java.util.Collections;
 import java.util.List;
 import java.util.SortedMap;
+import java.util.StringTokenizer;
 import java.util.TreeMap;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
@@ -241,6 +242,37 @@
         }
     }
     
+    public static class ChecksumWriter extends Writer
+    {
+        private int size_;
+
+        ChecksumWriter(String filename, int size) throws IOException
+        {
+            super(filename, size);
+            size_ = size;
+        }
+        
+        @Override
+        protected void init(String filename) throws IOException
+        {
+            init(filename, 0);
+        }
+        
+        @Override
+        protected void init(String filename, int size) throws IOException
+        {
+            File file = new File(filename);
+            file_ = new ChecksumRandomAccessFile(file, "rw", size);
+        }
+        
+        @Override
+        public void close() throws IOException
+        {
+            super.close();
+            ChecksumManager.close(filename_);
+        }
+    }
+    
     public static class AIOWriter extends Writer
     {        
         private int size_;
@@ -911,10 +943,13 @@
                     else
                     {
                         /* Read the bloom filter for the column summarization */
+                        long preBfPos = file_.getFilePointer();
                         BloomFilter bf = defreezeBloomFilter();
                         /* column does not exist in this file */
                         if ( !bf.isPresent(columnName) ) 
                             return bytesRead;
+                        long postBfPos = file_.getFilePointer();
+                        dataSize -= (postBfPos - preBfPos);
                         
                         List<IndexHelper.ColumnIndexInfo> columnIndexList = new ArrayList<IndexHelper.ColumnIndexInfo>();
                         /* Read the name indexes if present */
@@ -1149,8 +1184,11 @@
                     }
                     else
                     {
-                        /* Read the bloom filter summarizing the columns */                         
-                        BloomFilter bf = defreezeBloomFilter();  
+                        /* Read the bloom filter summarizing the columns */ 
+                        long preBfPos = file_.getFilePointer();
+                        BloomFilter bf = defreezeBloomFilter();
+                        long postBfPos = file_.getFilePointer();
+                        dataSize -= (postBfPos - preBfPos);
                         /*
                         // remove the columns that the bloom filter says do not exist.
                         for ( String cName : columnNames )
@@ -1406,6 +1444,22 @@
         }
     }
     
+    public static class ChecksumReader extends Reader
+    {        
+        private int size_;
+
+        ChecksumReader(String filename, int size) throws IOException
+        {
+            super(filename);
+            size_ = size;
+        }
+        
+        protected void init(String filename) throws IOException
+        {
+            file_ = new ChecksumRandomAccessFile(filename, "r", size_);
+        }
+    }
+    
     public static class AIOReader extends Reader
     {                  
         private int size_;
@@ -1432,7 +1486,7 @@
         
     private static Logger logger_ = Logger.getLogger( SequenceFile.class ) ;
     public static final short utfPrefix_ = 2;
-    static final String marker_ = "Bloom-Filter";
+    public static final String marker_ = "Bloom-Filter";
 
     public static IFileWriter writer(String filename) throws IOException
     {
@@ -1444,6 +1498,11 @@
         return new BufferWriter(filename, size);
     }
     
+    public static IFileWriter chksumWriter(String filename, int size) throws IOException
+    {
+        return new ChecksumWriter(filename, size);
+    }
+    
     public static IFileWriter aioWriter(String filename, int size) throws IOException
     {
         return new AIOWriter(filename, size);
@@ -1469,6 +1528,11 @@
         return new BufferReader(filename, size);
     }
     
+    public static IFileReader chksumReader(String filename, int size) throws IOException
+    {
+        return new ChecksumReader(filename, size);
+    }
+    
     public static IFileReader aioReader(String filename, int size) throws IOException
     {
         return new AIOReader(filename, size);
@@ -1631,10 +1695,4 @@
         // The number of chars produced may be less than utflen
         return new String(chararr, 0, chararr_count);
     }
-    
-    public static short getFileId(String file)
-    {
-        String[] peices = file.split("-");
-        return Short.parseShort( peices[2] );
-    }
 }