You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/05/22 17:33:44 UTC
svn commit: r777578 [2/2] - in /incubator/cassandra/trunk: interface/
interface/gen-java/org/apache/cassandra/service/ lib/
src/java/org/apache/cassandra/db/ src/java/org/apache/cassandra/io/
src/java/org/apache/cassandra/service/ src/java/org/apache/c...
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTable.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTable.java?rev=777578&r1=777577&r2=777578&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTable.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTable.java Fri May 22 15:33:42 2009
@@ -42,6 +42,7 @@
import org.apache.cassandra.utils.BloomFilter;
import org.apache.cassandra.utils.FileUtils;
import org.apache.cassandra.utils.LogUtil;
+import org.apache.cassandra.io.SequenceFile.ColumnGroupReader;
/**
* This class is built on top of the SequenceFile. It stores
@@ -873,4 +874,29 @@
return hashtable.remove(cannonicalize(filename));
}
}
+
+
+ /**
+ * obtain a BlockReader for the getColumnSlice call.
+ */
+ public ColumnGroupReader getColumnGroupReader(String key, String cfName,
+ String startColumn, boolean isAscending) throws IOException
+ {
+ ColumnGroupReader reader = null;
+ IFileReader dataReader = SequenceFile.reader(dataFile_);
+
+ try
+ {
+ /* Morph key into actual key based on the partition type. */
+ String decoratedKey = partitioner_.decorateKey(key);
+ Coordinate fileCoordinate = getCoordinates(decoratedKey, dataReader, partitioner_);
+ reader = new ColumnGroupReader(dataFile_, decoratedKey, cfName, startColumn, isAscending, fileCoordinate);
+ }
+ finally
+ {
+ if (dataReader != null)
+ dataReader.close();
+ }
+ return reader;
+ }
}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SequenceFile.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SequenceFile.java?rev=777578&r1=777577&r2=777578&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SequenceFile.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SequenceFile.java Fri May 22 15:33:42 2009
@@ -28,6 +28,7 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import java.util.Arrays;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.utils.BloomFilter;
@@ -532,6 +533,146 @@
}
}
+
+ /**
+ * This is a reader that finds the block for a starting column and returns
+ * blocks before/after it for each next call. This function assumes that
+ * the CF is sorted by name and exploits the name index.
+ */
+ public static class ColumnGroupReader extends BufferReader
+ {
+ private String key_;
+ private String cfName_;
+ private boolean isAscending_;
+
+ private List<IndexHelper.ColumnIndexInfo> columnIndexList_;
+ private long columnStartPosition_;
+ private int curRangeIndex_;
+ private int allColumnsSize_;
+ private int localDeletionTime_;
+ private long markedForDeleteAt_;
+
+ ColumnGroupReader(String filename, String key, String cfName, String startColumn, boolean isAscending, Coordinate section) throws IOException
+ {
+ super(filename, 128 * 1024);
+ this.cfName_ = cfName;
+ this.key_ = key;
+ this.isAscending_ = isAscending;
+ init(startColumn, section);
+ }
+
+ /**
+ * Build a list of index entries ready for search.
+ */
+ private List<IndexHelper.ColumnIndexInfo> getFullColumnIndexList(List<IndexHelper.ColumnIndexInfo> columnIndexList, int totalNumCols)
+ {
+ if (columnIndexList.size() == 0)
+ {
+ /* if there is no column index, add an index entry that covers the full space. */
+ return Arrays.asList(new IndexHelper.ColumnIndexInfo[]{new IndexHelper.ColumnNameIndexInfo("", 0, totalNumCols)});
+ }
+
+ List<IndexHelper.ColumnIndexInfo> fullColIndexList = new ArrayList<IndexHelper.ColumnIndexInfo>();
+ int accumulatededCols = 0;
+ for (IndexHelper.ColumnIndexInfo colPosInfo : columnIndexList)
+ accumulatededCols += colPosInfo.count();
+ int remainingCols = totalNumCols - accumulatededCols;
+
+ fullColIndexList.add(new IndexHelper.ColumnNameIndexInfo("", 0, columnIndexList.get(0).count()));
+ for (int i = 0; i < columnIndexList.size() - 1; i++)
+ {
+ IndexHelper.ColumnNameIndexInfo colPosInfo = (IndexHelper.ColumnNameIndexInfo)columnIndexList.get(i);
+ fullColIndexList.add(new IndexHelper.ColumnNameIndexInfo(colPosInfo.name(),
+ colPosInfo.position(),
+ columnIndexList.get(i + 1).count()));
+ }
+ String columnName = ((IndexHelper.ColumnNameIndexInfo)columnIndexList.get(columnIndexList.size() - 1)).name();
+ fullColIndexList.add(new IndexHelper.ColumnNameIndexInfo(columnName,
+ columnIndexList.get(columnIndexList.size() - 1).position(),
+ remainingCols));
+ return fullColIndexList;
+ }
+
+ private void init(String startColumn, Coordinate section) throws IOException
+ {
+ String keyInDisk = null;
+ if (seekTo(key_, section) >= 0)
+ keyInDisk = file_.readUTF();
+
+ if ( keyInDisk != null && keyInDisk.equals(key_))
+ {
+ /* read off the size of this row */
+ int dataSize = file_.readInt();
+ /* skip the bloomfilter */
+ int totalBytesRead = IndexHelper.skipBloomFilter(file_);
+ /* read off the index flag, it has to be true */
+ boolean hasColumnIndexes = file_.readBoolean();
+ totalBytesRead += 1;
+
+ /* read the index */
+ List<IndexHelper.ColumnIndexInfo> colIndexList = new ArrayList<IndexHelper.ColumnIndexInfo>();
+ if (hasColumnIndexes)
+ totalBytesRead += IndexHelper.deserializeIndex(cfName_, file_, colIndexList);
+
+ /* need to do two things here.
+ * 1. move the file pointer to the beginning of the list of stored columns
+ * 2. calculate the size of all columns */
+ String cfName = file_.readUTF();
+ localDeletionTime_ = file_.readInt();
+ markedForDeleteAt_ = file_.readLong();
+ int totalNumCols = file_.readInt();
+ allColumnsSize_ = dataSize - (totalBytesRead + utfPrefix_ + cfName.length() + 4 + 8 + 4);
+
+ columnStartPosition_ = file_.getFilePointer();
+ columnIndexList_ = getFullColumnIndexList(colIndexList, totalNumCols);
+
+ int index = Collections.binarySearch(columnIndexList_, new IndexHelper.ColumnNameIndexInfo(startColumn));
+ curRangeIndex_ = index < 0 ? (++index) * (-1) - 1 : index;
+ }
+ else
+ {
+ /* no keys found in this file because of a false positive in BF */
+ curRangeIndex_ = -1;
+ columnIndexList_ = new ArrayList<IndexHelper.ColumnIndexInfo>();
+ }
+ }
+
+ private boolean getBlockFromCurIndex(DataOutputBuffer bufOut) throws IOException
+ {
+ if (curRangeIndex_ < 0 || curRangeIndex_ >= columnIndexList_.size())
+ return false;
+ IndexHelper.ColumnIndexInfo curColPostion = columnIndexList_.get(curRangeIndex_);
+ long start = curColPostion.position();
+ long end = curRangeIndex_ < columnIndexList_.size() - 1
+ ? columnIndexList_.get(curRangeIndex_+1).position()
+ : allColumnsSize_;
+
+ /* seek to the correct offset to the data, and calculate the data size */
+ file_.seek(columnStartPosition_ + start);
+ long dataSize = end - start;
+
+ bufOut.reset();
+ // write CF info
+ bufOut.writeUTF(cfName_);
+ bufOut.writeInt(localDeletionTime_);
+ bufOut.writeLong(markedForDeleteAt_);
+ // now write the columns
+ bufOut.writeInt(curColPostion.count());
+ bufOut.write(file_, (int)dataSize);
+ return true;
+ }
+
+ public boolean getNextBlock(DataOutputBuffer outBuf) throws IOException
+ {
+ boolean result = getBlockFromCurIndex(outBuf);
+ if (isAscending_)
+ curRangeIndex_++;
+ else
+ curRangeIndex_--;
+ return result;
+ }
+ }
+
public static abstract class AbstractReader implements IFileReader
{
private static final short utfPrefix_ = 2;
@@ -705,7 +846,7 @@
* @param section indicates the location of the block index.
* @throws IOException
*/
- private long seekTo(String key, Coordinate section) throws IOException
+ protected long seekTo(String key, Coordinate section) throws IOException
{
seek(section.end_);
long position = getPositionFromBlockIndex(key);
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/CassandraServer.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/CassandraServer.java?rev=777578&r1=777577&r2=777578&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/CassandraServer.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/CassandraServer.java Fri May 22 15:33:42 2009
@@ -35,6 +35,7 @@
import org.apache.cassandra.db.ColumnsSinceReadCommand;
import org.apache.cassandra.db.SliceByNamesReadCommand;
import org.apache.cassandra.db.SliceByRangeReadCommand;
+import org.apache.cassandra.db.SliceFromReadCommand;
import org.apache.cassandra.db.SliceReadCommand;
import org.apache.cassandra.db.IColumn;
import org.apache.cassandra.db.Row;
@@ -208,7 +209,26 @@
}
return thriftifyColumns(columns);
}
-
+
+ public List<column_t> get_slice_from(String tablename, String key, String columnFamily_column, boolean isAscending, int count) throws InvalidRequestException
+ {
+ logger.debug("get_slice_from");
+ String[] values = RowMutation.getColumnAndColumnFamily(columnFamily_column);
+ if (values.length != 2 || DatabaseDescriptor.getColumnFamilyType(values[0]) != "Standard")
+ throw new InvalidRequestException("get_slice_from requires a standard CF name and a starting column name");
+ if (count <= 0)
+ throw new InvalidRequestException("get_slice_from requires positive count");
+ if ("Name".compareTo(DatabaseDescriptor.getCFMetaData(tablename, values[0]).indexProperty_) != 0)
+ throw new InvalidRequestException("get_slice_from requires CF indexed by name");
+ ColumnFamily cfamily = readColumnFamily(new SliceFromReadCommand(tablename, key, columnFamily_column, isAscending, count));
+ if (cfamily == null)
+ {
+ return EMPTY_COLUMNS;
+ }
+ Collection<IColumn> columns = cfamily.getAllColumns();
+ return thriftifyColumns(columns);
+ }
+
public column_t get_column(String tablename, String key, String columnFamily_column) throws NotFoundException, InvalidRequestException
{
logger.debug("get_column");
Added: incubator/cassandra/trunk/src/java/org/apache/cassandra/utils/ReducingIterator.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/utils/ReducingIterator.java?rev=777578&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/utils/ReducingIterator.java (added)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/utils/ReducingIterator.java Fri May 22 15:33:42 2009
@@ -0,0 +1,59 @@
+package org.apache.cassandra.utils;
+
+import java.util.Iterator;
+
+import com.google.common.collect.AbstractIterator;
+
+/**
+ * reduces equal values from the source iterator to a single (optionally transformed) instance.
+ */
+public abstract class ReducingIterator<T> extends AbstractIterator<T> implements Iterator<T>, Iterable<T>
+{
+ protected Iterator<T> source;
+ protected T last;
+
+ public ReducingIterator(Iterator<T> source)
+ {
+ this.source = source;
+ }
+
+ /** combine this object with the previous ones. intermediate state is up to your implementation. */
+ public abstract void reduce(T current);
+
+ /** return the last object computed by reduce */
+ protected abstract T getReduced();
+
+ /** override this if the keys you want to base the reduce on are not the same as the object itself (but can be generated from it) */
+ protected Object getKey(T o)
+ {
+ return o;
+ }
+
+ protected T computeNext()
+ {
+ if (last == null && !source.hasNext())
+ return endOfData();
+
+ boolean keyChanged = false;
+ while (!keyChanged)
+ {
+ if (last != null)
+ reduce(last);
+ if (!source.hasNext())
+ {
+ last = null;
+ break;
+ }
+ T current = source.next();
+ if (last != null && !getKey(current).equals(getKey(last)))
+ keyChanged = true;
+ last = current;
+ }
+ return getReduced();
+ }
+
+ public Iterator<T> iterator()
+ {
+ return this;
+ }
+}
Modified: incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/TableTest.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/TableTest.java?rev=777578&r1=777577&r2=777578&view=diff
==============================================================================
--- incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/TableTest.java (original)
+++ incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/TableTest.java Fri May 22 15:33:42 2009
@@ -18,6 +18,11 @@
package org.apache.cassandra.db;
+import java.util.SortedSet;
+import java.util.Arrays;
+import java.util.List;
+import java.util.ArrayList;
+
import org.junit.Test;
import static junit.framework.Assert.*;
@@ -202,4 +207,117 @@
rm.add(cf);
return rm;
}
+
+ @Test
+ public void testGetSliceFromBasic() throws Throwable
+ {
+ Table table = Table.open(TABLE_NAME);
+ String ROW = "row1";
+ RowMutation rm = new RowMutation(TABLE_NAME, ROW);
+ ColumnFamily cf = new ColumnFamily("Standard1", "Standard");
+ cf.addColumn(new Column("col1", "val1".getBytes(), 1L));
+ cf.addColumn(new Column("col3", "val3".getBytes(), 1L));
+ cf.addColumn(new Column("col4", "val4".getBytes(), 1L));
+ cf.addColumn(new Column("col5", "val5".getBytes(), 1L));
+ cf.addColumn(new Column("col7", "val7".getBytes(), 1L));
+ cf.addColumn(new Column("col9", "val9".getBytes(), 1L));
+ rm.add(cf);
+ rm.apply();
+
+ rm = new RowMutation(TABLE_NAME, ROW);
+ rm.delete("Standard1:col4", 2L);
+ rm.apply();
+ validateGetSliceFromBasic(table, ROW);
+
+ // flush to disk
+ table.getColumnFamilyStore("Standard1").forceBlockingFlush();
+ validateGetSliceFromBasic(table, ROW);
+ }
+
+ @Test
+ public void testGetSliceFromAdvanced() throws Throwable
+ {
+ Table table = Table.open(TABLE_NAME);
+ String ROW = "row2";
+ RowMutation rm = new RowMutation(TABLE_NAME, ROW);
+ ColumnFamily cf = new ColumnFamily("Standard1", "Standard");
+ cf.addColumn(new Column("col1", "val1".getBytes(), 1L));
+ cf.addColumn(new Column("col2", "val2".getBytes(), 1L));
+ cf.addColumn(new Column("col3", "val3".getBytes(), 1L));
+ cf.addColumn(new Column("col4", "val4".getBytes(), 1L));
+ cf.addColumn(new Column("col5", "val5".getBytes(), 1L));
+ cf.addColumn(new Column("col6", "val6".getBytes(), 1L));
+ rm.add(cf);
+ rm.apply();
+ // flush to disk
+ table.getColumnFamilyStore("Standard1").forceBlockingFlush();
+
+ rm = new RowMutation(TABLE_NAME, ROW);
+ cf = new ColumnFamily("Standard1", "Standard");
+ cf.addColumn(new Column("col1", "valx".getBytes(), 2L));
+ cf.addColumn(new Column("col2", "valx".getBytes(), 2L));
+ cf.addColumn(new Column("col3", "valx".getBytes(), 2L));
+ rm.add(cf);
+ rm.apply();
+ validateGetSliceFromAdvanced(table, ROW);
+
+ // flush to disk
+ table.getColumnFamilyStore("Standard1").forceBlockingFlush();
+ validateGetSliceFromAdvanced(table, ROW);
+ }
+
+ private void assertColumns(ColumnFamily columnFamily, String... columnFamilyNames)
+ {
+ assertNotNull(columnFamily);
+ SortedSet<IColumn> columns = columnFamily.getAllColumns();
+ List<String> L = new ArrayList<String>();
+ for (IColumn column : columns)
+ {
+ L.add(column.name());
+ }
+ assert Arrays.equals(L.toArray(new String[columns.size()]), columnFamilyNames);
+ }
+
+ private void validateGetSliceFromAdvanced(Table table, String row) throws Throwable
+ {
+ Row result;
+ ColumnFamily cfres;
+
+ result = table.getSliceFrom(row, "Standard1:col2", true, 3);
+ cfres = result.getColumnFamily("Standard1");
+ assertColumns(cfres, "col2", "col3", "col4");
+ assertEquals(new String(cfres.getColumn("col2").value()), "valx");
+ assertEquals(new String(cfres.getColumn("col3").value()), "valx");
+ assertEquals(new String(cfres.getColumn("col4").value()), "val4");
+ }
+
+ private void validateGetSliceFromBasic(Table table, String row) throws Throwable
+ {
+ Row result;
+ ColumnFamily cf;
+
+ result = table.getSliceFrom(row, "Standard1:col5", true, 2);
+ cf = result.getColumnFamily("Standard1");
+ assertColumns(cf, "col5", "col7");
+
+ result = table.getSliceFrom(row, "Standard1:col4", true, 2);
+ cf = result.getColumnFamily("Standard1");
+ assertColumns(cf, "col4", "col5", "col7");
+
+ result = table.getSliceFrom(row, "Standard1:col5", false, 2);
+ cf = result.getColumnFamily("Standard1");
+ assertColumns(cf, "col3", "col4", "col5");
+
+ result = table.getSliceFrom(row, "Standard1:col6", false, 2);
+ cf = result.getColumnFamily("Standard1");
+ assertColumns(cf, "col3", "col4", "col5");
+
+ result = table.getSliceFrom(row, "Standard1:col95", true, 2);
+ cf = result.getColumnFamily("Standard1");
+ assertColumns(cf);
+
+ result = table.getSliceFrom(row, "Standard1:col0", false, 2);
+ cf = result.getColumnFamily("Standard1");
+ assertColumns(cf);
+ }
}