You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2010/08/03 17:47:29 UTC
svn commit: r981938 -
/cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
Author: jbellis
Date: Tue Aug 3 15:47:29 2010
New Revision: 981938
URL: http://svn.apache.org/viewvc?rev=981938&view=rev
Log:
close commitlog reader before deleting it
patch by jbellis; reviewed by gdusbabek for CASSANDRA-1348
Modified:
cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLog.java?rev=981938&r1=981937&r2=981938&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLog.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLog.java Tue Aug 3 15:47:29 2010
@@ -187,115 +187,122 @@ public class CommitLog
for (File file : clogs)
{
- CommitLogHeader clHeader = null;
- int bufferSize = (int)Math.min(file.length(), 32 * 1024 * 1024);
- BufferedRandomAccessFile reader = new BufferedRandomAccessFile(file.getAbsolutePath(), "r", bufferSize);
-
- int replayPosition = 0;
- String headerPath = CommitLogHeader.getHeaderPathFromSegmentPath(file.getAbsolutePath());
+ BufferedRandomAccessFile reader = null;
try
{
- clHeader = CommitLogHeader.readCommitLogHeader(headerPath);
- replayPosition = clHeader.getReplayPosition();
- }
- catch (IOException ioe)
- {
- logger.info(headerPath + " incomplete, missing or corrupt. Everything is ok, don't panic. CommitLog will be replayed from the beginning");
- logger.debug("exception was", ioe);
- }
- if (replayPosition < 0)
- {
- logger.debug("skipping replay of fully-flushed {}", file);
- continue;
- }
- reader.seek(replayPosition);
+ CommitLogHeader clHeader = null;
+ int bufferSize = (int)Math.min(file.length(), 32 * 1024 * 1024);
+ reader = new BufferedRandomAccessFile(file.getAbsolutePath(), "r", bufferSize);
- if (logger.isDebugEnabled())
- logger.debug("Replaying " + file + " starting at " + reader.getFilePointer());
-
- /* read the logs populate RowMutation and apply */
- while (!reader.isEOF())
- {
- if (logger.isDebugEnabled())
- logger.debug("Reading mutation at " + reader.getFilePointer());
-
- long claimedCRC32;
-
- Checksum checksum = new CRC32();
- int serializedSize;
+ int replayPosition = 0;
+ String headerPath = CommitLogHeader.getHeaderPathFromSegmentPath(file.getAbsolutePath());
try
{
- // any of the reads may hit EOF
- serializedSize = reader.readInt();
- long claimedSizeChecksum = reader.readLong();
- checksum.update(serializedSize);
- if (checksum.getValue() != claimedSizeChecksum || serializedSize <= 0)
- break; // entry wasn't synced correctly/fully. that's ok.
-
- if (serializedSize > bytes.length)
- bytes = new byte[(int) (1.2 * serializedSize)];
- reader.readFully(bytes, 0, serializedSize);
- claimedCRC32 = reader.readLong();
+ clHeader = CommitLogHeader.readCommitLogHeader(headerPath);
+ replayPosition = clHeader.getReplayPosition();
}
- catch(EOFException eof)
+ catch (IOException ioe)
{
- break; // last CL entry didn't get completely written. that's ok.
+ logger.info(headerPath + " incomplete, missing or corrupt. Everything is ok, don't panic. CommitLog will be replayed from the beginning");
+ logger.debug("exception was", ioe);
}
-
- checksum.update(bytes, 0, serializedSize);
- if (claimedCRC32 != checksum.getValue())
+ if (replayPosition < 0)
{
- // this entry must not have been fsynced. probably the rest is bad too,
- // but just in case there is no harm in trying them (since we still read on an entry boundary)
+ logger.debug("skipping replay of fully-flushed {}", file);
continue;
}
+ reader.seek(replayPosition);
- /* deserialize the commit log entry */
- ByteArrayInputStream bufIn = new ByteArrayInputStream(bytes, 0, serializedSize);
- final RowMutation rm = RowMutation.serializer().deserialize(new DataInputStream(bufIn));
if (logger.isDebugEnabled())
- logger.debug(String.format("replaying mutation for %s.%s: %s",
- rm.getTable(),
- rm.key(),
- "{" + StringUtils.join(rm.getColumnFamilies(), ", ") + "}"));
- final Table table = Table.open(rm.getTable());
- tablesRecovered.add(table);
- final Collection<ColumnFamily> columnFamilies = new ArrayList<ColumnFamily>(rm.getColumnFamilies());
- final long entryLocation = reader.getFilePointer();
- final CommitLogHeader finalHeader = clHeader;
- Runnable runnable = new WrappedRunnable()
+ logger.debug("Replaying " + file + " starting at " + reader.getFilePointer());
+
+ /* read the logs populate RowMutation and apply */
+ while (!reader.isEOF())
{
- public void runMayThrow() throws IOException
+ if (logger.isDebugEnabled())
+ logger.debug("Reading mutation at " + reader.getFilePointer());
+
+ long claimedCRC32;
+
+ Checksum checksum = new CRC32();
+ int serializedSize;
+ try
{
- RowMutation newRm = new RowMutation(rm.getTable(), rm.key());
-
- // Rebuild the row mutation, omitting column families that a) have already been flushed,
- // b) are part of a cf that was dropped. Keep in mind that the cf.name() is suspect. do every
- // thing based on the cfid instead.
- for (ColumnFamily columnFamily : columnFamilies)
- {
- if (CFMetaData.getCF(columnFamily.id()) == null)
- // null means the cf has been dropped
- continue;
-
- if (finalHeader == null || (finalHeader.isDirty(columnFamily.id()) && entryLocation >= finalHeader.getPosition(columnFamily.id())))
- newRm.add(columnFamily);
- }
- if (!newRm.isEmpty())
+ // any of the reads may hit EOF
+ serializedSize = reader.readInt();
+ long claimedSizeChecksum = reader.readLong();
+ checksum.update(serializedSize);
+ if (checksum.getValue() != claimedSizeChecksum || serializedSize <= 0)
+ break; // entry wasn't synced correctly/fully. that's ok.
+
+ if (serializedSize > bytes.length)
+ bytes = new byte[(int) (1.2 * serializedSize)];
+ reader.readFully(bytes, 0, serializedSize);
+ claimedCRC32 = reader.readLong();
+ }
+ catch(EOFException eof)
+ {
+ break; // last CL entry didn't get completely written. that's ok.
+ }
+
+ checksum.update(bytes, 0, serializedSize);
+ if (claimedCRC32 != checksum.getValue())
+ {
+ // this entry must not have been fsynced. probably the rest is bad too,
+ // but just in case there is no harm in trying them (since we still read on an entry boundary)
+ continue;
+ }
+
+ /* deserialize the commit log entry */
+ ByteArrayInputStream bufIn = new ByteArrayInputStream(bytes, 0, serializedSize);
+ final RowMutation rm = RowMutation.serializer().deserialize(new DataInputStream(bufIn));
+ if (logger.isDebugEnabled())
+ logger.debug(String.format("replaying mutation for %s.%s: %s",
+ rm.getTable(),
+ rm.key(),
+ "{" + StringUtils.join(rm.getColumnFamilies(), ", ") + "}"));
+ final Table table = Table.open(rm.getTable());
+ tablesRecovered.add(table);
+ final Collection<ColumnFamily> columnFamilies = new ArrayList<ColumnFamily>(rm.getColumnFamilies());
+ final long entryLocation = reader.getFilePointer();
+ final CommitLogHeader finalHeader = clHeader;
+ Runnable runnable = new WrappedRunnable()
+ {
+ public void runMayThrow() throws IOException
{
- Table.open(newRm.getTable()).apply(newRm, null, false);
+ RowMutation newRm = new RowMutation(rm.getTable(), rm.key());
+
+ // Rebuild the row mutation, omitting column families that a) have already been flushed,
+ // b) are part of a cf that was dropped. Keep in mind that the cf.name() is suspect. do every
+ // thing based on the cfid instead.
+ for (ColumnFamily columnFamily : columnFamilies)
+ {
+ if (CFMetaData.getCF(columnFamily.id()) == null)
+ // null means the cf has been dropped
+ continue;
+
+ if (finalHeader == null || (finalHeader.isDirty(columnFamily.id()) && entryLocation >= finalHeader.getPosition(columnFamily.id())))
+ newRm.add(columnFamily);
+ }
+ if (!newRm.isEmpty())
+ {
+ Table.open(newRm.getTable()).apply(newRm, null, false);
+ }
}
+ };
+ futures.add(StageManager.getStage(StageManager.MUTATION_STAGE).submit(runnable));
+ if (futures.size() > MAX_OUTSTANDING_REPLAY_COUNT)
+ {
+ FBUtilities.waitOnFutures(futures);
+ futures.clear();
}
- };
- futures.add(StageManager.getStage(StageManager.MUTATION_STAGE).submit(runnable));
- if (futures.size() > MAX_OUTSTANDING_REPLAY_COUNT)
- {
- FBUtilities.waitOnFutures(futures);
- futures.clear();
}
}
- reader.close();
- logger.info("Finished reading " + file);
+ finally
+ {
+ reader.close();
+ logger.info("Finished reading " + file);
+ }
}
// wait for all the writes to finish on the mutation stage