You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/05/22 17:33:44 UTC

svn commit: r777578 [2/2] - in /incubator/cassandra/trunk: interface/ interface/gen-java/org/apache/cassandra/service/ lib/ src/java/org/apache/cassandra/db/ src/java/org/apache/cassandra/io/ src/java/org/apache/cassandra/service/ src/java/org/apache/c...

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTable.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTable.java?rev=777578&r1=777577&r2=777578&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTable.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTable.java Fri May 22 15:33:42 2009
@@ -42,6 +42,7 @@
 import org.apache.cassandra.utils.BloomFilter;
 import org.apache.cassandra.utils.FileUtils;
 import org.apache.cassandra.utils.LogUtil;
+import org.apache.cassandra.io.SequenceFile.ColumnGroupReader;
 
 /**
  * This class is built on top of the SequenceFile. It stores
@@ -873,4 +874,29 @@
             return hashtable.remove(cannonicalize(filename));
         }
     }
+    
+    
+    /**
+     * obtain a BlockReader for the getColumnSlice call.
+     */
+    public ColumnGroupReader getColumnGroupReader(String key, String cfName, 
+            String startColumn, boolean isAscending) throws IOException
+    {
+        ColumnGroupReader reader = null;
+        IFileReader dataReader = SequenceFile.reader(dataFile_);
+
+        try
+        {
+            /* Morph key into actual key based on the partition type. */
+            String decoratedKey = partitioner_.decorateKey(key);
+            Coordinate fileCoordinate = getCoordinates(decoratedKey, dataReader, partitioner_);
+            reader = new ColumnGroupReader(dataFile_, decoratedKey, cfName, startColumn, isAscending, fileCoordinate);
+        }
+        finally
+        {
+            if (dataReader != null)
+                dataReader.close();
+        }
+        return reader;
+    }
 }

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SequenceFile.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SequenceFile.java?rev=777578&r1=777577&r2=777578&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SequenceFile.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SequenceFile.java Fri May 22 15:33:42 2009
@@ -28,6 +28,7 @@
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import java.util.Arrays;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.utils.BloomFilter;
@@ -532,6 +533,146 @@
         }
     }
 
+    
+    /**
+     *  This is a reader that finds the block for a starting column and returns
+     *  blocks before/after it for each next call. This function assumes that
+     *  the CF is sorted by name and exploits the name index.
+     */
+    public static class ColumnGroupReader extends BufferReader
+    {
+        private String key_;
+        private String cfName_;
+        private boolean isAscending_;
+
+        private List<IndexHelper.ColumnIndexInfo> columnIndexList_;
+        private long columnStartPosition_;
+        private int curRangeIndex_;
+        private int allColumnsSize_;
+        private int localDeletionTime_;
+        private long markedForDeleteAt_;
+
+        ColumnGroupReader(String filename, String key, String cfName, String startColumn, boolean isAscending, Coordinate section) throws IOException
+        {
+            super(filename, 128 * 1024);
+            this.cfName_ = cfName;
+            this.key_ = key;
+            this.isAscending_ = isAscending;
+            init(startColumn, section);
+        }
+
+        /**
+         *   Build a list of index entries ready for search.
+         */
+        private List<IndexHelper.ColumnIndexInfo> getFullColumnIndexList(List<IndexHelper.ColumnIndexInfo> columnIndexList, int totalNumCols)
+        {
+            if (columnIndexList.size() == 0)
+            {
+                /* if there is no column index, add an index entry that covers the full space. */
+                return Arrays.asList(new IndexHelper.ColumnIndexInfo[]{new IndexHelper.ColumnNameIndexInfo("", 0, totalNumCols)});
+            }
+
+            List<IndexHelper.ColumnIndexInfo> fullColIndexList = new ArrayList<IndexHelper.ColumnIndexInfo>();
+            int accumulatededCols = 0;
+            for (IndexHelper.ColumnIndexInfo colPosInfo : columnIndexList)
+                accumulatededCols += colPosInfo.count();
+            int remainingCols = totalNumCols - accumulatededCols;
+
+            fullColIndexList.add(new IndexHelper.ColumnNameIndexInfo("", 0, columnIndexList.get(0).count()));
+            for (int i = 0; i < columnIndexList.size() - 1; i++)
+            {
+                IndexHelper.ColumnNameIndexInfo colPosInfo = (IndexHelper.ColumnNameIndexInfo)columnIndexList.get(i);
+                fullColIndexList.add(new IndexHelper.ColumnNameIndexInfo(colPosInfo.name(),
+                                                                         colPosInfo.position(),
+                                                                         columnIndexList.get(i + 1).count()));
+            }
+            String columnName = ((IndexHelper.ColumnNameIndexInfo)columnIndexList.get(columnIndexList.size() - 1)).name();
+            fullColIndexList.add(new IndexHelper.ColumnNameIndexInfo(columnName,
+                                                                     columnIndexList.get(columnIndexList.size() - 1).position(),
+                                                                     remainingCols));
+            return fullColIndexList;
+        }
+
+        private void init(String startColumn, Coordinate section) throws IOException
+        {
+            String keyInDisk = null;
+            if (seekTo(key_, section) >= 0)
+                keyInDisk = file_.readUTF();
+
+            if ( keyInDisk != null && keyInDisk.equals(key_))
+            {
+                /* read off the size of this row */
+                int dataSize = file_.readInt();
+                /* skip the bloomfilter */
+                int totalBytesRead = IndexHelper.skipBloomFilter(file_);
+                /* read off the index flag, it has to be true */
+                boolean hasColumnIndexes = file_.readBoolean();
+                totalBytesRead += 1;
+
+                /* read the index */
+                List<IndexHelper.ColumnIndexInfo> colIndexList = new ArrayList<IndexHelper.ColumnIndexInfo>();
+                if (hasColumnIndexes)
+                    totalBytesRead += IndexHelper.deserializeIndex(cfName_, file_, colIndexList);
+
+                /* need to do two things here.
+                 * 1. move the file pointer to the beginning of the list of stored columns
+                 * 2. calculate the size of all columns */
+                String cfName = file_.readUTF();
+                localDeletionTime_ = file_.readInt();
+                markedForDeleteAt_ = file_.readLong();
+                int totalNumCols = file_.readInt();
+                allColumnsSize_ = dataSize - (totalBytesRead + utfPrefix_ + cfName.length() + 4 + 8 + 4);
+
+                columnStartPosition_ = file_.getFilePointer();
+                columnIndexList_ = getFullColumnIndexList(colIndexList, totalNumCols);
+
+                int index = Collections.binarySearch(columnIndexList_, new IndexHelper.ColumnNameIndexInfo(startColumn));
+                curRangeIndex_ = index < 0 ? (++index) * (-1) - 1 : index;
+            }
+            else 
+            {
+                /* no keys found in this file because of a false positive in BF */
+                curRangeIndex_ = -1;
+                columnIndexList_ = new ArrayList<IndexHelper.ColumnIndexInfo>();
+            }
+        }
+
+        private boolean getBlockFromCurIndex(DataOutputBuffer bufOut) throws IOException
+        {
+            if (curRangeIndex_ < 0 || curRangeIndex_ >= columnIndexList_.size())
+                return false;
+            IndexHelper.ColumnIndexInfo curColPostion = columnIndexList_.get(curRangeIndex_);
+            long start = curColPostion.position();
+            long end = curRangeIndex_ < columnIndexList_.size() - 1
+                       ? columnIndexList_.get(curRangeIndex_+1).position()
+                       : allColumnsSize_;
+
+            /* seek to the correct offset to the data, and calculate the data size */
+            file_.seek(columnStartPosition_ + start);
+            long dataSize = end - start;
+
+            bufOut.reset();
+            // write CF info
+            bufOut.writeUTF(cfName_);
+            bufOut.writeInt(localDeletionTime_);
+            bufOut.writeLong(markedForDeleteAt_);
+            // now write the columns
+            bufOut.writeInt(curColPostion.count());
+            bufOut.write(file_, (int)dataSize);
+            return true;
+        }
+
+        public boolean getNextBlock(DataOutputBuffer outBuf) throws IOException
+        {
+            boolean result = getBlockFromCurIndex(outBuf);
+            if (isAscending_)
+                curRangeIndex_++;
+            else
+                curRangeIndex_--;
+            return result;
+        }
+    }
+
     public static abstract class AbstractReader implements IFileReader
     {
         private static final short utfPrefix_ = 2;
@@ -705,7 +846,7 @@
          * @param section indicates the location of the block index.
          * @throws IOException
          */
-        private long seekTo(String key, Coordinate section) throws IOException
+        protected long seekTo(String key, Coordinate section) throws IOException
         {
             seek(section.end_);
             long position = getPositionFromBlockIndex(key);

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/CassandraServer.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/CassandraServer.java?rev=777578&r1=777577&r2=777578&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/CassandraServer.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/CassandraServer.java Fri May 22 15:33:42 2009
@@ -35,6 +35,7 @@
 import org.apache.cassandra.db.ColumnsSinceReadCommand;
 import org.apache.cassandra.db.SliceByNamesReadCommand;
 import org.apache.cassandra.db.SliceByRangeReadCommand;
+import org.apache.cassandra.db.SliceFromReadCommand;
 import org.apache.cassandra.db.SliceReadCommand;
 import org.apache.cassandra.db.IColumn;
 import org.apache.cassandra.db.Row;
@@ -208,7 +209,26 @@
         }
         return thriftifyColumns(columns);
 	}
-    
+
+    public List<column_t> get_slice_from(String tablename, String key, String columnFamily_column, boolean isAscending, int count) throws InvalidRequestException
+    {
+        logger.debug("get_slice_from");
+        String[] values = RowMutation.getColumnAndColumnFamily(columnFamily_column);
+        if (values.length != 2 || DatabaseDescriptor.getColumnFamilyType(values[0]) != "Standard")
+            throw new InvalidRequestException("get_slice_from requires a standard CF name and a starting column name");
+        if (count <= 0)
+            throw new InvalidRequestException("get_slice_from requires positive count");
+        if ("Name".compareTo(DatabaseDescriptor.getCFMetaData(tablename, values[0]).indexProperty_) != 0)
+            throw new InvalidRequestException("get_slice_from requires CF indexed by name");
+        ColumnFamily cfamily = readColumnFamily(new SliceFromReadCommand(tablename, key, columnFamily_column, isAscending, count));
+        if (cfamily == null)
+        {
+            return EMPTY_COLUMNS;
+        }
+        Collection<IColumn> columns = cfamily.getAllColumns();
+        return thriftifyColumns(columns);
+    }
+
     public column_t get_column(String tablename, String key, String columnFamily_column) throws NotFoundException, InvalidRequestException
     {
         logger.debug("get_column");

Added: incubator/cassandra/trunk/src/java/org/apache/cassandra/utils/ReducingIterator.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/utils/ReducingIterator.java?rev=777578&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/utils/ReducingIterator.java (added)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/utils/ReducingIterator.java Fri May 22 15:33:42 2009
@@ -0,0 +1,59 @@
+package org.apache.cassandra.utils;
+
+import java.util.Iterator;
+
+import com.google.common.collect.AbstractIterator;
+
+/**
+ * reduces equal values from the source iterator to a single (optionally transformed) instance.
+ */
+public abstract class ReducingIterator<T> extends AbstractIterator<T> implements Iterator<T>, Iterable<T>
+{
+    protected Iterator<T> source;
+    protected T last;
+
+    public ReducingIterator(Iterator<T> source)
+    {
+        this.source = source;
+    }
+
+    /** combine this object with the previous ones.  intermediate state is up to your implementation. */
+    public abstract void reduce(T current);
+
+    /** return the last object computed by reduce */
+    protected abstract T getReduced();
+
+    /** override this if the keys you want to base the reduce on are not the same as the object itself (but can be generated from it) */
+    protected Object getKey(T o)
+    {
+        return o;
+    }
+
+     protected T computeNext()
+    {
+        if (last == null && !source.hasNext())
+            return endOfData();
+
+        boolean keyChanged = false;
+        while (!keyChanged)
+        {
+            if (last != null)
+                reduce(last);
+            if (!source.hasNext())
+            {
+                last = null;
+                break;
+            }
+            T current = source.next();
+            if (last != null && !getKey(current).equals(getKey(last)))
+                keyChanged = true;
+            last = current;
+        }
+        return getReduced();
+    }
+
+    public Iterator<T> iterator()
+    {
+        return this;
+    }
+}

Modified: incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/TableTest.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/TableTest.java?rev=777578&r1=777577&r2=777578&view=diff
==============================================================================
--- incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/TableTest.java (original)
+++ incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/TableTest.java Fri May 22 15:33:42 2009
@@ -18,6 +18,11 @@
 
 package org.apache.cassandra.db;
 
+import java.util.SortedSet;
+import java.util.Arrays;
+import java.util.List;
+import java.util.ArrayList;
+
 import org.junit.Test;
 
 import static junit.framework.Assert.*;
@@ -202,4 +207,117 @@
         rm.add(cf);
         return rm;
     }
+
+    @Test
+    public void testGetSliceFromBasic() throws Throwable
+    {
+        Table table = Table.open(TABLE_NAME);
+        String ROW = "row1";
+        RowMutation rm = new RowMutation(TABLE_NAME, ROW);
+        ColumnFamily cf = new ColumnFamily("Standard1", "Standard");
+        cf.addColumn(new Column("col1", "val1".getBytes(), 1L));
+        cf.addColumn(new Column("col3", "val3".getBytes(), 1L));
+        cf.addColumn(new Column("col4", "val4".getBytes(), 1L));
+        cf.addColumn(new Column("col5", "val5".getBytes(), 1L));
+        cf.addColumn(new Column("col7", "val7".getBytes(), 1L));
+        cf.addColumn(new Column("col9", "val9".getBytes(), 1L));
+        rm.add(cf);
+        rm.apply();
+        
+        rm = new RowMutation(TABLE_NAME, ROW);
+        rm.delete("Standard1:col4", 2L);
+        rm.apply();
+        validateGetSliceFromBasic(table, ROW);
+        
+        // flush to disk
+        table.getColumnFamilyStore("Standard1").forceBlockingFlush();
+        validateGetSliceFromBasic(table, ROW);        
+    }
+
+    @Test
+    public void testGetSliceFromAdvanced() throws Throwable
+    {
+        Table table = Table.open(TABLE_NAME);
+        String ROW = "row2";
+        RowMutation rm = new RowMutation(TABLE_NAME, ROW);
+        ColumnFamily cf = new ColumnFamily("Standard1", "Standard");
+        cf.addColumn(new Column("col1", "val1".getBytes(), 1L));
+        cf.addColumn(new Column("col2", "val2".getBytes(), 1L));
+        cf.addColumn(new Column("col3", "val3".getBytes(), 1L));
+        cf.addColumn(new Column("col4", "val4".getBytes(), 1L));
+        cf.addColumn(new Column("col5", "val5".getBytes(), 1L));
+        cf.addColumn(new Column("col6", "val6".getBytes(), 1L));
+        rm.add(cf);
+        rm.apply();
+        // flush to disk
+        table.getColumnFamilyStore("Standard1").forceBlockingFlush();
+        
+        rm = new RowMutation(TABLE_NAME, ROW);
+        cf = new ColumnFamily("Standard1", "Standard");
+        cf.addColumn(new Column("col1", "valx".getBytes(), 2L));
+        cf.addColumn(new Column("col2", "valx".getBytes(), 2L));
+        cf.addColumn(new Column("col3", "valx".getBytes(), 2L));
+        rm.add(cf);
+        rm.apply();
+        validateGetSliceFromAdvanced(table, ROW);
+        
+        // flush to disk
+        table.getColumnFamilyStore("Standard1").forceBlockingFlush();
+        validateGetSliceFromAdvanced(table, ROW);
+    }
+
+    private void assertColumns(ColumnFamily columnFamily, String... columnFamilyNames)
+    {
+        assertNotNull(columnFamily);
+        SortedSet<IColumn> columns = columnFamily.getAllColumns();
+        List<String> L = new ArrayList<String>();
+        for (IColumn column : columns)
+        {
+            L.add(column.name());
+        }
+        assert Arrays.equals(L.toArray(new String[columns.size()]), columnFamilyNames);
+    }
+
+    private void validateGetSliceFromAdvanced(Table table, String row) throws Throwable
+    {
+        Row result;
+        ColumnFamily cfres;
+
+        result = table.getSliceFrom(row, "Standard1:col2", true, 3);
+        cfres = result.getColumnFamily("Standard1");
+        assertColumns(cfres, "col2", "col3", "col4");
+        assertEquals(new String(cfres.getColumn("col2").value()), "valx");
+        assertEquals(new String(cfres.getColumn("col3").value()), "valx");
+        assertEquals(new String(cfres.getColumn("col4").value()), "val4");        
+    }
+
+    private void validateGetSliceFromBasic(Table table, String row) throws Throwable
+    {
+        Row result;
+        ColumnFamily cf;
+
+        result = table.getSliceFrom(row, "Standard1:col5", true, 2);
+        cf = result.getColumnFamily("Standard1");
+        assertColumns(cf, "col5", "col7");
+
+        result = table.getSliceFrom(row, "Standard1:col4", true, 2);
+        cf = result.getColumnFamily("Standard1");
+        assertColumns(cf, "col4", "col5", "col7");
+
+        result = table.getSliceFrom(row, "Standard1:col5", false, 2);
+        cf = result.getColumnFamily("Standard1");
+        assertColumns(cf, "col3", "col4", "col5");
+
+        result = table.getSliceFrom(row, "Standard1:col6", false, 2);
+        cf = result.getColumnFamily("Standard1");
+        assertColumns(cf, "col3", "col4", "col5");
+
+        result = table.getSliceFrom(row, "Standard1:col95", true, 2);
+        cf = result.getColumnFamily("Standard1");
+        assertColumns(cf);
+
+        result = table.getSliceFrom(row, "Standard1:col0", false, 2);
+        cf = result.getColumnFamily("Standard1");
+        assertColumns(cf);
+    }
 }