You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2010/08/03 17:47:29 UTC

svn commit: r981938 - /cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLog.java

Author: jbellis
Date: Tue Aug  3 15:47:29 2010
New Revision: 981938

URL: http://svn.apache.org/viewvc?rev=981938&view=rev
Log:
close commitlog reader before deleting it
patch by jbellis; reviewed by gdusbabek for CASSANDRA-1348

Modified:
    cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLog.java

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLog.java?rev=981938&r1=981937&r2=981938&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLog.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLog.java Tue Aug  3 15:47:29 2010
@@ -187,115 +187,122 @@ public class CommitLog
 
         for (File file : clogs)
         {
-            CommitLogHeader clHeader = null;
-            int bufferSize = (int)Math.min(file.length(), 32 * 1024 * 1024);
-            BufferedRandomAccessFile reader = new BufferedRandomAccessFile(file.getAbsolutePath(), "r", bufferSize);
-
-            int replayPosition = 0;
-            String headerPath = CommitLogHeader.getHeaderPathFromSegmentPath(file.getAbsolutePath());
+            BufferedRandomAccessFile reader = null;
             try
             {
-                clHeader = CommitLogHeader.readCommitLogHeader(headerPath);
-                replayPosition = clHeader.getReplayPosition();
-            }
-            catch (IOException ioe)
-            {
-                logger.info(headerPath + " incomplete, missing or corrupt.  Everything is ok, don't panic.  CommitLog will be replayed from the beginning");
-                logger.debug("exception was", ioe);
-            }
-            if (replayPosition < 0)
-            {
-                logger.debug("skipping replay of fully-flushed {}", file);
-                continue;
-            }
-            reader.seek(replayPosition);
+                CommitLogHeader clHeader = null;
+                int bufferSize = (int)Math.min(file.length(), 32 * 1024 * 1024);
+                reader = new BufferedRandomAccessFile(file.getAbsolutePath(), "r", bufferSize);
 
-            if (logger.isDebugEnabled())
-                logger.debug("Replaying " + file + " starting at " + reader.getFilePointer());
-
-            /* read the logs populate RowMutation and apply */
-            while (!reader.isEOF())
-            {
-                if (logger.isDebugEnabled())
-                    logger.debug("Reading mutation at " + reader.getFilePointer());
-
-                long claimedCRC32;
-
-                Checksum checksum = new CRC32();
-                int serializedSize;
+                int replayPosition = 0;
+                String headerPath = CommitLogHeader.getHeaderPathFromSegmentPath(file.getAbsolutePath());
                 try
                 {
-                    // any of the reads may hit EOF
-                    serializedSize = reader.readInt();
-                    long claimedSizeChecksum = reader.readLong();
-                    checksum.update(serializedSize);
-                    if (checksum.getValue() != claimedSizeChecksum || serializedSize <= 0)
-                        break; // entry wasn't synced correctly/fully.  that's ok.
-
-                    if (serializedSize > bytes.length)
-                        bytes = new byte[(int) (1.2 * serializedSize)];
-                    reader.readFully(bytes, 0, serializedSize);
-                    claimedCRC32 = reader.readLong();
+                    clHeader = CommitLogHeader.readCommitLogHeader(headerPath);
+                    replayPosition = clHeader.getReplayPosition();
                 }
-                catch(EOFException eof)
+                catch (IOException ioe)
                 {
-                    break; // last CL entry didn't get completely written.  that's ok.
+                    logger.info(headerPath + " incomplete, missing or corrupt.  Everything is ok, don't panic.  CommitLog will be replayed from the beginning");
+                    logger.debug("exception was", ioe);
                 }
-
-                checksum.update(bytes, 0, serializedSize);
-                if (claimedCRC32 != checksum.getValue())
+                if (replayPosition < 0)
                 {
-                    // this entry must not have been fsynced.  probably the rest is bad too,
-                    // but just in case there is no harm in trying them (since we still read on an entry boundary)
+                    logger.debug("skipping replay of fully-flushed {}", file);
                     continue;
                 }
+                reader.seek(replayPosition);
 
-                /* deserialize the commit log entry */
-                ByteArrayInputStream bufIn = new ByteArrayInputStream(bytes, 0, serializedSize);
-                final RowMutation rm = RowMutation.serializer().deserialize(new DataInputStream(bufIn));
                 if (logger.isDebugEnabled())
-                    logger.debug(String.format("replaying mutation for %s.%s: %s",
-                                                rm.getTable(),
-                                                rm.key(),
-                                                "{" + StringUtils.join(rm.getColumnFamilies(), ", ") + "}"));
-                final Table table = Table.open(rm.getTable());
-                tablesRecovered.add(table);
-                final Collection<ColumnFamily> columnFamilies = new ArrayList<ColumnFamily>(rm.getColumnFamilies());
-                final long entryLocation = reader.getFilePointer();
-                final CommitLogHeader finalHeader = clHeader;
-                Runnable runnable = new WrappedRunnable()
+                    logger.debug("Replaying " + file + " starting at " + reader.getFilePointer());
+
+                /* read the logs populate RowMutation and apply */
+                while (!reader.isEOF())
                 {
-                    public void runMayThrow() throws IOException
+                    if (logger.isDebugEnabled())
+                        logger.debug("Reading mutation at " + reader.getFilePointer());
+
+                    long claimedCRC32;
+
+                    Checksum checksum = new CRC32();
+                    int serializedSize;
+                    try
                     {
-                        RowMutation newRm = new RowMutation(rm.getTable(), rm.key());
-                        
-                        // Rebuild the row mutation, omitting column families that a) have already been flushed,
-                        // b) are part of a cf that was dropped. Keep in mind that the cf.name() is suspect. do every
-                        // thing based on the cfid instead.
-                        for (ColumnFamily columnFamily : columnFamilies)
-                        {
-                            if (CFMetaData.getCF(columnFamily.id()) == null)
-                                // null means the cf has been dropped
-                                continue;
-                            
-                            if (finalHeader == null || (finalHeader.isDirty(columnFamily.id()) && entryLocation >= finalHeader.getPosition(columnFamily.id())))
-                                newRm.add(columnFamily);
-                        }
-                        if (!newRm.isEmpty())
+                        // any of the reads may hit EOF
+                        serializedSize = reader.readInt();
+                        long claimedSizeChecksum = reader.readLong();
+                        checksum.update(serializedSize);
+                        if (checksum.getValue() != claimedSizeChecksum || serializedSize <= 0)
+                            break; // entry wasn't synced correctly/fully.  that's ok.
+
+                        if (serializedSize > bytes.length)
+                            bytes = new byte[(int) (1.2 * serializedSize)];
+                        reader.readFully(bytes, 0, serializedSize);
+                        claimedCRC32 = reader.readLong();
+                    }
+                    catch(EOFException eof)
+                    {
+                        break; // last CL entry didn't get completely written.  that's ok.
+                    }
+
+                    checksum.update(bytes, 0, serializedSize);
+                    if (claimedCRC32 != checksum.getValue())
+                    {
+                        // this entry must not have been fsynced.  probably the rest is bad too,
+                        // but just in case there is no harm in trying them (since we still read on an entry boundary)
+                        continue;
+                    }
+
+                    /* deserialize the commit log entry */
+                    ByteArrayInputStream bufIn = new ByteArrayInputStream(bytes, 0, serializedSize);
+                    final RowMutation rm = RowMutation.serializer().deserialize(new DataInputStream(bufIn));
+                    if (logger.isDebugEnabled())
+                        logger.debug(String.format("replaying mutation for %s.%s: %s",
+                                                    rm.getTable(),
+                                                    rm.key(),
+                                                    "{" + StringUtils.join(rm.getColumnFamilies(), ", ") + "}"));
+                    final Table table = Table.open(rm.getTable());
+                    tablesRecovered.add(table);
+                    final Collection<ColumnFamily> columnFamilies = new ArrayList<ColumnFamily>(rm.getColumnFamilies());
+                    final long entryLocation = reader.getFilePointer();
+                    final CommitLogHeader finalHeader = clHeader;
+                    Runnable runnable = new WrappedRunnable()
+                    {
+                        public void runMayThrow() throws IOException
                         {
-                            Table.open(newRm.getTable()).apply(newRm, null, false);
+                            RowMutation newRm = new RowMutation(rm.getTable(), rm.key());
+
+                            // Rebuild the row mutation, omitting column families that a) have already been flushed,
+                            // b) are part of a cf that was dropped. Keep in mind that the cf.name() is suspect. do every
+                            // thing based on the cfid instead.
+                            for (ColumnFamily columnFamily : columnFamilies)
+                            {
+                                if (CFMetaData.getCF(columnFamily.id()) == null)
+                                    // null means the cf has been dropped
+                                    continue;
+
+                                if (finalHeader == null || (finalHeader.isDirty(columnFamily.id()) && entryLocation >= finalHeader.getPosition(columnFamily.id())))
+                                    newRm.add(columnFamily);
+                            }
+                            if (!newRm.isEmpty())
+                            {
+                                Table.open(newRm.getTable()).apply(newRm, null, false);
+                            }
                         }
+                    };
+                    futures.add(StageManager.getStage(StageManager.MUTATION_STAGE).submit(runnable));
+                    if (futures.size() > MAX_OUTSTANDING_REPLAY_COUNT)
+                    {
+                        FBUtilities.waitOnFutures(futures);
+                        futures.clear();
                     }
-                };
-                futures.add(StageManager.getStage(StageManager.MUTATION_STAGE).submit(runnable));
-                if (futures.size() > MAX_OUTSTANDING_REPLAY_COUNT)
-                {
-                    FBUtilities.waitOnFutures(futures);
-                    futures.clear();
                 }
             }
-            reader.close();
-            logger.info("Finished reading " + file);
+            finally
+            {
+                reader.close();
+                logger.info("Finished reading " + file);
+            }
         }
 
         // wait for all the writes to finish on the mutation stage