You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/08/02 00:30:15 UTC
svn commit: r799942 - in
/incubator/cassandra/trunk/src/java/org/apache/cassandra: db/CommitLog.java
io/AbstractWriter.java io/BufferedRandomAccessFile.java io/IFileReader.java
io/SSTableReader.java io/SequenceFile.java
Author: jbellis
Date: Sat Aug 1 22:30:14 2009
New Revision: 799942
URL: http://svn.apache.org/viewvc?rev=799942&view=rev
Log:
make CommitLog use a raw BufferedRandomAccessFile instead of overloading AbstractWriter/IFileReader unnecessarily. the set of AW features used by both CommitLog and SSTable was nil.
patch by jbellis; reviewed by Stu Hood for CASSANDRA-330
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/AbstractWriter.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/BufferedRandomAccessFile.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/IFileReader.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableReader.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SequenceFile.java
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java?rev=799942&r1=799941&r2=799942&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java Sat Aug 1 22:30:14 2009
@@ -126,9 +126,9 @@
return Long.parseLong(entries[entries.length - 2]);
}
- private static AbstractWriter createWriter(String file) throws IOException
+ private static BufferedRandomAccessFile createWriter(String file) throws IOException
{
- return SequenceFile.writer(file);
+ return new BufferedRandomAccessFile(file, "rw");
}
static CommitLog open() throws IOException
@@ -156,7 +156,7 @@
private String logFile_;
/* header for current commit log */
private CommitLogHeader clHeader_;
- private AbstractWriter logWriter_;
+ private BufferedRandomAccessFile logWriter_;
/*
* Generates a file name of the format CommitLog-<table>-<timestamp>.log in the
@@ -203,11 +203,11 @@
return logFile_;
}
- private CommitLogHeader readCommitLogHeader(IFileReader logReader) throws IOException
+ private CommitLogHeader readCommitLogHeader(BufferedRandomAccessFile logReader) throws IOException
{
int size = (int)logReader.readLong();
byte[] bytes = new byte[size];
- logReader.readDirect(bytes);
+ logReader.read(bytes);
ByteArrayInputStream byteStream = new ByteArrayInputStream(bytes);
return CommitLogHeader.serializer().deserialize(new DataInputStream(byteStream));
}
@@ -217,7 +217,7 @@
*/
private static void writeCommitLogHeader(String commitLogFileName, byte[] bytes) throws IOException
{
- AbstractWriter logWriter = CommitLog.createWriter(commitLogFileName);
+ RandomAccessFile logWriter = CommitLog.createWriter(commitLogFileName);
writeCommitLogHeader(logWriter, bytes);
logWriter.close();
}
@@ -236,7 +236,7 @@
/** writes header at the beginning of the file, then seeks back to current position */
private void seekAndWriteCommitLogHeader(byte[] bytes) throws IOException
{
- long currentPos = logWriter_.getCurrentPosition();
+ long currentPos = logWriter_.getFilePointer();
logWriter_.seek(0);
writeCommitLogHeader(logWriter_, bytes);
@@ -244,10 +244,10 @@
logWriter_.seek(currentPos);
}
- private static void writeCommitLogHeader(AbstractWriter logWriter, byte[] bytes) throws IOException
+ private static void writeCommitLogHeader(RandomAccessFile logWriter, byte[] bytes) throws IOException
{
logWriter.writeLong(bytes.length);
- logWriter.writeDirect(bytes);
+ logWriter.write(bytes);
}
void recover(File[] clogs) throws IOException
@@ -256,7 +256,7 @@
for (File file : clogs)
{
- IFileReader reader = SequenceFile.reader(file.getAbsolutePath());
+ BufferedRandomAccessFile reader = new BufferedRandomAccessFile(file.getAbsolutePath(), "r");
CommitLogHeader clHeader = readCommitLogHeader(reader);
/* seek to the lowest position */
int lowPos = CommitLogHeader.getLowestPosition(clHeader);
@@ -272,13 +272,13 @@
Set<Table> tablesRecovered = new HashSet<Table>();
/* read the logs populate RowMutation and apply */
- while ( !reader.isEOF() )
+ while (reader.getFilePointer() < reader.length())
{
byte[] bytes;
try
{
- bytes = new byte[(int)reader.readLong()];
- reader.readDirect(bytes);
+ bytes = new byte[(int) reader.readLong()];
+ reader.read(bytes);
}
catch (EOFException e)
{
@@ -296,16 +296,18 @@
for (ColumnFamily columnFamily : columnFamilies)
{
/* TODO: Remove this to not process Hints */
- if ( !DatabaseDescriptor.isApplicationColumnFamily(columnFamily.name()) )
+ if (!DatabaseDescriptor.isApplicationColumnFamily(columnFamily.name()))
{
row.removeColumnFamily(columnFamily);
continue;
}
int id = table.getColumnFamilyId(columnFamily.name());
- if ( !clHeader.isDirty(id) || reader.getCurrentPosition() < clHeader.getPosition(id) )
+ if (!clHeader.isDirty(id) || reader.getFilePointer() < clHeader.getPosition(id))
+ {
row.removeColumnFamily(columnFamily);
+ }
}
- if ( !row.isEmpty() )
+ if (!row.isEmpty())
{
table.applyNow(row);
}
@@ -331,7 +333,7 @@
int id = table.getColumnFamilyId(columnFamily.name());
if (!clHeader_.isDirty(id) || (clHeader_.isDirty(id) && clHeader_.getPosition(id) == 0))
{
- clHeader_.turnOn(id, logWriter_.getCurrentPosition());
+ clHeader_.turnOn(id, logWriter_.getFilePointer());
seekAndWriteCommitLogHeader(clHeader_.toByteArray());
}
}
@@ -343,7 +345,7 @@
{
public CommitLogContext call() throws Exception
{
- return new CommitLogContext(logFile_, logWriter_.getCurrentPosition());
+ return new CommitLogContext(logFile_, logWriter_.getFilePointer());
}
};
try
@@ -499,11 +501,11 @@
private boolean maybeRollLog() throws IOException
{
- if (logWriter_.getFileSize() >= SEGMENT_SIZE)
+ if (logWriter_.length() >= SEGMENT_SIZE)
{
/* Rolls the current log file over to a new one. */
setNextFileName();
- String oldLogFile = logWriter_.getFileName();
+ String oldLogFile = logWriter_.getPath();
logWriter_.close();
/* point reader/writer to a new commit log file. */
@@ -542,12 +544,12 @@
{
/* serialize the row */
Row.serializer().serialize(row, cfBuffer);
- currentPosition = logWriter_.getCurrentPosition();
+ currentPosition = logWriter_.getFilePointer();
CommitLogContext cLogCtx = new CommitLogContext(logFile_, currentPosition);
/* Update the header */
maybeUpdateHeader(row);
logWriter_.writeLong(cfBuffer.getLength());
- logWriter_.append(cfBuffer);
+ logWriter_.write(cfBuffer.getData(), 0, cfBuffer.getLength());
maybeRollLog();
return cLogCtx;
}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/io/AbstractWriter.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/AbstractWriter.java?rev=799942&r1=799941&r2=799942&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/io/AbstractWriter.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/io/AbstractWriter.java Sat Aug 1 22:30:14 2009
@@ -39,21 +39,6 @@
public abstract void seek(long position) throws IOException;
/**
- * Appends the buffer to the the underlying SequenceFile.
- * @param buffer buffer which contains the serialized data.
- * @throws java.io.IOException
- */
- public abstract void append(DataOutputBuffer buffer) throws IOException;
-
- /**
- * Appends the key and the value to the the underlying SequenceFile.
- * @param keyBuffer buffer which contains the serialized key.
- * @param buffer buffer which contains the serialized data.
- * @throws java.io.IOException
- */
- public abstract void append(DataOutputBuffer keyBuffer, DataOutputBuffer buffer) throws IOException;
-
- /**
* Appends the key and the value to the the underlying SequenceFile.
* @param key key associated with this peice of data.
* @param buffer buffer containing the serialized data.
@@ -70,48 +55,12 @@
public abstract void append(String key, byte[] value) throws IOException;
/**
- * Appends the key and the long value to the the underlying SequenceFile.
- * This is used in the contruction of the index file associated with a
- * SSTable.
- * @param key key associated with this peice of data.
- * @param value value associated with this key.
- * @throws java.io.IOException
- */
- public abstract void append(String key, long value) throws IOException;
-
- /**
- * Be extremely careful while using this API. This currently
- * used to write the commit log header in the commit logs.
- * If not used carefully it could completely screw up reads
- * of other key/value pairs that are written.
- *
- * @param bytes serialized version of the commit log header.
- * @throws java.io.IOException
- */
- public abstract long writeDirect(byte[] bytes) throws IOException;
-
- /**
- * Write a long into the underlying sub system.
- * @param value long to be written
- * @throws java.io.IOException
- */
- public abstract void writeLong(long value) throws IOException;
-
- /**
* Close the file which is being used for the write.
* @throws java.io.IOException
*/
public abstract void close() throws IOException;
/**
- * Close the file after appending the passed in footer information.
- * @param footer footer information.
- * @param size size of the footer.
- * @throws java.io.IOException
- */
- public abstract void close(byte[] footer, int size) throws IOException;
-
- /**
* @return the size of the file.
* @throws java.io.IOException
*/
@@ -162,25 +111,6 @@
file_.seek(position);
}
- public void append(DataOutputBuffer buffer) throws IOException
- {
- file_.write(buffer.getData(), 0, buffer.getLength());
- }
-
- public void append(DataOutputBuffer keyBuffer, DataOutputBuffer buffer) throws IOException
- {
- int keyBufLength = keyBuffer.getLength();
- if (keyBuffer == null || keyBufLength == 0)
- throw new IllegalArgumentException("Key cannot be NULL or of zero length.");
-
- file_.writeInt(keyBufLength);
- file_.write(keyBuffer.getData(), 0, keyBufLength);
-
- int length = buffer.getLength();
- file_.writeInt(length);
- file_.write(buffer.getData(), 0, length);
- }
-
public void append(String key, DataOutputBuffer buffer) throws IOException
{
if (key == null)
@@ -202,46 +132,12 @@
file_.write(value);
}
- public void append(String key, long value) throws IOException
- {
- if (key == null)
- throw new IllegalArgumentException("Key cannot be NULL.");
-
- file_.writeUTF(key);
- file_.writeLong(value);
- }
-
- /**
- * Be extremely careful while using this API. This currently
- * used to write the commit log header in the commit logs.
- * If not used carefully it could completely screw up reads
- * of other key/value pairs that are written.
- *
- * @param bytes the bytes to write
- */
- public long writeDirect(byte[] bytes) throws IOException
- {
- file_.write(bytes);
- return file_.getFilePointer();
- }
-
- public void writeLong(long value) throws IOException
- {
- file_.writeLong(value);
- }
-
public void close() throws IOException
{
sync();
file_.close();
}
- public void close(byte[] footer, int size) throws IOException
- {
- file_.writeInt(size);
- file_.write(footer, 0, size);
- }
-
public String getFileName()
{
return filename_;
@@ -282,5 +178,11 @@
file.createNewFile();
}
}
+
+ @Override
+ public void sync() throws IOException
+ {
+ ((BufferedRandomAccessFile)file_).sync();
+ }
}
}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/io/BufferedRandomAccessFile.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/BufferedRandomAccessFile.java?rev=799942&r1=799941&r2=799942&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/io/BufferedRandomAccessFile.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/io/BufferedRandomAccessFile.java Sat Aug 1 22:30:14 2009
@@ -42,6 +42,8 @@
static final int LogBuffSz_ = 16; // 64K buffer
public static final int BuffSz_ = (1 << LogBuffSz_);
static final long BuffMask_ = ~(((long) BuffSz_) - 1L);
+
+ private String path_;
/*
* This implementation is based on the buffer implementation in Modula-3's
@@ -54,59 +56,59 @@
private long maxHi_; // this.lo + this.buff.length
private boolean hitEOF_; // buffer contains last file block?
private long diskPos_; // disk position
-
+
/*
- * To describe the above fields, we introduce the following abstractions for
- * the file "f":
- *
- * len(f) the length of the file curr(f) the current position in the file
- * c(f) the abstract contents of the file disk(f) the contents of f's
- * backing disk file closed(f) true iff the file is closed
- *
- * "curr(f)" is an index in the closed interval [0, len(f)]. "c(f)" is a
- * character sequence of length "len(f)". "c(f)" and "disk(f)" may differ if
- * "c(f)" contains unflushed writes not reflected in "disk(f)". The flush
- * operation has the effect of making "disk(f)" identical to "c(f)".
- *
- * A file is said to be *valid* if the following conditions hold:
- *
- * V1. The "closed" and "curr" fields are correct:
- *
- * f.closed == closed(f) f.curr == curr(f)
- *
- * V2. The current position is either contained in the buffer, or just past
- * the buffer:
- *
- * f.lo <= f.curr <= f.hi
- *
- * V3. Any (possibly) unflushed characters are stored in "f.buff":
- *
- * (forall i in [f.lo, f.curr): c(f)[i] == f.buff[i - f.lo])
- *
- * V4. For all characters not covered by V3, c(f) and disk(f) agree:
- *
- * (forall i in [f.lo, len(f)): i not in [f.lo, f.curr) => c(f)[i] ==
- * disk(f)[i])
- *
- * V5. "f.dirty" is true iff the buffer contains bytes that should be
- * flushed to the file; by V3 and V4, only part of the buffer can be dirty.
- *
- * f.dirty == (exists i in [f.lo, f.curr): c(f)[i] != f.buff[i - f.lo])
- *
- * V6. this.maxHi == this.lo + this.buff.length
- *
- * Note that "f.buff" can be "null" in a valid file, since the range of
- * characters in V3 is empty when "f.lo == f.curr".
- *
- * A file is said to be *ready* if the buffer contains the current position,
- * i.e., when:
- *
- * R1. !f.closed && f.buff != null && f.lo <= f.curr && f.curr < f.hi
- *
- * When a file is ready, reading or writing a single byte can be performed
- * by reading or writing the in-memory buffer without performing a disk
- * operation.
- */
+ * To describe the above fields, we introduce the following abstractions for
+ * the file "f":
+ *
+ * len(f) the length of the file curr(f) the current position in the file
+ * c(f) the abstract contents of the file disk(f) the contents of f's
+ * backing disk file closed(f) true iff the file is closed
+ *
+ * "curr(f)" is an index in the closed interval [0, len(f)]. "c(f)" is a
+ * character sequence of length "len(f)". "c(f)" and "disk(f)" may differ if
+ * "c(f)" contains unflushed writes not reflected in "disk(f)". The flush
+ * operation has the effect of making "disk(f)" identical to "c(f)".
+ *
+ * A file is said to be *valid* if the following conditions hold:
+ *
+ * V1. The "closed" and "curr" fields are correct:
+ *
+ * f.closed == closed(f) f.curr == curr(f)
+ *
+ * V2. The current position is either contained in the buffer, or just past
+ * the buffer:
+ *
+ * f.lo <= f.curr <= f.hi
+ *
+ * V3. Any (possibly) unflushed characters are stored in "f.buff":
+ *
+ * (forall i in [f.lo, f.curr): c(f)[i] == f.buff[i - f.lo])
+ *
+ * V4. For all characters not covered by V3, c(f) and disk(f) agree:
+ *
+ * (forall i in [f.lo, len(f)): i not in [f.lo, f.curr) => c(f)[i] ==
+ * disk(f)[i])
+ *
+ * V5. "f.dirty" is true iff the buffer contains bytes that should be
+ * flushed to the file; by V3 and V4, only part of the buffer can be dirty.
+ *
+ * f.dirty == (exists i in [f.lo, f.curr): c(f)[i] != f.buff[i - f.lo])
+ *
+ * V6. this.maxHi == this.lo + this.buff.length
+ *
+ * Note that "f.buff" can be "null" in a valid file, since the range of
+ * characters in V3 is empty when "f.lo == f.curr".
+ *
+ * A file is said to be *ready* if the buffer contains the current position,
+ * i.e., when:
+ *
+ * R1. !f.closed && f.buff != null && f.lo <= f.curr && f.curr < f.hi
+ *
+ * When a file is ready, reading or writing a single byte can be performed
+ * by reading or writing the in-memory buffer without performing a disk
+ * operation.
+ */
/**
* Open a new <code>BufferedRandomAccessFile</code> on <code>file</code>
@@ -115,13 +117,13 @@
*/
public BufferedRandomAccessFile(File file, String mode) throws IOException
{
- super(file, mode);
- this.init(0);
+ this(file, mode, 0);
}
public BufferedRandomAccessFile(File file, String mode, int size) throws IOException
{
super(file, mode);
+ path_ = file.getAbsolutePath();
this.init(size);
}
@@ -132,13 +134,13 @@
*/
public BufferedRandomAccessFile(String name, String mode) throws IOException
{
- super(name, mode);
- this.init(0);
+ this(name, mode, 0);
}
public BufferedRandomAccessFile(String name, String mode, int size) throws FileNotFoundException
{
super(name, mode);
+ path_ = name;
this.init(size);
}
@@ -151,7 +153,18 @@
this.hitEOF_ = false;
this.diskPos_ = 0L;
}
-
+
+ public String getPath()
+ {
+ return path_;
+ }
+
+ public void sync() throws IOException
+ {
+ flush();
+ getChannel().force(true);
+ }
+
public void close() throws IOException
{
this.flush();
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/io/IFileReader.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/IFileReader.java?rev=799942&r1=799941&r2=799942&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/io/IFileReader.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/io/IFileReader.java Sat Aug 1 22:30:14 2009
@@ -32,27 +32,10 @@
public String getFileName();
public long getEOF() throws IOException;
public long getCurrentPosition() throws IOException;
- public boolean isHealthyFileDescriptor() throws IOException;
public void seek(long position) throws IOException;
public boolean isEOF() throws IOException;
/**
- * Be extremely careful while using this API. This currently
- * used to read the commit log header from the commit logs.
- * Treat this as an internal API.
- *
- * @param bytes read into this byte array.
- */
- public void readDirect(byte[] bytes) throws IOException;
-
- /**
- * Read a long value from the underlying sub system.
- * @return value read
- * @throws IOException
- */
- public long readLong() throws IOException;
-
- /**
* This method dumps the next key/value into the DataOuputStream
* passed in.
*
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableReader.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableReader.java?rev=799942&r1=799941&r2=799942&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableReader.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableReader.java Sat Aug 1 22:30:14 2009
@@ -292,7 +292,7 @@
IFileReader dataReader = null;
try
{
- dataReader = SequenceFile.reader(dataFile);
+ dataReader = SequenceFile.bufferedReader(dataFile, 64 * 1024);
String decoratedKey = partitioner.decorateKey(clientKey);
long position = getPosition(decoratedKey, partitioner);
@@ -325,7 +325,7 @@
*/
public ColumnGroupReader getColumnGroupReader(String key, String cfName, byte[] startColumn, boolean isAscending) throws IOException
{
- IFileReader dataReader = SequenceFile.reader(dataFile);
+ IFileReader dataReader = SequenceFile.bufferedReader(dataFile, 64 * 1024);
try
{
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SequenceFile.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SequenceFile.java?rev=799942&r1=799941&r2=799942&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SequenceFile.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SequenceFile.java Sat Aug 1 22:30:14 2009
@@ -473,11 +473,6 @@
return file_.getFilePointer();
}
- public boolean isHealthyFileDescriptor() throws IOException
- {
- return file_.getFD().valid();
- }
-
public void seek(long position) throws IOException
{
file_.seek(position);
@@ -488,23 +483,6 @@
return (getCurrentPosition() == getEOF());
}
- /**
- * Be extremely careful while using this API. This currently
- * used to read the commit log header from the commit logs.
- * Treat this as an internal API.
- *
- * @param bytes read from the buffer into the this array
- */
- public void readDirect(byte[] bytes) throws IOException
- {
- file_.readFully(bytes);
- }
-
- public long readLong() throws IOException
- {
- return file_.readLong();
- }
-
public void close() throws IOException
{
file_.close();
@@ -531,21 +509,11 @@
public static final short utfPrefix_ = 2;
public static final String marker_ = "Bloom-Filter";
- public static AbstractWriter writer(String filename) throws IOException
- {
- return new AbstractWriter.Writer(filename);
- }
-
public static AbstractWriter bufferedWriter(String filename, int size) throws IOException
{
return new AbstractWriter.BufferWriter(filename, size);
}
- public static IFileReader reader(String filename) throws IOException
- {
- return new Reader(filename);
- }
-
public static IFileReader bufferedReader(String filename, int size) throws IOException
{
return new BufferReader(filename, size);