You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/08/02 00:30:15 UTC

svn commit: r799942 - in /incubator/cassandra/trunk/src/java/org/apache/cassandra: db/CommitLog.java io/AbstractWriter.java io/BufferedRandomAccessFile.java io/IFileReader.java io/SSTableReader.java io/SequenceFile.java

Author: jbellis
Date: Sat Aug  1 22:30:14 2009
New Revision: 799942

URL: http://svn.apache.org/viewvc?rev=799942&view=rev
Log:
make CommitLog use a raw BufferedRandomAccessFile instead of overloading AbstractWriter/IFileReader unnecessarily.  the set of AW features used by both CommitLog and SSTable was nil.
patch by jbellis; reviewed by Stu Hood for CASSANDRA-330

Modified:
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/io/AbstractWriter.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/io/BufferedRandomAccessFile.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/io/IFileReader.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableReader.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SequenceFile.java

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java?rev=799942&r1=799941&r2=799942&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java Sat Aug  1 22:30:14 2009
@@ -126,9 +126,9 @@
         return Long.parseLong(entries[entries.length - 2]);
     }
 
-    private static AbstractWriter createWriter(String file) throws IOException
+    private static BufferedRandomAccessFile createWriter(String file) throws IOException
     {        
-        return SequenceFile.writer(file);
+        return new BufferedRandomAccessFile(file, "rw");
     }
 
     static CommitLog open() throws IOException
@@ -156,7 +156,7 @@
     private String logFile_;
     /* header for current commit log */
     private CommitLogHeader clHeader_;
-    private AbstractWriter logWriter_;
+    private BufferedRandomAccessFile logWriter_;
 
     /*
      * Generates a file name of the format CommitLog-<table>-<timestamp>.log in the
@@ -203,11 +203,11 @@
         return logFile_;
     }
     
-    private CommitLogHeader readCommitLogHeader(IFileReader logReader) throws IOException
+    private CommitLogHeader readCommitLogHeader(BufferedRandomAccessFile logReader) throws IOException
     {
         int size = (int)logReader.readLong();
         byte[] bytes = new byte[size];
-        logReader.readDirect(bytes);
+        logReader.read(bytes);
         ByteArrayInputStream byteStream = new ByteArrayInputStream(bytes);
         return CommitLogHeader.serializer().deserialize(new DataInputStream(byteStream));
     }
@@ -217,7 +217,7 @@
     */
     private static void writeCommitLogHeader(String commitLogFileName, byte[] bytes) throws IOException
     {
-        AbstractWriter logWriter = CommitLog.createWriter(commitLogFileName);
+        RandomAccessFile logWriter = CommitLog.createWriter(commitLogFileName);
         writeCommitLogHeader(logWriter, bytes);
         logWriter.close();
     }
@@ -236,7 +236,7 @@
     /** writes header at the beginning of the file, then seeks back to current position */
     private void seekAndWriteCommitLogHeader(byte[] bytes) throws IOException
     {
-        long currentPos = logWriter_.getCurrentPosition();
+        long currentPos = logWriter_.getFilePointer();
         logWriter_.seek(0);
 
         writeCommitLogHeader(logWriter_, bytes);
@@ -244,10 +244,10 @@
         logWriter_.seek(currentPos);
     }
 
-    private static void writeCommitLogHeader(AbstractWriter logWriter, byte[] bytes) throws IOException
+    private static void writeCommitLogHeader(RandomAccessFile logWriter, byte[] bytes) throws IOException
     {
         logWriter.writeLong(bytes.length);
-        logWriter.writeDirect(bytes);
+        logWriter.write(bytes);
     }
 
     void recover(File[] clogs) throws IOException
@@ -256,7 +256,7 @@
 
         for (File file : clogs)
         {
-            IFileReader reader = SequenceFile.reader(file.getAbsolutePath());
+            BufferedRandomAccessFile reader = new BufferedRandomAccessFile(file.getAbsolutePath(), "r");
             CommitLogHeader clHeader = readCommitLogHeader(reader);
             /* seek to the lowest position */
             int lowPos = CommitLogHeader.getLowestPosition(clHeader);
@@ -272,13 +272,13 @@
             Set<Table> tablesRecovered = new HashSet<Table>();
 
             /* read the logs populate RowMutation and apply */
-            while ( !reader.isEOF() )
+            while (reader.getFilePointer() < reader.length())
             {
                 byte[] bytes;
                 try
                 {
-                    bytes = new byte[(int)reader.readLong()];
-                    reader.readDirect(bytes);
+                    bytes = new byte[(int) reader.readLong()];
+                    reader.read(bytes);
                 }
                 catch (EOFException e)
                 {
@@ -296,16 +296,18 @@
                 for (ColumnFamily columnFamily : columnFamilies)
                 {
                     /* TODO: Remove this to not process Hints */
-                    if ( !DatabaseDescriptor.isApplicationColumnFamily(columnFamily.name()) )
+                    if (!DatabaseDescriptor.isApplicationColumnFamily(columnFamily.name()))
                     {
                         row.removeColumnFamily(columnFamily);
                         continue;
                     }
                     int id = table.getColumnFamilyId(columnFamily.name());
-                    if ( !clHeader.isDirty(id) || reader.getCurrentPosition() < clHeader.getPosition(id) )
+                    if (!clHeader.isDirty(id) || reader.getFilePointer() < clHeader.getPosition(id))
+                    {
                         row.removeColumnFamily(columnFamily);
+                    }
                 }
-                if ( !row.isEmpty() )
+                if (!row.isEmpty())
                 {
                     table.applyNow(row);
                 }
@@ -331,7 +333,7 @@
             int id = table.getColumnFamilyId(columnFamily.name());
             if (!clHeader_.isDirty(id) || (clHeader_.isDirty(id) && clHeader_.getPosition(id) == 0))
             {
-                clHeader_.turnOn(id, logWriter_.getCurrentPosition());
+                clHeader_.turnOn(id, logWriter_.getFilePointer());
                 seekAndWriteCommitLogHeader(clHeader_.toByteArray());
             }
         }
@@ -343,7 +345,7 @@
         {
             public CommitLogContext call() throws Exception
             {
-                return new CommitLogContext(logFile_, logWriter_.getCurrentPosition());
+                return new CommitLogContext(logFile_, logWriter_.getFilePointer());
             }
         };
         try
@@ -499,11 +501,11 @@
 
     private boolean maybeRollLog() throws IOException
     {
-        if (logWriter_.getFileSize() >= SEGMENT_SIZE)
+        if (logWriter_.length() >= SEGMENT_SIZE)
         {
             /* Rolls the current log file over to a new one. */
             setNextFileName();
-            String oldLogFile = logWriter_.getFileName();
+            String oldLogFile = logWriter_.getPath();
             logWriter_.close();
 
             /* point reader/writer to a new commit log file. */
@@ -542,12 +544,12 @@
             {
                 /* serialize the row */
                 Row.serializer().serialize(row, cfBuffer);
-                currentPosition = logWriter_.getCurrentPosition();
+                currentPosition = logWriter_.getFilePointer();
                 CommitLogContext cLogCtx = new CommitLogContext(logFile_, currentPosition);
                 /* Update the header */
                 maybeUpdateHeader(row);
                 logWriter_.writeLong(cfBuffer.getLength());
-                logWriter_.append(cfBuffer);
+                logWriter_.write(cfBuffer.getData(), 0, cfBuffer.getLength());
                 maybeRollLog();
                 return cLogCtx;
             }

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/io/AbstractWriter.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/AbstractWriter.java?rev=799942&r1=799941&r2=799942&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/io/AbstractWriter.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/io/AbstractWriter.java Sat Aug  1 22:30:14 2009
@@ -39,21 +39,6 @@
     public abstract void seek(long position) throws IOException;
 
     /**
-     * Appends the buffer to the the underlying SequenceFile.
-     * @param buffer buffer which contains the serialized data.
-     * @throws java.io.IOException
-     */
-    public abstract void append(DataOutputBuffer buffer) throws IOException;
-
-    /**
-     * Appends the key and the value to the the underlying SequenceFile.
-     * @param keyBuffer buffer which contains the serialized key.
-     * @param buffer buffer which contains the serialized data.
-     * @throws java.io.IOException
-     */
-    public abstract void append(DataOutputBuffer keyBuffer, DataOutputBuffer buffer) throws IOException;
-
-    /**
      * Appends the key and the value to the the underlying SequenceFile.
      * @param key key associated with this peice of data.
      * @param buffer buffer containing the serialized data.
@@ -70,48 +55,12 @@
     public abstract void append(String key, byte[] value) throws IOException;
 
     /**
-     * Appends the key and the long value to the the underlying SequenceFile.
-     * This is used in the contruction of the index file associated with a
-     * SSTable.
-     * @param key key associated with this peice of data.
-     * @param value value associated with this key.
-     * @throws java.io.IOException
-     */
-    public abstract void append(String key, long value) throws IOException;
-
-    /**
-     * Be extremely careful while using this API. This currently
-     * used to write the commit log header in the commit logs.
-     * If not used carefully it could completely screw up reads
-     * of other key/value pairs that are written.
-     *
-     * @param bytes serialized version of the commit log header.
-     * @throws java.io.IOException
-    */
-    public abstract long writeDirect(byte[] bytes) throws IOException;
-
-    /**
-     * Write a long into the underlying sub system.
-     * @param value long to be written
-     * @throws java.io.IOException
-     */
-    public abstract void writeLong(long value) throws IOException;
-
-    /**
      * Close the file which is being used for the write.
      * @throws java.io.IOException
      */
     public abstract void close() throws IOException;
 
     /**
-     * Close the file after appending the passed in footer information.
-     * @param footer footer information.
-     * @param size size of the footer.
-     * @throws java.io.IOException
-     */
-    public abstract void close(byte[] footer, int size) throws IOException;
-
-    /**
      * @return the size of the file.
      * @throws java.io.IOException
      */
@@ -162,25 +111,6 @@
             file_.seek(position);
         }
 
-        public void append(DataOutputBuffer buffer) throws IOException
-        {
-            file_.write(buffer.getData(), 0, buffer.getLength());
-        }
-
-        public void append(DataOutputBuffer keyBuffer, DataOutputBuffer buffer) throws IOException
-        {
-            int keyBufLength = keyBuffer.getLength();
-            if (keyBuffer == null || keyBufLength == 0)
-                throw new IllegalArgumentException("Key cannot be NULL or of zero length.");
-
-            file_.writeInt(keyBufLength);
-            file_.write(keyBuffer.getData(), 0, keyBufLength);
-
-            int length = buffer.getLength();
-            file_.writeInt(length);
-            file_.write(buffer.getData(), 0, length);
-        }
-
         public void append(String key, DataOutputBuffer buffer) throws IOException
         {
             if (key == null)
@@ -202,46 +132,12 @@
             file_.write(value);
         }
 
-        public void append(String key, long value) throws IOException
-        {
-            if (key == null)
-                throw new IllegalArgumentException("Key cannot be NULL.");
-
-            file_.writeUTF(key);
-            file_.writeLong(value);
-        }
-
-        /**
-         * Be extremely careful while using this API. This currently
-         * used to write the commit log header in the commit logs.
-         * If not used carefully it could completely screw up reads
-         * of other key/value pairs that are written.
-         *
-         * @param bytes the bytes to write
-         */
-        public long writeDirect(byte[] bytes) throws IOException
-        {
-            file_.write(bytes);
-            return file_.getFilePointer();
-        }
-
-        public void writeLong(long value) throws IOException
-        {
-            file_.writeLong(value);
-        }
-
         public void close() throws IOException
         {
             sync();
             file_.close();
         }
 
-        public void close(byte[] footer, int size) throws IOException
-        {
-            file_.writeInt(size);
-            file_.write(footer, 0, size);
-        }
-
         public String getFileName()
         {
             return filename_;
@@ -282,5 +178,11 @@
                 file.createNewFile();
             }
         }
+
+        @Override
+        public void sync() throws IOException
+        {
+            ((BufferedRandomAccessFile)file_).sync();
+        }
     }
 }

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/io/BufferedRandomAccessFile.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/BufferedRandomAccessFile.java?rev=799942&r1=799941&r2=799942&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/io/BufferedRandomAccessFile.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/io/BufferedRandomAccessFile.java Sat Aug  1 22:30:14 2009
@@ -42,6 +42,8 @@
     static final int LogBuffSz_ = 16; // 64K buffer
     public static final int BuffSz_ = (1 << LogBuffSz_);
     static final long BuffMask_ = ~(((long) BuffSz_) - 1L);
+
+    private String path_;
     
     /*
      * This implementation is based on the buffer implementation in Modula-3's
@@ -54,59 +56,59 @@
     private long maxHi_; // this.lo + this.buff.length
     private boolean hitEOF_; // buffer contains last file block?
     private long diskPos_; // disk position
-    
+
     /*
-     * To describe the above fields, we introduce the following abstractions for
-     * the file "f":
-     * 
-     * len(f) the length of the file curr(f) the current position in the file
-     * c(f) the abstract contents of the file disk(f) the contents of f's
-     * backing disk file closed(f) true iff the file is closed
-     * 
-     * "curr(f)" is an index in the closed interval [0, len(f)]. "c(f)" is a
-     * character sequence of length "len(f)". "c(f)" and "disk(f)" may differ if
-     * "c(f)" contains unflushed writes not reflected in "disk(f)". The flush
-     * operation has the effect of making "disk(f)" identical to "c(f)".
-     * 
-     * A file is said to be *valid* if the following conditions hold:
-     * 
-     * V1. The "closed" and "curr" fields are correct:
-     * 
-     * f.closed == closed(f) f.curr == curr(f)
-     * 
-     * V2. The current position is either contained in the buffer, or just past
-     * the buffer:
-     * 
-     * f.lo <= f.curr <= f.hi
-     * 
-     * V3. Any (possibly) unflushed characters are stored in "f.buff":
-     * 
-     * (forall i in [f.lo, f.curr): c(f)[i] == f.buff[i - f.lo])
-     * 
-     * V4. For all characters not covered by V3, c(f) and disk(f) agree:
-     * 
-     * (forall i in [f.lo, len(f)): i not in [f.lo, f.curr) => c(f)[i] ==
-     * disk(f)[i])
-     * 
-     * V5. "f.dirty" is true iff the buffer contains bytes that should be
-     * flushed to the file; by V3 and V4, only part of the buffer can be dirty.
-     * 
-     * f.dirty == (exists i in [f.lo, f.curr): c(f)[i] != f.buff[i - f.lo])
-     * 
-     * V6. this.maxHi == this.lo + this.buff.length
-     * 
-     * Note that "f.buff" can be "null" in a valid file, since the range of
-     * characters in V3 is empty when "f.lo == f.curr".
-     * 
-     * A file is said to be *ready* if the buffer contains the current position,
-     * i.e., when:
-     * 
-     * R1. !f.closed && f.buff != null && f.lo <= f.curr && f.curr < f.hi
-     * 
-     * When a file is ready, reading or writing a single byte can be performed
-     * by reading or writing the in-memory buffer without performing a disk
-     * operation.
-     */
+    * To describe the above fields, we introduce the following abstractions for
+    * the file "f":
+    *
+    * len(f) the length of the file curr(f) the current position in the file
+    * c(f) the abstract contents of the file disk(f) the contents of f's
+    * backing disk file closed(f) true iff the file is closed
+    *
+    * "curr(f)" is an index in the closed interval [0, len(f)]. "c(f)" is a
+    * character sequence of length "len(f)". "c(f)" and "disk(f)" may differ if
+    * "c(f)" contains unflushed writes not reflected in "disk(f)". The flush
+    * operation has the effect of making "disk(f)" identical to "c(f)".
+    *
+    * A file is said to be *valid* if the following conditions hold:
+    *
+    * V1. The "closed" and "curr" fields are correct:
+    *
+    * f.closed == closed(f) f.curr == curr(f)
+    *
+    * V2. The current position is either contained in the buffer, or just past
+    * the buffer:
+    *
+    * f.lo <= f.curr <= f.hi
+    *
+    * V3. Any (possibly) unflushed characters are stored in "f.buff":
+    *
+    * (forall i in [f.lo, f.curr): c(f)[i] == f.buff[i - f.lo])
+    *
+    * V4. For all characters not covered by V3, c(f) and disk(f) agree:
+    *
+    * (forall i in [f.lo, len(f)): i not in [f.lo, f.curr) => c(f)[i] ==
+    * disk(f)[i])
+    *
+    * V5. "f.dirty" is true iff the buffer contains bytes that should be
+    * flushed to the file; by V3 and V4, only part of the buffer can be dirty.
+    *
+    * f.dirty == (exists i in [f.lo, f.curr): c(f)[i] != f.buff[i - f.lo])
+    *
+    * V6. this.maxHi == this.lo + this.buff.length
+    *
+    * Note that "f.buff" can be "null" in a valid file, since the range of
+    * characters in V3 is empty when "f.lo == f.curr".
+    *
+    * A file is said to be *ready* if the buffer contains the current position,
+    * i.e., when:
+    *
+    * R1. !f.closed && f.buff != null && f.lo <= f.curr && f.curr < f.hi
+    *
+    * When a file is ready, reading or writing a single byte can be performed
+    * by reading or writing the in-memory buffer without performing a disk
+    * operation.
+    */
     
     /**
      * Open a new <code>BufferedRandomAccessFile</code> on <code>file</code>
@@ -115,13 +117,13 @@
      */
     public BufferedRandomAccessFile(File file, String mode) throws IOException
     {
-        super(file, mode);
-        this.init(0);
+        this(file, mode, 0);
     }
     
     public BufferedRandomAccessFile(File file, String mode, int size) throws IOException
     {
         super(file, mode);
+        path_ = file.getAbsolutePath();
         this.init(size);
     }
     
@@ -132,13 +134,13 @@
      */
     public BufferedRandomAccessFile(String name, String mode) throws IOException
     {
-        super(name, mode);
-        this.init(0);
+        this(name, mode, 0);
     }
     
     public BufferedRandomAccessFile(String name, String mode, int size) throws FileNotFoundException
     {
         super(name, mode);
+        path_ = name;
         this.init(size);
     }
     
@@ -151,7 +153,18 @@
         this.hitEOF_ = false;
         this.diskPos_ = 0L;
     }
-    
+
+    public String getPath()
+    {
+        return path_;
+    }
+
+    public void sync() throws IOException
+    {
+        flush();
+        getChannel().force(true);
+    }
+
     public void close() throws IOException
     {
         this.flush();

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/io/IFileReader.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/IFileReader.java?rev=799942&r1=799941&r2=799942&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/io/IFileReader.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/io/IFileReader.java Sat Aug  1 22:30:14 2009
@@ -32,27 +32,10 @@
     public String getFileName();
     public long getEOF() throws IOException;
     public long getCurrentPosition() throws IOException;
-    public boolean isHealthyFileDescriptor() throws IOException;
     public void seek(long position) throws IOException;
     public boolean isEOF() throws IOException;
 
     /**
-     * Be extremely careful while using this API. This currently
-     * used to read the commit log header from the commit logs.
-     * Treat this as an internal API.
-     * 
-     * @param bytes read into this byte array.
-    */
-    public void readDirect(byte[] bytes) throws IOException;
-    
-    /**
-     * Read a long value from the underlying sub system.
-     * @return value read
-     * @throws IOException
-     */
-    public long readLong() throws IOException;
-        
-    /**
      * This method dumps the next key/value into the DataOuputStream
      * passed in.
      *

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableReader.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableReader.java?rev=799942&r1=799941&r2=799942&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableReader.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableReader.java Sat Aug  1 22:30:14 2009
@@ -292,7 +292,7 @@
         IFileReader dataReader = null;
         try
         {
-            dataReader = SequenceFile.reader(dataFile);
+            dataReader = SequenceFile.bufferedReader(dataFile, 64 * 1024);
             String decoratedKey = partitioner.decorateKey(clientKey);
             long position = getPosition(decoratedKey, partitioner);
 
@@ -325,7 +325,7 @@
      */
     public ColumnGroupReader getColumnGroupReader(String key, String cfName, byte[] startColumn, boolean isAscending) throws IOException
     {
-        IFileReader dataReader = SequenceFile.reader(dataFile);
+        IFileReader dataReader = SequenceFile.bufferedReader(dataFile, 64 * 1024);
 
         try
         {

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SequenceFile.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SequenceFile.java?rev=799942&r1=799941&r2=799942&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SequenceFile.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SequenceFile.java Sat Aug  1 22:30:14 2009
@@ -473,11 +473,6 @@
             return file_.getFilePointer();
         }
 
-        public boolean isHealthyFileDescriptor() throws IOException
-        {
-            return file_.getFD().valid();
-        }
-
         public void seek(long position) throws IOException
         {
             file_.seek(position);
@@ -488,23 +483,6 @@
             return (getCurrentPosition() == getEOF());
         }
 
-        /**
-         * Be extremely careful while using this API. This currently
-         * used to read the commit log header from the commit logs.
-         * Treat this as an internal API.
-         *
-         * @param bytes read from the buffer into the this array
-         */
-        public void readDirect(byte[] bytes) throws IOException
-        {
-            file_.readFully(bytes);
-        }
-
-        public long readLong() throws IOException
-        {
-            return file_.readLong();
-        }
-
         public void close() throws IOException
         {
             file_.close();
@@ -531,21 +509,11 @@
     public static final short utfPrefix_ = 2;
     public static final String marker_ = "Bloom-Filter";
 
-    public static AbstractWriter writer(String filename) throws IOException
-    {
-        return new AbstractWriter.Writer(filename);
-    }
-
     public static AbstractWriter bufferedWriter(String filename, int size) throws IOException
     {
         return new AbstractWriter.BufferWriter(filename, size);
     }
 
-    public static IFileReader reader(String filename) throws IOException
-    {
-        return new Reader(filename);
-    }
-
     public static IFileReader bufferedReader(String filename, int size) throws IOException
     {
         return new BufferReader(filename, size);