You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/09/17 23:09:58 UTC
svn commit: r816380 - in
/incubator/cassandra/trunk/src/java/org/apache/cassandra: db/Column.java
db/ColumnFamilySerializer.java db/ColumnFamilyStore.java
db/SuperColumn.java io/IteratingRow.java io/SSTableReader.java
io/SSTableScanner.java
Author: jbellis
Date: Thu Sep 17 21:09:58 2009
New Revision: 816380
URL: http://svn.apache.org/viewvc?rev=816380&view=rev
Log:
copy FileStruct to SSTableScanner and remove cruft. Migrate getKeyRange to new scanner class.
patch by jbellis; reviewed by goffinet for CASSANDRA-436
Added:
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableScanner.java
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Column.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/SuperColumn.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/IteratingRow.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableReader.java
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Column.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Column.java?rev=816380&r1=816379&r2=816380&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Column.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Column.java Thu Sep 17 21:09:58 2009
@@ -36,7 +36,7 @@
{
private static ColumnSerializer serializer_ = new ColumnSerializer();
- static ColumnSerializer serializer()
+ public static ColumnSerializer serializer()
{
return serializer_;
}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java?rev=816380&r1=816379&r2=816380&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java Thu Sep 17 21:09:58 2009
@@ -27,6 +27,7 @@
import java.util.Collection;
import org.apache.cassandra.io.ICompactSerializer2;
+import org.apache.cassandra.io.SSTableReader;
import org.apache.cassandra.db.marshal.AbstractType;
public class ColumnFamilySerializer implements ICompactSerializer2<ColumnFamily>
@@ -81,14 +82,18 @@
public ColumnFamily deserialize(DataInput dis) throws IOException
{
ColumnFamily cf = deserializeFromSSTableNoColumns(dis.readUTF(), dis.readUTF(), readComparator(dis), readComparator(dis), dis);
+ deserializeColumns(dis, cf);
+ return cf;
+ }
+
+ private void deserializeColumns(DataInput dis, ColumnFamily cf) throws IOException
+ {
int size = dis.readInt();
- IColumn column;
for (int i = 0; i < size; ++i)
{
- column = cf.getColumnSerializer().deserialize(dis);
+ IColumn column = cf.getColumnSerializer().deserialize(dis);
cf.addColumn(column);
}
- return cf;
}
private AbstractType readComparator(DataInput dis) throws IOException
@@ -124,4 +129,12 @@
cf.delete(input.readInt(), input.readLong());
return cf;
}
+
+ public ColumnFamily deserializeFromSSTable(SSTableReader sstable, DataInput file) throws IOException
+ {
+ ColumnFamily cf = sstable.makeColumnFamily();
+ deserializeFromSSTableNoColumns(cf, file);
+ deserializeColumns(file, cf);
+ return cf;
+ }
}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=816380&r1=816379&r2=816380&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Thu Sep 17 21:09:58 2009
@@ -20,6 +20,7 @@
import java.io.File;
import java.io.IOException;
+import java.io.Closeable;
import java.lang.management.ManagementFactory;
import javax.management.MBeanServer;
import javax.management.ObjectName;
@@ -1340,9 +1341,23 @@
// sstables
for (SSTableReader sstable : ssTables_)
{
- FileStruct fs = sstable.getFileStruct();
+ final SSTableScanner fs = sstable.getScanner();
fs.seekTo(startWith);
- iterators.add(fs);
+ iterators.add(new Iterator<String>()
+ {
+ public boolean hasNext()
+ {
+ return fs.hasNext();
+ }
+ public String next()
+ {
+ return fs.next().getKey();
+ }
+ public void remove()
+ {
+ throw new UnsupportedOperationException();
+ }
+ });
}
Iterator<String> collated = IteratorUtils.collatedIterator(comparator, iterators);
@@ -1392,9 +1407,9 @@
{
for (Iterator iter : iterators)
{
- if (iter instanceof FileStruct)
+ if (iter instanceof Closeable)
{
- ((FileStruct)iter).close();
+ ((Closeable)iter).close();
}
}
}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/SuperColumn.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/SuperColumn.java?rev=816380&r1=816379&r2=816380&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/SuperColumn.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/SuperColumn.java Thu Sep 17 21:09:58 2009
@@ -38,7 +38,7 @@
{
private static Logger logger_ = Logger.getLogger(SuperColumn.class);
- static SuperColumnSerializer serializer(AbstractType comparator)
+ public static SuperColumnSerializer serializer(AbstractType comparator)
{
return new SuperColumnSerializer(comparator);
}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/io/IteratingRow.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/IteratingRow.java?rev=816380&r1=816379&r2=816380&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/io/IteratingRow.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/io/IteratingRow.java Thu Sep 17 21:09:58 2009
@@ -22,31 +22,39 @@
import java.io.IOException;
+import java.io.OutputStream;
+import java.io.DataOutputStream;
+import java.io.DataOutput;
import org.apache.cassandra.db.ColumnFamily;
import org.apache.cassandra.db.IColumn;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.service.StorageService;
import com.google.common.collect.AbstractIterator;
-public class IteratingRow extends AbstractIterator<IColumn>
+public class IteratingRow extends AbstractIterator<IColumn> implements Comparable<IteratingRow>
{
private final String key;
private final long finishedAt;
private final ColumnFamily emptyColumnFamily;
private final BufferedRandomAccessFile file;
+ private SSTableReader sstable;
+ private long dataStart;
public IteratingRow(BufferedRandomAccessFile file, SSTableReader sstable) throws IOException
{
this.file = file;
+ this.sstable = sstable;
key = file.readUTF();
- long dataSize = file.readInt();
- long dataStart = file.getFilePointer();
+ int dataSize = file.readInt();
+ dataStart = file.getFilePointer();
finishedAt = dataStart + dataSize;
+ // legacy stuff to support FileStruct:
IndexHelper.skipBloomFilterAndIndex(file);
emptyColumnFamily = ColumnFamily.serializer().deserializeFromSSTableNoColumns(sstable.makeColumnFamily(), file);
- file.readInt(); // column count. breaking serializer encapsulation is less fugly than adding a wrapper class to allow deserializeEmpty to return both values
+ file.readInt();
}
public String getKey()
@@ -59,11 +67,33 @@
return emptyColumnFamily;
}
+ public void echoData(DataOutput out) throws IOException
+ {
+ file.seek(dataStart);
+ while (file.getFilePointer() < finishedAt)
+ {
+ out.write(file.read());
+ }
+ }
+
+ // TODO r/m this and make compaction merge columns iteratively for CASSSANDRA-16
+ public ColumnFamily getColumnFamily() throws IOException
+ {
+ file.seek(dataStart);
+ IndexHelper.skipBloomFilterAndIndex(file);
+ return ColumnFamily.serializer().deserializeFromSSTable(sstable, file);
+ }
+
public void skipRemaining() throws IOException
{
file.seek(finishedAt);
}
+ public long getEndPosition()
+ {
+ return finishedAt;
+ }
+
protected IColumn computeNext()
{
try
@@ -74,11 +104,16 @@
return endOfData();
}
- return emptyColumnFamily.getColumnSerializer().deserialize(file);
+ return sstable.getColumnSerializer().deserialize(file);
}
catch (IOException e)
{
throw new RuntimeException(e);
}
}
+
+ public int compareTo(IteratingRow o)
+ {
+ return StorageService.getPartitioner().getDecoratedKeyComparator().compare(key, o.key);
+ }
}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableReader.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableReader.java?rev=816380&r1=816379&r2=816380&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableReader.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableReader.java Thu Sep 17 21:09:58 2009
@@ -32,8 +32,7 @@
import org.apache.cassandra.utils.BloomFilter;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.db.ColumnFamily;
-import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.*;
import org.apache.cassandra.db.marshal.AbstractType;
import org.cliffc.high_scale_lib.NonBlockingHashMap;
@@ -339,6 +338,11 @@
return new FileStruct(this);
}
+ public SSTableScanner getScanner() throws IOException
+ {
+ return new SSTableScanner(this);
+ }
+
public String getTableName()
{
return parseTableName(path);
@@ -353,6 +357,13 @@
{
return ColumnFamily.create(getTableName(), getColumnFamilyName());
}
+
+ public ICompactSerializer2<IColumn> getColumnSerializer()
+ {
+ return DatabaseDescriptor.getColumnFamilyType(getTableName(), getColumnFamilyName()).equals("Standard")
+ ? Column.serializer()
+ : SuperColumn.serializer(getColumnComparator());
+ }
}
class FileSSTableMap
Added: incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableScanner.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableScanner.java?rev=816380&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableScanner.java (added)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableScanner.java Thu Sep 17 21:09:58 2009
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io;
+
+import java.io.IOException;
+import java.io.Closeable;
+import java.util.Iterator;
+import java.util.Arrays;
+
+import org.apache.log4j.Logger;
+import com.google.common.collect.AbstractIterator;
+
+
+public class SSTableScanner implements Iterator<IteratingRow>, Closeable
+{
+ private static Logger logger = Logger.getLogger(SSTableScanner.class);
+
+ private IteratingRow row;
+ private boolean exhausted = false;
+ private BufferedRandomAccessFile file;
+ private SSTableReader sstable;
+ private Iterator<IteratingRow> iterator;
+
+ SSTableScanner(SSTableReader sstable) throws IOException
+ {
+ // TODO this is used for both compactions and key ranges. the buffer sizes we want
+ // to use for these ops are very different. here we are leaning towards the key-range
+ // use case since that is more common. What we really want is to split those
+ // two uses of this class up.
+ this.file = new BufferedRandomAccessFile(sstable.getFilename(), "r", 256 * 1024);
+ this.sstable = sstable;
+ }
+
+ public void close() throws IOException
+ {
+ file.close();
+ }
+
+ public void seekTo(String seekKey)
+ {
+ try
+ {
+ long position = sstable.getNearestPosition(seekKey);
+ if (position < 0)
+ {
+ exhausted = true;
+ return;
+ }
+ file.seek(position);
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException("corrupt sstable", e);
+ }
+ }
+
+ public boolean hasNext()
+ {
+ if (iterator == null)
+ iterator = exhausted ? Arrays.asList(new IteratingRow[0]).iterator() : new KeyScanningIterator();
+ return iterator.hasNext();
+ }
+
+ public IteratingRow next()
+ {
+ if (iterator == null)
+ iterator = exhausted ? Arrays.asList(new IteratingRow[0]).iterator() : new KeyScanningIterator();
+ return iterator.next();
+ }
+
+ public void remove()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ private class KeyScanningIterator implements Iterator<IteratingRow>
+ {
+ public boolean hasNext()
+ {
+ try
+ {
+ return (row == null && !file.isEOF()) || row.getEndPosition() < file.length();
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public IteratingRow next()
+ {
+ try
+ {
+ if (row != null)
+ row.skipRemaining();
+ assert !file.isEOF();
+ return row = new IteratingRow(file, sstable);
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public void remove()
+ {
+ throw new UnsupportedOperationException();
+ }
+ }
+}