You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/08/02 00:30:41 UTC
svn commit: r799944 - in /incubator/cassandra/trunk:
src/java/org/apache/cassandra/db/ src/java/org/apache/cassandra/db/filter/
src/java/org/apache/cassandra/io/ src/java/org/apache/cassandra/utils/
test/unit/org/apache/cassandra/db/ test/unit/org/apac...
Author: jbellis
Date: Sat Aug 1 22:30:40 2009
New Revision: 799944
URL: http://svn.apache.org/viewvc?rev=799944&view=rev
Log:
r/m SF.next(DataOutput) in favor of more-efficient IteratingRow.
patch by jbellis; reviewed by Stu Hood for CASSANDRA-330
Added:
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/ICompactSerializer2.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/IteratingRow.java
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamily.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnSerializer.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CompactSerializerInvocationHandler.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/SuperColumn.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/filter/SSTableNamesIterator.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/filter/SSTableSliceIterator.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/BufferedRandomAccessFile.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/FileStruct.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/IFileReader.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableReader.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SequenceFile.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/utils/FBUtilities.java
incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/OneCompactionTest.java
incubator/cassandra/trunk/test/unit/org/apache/cassandra/io/SSTableTest.java
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamily.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamily.java?rev=799944&r1=799943&r2=799944&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamily.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamily.java Sat Aug 1 22:30:40 2009
@@ -18,9 +18,7 @@
package org.apache.cassandra.db;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
+import java.io.*;
import java.lang.reflect.Proxy;
import java.util.Collection;
import java.util.HashMap;
@@ -37,6 +35,8 @@
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.io.ICompactSerializer;
+import org.apache.cassandra.io.ICompactSerializer2;
+import org.apache.cassandra.io.BufferedRandomAccessFile;
import org.apache.cassandra.db.filter.QueryPath;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.db.marshal.MarshalException;
@@ -48,7 +48,7 @@
public final class ColumnFamily
{
/* The column serializer for this Column Family. Create based on config. */
- private static ICompactSerializer<ColumnFamily> serializer_;
+ private static ColumnFamilySerializer serializer_ = new ColumnFamilySerializer();
public static final short utfPrefix_ = 2;
private static Logger logger_ = Logger.getLogger( ColumnFamily.class );
@@ -58,13 +58,12 @@
static
{
- serializer_ = new ColumnFamilySerializer();
/* TODO: These are the various column types. Hard coded for now. */
columnTypes_.put("Standard", "Standard");
columnTypes_.put("Super", "Super");
}
- public static ICompactSerializer<ColumnFamily> serializer()
+ public static ColumnFamilySerializer serializer()
{
return serializer_;
}
@@ -73,9 +72,9 @@
* This method returns the serializer whose methods are
* preprocessed by a dynamic proxy.
*/
- public static ICompactSerializer<ColumnFamily> serializerWithIndexes()
+ public static ICompactSerializer2<ColumnFamily> serializerWithIndexes()
{
- return (ICompactSerializer<ColumnFamily>)Proxy.newProxyInstance( ColumnFamily.class.getClassLoader(), new Class[]{ICompactSerializer.class}, new CompactSerializerInvocationHandler<ColumnFamily>(serializer_) );
+ return (ICompactSerializer2<ColumnFamily>)Proxy.newProxyInstance( ColumnFamily.class.getClassLoader(), new Class[]{ICompactSerializer2.class}, new CompactSerializerInvocationHandler<ColumnFamily>(serializer_) );
}
public static String getColumnType(String key)
@@ -95,7 +94,7 @@
private String name_;
- private transient ICompactSerializer<IColumn> columnSerializer_;
+ private transient ICompactSerializer2<IColumn> columnSerializer_;
private long markedForDeleteAt = Long.MIN_VALUE;
private int localDeletionTime = Integer.MIN_VALUE;
private AtomicInteger size_ = new AtomicInteger(0);
@@ -146,7 +145,7 @@
}
}
- public ICompactSerializer<IColumn> getColumnSerializer()
+ public ICompactSerializer2<IColumn> getColumnSerializer()
{
return columnSerializer_;
}
@@ -433,7 +432,7 @@
return cf;
}
- public static class ColumnFamilySerializer implements ICompactSerializer<ColumnFamily>
+ public static class ColumnFamilySerializer implements ICompactSerializer2<ColumnFamily>
{
/*
* We are going to create indexes, and write out that information as well. The format
@@ -459,7 +458,7 @@
* <total number of columns>
* <columns data>
*/
- public void serialize(ColumnFamily columnFamily, DataOutputStream dos) throws IOException
+ public void serialize(ColumnFamily columnFamily, DataOutput dos) throws IOException
{
// TODO whenever we change this we need to change the code in SequenceFile to match in two places.
// This SUCKS and is inefficient to boot. let's fix this ASAP.
@@ -480,13 +479,9 @@
}
}
- public ColumnFamily deserialize(DataInputStream dis) throws IOException
+ public ColumnFamily deserialize(DataInput dis) throws IOException
{
- ColumnFamily cf = new ColumnFamily(dis.readUTF(),
- dis.readUTF(),
- readComparator(dis),
- readComparator(dis));
- cf.delete(dis.readInt(), dis.readLong());
+ ColumnFamily cf = deserializeEmpty(dis);
int size = dis.readInt();
IColumn column;
for (int i = 0; i < size; ++i)
@@ -497,7 +492,7 @@
return cf;
}
- private AbstractType readComparator(DataInputStream dis) throws IOException
+ private AbstractType readComparator(DataInput dis) throws IOException
{
String className = dis.readUTF();
if (className.equals(""))
@@ -518,6 +513,16 @@
throw new RuntimeException(e);
}
}
+
+ public ColumnFamily deserializeEmpty(DataInput input) throws IOException
+ {
+ ColumnFamily cf = new ColumnFamily(input.readUTF(),
+ input.readUTF(),
+ readComparator(input),
+ readComparator(input));
+ cf.delete(input.readInt(), input.readLong());
+ return cf;
+ }
}
}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=799944&r1=799943&r2=799944&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Sat Aug 1 22:30:40 2009
@@ -650,7 +650,7 @@
}
}
- private PriorityQueue<FileStruct> initializePriorityQueue(List<String> files, List<Range> ranges, int minBufferSize)
+ private PriorityQueue<FileStruct> initializePriorityQueue(List<String> files, List<Range> ranges, int minBufferSize) throws IOException
{
PriorityQueue<FileStruct> pq = new PriorityQueue<FileStruct>();
if (files.size() > 1 || (ranges != null && files.size() > 0))
@@ -659,31 +659,13 @@
FileStruct fs = null;
for (String file : files)
{
- try
- {
- fs = SSTableReader.get(file).getFileStruct();
- fs.advance();
- if (fs.isExhausted())
- {
- continue;
- }
- pq.add(fs);
- }
- catch (Exception ex)
+ fs = SSTableReader.get(file).getFileStruct();
+ fs.advance(true);
+ if (fs.isExhausted())
{
- logger_.warn("corrupt file? or are you just blowing away data files manually out from under me?", ex);
- try
- {
- if (fs != null)
- {
- fs.close();
- }
- }
- catch (Exception e)
- {
- logger_.error("Unable to close file :" + file);
- }
+ continue;
}
+ pq.add(fs);
}
}
return pq;
@@ -930,8 +912,8 @@
// If the compaction file path is null that means we have no space left for this compaction.
if (rangeFileLocation == null)
{
- logger_.warn("Total bytes to be written for range compaction ..."
- + expectedRangeFileSize + " is greater than the safe limit of the disk space available.");
+ logger_.error("Total bytes to be written for range compaction ..."
+ + expectedRangeFileSize + " is greater than the safe limit of the disk space available.");
return result;
}
PriorityQueue<FileStruct> pq = initializePriorityQueue(files, ranges, ColumnFamilyStore.BUFSIZE);
@@ -975,41 +957,28 @@
{
for (FileStruct filestruct : lfs)
{
- try
- {
- /* read the length although we don't need it */
- filestruct.getBufIn().readInt();
- // Skip the Index
- IndexHelper.skipBloomFilterAndIndex(filestruct.getBufIn());
- // We want to add only 2 and resolve them right there in order to save on memory footprint
- if (columnFamilies.size() > 1)
- {
- // Now merge the 2 column families
- merge(columnFamilies);
- }
- // deserialize into column families
- columnFamilies.add(ColumnFamily.serializer().deserialize(filestruct.getBufIn()));
- }
- catch (Exception ex)
+ // We want to add only 2 and resolve them right there in order to save on memory footprint
+ if (columnFamilies.size() > 1)
{
- logger_.warn(LogUtil.throwableToString(ex));
+ // Now merge the 2 column families
+ merge(columnFamilies);
}
+ // deserialize into column families
+ columnFamilies.add(filestruct.getColumnFamily());
}
// Now after merging all crap append to the sstable
columnFamily = resolveAndRemoveDeleted(columnFamilies);
columnFamilies.clear();
if (columnFamily != null)
{
- /* serialize the cf with column indexes */
ColumnFamily.serializerWithIndexes().serialize(columnFamily, bufOut);
}
}
else
{
+ // TODO deserializing only to reserialize is dumb
FileStruct filestruct = lfs.get(0);
- /* read the length although we don't need it */
- int size = filestruct.getBufIn().readInt();
- bufOut.write(filestruct.getBufIn(), size);
+ ColumnFamily.serializerWithIndexes().serialize(filestruct.getColumnFamily(), bufOut);
}
if (Range.isTokenInRanges(StorageService.getPartitioner().getInitialToken(lastkey), ranges))
{
@@ -1023,48 +992,30 @@
String fname = new File(rangeFileLocation, mergedFileName).getAbsolutePath();
rangeWriter = new SSTableWriter(fname, expectedBloomFilterSize, StorageService.getPartitioner());
}
- try
- {
- rangeWriter.append(lastkey, bufOut);
- }
- catch (Exception ex)
- {
- logger_.warn(LogUtil.throwableToString(ex));
- }
+ rangeWriter.append(lastkey, bufOut);
}
totalkeysWritten++;
for (FileStruct filestruct : lfs)
{
- try
+ filestruct.advance(true);
+ if (filestruct.isExhausted())
{
- filestruct.advance();
+ continue;
+ }
+ /* keep on looping until we find a key in the range */
+ while (!Range.isTokenInRanges(StorageService.getPartitioner().getInitialToken(filestruct.getKey()), ranges))
+ {
+ filestruct.advance(true);
if (filestruct.isExhausted())
{
- continue;
- }
- /* keep on looping until we find a key in the range */
- while (!Range.isTokenInRanges(StorageService.getPartitioner().getInitialToken(filestruct.getKey()), ranges))
- {
- filestruct.advance();
- if (filestruct.isExhausted())
- {
- break;
- }
- }
- if (!filestruct.isExhausted())
- {
- pq.add(filestruct);
+ break;
}
- totalkeysRead++;
}
- catch (Exception ex)
+ if (!filestruct.isExhausted())
{
- // Ignore the exception as it might be a corrupted file
- // in any case we have read as far as possible from it
- // and it will be deleted after compaction.
- logger_.warn("corrupt sstable?", ex);
- filestruct.close();
+ pq.add(filestruct);
}
+ totalkeysRead++;
}
lfs.clear();
lastkey = null;
@@ -1096,11 +1047,6 @@
return result;
}
- private void doFill(BloomFilter bf, String decoratedKey)
- {
- bf.add(StorageService.getPartitioner().undecorateKey(decoratedKey));
- }
-
/*
* This function does the actual compaction for files.
* It maintains a priority queue of with the first key from each file
@@ -1176,40 +1122,27 @@
{
for (FileStruct filestruct : lfs)
{
- try
+ // We want to add only 2 and resolve them right there in order to save on memory footprint
+ if (columnFamilies.size() > 1)
{
- /* read the length although we don't need it */
- filestruct.getBufIn().readInt();
- // Skip the Index
- IndexHelper.skipBloomFilterAndIndex(filestruct.getBufIn());
- // We want to add only 2 and resolve them right there in order to save on memory footprint
- if (columnFamilies.size() > 1)
- {
- merge(columnFamilies);
- }
- // deserialize into column families
- columnFamilies.add(ColumnFamily.serializer().deserialize(filestruct.getBufIn()));
- }
- catch (Exception ex)
- {
- logger_.warn("error in filecompaction", ex);
+ merge(columnFamilies);
}
+ // deserialize into column families
+ columnFamilies.add(filestruct.getColumnFamily());
}
// Now after merging all crap append to the sstable
columnFamily = resolveAndRemoveDeleted(columnFamilies);
columnFamilies.clear();
if (columnFamily != null)
{
- /* serialize the cf with column indexes */
ColumnFamily.serializerWithIndexes().serialize(columnFamily, bufOut);
}
}
else
{
+ // TODO deserializing only to reserialize is dumb
FileStruct filestruct = lfs.get(0);
- /* read the length although we don't need it */
- int size = filestruct.getBufIn().readInt();
- bufOut.write(filestruct.getBufIn(), size);
+ ColumnFamily.serializerWithIndexes().serialize(filestruct.getColumnFamily(), bufOut);
}
if (writer == null)
@@ -1222,24 +1155,13 @@
for (FileStruct filestruct : lfs)
{
- try
+ filestruct.advance(true);
+ if (filestruct.isExhausted())
{
- filestruct.advance();
- if (filestruct.isExhausted())
- {
- continue;
- }
- pq.add(filestruct);
- totalkeysRead++;
- }
- catch (Throwable ex)
- {
- // Ignore the exception as it might be a corrupted file
- // in any case we have read as far as possible from it
- // and it will be deleted after compaction.
- logger_.warn("corrupt sstable?", ex);
- filestruct.close();
+ continue;
}
+ pq.add(filestruct);
+ totalkeysRead++;
}
lfs.clear();
lastkey = null;
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnSerializer.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnSerializer.java?rev=799944&r1=799943&r2=799944&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnSerializer.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnSerializer.java Sat Aug 1 22:30:40 2009
@@ -3,9 +3,10 @@
import java.io.*;
import org.apache.cassandra.io.ICompactSerializer;
+import org.apache.cassandra.io.ICompactSerializer2;
import org.apache.cassandra.utils.FBUtilities;
-public class ColumnSerializer implements ICompactSerializer<IColumn>
+public class ColumnSerializer implements ICompactSerializer2<IColumn>
{
public static void writeName(byte[] name, DataOutput out) throws IOException
{
@@ -26,7 +27,7 @@
return bytes;
}
- public void serialize(IColumn column, DataOutputStream dos) throws IOException
+ public void serialize(IColumn column, DataOutput dos) throws IOException
{
ColumnSerializer.writeName(column.name(), dos);
dos.writeBoolean(column.isMarkedForDelete());
@@ -34,7 +35,7 @@
FBUtilities.writeByteArray(column.value(), dos);
}
- public Column deserialize(DataInputStream dis) throws IOException
+ public Column deserialize(DataInput dis) throws IOException
{
byte[] name = ColumnSerializer.readName(dis);
boolean delete = dis.readBoolean();
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java?rev=799944&r1=799943&r2=799944&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java Sat Aug 1 22:30:40 2009
@@ -272,7 +272,7 @@
Set<Table> tablesRecovered = new HashSet<Table>();
/* read the logs populate RowMutation and apply */
- while (reader.getFilePointer() < reader.length())
+ while (!reader.isEOF())
{
byte[] bytes;
try
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CompactSerializerInvocationHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CompactSerializerInvocationHandler.java?rev=799944&r1=799943&r2=799944&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CompactSerializerInvocationHandler.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CompactSerializerInvocationHandler.java Sat Aug 1 22:30:40 2009
@@ -23,6 +23,7 @@
import org.apache.cassandra.io.DataOutputBuffer;
import org.apache.cassandra.io.ICompactSerializer;
+import org.apache.cassandra.io.ICompactSerializer2;
/*
@@ -33,9 +34,9 @@
public class CompactSerializerInvocationHandler<T> implements InvocationHandler
{
- private ICompactSerializer<T> serializer_;
+ private ICompactSerializer2<T> serializer_;
- public CompactSerializerInvocationHandler(ICompactSerializer<T> serializer)
+ public CompactSerializerInvocationHandler(ICompactSerializer2<T> serializer)
{
serializer_ = serializer;
}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/SuperColumn.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/SuperColumn.java?rev=799944&r1=799943&r2=799944&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/SuperColumn.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/SuperColumn.java Sat Aug 1 22:30:40 2009
@@ -18,10 +18,7 @@
package org.apache.cassandra.db;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.io.Serializable;
+import java.io.*;
import java.util.Collection;
import java.util.Arrays;
import java.util.concurrent.atomic.AtomicInteger;
@@ -32,6 +29,7 @@
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.io.ICompactSerializer;
+import org.apache.cassandra.io.ICompactSerializer2;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.db.marshal.MarshalException;
@@ -326,7 +324,7 @@
}
}
-class SuperColumnSerializer implements ICompactSerializer<IColumn>
+class SuperColumnSerializer implements ICompactSerializer2<IColumn>
{
private AbstractType comparator;
@@ -340,7 +338,7 @@
return comparator;
}
- public void serialize(IColumn column, DataOutputStream dos) throws IOException
+ public void serialize(IColumn column, DataOutput dos) throws IOException
{
SuperColumn superColumn = (SuperColumn)column;
ColumnSerializer.writeName(column.name(), dos);
@@ -358,12 +356,11 @@
}
}
- public IColumn deserialize(DataInputStream dis) throws IOException
+ public IColumn deserialize(DataInput dis) throws IOException
{
byte[] name = ColumnSerializer.readName(dis);
SuperColumn superColumn = new SuperColumn(name, comparator);
superColumn.markForDeleteAt(dis.readInt(), dis.readLong());
- assert dis.available() > 0;
/* read the number of columns */
int size = dis.readInt();
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/filter/SSTableNamesIterator.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/filter/SSTableNamesIterator.java?rev=799944&r1=799943&r2=799944&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/filter/SSTableNamesIterator.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/filter/SSTableNamesIterator.java Sat Aug 1 22:30:40 2009
@@ -28,7 +28,7 @@
{
dataReader = SequenceFile.bufferedReader(ssTable.getFilename(), 64 * 1024);
String decoratedKey = ssTable.getPartitioner().decorateKey(key);
- long position = ssTable.getPosition(decoratedKey, ssTable.getPartitioner());
+ long position = ssTable.getPosition(decoratedKey);
long bytesRead = dataReader.next(decoratedKey, bufOut, cfName, columns, position);
if (bytesRead != -1L)
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/filter/SSTableSliceIterator.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/filter/SSTableSliceIterator.java?rev=799944&r1=799943&r2=799944&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/filter/SSTableSliceIterator.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/filter/SSTableSliceIterator.java Sat Aug 1 22:30:40 2009
@@ -36,7 +36,7 @@
/* Morph key into actual key based on the partition type. */
String decoratedKey = ssTable.getPartitioner().decorateKey(key);
- long position = ssTable.getPosition(decoratedKey, ssTable.getPartitioner());
+ long position = ssTable.getPosition(decoratedKey);
AbstractType comparator1 = DatabaseDescriptor.getComparator(ssTable.getTableName(), cfName);
reader = new SequenceFile.ColumnGroupReader(ssTable.getFilename(), decoratedKey, cfName, comparator1, startColumn, isAscending, position);
this.comparator = comparator;
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/io/BufferedRandomAccessFile.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/BufferedRandomAccessFile.java?rev=799944&r1=799943&r2=799944&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/io/BufferedRandomAccessFile.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/io/BufferedRandomAccessFile.java Sat Aug 1 22:30:40 2009
@@ -165,6 +165,12 @@
getChannel().force(true);
}
+ public boolean isEOF() throws IOException
+ {
+ assert getFilePointer() <= length();
+ return getFilePointer() == length();
+ }
+
public void close() throws IOException
{
this.flush();
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/io/FileStruct.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/FileStruct.java?rev=799944&r1=799943&r2=799944&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/io/FileStruct.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/io/FileStruct.java Sat Aug 1 22:30:40 2009
@@ -19,12 +19,13 @@
package org.apache.cassandra.io;
import java.io.IOException;
+import java.io.DataInput;
import java.util.Iterator;
-import org.apache.cassandra.io.DataInputBuffer;
-import org.apache.cassandra.io.DataOutputBuffer;
import org.apache.cassandra.io.IFileReader;
import org.apache.cassandra.io.SSTableReader;
+import org.apache.cassandra.db.IColumn;
+import org.apache.cassandra.db.ColumnFamily;
import org.apache.log4j.Logger;
import com.google.common.collect.AbstractIterator;
@@ -34,30 +35,26 @@
{
private static Logger logger = Logger.getLogger(FileStruct.class);
- private String key = null; // decorated!
+ private IteratingRow row;
private boolean exhausted = false;
- private IFileReader reader;
- private DataInputBuffer bufIn;
- private DataOutputBuffer bufOut;
+ private BufferedRandomAccessFile file;
private SSTableReader sstable;
private FileStructIterator iterator;
FileStruct(SSTableReader sstable) throws IOException
{
- this.reader = SequenceFile.bufferedReader(sstable.getFilename(), 1024 * 1024);
+ this.file = new BufferedRandomAccessFile(sstable.getFilename(), "r", 1024 * 1024);
this.sstable = sstable;
- bufIn = new DataInputBuffer();
- bufOut = new DataOutputBuffer();
}
public String getFileName()
{
- return reader.getFileName();
+ return file.getPath();
}
public void close() throws IOException
{
- reader.close();
+ file.close();
}
public boolean isExhausted()
@@ -65,20 +62,20 @@
return exhausted;
}
- public DataInputBuffer getBufIn()
+ public String getKey()
{
- return bufIn;
+ return row.getKey();
}
- public String getKey()
+ public ColumnFamily getColumnFamily()
{
- return key;
+ return row.getEmptyColumnFamily();
}
public int compareTo(FileStruct f)
{
- return sstable.getPartitioner().getDecoratedKeyComparator().compare(key, f.key);
- }
+ return sstable.getPartitioner().getDecoratedKeyComparator().compare(getKey(), f.getKey());
+ }
public void seekTo(String seekKey)
{
@@ -90,8 +87,8 @@
exhausted = true;
return;
}
- reader.seek(position);
- advance();
+ file.seek(position);
+ advance(false);
}
catch (IOException e)
{
@@ -100,37 +97,40 @@
}
/*
- * Read the next key from the data file, skipping block indexes.
+ * Read the next key from the data file.
* Caller must check isExhausted after each call to see if further
* reads are valid.
* Do not mix with calls to the iterator interface (next/hasnext).
* @deprecated -- prefer the iterator interface.
*/
- public void advance() throws IOException
+ public void advance(boolean materialize) throws IOException
{
+ // TODO r/m materialize option -- use iterableness!
if (exhausted)
{
throw new IndexOutOfBoundsException();
}
- bufOut.reset();
- if (reader.isEOF())
+ if (file.isEOF())
{
- reader.close();
+ file.close();
exhausted = true;
return;
}
- long bytesread = reader.next(bufOut);
- if (bytesread == -1)
+ row = new IteratingRow(file);
+ if (materialize)
{
- reader.close();
- exhausted = true;
- return;
+ while (row.hasNext())
+ {
+ IColumn column = row.next();
+ row.getEmptyColumnFamily().addColumn(column);
+ }
+ }
+ else
+ {
+ row.skipRemaining();
}
-
- bufIn.reset(bufOut.getData(), bufOut.getLength());
- key = bufIn.readUTF();
}
public boolean hasNext()
@@ -157,7 +157,7 @@
{
public FileStructIterator()
{
- if (key == null)
+ if (row == null)
{
if (!isExhausted())
{
@@ -170,7 +170,7 @@
{
try
{
- advance();
+ advance(false);
}
catch (IOException e)
{
@@ -184,7 +184,7 @@
{
return endOfData();
}
- String oldKey = key;
+ String oldKey = getKey();
forward();
return oldKey;
}
Added: incubator/cassandra/trunk/src/java/org/apache/cassandra/io/ICompactSerializer2.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/ICompactSerializer2.java?rev=799944&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/io/ICompactSerializer2.java (added)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/io/ICompactSerializer2.java Sat Aug 1 22:30:40 2009
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io;
+
+import java.io.*;
+
+public interface ICompactSerializer2<T>
+{
+ /**
+ * Serialize the specified type into the specified DataOutput instance.
+ * @param t type that needs to be serialized
+ * @param dos DataOutput into which serialization needs to happen.
+ * @throws IOException
+ */
+ public void serialize(T t, DataOutput dos) throws IOException;
+
+ /**
+ * Deserialize from the specified DataInput instance.
+ * @param dis DataInput from which deserialization needs to happen.
+ * @throws IOException
+ * @return the type that was deserialized
+ */
+ public T deserialize(DataInput dis) throws IOException;
+}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/io/IFileReader.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/IFileReader.java?rev=799944&r1=799943&r2=799944&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/io/IFileReader.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/io/IFileReader.java Sat Aug 1 22:30:40 2009
@@ -37,16 +37,6 @@
/**
* This method dumps the next key/value into the DataOuputStream
- * passed in.
- *
- * @param bufOut DataOutputStream that needs to be filled.
- * @return number of bytes read.
- * @throws IOException
- */
- public long next(DataOutputBuffer bufOut) throws IOException;
-
- /**
- * This method dumps the next key/value into the DataOuputStream
* passed in. Always use this method to query for application
* specific data as it will have indexes.
*
Added: incubator/cassandra/trunk/src/java/org/apache/cassandra/io/IteratingRow.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/IteratingRow.java?rev=799944&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/io/IteratingRow.java (added)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/io/IteratingRow.java Sat Aug 1 22:30:40 2009
@@ -0,0 +1,61 @@
+package org.apache.cassandra.io;
+
+import java.io.IOException;
+
+import org.apache.cassandra.db.ColumnFamily;
+import org.apache.cassandra.db.IColumn;
+import com.google.common.collect.AbstractIterator;
+
+public class IteratingRow extends AbstractIterator<IColumn>
+{
+ private final String key;
+ private final long finishedAt;
+ private final ColumnFamily emptyColumnFamily;
+ private final BufferedRandomAccessFile file;
+
+ public IteratingRow(BufferedRandomAccessFile file) throws IOException
+ {
+ this.file = file;
+
+ key = file.readUTF();
+ long dataSize = file.readInt();
+ long dataStart = file.getFilePointer();
+ finishedAt = dataStart + dataSize;
+ IndexHelper.skipBloomFilterAndIndex(file);
+ emptyColumnFamily = ColumnFamily.serializer().deserializeEmpty(file);
+ file.readInt(); // column count. breaking serializer encapsulation is less fugly than adding a wrapper class to allow deserializeEmpty to return both values
+ }
+
+ public String getKey()
+ {
+ return key;
+ }
+
+ public ColumnFamily getEmptyColumnFamily()
+ {
+ return emptyColumnFamily;
+ }
+
+ public void skipRemaining() throws IOException
+ {
+ file.seek(finishedAt);
+ }
+
+ protected IColumn computeNext()
+ {
+ try
+ {
+ assert file.getFilePointer() <= finishedAt;
+ if (file.getFilePointer() == finishedAt)
+ {
+ return endOfData();
+ }
+
+ return emptyColumnFamily.getColumnSerializer().deserialize(file);
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableReader.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableReader.java?rev=799944&r1=799943&r2=799944&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableReader.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableReader.java Sat Aug 1 22:30:40 2009
@@ -198,7 +198,7 @@
/**
* returns the position in the data file to find the given key, or -1 if the key is not present
*/
- public long getPosition(String decoratedKey, IPartitioner partitioner) throws IOException
+ public long getPosition(String decoratedKey) throws IOException
{
if (!bf.isPresent(decoratedKey))
return -1;
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SequenceFile.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SequenceFile.java?rev=799944&r1=799943&r2=799944&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SequenceFile.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SequenceFile.java Sat Aug 1 22:30:40 2009
@@ -417,37 +417,6 @@
}
}
}
-
- /**
- * This method dumps the next key/value into the DataOuputStream
- * passed in.
- *
- * @param bufOut DataOutputStream that needs to be filled.
- * @return total number of bytes read/considered
- */
- public long next(DataOutputBuffer bufOut) throws IOException
- {
- long bytesRead = -1L;
- if (isEOF())
- return bytesRead;
-
- long startPosition = file_.getFilePointer();
- String key = file_.readUTF();
- if (key != null)
- {
- /* write the key into buffer */
- bufOut.writeUTF(key);
- int dataSize = file_.readInt();
- /* write data size into buffer */
- bufOut.writeInt(dataSize);
- /* write the data into buffer */
- bufOut.write(file_, dataSize);
- long endPosition = file_.getFilePointer();
- bytesRead = endPosition - startPosition;
- }
-
- return bytesRead;
- }
}
public static class Reader extends AbstractReader
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/utils/FBUtilities.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/utils/FBUtilities.java?rev=799944&r1=799943&r2=799944&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/utils/FBUtilities.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/utils/FBUtilities.java Sat Aug 1 22:30:40 2009
@@ -385,7 +385,10 @@
{
int length = in.readInt();
byte[] bytes = new byte[length];
- in.readFully(bytes);
+ if (length > 0)
+ {
+ in.readFully(bytes);
+ }
return bytes;
}
}
Modified: incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/OneCompactionTest.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/OneCompactionTest.java?rev=799944&r1=799943&r2=799944&view=diff
==============================================================================
--- incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/OneCompactionTest.java (original)
+++ incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/OneCompactionTest.java Sat Aug 1 22:30:40 2009
@@ -39,13 +39,13 @@
Set<String> inserted = new HashSet<String>();
for (int j = 0; j < insertsPerTable; j++) {
- String key = "0";
+ String key = String.valueOf(j);
RowMutation rm = new RowMutation("Keyspace1", key);
rm.add(new QueryPath(columnFamilyName, null, "0".getBytes()), new byte[0], j);
rm.apply();
inserted.add(key);
store.forceBlockingFlush();
- assertEquals(table.getKeyRange(columnFamilyName, "", "", 10000).keys.size(), inserted.size());
+ assertEquals(inserted.size(), table.getKeyRange(columnFamilyName, "", "", 10000).keys.size());
}
Future<Integer> ft = MinorCompactionManager.instance().submit(store, 2);
ft.get();
@@ -62,6 +62,6 @@
@Test
public void testCompaction2() throws IOException, ExecutionException, InterruptedException
{
- testCompaction("Standard2", 5);
+ testCompaction("Standard2", 2);
}
}
Modified: incubator/cassandra/trunk/test/unit/org/apache/cassandra/io/SSTableTest.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/io/SSTableTest.java?rev=799944&r1=799943&r2=799944&view=diff
==============================================================================
--- incubator/cassandra/trunk/test/unit/org/apache/cassandra/io/SSTableTest.java (original)
+++ incubator/cassandra/trunk/test/unit/org/apache/cassandra/io/SSTableTest.java Sat Aug 1 22:30:40 2009
@@ -57,11 +57,12 @@
private void verifySingle(SSTableReader sstable, byte[] bytes, String key) throws IOException
{
- FileStruct fs = sstable.getFileStruct();
- fs.seekTo(key);
- int size = fs.getBufIn().readInt();
+ BufferedRandomAccessFile file = new BufferedRandomAccessFile(sstable.dataFile, "r");
+ file.seek(sstable.getPosition(key));
+ assert key.equals(file.readUTF());
+ int size = file.readInt();
byte[] bytes2 = new byte[size];
- fs.getBufIn().readFully(bytes2);
+ file.readFully(bytes2);
assert Arrays.equals(bytes2, bytes);
}
@@ -93,13 +94,14 @@
{
List<String> keys = new ArrayList<String>(map.keySet());
Collections.shuffle(keys);
- FileStruct fs = sstable.getFileStruct();
+ BufferedRandomAccessFile file = new BufferedRandomAccessFile(sstable.dataFile, "r");
for (String key : keys)
{
- fs.seekTo(key);
- int size = fs.getBufIn().readInt();
+ file.seek(sstable.getPosition(key));
+ assert key.equals(file.readUTF());
+ int size = file.readInt();
byte[] bytes2 = new byte[size];
- fs.getBufIn().readFully(bytes2);
+ file.readFully(bytes2);
assert Arrays.equals(bytes2, map.get(key));
}
}