You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2010/07/08 17:17:21 UTC

svn commit: r961795 [4/4] - in /cassandra/trunk: interface/ interface/thrift/gen-java/org/apache/cassandra/thrift/ src/java/org/apache/cassandra/cli/ src/java/org/apache/cassandra/config/ src/java/org/apache/cassandra/db/ src/java/org/apache/cassandra/...

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=961795&r1=961794&r2=961795&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Thu Jul  8 15:17:20 2010
@@ -22,7 +22,6 @@ import java.io.File;
 import java.io.FilenameFilter;
 import java.io.IOError;
 import java.io.IOException;
-import java.lang.management.ManagementFactory;
 import java.util.*;
 import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -30,34 +29,40 @@ import java.util.concurrent.atomic.Atomi
 import java.util.concurrent.locks.Condition;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
-import javax.management.MBeanServer;
-import javax.management.ObjectName;
 
-import com.google.common.base.Predicate;
 import com.google.common.collect.Iterables;
-import com.google.common.collect.Iterators;
 import org.apache.commons.collections.IteratorUtils;
+import org.apache.commons.lang.ArrayUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
 import org.apache.cassandra.concurrent.NamedThreadFactory;
 import org.apache.cassandra.concurrent.StageManager;
 import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.IClock.ClockRelationship;
+import org.apache.cassandra.db.clock.TimestampReconciler;
 import org.apache.cassandra.db.commitlog.CommitLog;
 import org.apache.cassandra.db.commitlog.CommitLogSegment;
 import org.apache.cassandra.db.filter.*;
 import org.apache.cassandra.db.marshal.AbstractType;
-import org.apache.cassandra.dht.AbstractBounds;
-import org.apache.cassandra.dht.Bounds;
-import org.apache.cassandra.dht.Range;
-import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.io.sstable.*;
+import org.apache.cassandra.db.marshal.BytesType;
+import org.apache.cassandra.db.marshal.LocalByPartionerType;
+import org.apache.cassandra.dht.*;
+import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.SSTable;
+import org.apache.cassandra.io.sstable.SSTableReader;
+import org.apache.cassandra.io.sstable.SSTableTracker;
 import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.utils.*;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.cassandra.thrift.IndexClause;
+import org.apache.cassandra.thrift.IndexExpression;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.LatencyTracker;
+import org.apache.cassandra.utils.SimpleCondition;
+import org.apache.cassandra.utils.WrappedRunnable;
 
 public class ColumnFamilyStore implements ColumnFamilyStoreMBean
 {
@@ -106,6 +111,7 @@ public class ColumnFamilyStore implement
 
     private final String table_;
     public final String columnFamily_;
+    private final IPartitioner partitioner_;
 
     private volatile int memtableSwitchCount = 0;
 
@@ -115,6 +121,8 @@ public class ColumnFamilyStore implement
     /* active memtable associated with this ColumnFamilyStore. */
     private Memtable memtable_;
 
+    private final Map<byte[], ColumnFamilyStore> indexedColumns_;
+
     // TODO binarymemtable ops are not threadsafe (do they need to be?)
     private AtomicReference<BinaryMemtable> binaryMemtable_;
 
@@ -128,13 +136,17 @@ public class ColumnFamilyStore implement
     private long maxRowCompactedSize = 0L;
     private long rowsCompactedTotalSize = 0L;
     private long rowsCompactedCount = 0L;
-    
-    ColumnFamilyStore(String table, String columnFamilyName, int indexValue)
+    final CFMetaData metadata;
+
+    ColumnFamilyStore(String table, String columnFamilyName, IPartitioner partitioner, int generation, CFMetaData metadata)
     {
+        assert metadata != null : "null metadata for " + table + ":" + columnFamilyName;
         table_ = table;
         columnFamily_ = columnFamilyName;
-        fileIndexGenerator_.set(indexValue);
-        memtable_ = new Memtable(this);
+        this.metadata = metadata;
+        this.partitioner_ = partitioner;
+        fileIndexGenerator_.set(generation);
+        memtable_ = new Memtable(this, partitioner_);
         binaryMemtable_ = new AtomicReference<BinaryMemtable>(new BinaryMemtable(this));
 
         if (logger_.isDebugEnabled())
@@ -197,7 +209,7 @@ public class ColumnFamilyStore implement
             SSTableReader sstable;
             try
             {
-                sstable = SSTableReader.open(filename);
+                sstable = SSTableReader.open(filename, partitioner_);
             }
             catch (IOException ex)
             {
@@ -208,6 +220,39 @@ public class ColumnFamilyStore implement
         }
         ssTables_ = new SSTableTracker(table, columnFamilyName);
         ssTables_.add(sstables);
+
+        indexedColumns_ = new TreeMap<byte[], ColumnFamilyStore>(BytesType.instance);
+        for (Map.Entry<byte[], ColumnDefinition> entry : metadata.column_metadata.entrySet())
+        {
+            byte[] column = entry.getKey();
+            ColumnDefinition info = entry.getValue();
+            if (info.index_type == null)
+                continue;
+
+            String indexedCfName = columnFamily_ + "." + (info.index_name == null ? FBUtilities.bytesToHex(column) : info.index_name);
+            IPartitioner rowPartitioner = StorageService.getPartitioner();
+            AbstractType columnComparator = (rowPartitioner instanceof OrderPreservingPartitioner || rowPartitioner instanceof ByteOrderedPartitioner)
+                                            ? BytesType.instance
+                                            : new LocalByPartionerType(StorageService.getPartitioner());
+            CFMetaData indexedCfMetadata = new CFMetaData(table,
+                                                          indexedCfName,
+                                                          ColumnFamilyType.Standard,
+                                                          ClockType.Timestamp,
+                                                          columnComparator,
+                                                          null,
+                                                          new TimestampReconciler(),
+                                                          "",
+                                                          0,
+                                                          false,
+                                                          0,
+                                                          0,
+                                                          Collections.<byte[], ColumnDefinition>emptyMap());
+            ColumnFamilyStore indexedCfs = ColumnFamilyStore.createColumnFamilyStore(table, 
+                                                                                     indexedCfName,
+                                                                                     new LocalPartitioner(metadata.column_metadata.get(column).validator),
+                                                                                     indexedCfMetadata);
+            indexedColumns_.put(column, indexedCfs);
+        }
     }
 
     public void addToCompactedRowStats(long rowsize)
@@ -240,6 +285,11 @@ public class ColumnFamilyStore implement
 
     public static ColumnFamilyStore createColumnFamilyStore(String table, String columnFamily)
     {
+        return createColumnFamilyStore(table, columnFamily, StorageService.getPartitioner(), DatabaseDescriptor.getCFMetaData(table, columnFamily));
+    }
+
+    public static ColumnFamilyStore createColumnFamilyStore(String table, String columnFamily, IPartitioner partitioner, CFMetaData metadata)
+    {
         /*
          * Get all data files associated with old Memtables for this table.
          * These files are named as follows <Table>-1.db, ..., <Table>-n.db. Get
@@ -269,22 +319,7 @@ public class ColumnFamilyStore implement
         Collections.sort(generations);
         int value = (generations.size() > 0) ? (generations.get(generations.size() - 1)) : 0;
 
-        ColumnFamilyStore cfs = new ColumnFamilyStore(table, columnFamily, value);
-
-        MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
-        try
-        {
-            ObjectName mbeanName = new ObjectName("org.apache.cassandra.db:type=ColumnFamilyStores,keyspace=" + table + ",columnfamily=" + columnFamily);
-            if (mbs.isRegistered(mbeanName))
-                mbs.unregisterMBean(mbeanName);
-            mbs.registerMBean(cfs, mbeanName);
-        }
-        catch (Exception e)
-        {
-            throw new RuntimeException(e);
-        }
-
-        return cfs;
+        return new ColumnFamilyStore(table, columnFamily, partitioner, value, metadata);
     }
 
     private Set<File> files()
@@ -367,7 +402,7 @@ public class ColumnFamilyStore implement
             final CommitLogSegment.CommitLogContext ctx = writeCommitLog ? CommitLog.instance().getContext() : null;
             logger_.info(columnFamily_ + " has reached its threshold; switching in a fresh Memtable at " + ctx);
             final Condition condition = submitFlush(oldMemtable);
-            memtable_ = new Memtable(this);
+            memtable_ = new Memtable(this, partitioner_);
             // a second executor that makes sure the onMemtableFlushes get called in the right order,
             // while keeping the wait-for-flush (future.get) out of anything latency-sensitive.
             return commitLogUpdater_.submit(new WrappedRunnable()
@@ -379,9 +414,8 @@ public class ColumnFamilyStore implement
                     {
                         // if we're not writing to the commit log, we are replaying the log, so marking
                         // the log header with "you can discard anything written before the context" is not valid
-                        final Integer cfId = DatabaseDescriptor.getTableMetaData(table_).get(columnFamily_).cfId;
-                        logger_.debug("Discarding {}", cfId);
-                        CommitLog.instance().discardCompletedSegments(cfId, ctx);
+                        logger_.debug("Discarding {}", metadata.cfId);
+                        CommitLog.instance().discardCompletedSegments(metadata.cfId, ctx);
                     }
                 }
             });
@@ -800,7 +834,7 @@ public class ColumnFamilyStore implement
     {
         // we are querying top-level columns, do a merging fetch with indexes.
         List<IColumnIterator> iterators = new ArrayList<IColumnIterator>();
-        final ColumnFamily returnCF = ColumnFamily.create(table_, columnFamily_);
+        final ColumnFamily returnCF = ColumnFamily.create(metadata);
         try
         {
             IColumnIterator iter;
@@ -947,7 +981,7 @@ public class ColumnFamilyStore implement
         else
         {
             // wrapped range
-            Token min = StorageService.getPartitioner().getMinimumToken();
+            Token min = partitioner_.getMinimumToken();
             Range first = new Range(range.left, min);
             completed = getRangeRows(rows, super_column, first, keyMax, columnFilter);
             if (!completed && min.compareTo(range.right) < 0)
@@ -960,9 +994,42 @@ public class ColumnFamilyStore implement
         return rows;
     }
 
+    public List<Row> scan(IndexClause indexClause, IFilter dataFilter)
+    {
+        // TODO: use statistics to pick clause w/ highest selectivity
+        // TODO even later: allow merge join instead of just one index + loop
+        IndexExpression first = indexClause.expressions.get(0);
+        ColumnFamilyStore indexCFS = getIndexedColumnFamilyStore(first.column_name);
+        assert indexCFS != null;
+        DecoratedKey indexKey = indexCFS.partitioner_.decorateKey(first.value);
+        QueryFilter indexFilter = QueryFilter.getSliceFilter(indexKey,
+                                                             new QueryPath(indexCFS.getColumnFamilyName()),
+                                                             ArrayUtils.EMPTY_BYTE_ARRAY,
+                                                             ArrayUtils.EMPTY_BYTE_ARRAY,
+                                                             null,
+                                                             false,
+                                                             indexClause.count);
+
+        List<Row> rows = new ArrayList<Row>();
+        ColumnFamily indexRow = indexCFS.getColumnFamily(indexFilter);
+        if (indexRow == null)
+            return rows;
+
+        for (byte[] dataKey : indexRow.getColumnNames())
+        {
+            DecoratedKey dk = partitioner_.decorateKey(dataKey);
+            ColumnFamily data = getColumnFamily(new QueryFilter(dk, new QueryPath(columnFamily_), dataFilter));
+            rows.add(new Row(dk, data));
+        }
+
+        // TODO apply remaining expressions
+
+        return rows;
+    }
+
     public AbstractType getComparator()
     {
-        return DatabaseDescriptor.getComparator(table_, columnFamily_);
+        return metadata.comparator;
     }
 
     /**
@@ -1020,13 +1087,11 @@ public class ColumnFamilyStore implement
 
     public void loadRowCache()
     {
-        CFMetaData metadata = DatabaseDescriptor.getTableMetaData(table_).get(columnFamily_);
-        assert metadata != null;
         if (metadata.preloadRowCache)
         {
             logger_.debug(String.format("Loading cache for keyspace/columnfamily %s/%s", table_, columnFamily_));
             int ROWS = 4096;
-            Token min = StorageService.getPartitioner().getMinimumToken();
+            Token min = partitioner_.getMinimumToken();
             Token start = min;
             long i = 0;
             while (i < ssTables_.getRowCache().getCapacity())
@@ -1047,7 +1112,7 @@ public class ColumnFamilyStore implement
                 if (result.size() < ROWS)
                     break;
 
-                start = DatabaseDescriptor.getPartitioner().getToken(result.get(ROWS - 1).key.key);
+                start = partitioner_.getToken(result.get(ROWS - 1).key.key);
             }
             logger_.info(String.format("Loaded %s rows into the %s cache", i, columnFamily_));
         }
@@ -1228,4 +1293,33 @@ public class ColumnFamilyStore implement
             return 0d;
         return (double) falseCount / (trueCount + falseCount);
     }
+
+    public Set<byte[]> getIndexedColumns()
+    {
+        return indexedColumns_.keySet();
+    }
+
+    public ColumnFamilyStore getIndexedColumnFamilyStore(byte[] column)
+    {
+        return indexedColumns_.get(column);
+    }
+
+    public ColumnFamily newIndexedColumnFamily(byte[] column)
+    {
+        return ColumnFamily.create(indexedColumns_.get(column).metadata);
+    }
+
+    public DecoratedKey getIndexKeyFor(byte[] name, byte[] value)
+    {
+        return indexedColumns_.get(name).partitioner_.decorateKey(value);
+    }
+
+    @Override
+    public String toString()
+    {
+        return "ColumnFamilyStore(" +
+               "table='" + table_ + '\'' +
+               ", columnFamily='" + columnFamily_ + '\'' +
+               ')';
+    }
 }

Added: cassandra/trunk/src/java/org/apache/cassandra/db/IndexScanCommand.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/IndexScanCommand.java?rev=961795&view=auto
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/IndexScanCommand.java (added)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/IndexScanCommand.java Thu Jul  8 15:17:20 2010
@@ -0,0 +1,85 @@
+package org.apache.cassandra.db;
+
+import java.io.*;
+import java.util.Arrays;
+
+import org.apache.cassandra.concurrent.StageManager;
+import org.apache.cassandra.io.ICompactSerializer2;
+import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.thrift.IndexClause;
+import org.apache.cassandra.thrift.SlicePredicate;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.thrift.TDeserializer;
+import org.apache.thrift.TSerializer;
+import org.apache.thrift.protocol.TBinaryProtocol;
+
+public class IndexScanCommand
+{
+    private static final IndexScanCommandSerializer serializer = new IndexScanCommandSerializer();
+
+    public final String keyspace;
+    public final String column_family;
+    public final IndexClause index_clause;
+    public final SlicePredicate predicate;
+
+    public IndexScanCommand(String keyspace, String column_family, IndexClause index_clause, SlicePredicate predicate)
+    {
+
+        this.keyspace = keyspace;
+        this.column_family = column_family;
+        this.index_clause = index_clause;
+        this.predicate = predicate;
+    }
+
+    public Message getMessage()
+    {
+        DataOutputBuffer dob = new DataOutputBuffer();
+        try
+        {
+            serializer.serialize(this, dob);
+        }
+        catch (IOException e)
+        {
+            throw new IOError(e);
+        }
+        return new Message(FBUtilities.getLocalAddress(),
+                           StageManager.READ_STAGE,
+                           StorageService.Verb.INDEX_SCAN,
+                           Arrays.copyOf(dob.getData(), dob.getLength()));
+    }
+
+    public static IndexScanCommand read(Message message) throws IOException
+    {
+        byte[] bytes = message.getMessageBody();
+        ByteArrayInputStream bis = new ByteArrayInputStream(bytes);
+        return serializer.deserialize(new DataInputStream(bis));
+    }
+
+    private static class IndexScanCommandSerializer implements ICompactSerializer2<IndexScanCommand>
+    {
+        public void serialize(IndexScanCommand o, DataOutput out) throws IOException
+        {
+            out.writeUTF(o.keyspace);
+            out.writeUTF(o.column_family);
+            TSerializer ser = new TSerializer(new TBinaryProtocol.Factory());
+            FBUtilities.serialize(ser, o.index_clause, out);
+            FBUtilities.serialize(ser, o.predicate, out);
+        }
+
+        public IndexScanCommand deserialize(DataInput in) throws IOException
+        {
+            String keyspace = in.readUTF();
+            String columnFamily = in.readUTF();
+
+            TDeserializer dser = new TDeserializer(new TBinaryProtocol.Factory());
+            IndexClause indexClause = new IndexClause();
+            FBUtilities.deserialize(dser, indexClause, in);
+            SlicePredicate predicate = new SlicePredicate();
+            FBUtilities.deserialize(dser, predicate, in);
+
+            return new IndexScanCommand(keyspace, columnFamily, indexClause, predicate);
+        }
+    }
+}

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java?rev=961795&r1=961794&r2=961795&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java Thu Jul  8 15:17:20 2010
@@ -55,13 +55,14 @@ public class Memtable implements Compara
 
     private final long creationTime;
     private final ConcurrentNavigableMap<DecoratedKey, ColumnFamily> columnFamilies = new ConcurrentSkipListMap<DecoratedKey, ColumnFamily>();
-    private final IPartitioner partitioner = StorageService.getPartitioner();
+    private final IPartitioner partitioner;
     private final ColumnFamilyStore cfs;
 
-    public Memtable(ColumnFamilyStore cfs)
+    public Memtable(ColumnFamilyStore cfs, IPartitioner partitioner)
     {
 
         this.cfs = cfs;
+        this.partitioner = partitioner;
         creationTime = System.currentTimeMillis();
     }
 
@@ -147,7 +148,7 @@ public class Memtable implements Compara
     private SSTableReader writeSortedContents() throws IOException
     {
         logger.info("Writing " + this);
-        SSTableWriter writer = new SSTableWriter(cfs.getFlushPath(), columnFamilies.size(), StorageService.getPartitioner());
+        SSTableWriter writer = new SSTableWriter(cfs.getFlushPath(), columnFamilies.size(), partitioner);
 
         DataOutputBuffer buffer = new DataOutputBuffer();
         for (Map.Entry<DecoratedKey, ColumnFamily> entry : columnFamilies.entrySet())

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/Table.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java?rev=961795&r1=961794&r2=961795&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/Table.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/Table.java Thu Jul  8 15:17:20 2010
@@ -19,6 +19,7 @@
 package org.apache.cassandra.db;
 
 import java.io.IOError;
+import java.lang.management.ManagementFactory;
 import java.util.*;
 import java.io.IOException;
 import java.io.File;
@@ -28,17 +29,17 @@ import java.util.concurrent.Future;
 
 import com.google.common.base.Function;
 import com.google.common.collect.Iterables;
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.config.Config;
+
+import org.apache.cassandra.config.*;
 import org.apache.cassandra.db.commitlog.CommitLog;
-import org.apache.cassandra.db.commitlog.CommitLogSegment;
-import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.io.sstable.SSTableDeletingReference;
 import org.apache.cassandra.io.sstable.SSTableReader;
 import org.apache.cassandra.io.util.FileUtils;
 
-import java.net.InetAddress;
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+
+import org.apache.commons.lang.ArrayUtils;
 
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.db.filter.*;
@@ -48,7 +49,7 @@ import org.cliffc.high_scale_lib.NonBloc
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class Table 
+public class Table
 {
     public static final String SYSTEM_TABLE = "system";
 
@@ -84,6 +85,7 @@ public class Table 
     // cache application CFs since Range queries ask for them a _lot_
     private SortedSet<String> applicationColumnFamilies;
     private final TimerTask flushTask;
+    private final Object[] indexLocks;
     
     public static Table open(String table)
     {
@@ -215,6 +217,9 @@ public class Table 
     private Table(String table)
     {
         name = table;
+        indexLocks = new Object[DatabaseDescriptor.getConcurrentWriters() * 8];
+        for (int i = 0; i < indexLocks.length; i++)
+            indexLocks[i] = new Object();
         // create data directories.
         for (String dataDir : DatabaseDescriptor.getAllDataFileLocations())
         {
@@ -233,11 +238,25 @@ public class Table 
                 throw new IOError(ex);
             }
         }
-      
-        for (CFMetaData cfm : DatabaseDescriptor.getTableDefinition(table).cfMetaData().values())
+
+        MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+        for (CFMetaData cfm : new ArrayList<CFMetaData>(DatabaseDescriptor.getTableDefinition(table).cfMetaData().values()))
         {
-            columnFamilyStores.put(cfm.cfId, ColumnFamilyStore.createColumnFamilyStore(table, cfm.cfName));
-         }
+            ColumnFamilyStore cfs = ColumnFamilyStore.createColumnFamilyStore(table, cfm.cfName);
+            columnFamilyStores.put(cfm.cfId, cfs);
+            try
+            {
+                ObjectName mbeanName = new ObjectName("org.apache.cassandra.db:type=ColumnFamilyStores,keyspace=" + table + ",columnfamily=" + cfm.cfName);
+                if (mbs.isRegistered(mbeanName))
+                    mbs.unregisterMBean(mbeanName);
+                mbs.registerMBean(cfs, mbeanName);
+            }
+            catch (Exception e)
+            {
+                throw new RuntimeException(e);
+            }
+
+        }
 
         // check 10x as often as the lifetime, so we can exceed lifetime by 10% at most
         int checkMs = DatabaseDescriptor.getMemtableLifetimeMS() / 10;
@@ -312,28 +331,76 @@ public class Table 
         try
         {
             if (writeCommitLog)
-            {
                 CommitLog.instance().add(mutation, serializedMutation);
-            }
         
             DecoratedKey key = StorageService.getPartitioner().decorateKey(mutation.key());
             for (ColumnFamily columnFamily : mutation.getColumnFamilies())
             {
-                Memtable memtableToFlush;
                 ColumnFamilyStore cfs = columnFamilyStores.get(columnFamily.id());
                 if (cfs == null)
                 {
                     logger.error("Attempting to mutate non-existant column family " + columnFamily.id());
+                    continue;
+                }
+
+                ColumnFamily oldIndexedColumns;
+                SortedSet<byte[]> mutatedIndexedColumns = null;
+                for (byte[] column : cfs.getIndexedColumns())
+                {
+                    if (columnFamily.getColumnNames().contains(column))
+                    {
+                        if (mutatedIndexedColumns == null)
+                            mutatedIndexedColumns = new TreeSet<byte[]>(FBUtilities.byteArrayComparator);
+                        mutatedIndexedColumns.add(column);
+                    }
+                }
+
+                if (mutatedIndexedColumns == null)
+                {
+                    // just update the actual value, no extra synchronization
+                    applyCF(cfs, key, columnFamily, memtablesToFlush);
                 }
                 else
                 {
-                    if ((memtableToFlush=cfs.apply(key, columnFamily)) != null)
-                        memtablesToFlush.put(cfs, memtableToFlush);
-    
-                    ColumnFamily cachedRow = cfs.getRawCachedRow(key);
-                    if (cachedRow != null)
-                        cachedRow.addAll(columnFamily);
+                    synchronized (indexLocks[Arrays.hashCode(mutation.key()) % indexLocks.length])
+                    {
+                        // read old indexed values
+                        QueryFilter filter = QueryFilter.getNamesFilter(key, new QueryPath(cfs.getColumnFamilyName()), mutatedIndexedColumns);
+                        oldIndexedColumns = cfs.getColumnFamily(filter);
+
+                        // apply the mutation
+                        applyCF(cfs, key, columnFamily, memtablesToFlush);
+
+                        // add new index entries
+                        for (byte[] columnName : mutatedIndexedColumns)
+                        {
+                            IColumn column = columnFamily.getColumn(columnName);
+                            DecoratedKey valueKey = cfs.getIndexKeyFor(columnName, column.value());
+                            ColumnFamily cf = cfs.newIndexedColumnFamily(columnName);
+                            cf.addColumn(new Column(mutation.key(), ArrayUtils.EMPTY_BYTE_ARRAY, column.clock()));
+                            applyCF(cfs.getIndexedColumnFamilyStore(columnName), valueKey, cf, memtablesToFlush);
+                        }
+
+                        // remove the old index entries
+                        if (oldIndexedColumns != null)
+                        {
+                            int localDeletionTime = (int)(System.currentTimeMillis() / 1000);
+                            for (Map.Entry<byte[], IColumn> entry : oldIndexedColumns.getColumnsMap().entrySet())
+                            {
+                                byte[] columnName = entry.getKey();
+                                IColumn column = entry.getValue();
+                                DecoratedKey valueKey = cfs.getIndexKeyFor(columnName, column.value());
+                                ColumnFamily cf = cfs.newIndexedColumnFamily(columnName);
+                                cf.deleteColumn(mutation.key(), localDeletionTime, column.clock());
+                                applyCF(cfs, valueKey, cf, memtablesToFlush);
+                            }
+                        }
+                    }
                 }
+
+                ColumnFamily cachedRow = cfs.getRawCachedRow(key);
+                if (cachedRow != null)
+                    cachedRow.addAll(columnFamily);
             }
         }
         finally
@@ -346,6 +413,13 @@ public class Table 
             entry.getKey().maybeSwitchMemtable(entry.getValue(), writeCommitLog);
     }
 
+    private static void applyCF(ColumnFamilyStore cfs, DecoratedKey key, ColumnFamily columnFamily, HashMap<ColumnFamilyStore, Memtable> memtablesToFlush)
+    {
+        Memtable memtableToFlush = cfs.apply(key, columnFamily);
+        if (memtableToFlush != null)
+            memtablesToFlush.put(cfs, memtableToFlush);
+    }
+
     public List<Future<?>> flush() throws IOException
     {
         List<Future<?>> futures = new ArrayList<Future<?>>();

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/filter/QueryFilter.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/filter/QueryFilter.java?rev=961795&r1=961794&r2=961795&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/filter/QueryFilter.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/filter/QueryFilter.java Thu Jul  8 15:17:20 2010
@@ -31,9 +31,13 @@ import org.apache.cassandra.utils.Reduci
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.db.IClock.ClockRelationship;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class QueryFilter
 {
+    private static Logger logger = LoggerFactory.getLogger(QueryFilter.class);
+
     public final DecoratedKey key;
     public final QueryPath path;
     private final IFilter filter;

Added: cassandra/trunk/src/java/org/apache/cassandra/db/marshal/LocalByPartionerType.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/marshal/LocalByPartionerType.java?rev=961795&view=auto
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/marshal/LocalByPartionerType.java (added)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/marshal/LocalByPartionerType.java Thu Jul  8 15:17:20 2010
@@ -0,0 +1,26 @@
+package org.apache.cassandra.db.marshal;
+
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.Token;
+
+/** for sorting columns representing row keys in the row ordering as determined by a partitioner.
+ * Not intended for user-defined CFs, and will in fact error out if used with such. */
+public class LocalByPartionerType<T extends Token> extends AbstractType
+{
+    private final IPartitioner<T> partitioner;
+
+    public LocalByPartionerType(IPartitioner<T> partitioner)
+    {
+        this.partitioner = partitioner;
+    }
+
+    public String getString(byte[] bytes)
+    {
+        return null;
+    }
+
+    public int compare(byte[] o1, byte[] o2)
+    {
+        return partitioner.decorateKey(o1).compareTo(partitioner.decorateKey(o2));
+    }
+}

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/migration/AddColumnFamily.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/migration/AddColumnFamily.java?rev=961795&r1=961794&r2=961795&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/migration/AddColumnFamily.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/migration/AddColumnFamily.java Thu Jul  8 15:17:20 2010
@@ -92,10 +92,11 @@ public class AddColumnFamily extends Mig
         {
             throw new IOException(ex);
         }
+        Table.open(cfm.tableName); // make sure it's init-ed w/ the old definitions first, since we're going to call initCf on the new one manually
+        DatabaseDescriptor.setTableDefinition(ksm, newVersion);
         if (!clientMode)
             Table.open(ksm.name).initCf(cfm.cfId, cfm.cfName);
-        DatabaseDescriptor.setTableDefinition(ksm, newVersion);
-        
+
         if (!clientMode)
             // force creation of a new commit log segment.
             CommitLog.instance().forceNewSegment();

Added: cassandra/trunk/src/java/org/apache/cassandra/dht/LocalPartitioner.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/dht/LocalPartitioner.java?rev=961795&view=auto
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/dht/LocalPartitioner.java (added)
+++ cassandra/trunk/src/java/org/apache/cassandra/dht/LocalPartitioner.java Thu Jul  8 15:17:20 2010
@@ -0,0 +1,61 @@
+package org.apache.cassandra.dht;
+
+import org.apache.commons.lang.ArrayUtils;
+
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.marshal.AbstractType;
+
+public class LocalPartitioner implements IPartitioner<LocalToken>
+{
+    private final AbstractType comparator;
+
+    public LocalPartitioner(AbstractType comparator)
+    {
+        this.comparator = comparator;
+    }
+
+    public DecoratedKey<LocalToken> convertFromDiskFormat(byte[] key)
+    {
+        return decorateKey(key);
+    }
+
+    public byte[] convertToDiskFormat(DecoratedKey<LocalToken> key)
+    {
+        return key.token.token;
+    }
+
+    public DecoratedKey<LocalToken> decorateKey(byte[] key)
+    {
+        return new DecoratedKey<LocalToken>(getToken(key), key);
+    }
+
+    public LocalToken midpoint(LocalToken left, LocalToken right)
+    {
+        throw new UnsupportedOperationException();
+    }
+
+    public LocalToken getMinimumToken()
+    {
+        return new LocalToken(comparator, ArrayUtils.EMPTY_BYTE_ARRAY);
+    }
+
+    public LocalToken getToken(byte[] key)
+    {
+        return new LocalToken(comparator, key);
+    }
+
+    public LocalToken getRandomToken()
+    {
+        throw new UnsupportedOperationException();
+    }
+
+    public Token.TokenFactory getTokenFactory()
+    {
+        throw new UnsupportedOperationException();
+    }
+
+    public boolean preservesOrder()
+    {
+        return true;
+    }
+}

Added: cassandra/trunk/src/java/org/apache/cassandra/dht/LocalToken.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/dht/LocalToken.java?rev=961795&view=auto
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/dht/LocalToken.java (added)
+++ cassandra/trunk/src/java/org/apache/cassandra/dht/LocalToken.java Thu Jul  8 15:17:20 2010
@@ -0,0 +1,66 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*    http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+package org.apache.cassandra.dht;
+
+import java.util.Arrays;
+
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.utils.FBUtilities;
+
+public class LocalToken extends Token<byte[]>
+{
+    private final AbstractType comparator;
+
+    public LocalToken(AbstractType comparator, byte... token)
+    {
+        super(token);
+        this.comparator = comparator;
+    }
+
+    @Override
+    public String toString()
+    {
+        return comparator.getString(token);
+    }
+
+    @Override
+    public int compareTo(Token<byte[]> o)
+    {
+        return comparator.compare(token, o.token);
+    }
+
+    @Override
+    public int hashCode()
+    {
+        final int prime = 31;
+        return prime + Arrays.hashCode(token);
+    }
+
+    @Override
+    public boolean equals(Object obj)
+    {
+        if (this == obj)
+            return true;
+        if (!(obj instanceof LocalToken))
+            return false;
+        LocalToken other = (LocalToken) obj;
+        return Arrays.equals(token, other.token);
+    }
+
+}

Copied: cassandra/trunk/src/java/org/apache/cassandra/service/IndexScanVerbHandler.java (from r961780, cassandra/trunk/src/java/org/apache/cassandra/service/RangeSliceVerbHandler.java)
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/IndexScanVerbHandler.java?p2=cassandra/trunk/src/java/org/apache/cassandra/service/IndexScanVerbHandler.java&p1=cassandra/trunk/src/java/org/apache/cassandra/service/RangeSliceVerbHandler.java&r1=961780&r2=961795&rev=961795&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/RangeSliceVerbHandler.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/IndexScanVerbHandler.java Thu Jul  8 15:17:20 2010
@@ -18,10 +18,7 @@
 
 package org.apache.cassandra.service;
 
-import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.db.RangeSliceCommand;
-import org.apache.cassandra.db.RangeSliceReply;
-import org.apache.cassandra.db.Table;
+import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.filter.QueryFilter;
 import org.apache.cassandra.net.IVerbHandler;
 import org.apache.cassandra.net.Message;
@@ -29,21 +26,17 @@ import org.apache.cassandra.net.Messagin
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class RangeSliceVerbHandler implements IVerbHandler
+public class IndexScanVerbHandler implements IVerbHandler
 {
-
-    private static final Logger logger = LoggerFactory.getLogger(RangeSliceVerbHandler.class);
+    private static final Logger logger = LoggerFactory.getLogger(IndexScanVerbHandler.class);
 
     public void doVerb(Message message)
     {
         try
         {
-            RangeSliceCommand command = RangeSliceCommand.read(message);
+            IndexScanCommand command = IndexScanCommand.read(message);
             ColumnFamilyStore cfs = Table.open(command.keyspace).getColumnFamilyStore(command.column_family);
-            RangeSliceReply reply = new RangeSliceReply(cfs.getRangeSlice(command.super_column,
-                                                                          command.range,
-                                                                          command.max_keys,
-                                                                          QueryFilter.getFilter(command.predicate, cfs.getComparator())));
+            RangeSliceReply reply = new RangeSliceReply(cfs.scan(command.index_clause, QueryFilter.getFilter(command.predicate, cfs.getComparator())));
             Message response = reply.getReply(message);
             if (logger.isDebugEnabled())
                 logger.debug("Sending " + reply+ " to " + message.getMessageId() + "@" + message.getFrom());

Modified: cassandra/trunk/src/java/org/apache/cassandra/service/RangeSliceVerbHandler.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/RangeSliceVerbHandler.java?rev=961795&r1=961794&r2=961795&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/RangeSliceVerbHandler.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/RangeSliceVerbHandler.java Thu Jul  8 15:17:20 2010
@@ -32,7 +32,7 @@ import org.slf4j.LoggerFactory;
 public class RangeSliceVerbHandler implements IVerbHandler
 {
 
-    private static final Logger logger = LoggerFactory.getLogger(RangeSliceVerbHandler.class);
+    private static final Logger logger = LoggerFactory.getLogger(IndexScanVerbHandler.class);
 
     public void doVerb(Message message)
     {

Modified: cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java?rev=961795&r1=961794&r2=961795&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java Thu Jul  8 15:17:20 2010
@@ -28,19 +28,19 @@ import java.util.concurrent.*;
 import javax.management.MBeanServer;
 import javax.management.ObjectName;
 
+import com.google.common.collect.AbstractIterator;
+import com.google.common.collect.Multimap;
 import org.apache.commons.lang.ArrayUtils;
 import org.apache.commons.lang.StringUtils;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.collect.AbstractIterator;
-import com.google.common.collect.Multimap;
 import org.apache.cassandra.concurrent.StageManager;
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.dht.AbstractBounds;
+import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.gms.Gossiper;
@@ -54,7 +54,6 @@ import org.apache.cassandra.thrift.Consi
 import org.apache.cassandra.thrift.UnavailableException;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.LatencyTracker;
-import org.apache.cassandra.utils.Pair;
 import org.apache.cassandra.utils.WrappedRunnable;
 
 public class StorageProxy implements StorageProxyMBean
@@ -757,6 +756,28 @@ public class StorageProxy implements Sto
         return writeStats.getRecentLatencyMicros();
     }
 
+    public static List<Row> scan(IndexScanCommand command, ConsistencyLevel consistency_level)
+    throws IOException, TimeoutException
+    {
+        IPartitioner p = StorageService.getPartitioner();
+        Token startToken = command.index_clause.start_key == null ? p.getMinimumToken() : p.getToken(command.index_clause.start_key);
+        List<InetAddress> endpoints = StorageService.instance.getLiveNaturalEndpoints(command.keyspace, startToken);
+        // TODO iterate through endpoints in token order like getRangeSlice
+        Message message = command.getMessage();
+        RangeSliceResponseResolver resolver = new RangeSliceResponseResolver(command.keyspace, endpoints);
+        AbstractReplicationStrategy rs = StorageService.instance.getReplicationStrategy(command.keyspace);
+        QuorumResponseHandler<List<Row>> handler = rs.getQuorumResponseHandler(resolver, consistency_level, command.keyspace);
+        MessagingService.instance.sendRR(message, endpoints.get(0), handler);
+        try
+        {
+            return handler.get();
+        }
+        catch (DigestMismatchException e)
+        {
+            throw new RuntimeException(e);
+        }
+    }
+
     static class weakReadLocalCallable implements Callable<Object>
     {
         private ReadCommand command;

Modified: cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java?rev=961795&r1=961794&r2=961795&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java Thu Jul  8 15:17:20 2010
@@ -115,7 +115,7 @@ public class StorageService implements I
         DEFINITIONS_UPDATE_RESPONSE,
         TRUNCATE,
         SCHEMA_CHECK,
-        ;
+        INDEX_SCAN;
         // remember to add new verbs at the end, since we serialize by ordinal
     }
     public static final Verb[] VERBS = Verb.values();
@@ -223,6 +223,7 @@ public class StorageService implements I
         MessagingService.instance.registerVerbHandlers(Verb.READ_REPAIR, new ReadRepairVerbHandler());
         MessagingService.instance.registerVerbHandlers(Verb.READ, new ReadVerbHandler());
         MessagingService.instance.registerVerbHandlers(Verb.RANGE_SLICE, new RangeSliceVerbHandler());
+        MessagingService.instance.registerVerbHandlers(Verb.INDEX_SCAN, new IndexScanVerbHandler());
         // see BootStrapper for a summary of how the bootstrap verbs interact
         MessagingService.instance.registerVerbHandlers(Verb.BOOTSTRAP_TOKEN, new BootStrapper.BootstrapTokenVerbHandler());
         MessagingService.instance.registerVerbHandlers(Verb.STREAM_REQUEST, new StreamRequestVerbHandler() );

Modified: cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java?rev=961795&r1=961794&r2=961795&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java Thu Jul  8 15:17:20 2010
@@ -490,6 +490,12 @@ public class CassandraServer implements 
         String keyspace = keySpace.get();
         checkKeyspaceAndLoginAuthorized(AccessLevel.READONLY);
 
+        return getRangeSlicesInternal(keyspace, column_parent, range, predicate, consistency_level);
+    }
+
+    private List<KeySlice> getRangeSlicesInternal(String keyspace, ColumnParent column_parent, KeyRange range, SlicePredicate predicate, ConsistencyLevel consistency_level)
+    throws InvalidRequestException, UnavailableException, TimedOutException
+    {
         ThriftValidation.validateColumnParent(keyspace, column_parent);
         ThriftValidation.validatePredicate(keyspace, column_parent, predicate);
         ThriftValidation.validateKeyRange(range);
@@ -522,6 +528,11 @@ public class CassandraServer implements 
             throw new RuntimeException(e);
         }
 
+        return thriftifyKeySlices(rows, column_parent, predicate);
+    }
+
+    private List<KeySlice> thriftifyKeySlices(List<Row> rows, ColumnParent column_parent, SlicePredicate predicate)
+    {
         List<KeySlice> keySlices = new ArrayList<KeySlice>(rows.size());
         boolean reversed = predicate.slice_range != null && predicate.slice_range.reversed;
         for (Row row : rows)
@@ -533,6 +544,71 @@ public class CassandraServer implements 
         return keySlices;
     }
 
+    public List<KeySlice> scan(ColumnParent column_parent, RowPredicate row_predicate, SlicePredicate column_predicate, ConsistencyLevel consistency_level) throws InvalidRequestException, UnavailableException, TimedOutException, TException
+    {
+        if (logger.isDebugEnabled())
+            logger.debug("scan");
+
+        checkKeyspaceAndLoginAuthorized(AccessLevel.READONLY);
+
+        if (row_predicate.keys != null)
+        {
+            Map<byte[], List<ColumnOrSuperColumn>> rowMap = multigetSliceInternal(keySpace.get(), row_predicate.keys, column_parent, column_predicate, consistency_level);
+            List<KeySlice> rows = new ArrayList<KeySlice>(rowMap.size());
+            for (Map.Entry<byte[], List<ColumnOrSuperColumn>> entry : rowMap.entrySet())
+            {
+                rows.add(new KeySlice(entry.getKey(), entry.getValue()));
+            }
+            return rows;
+        }
+
+        if (row_predicate.key_range != null)
+        {
+            return getRangeSlicesInternal(keySpace.get(), column_parent, row_predicate.key_range, column_predicate, consistency_level);
+        }
+
+        if (row_predicate.index_clause != null)
+        {
+            return scanIndexInternal(keySpace.get(), column_parent, row_predicate.index_clause, column_predicate, consistency_level);
+        }
+
+        throw new InvalidRequestException("row predicate must specify keys, key_range, or index_clause");
+    }
+
+    private List<KeySlice> scanIndexInternal(String keyspace, ColumnParent column_parent, IndexClause index_clause, SlicePredicate predicate, ConsistencyLevel consistency_level)
+    throws InvalidRequestException, TimedOutException
+    {
+        ThriftValidation.validateColumnParent(keyspace, column_parent);
+        ThriftValidation.validatePredicate(keyspace, column_parent, predicate);
+        ThriftValidation.validateIndexClauses(keyspace, column_parent.column_family, index_clause);
+
+        List<Row> rows = null;
+        try
+        {
+            rows = StorageProxy.scan(new IndexScanCommand(keyspace, column_parent.column_family, index_clause, predicate), consistency_level);
+        }
+        catch (IOException e)
+        {
+            throw new RuntimeException(e);
+        }
+        catch (TimeoutException e)
+        {
+            throw new TimedOutException();
+        }
+        return thriftifyKeySlices(rows, column_parent, predicate);
+    }
+
+    public List<KeyCount> scan_count(ColumnParent column_parent, RowPredicate row_predicate, SlicePredicate column_predicate, ConsistencyLevel consistency_level) throws InvalidRequestException, UnavailableException, TimedOutException, TException
+    {
+        List<KeySlice> rows = scan(column_parent, row_predicate, column_predicate, consistency_level);
+        List<KeyCount> rowCounts = new ArrayList<KeyCount>(rows.size());
+        for (KeySlice slice : rows)
+        {
+            rowCounts.add(new KeyCount(slice.key, slice.columns.size()));
+        }
+        return rowCounts;
+    }
+
     public Set<String> describe_keyspaces() throws TException
     {
         return DatabaseDescriptor.getTables();

Modified: cassandra/trunk/src/java/org/apache/cassandra/thrift/ThriftValidation.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/thrift/ThriftValidation.java?rev=961795&r1=961794&r2=961795&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/thrift/ThriftValidation.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/thrift/ThriftValidation.java Thu Jul  8 15:17:20 2010
@@ -22,6 +22,7 @@ package org.apache.cassandra.thrift;
 
 import java.util.Arrays;
 import java.util.Comparator;
+import java.util.Set;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -370,4 +371,17 @@ public class ThriftValidation
             throw new InvalidRequestException("maxRows must be positive");
         }
     }
+
+    public static void validateIndexClauses(String keyspace, String columnFamily, IndexClause index_clause)
+    throws InvalidRequestException
+    {
+        if (index_clause.expressions.isEmpty())
+            throw new InvalidRequestException("index clause list may not be empty");
+        Set<byte[]> indexedColumns = Table.open(keyspace).getColumnFamilyStore(columnFamily).getIndexedColumns();
+        for (IndexExpression expression : index_clause.expressions)
+        {
+            if (!indexedColumns.contains(expression.column_name))
+                throw new InvalidRequestException("Unable to scan unindexed column");
+        }
+    }
 }

Modified: cassandra/trunk/src/java/org/apache/cassandra/utils/FBUtilities.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/utils/FBUtilities.java?rev=961795&r1=961794&r2=961795&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/utils/FBUtilities.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/utils/FBUtilities.java Thu Jul  8 15:17:20 2010
@@ -511,4 +511,11 @@ public class FBUtilities
 
         return scpurl.getFile();
     }
+
+    public static long timestampMicros()
+    {
+        // we use microsecond resolution for compatibility with other client libraries, even though
+        // we can't actually get microsecond precision.
+        return System.currentTimeMillis() * 1000;
+    }
 }

Modified: cassandra/trunk/test/system/__init__.py
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/system/__init__.py?rev=961795&r1=961794&r2=961795&view=diff
==============================================================================
--- cassandra/trunk/test/system/__init__.py (original)
+++ cassandra/trunk/test/system/__init__.py Thu Jul  8 15:17:20 2010
@@ -151,7 +151,8 @@ class ThriftTester(BaseTester):
             Cassandra.CfDef('Keyspace1', 'Super1', column_type='Super', subcomparator_type='LongType', row_cache_size=1000, key_cache_size=0), 
             Cassandra.CfDef('Keyspace1', 'Super2', column_type='Super', subcomparator_type='LongType'), 
             Cassandra.CfDef('Keyspace1', 'Super3', column_type='Super', subcomparator_type='LongType'), 
-            Cassandra.CfDef('Keyspace1', 'Super4', column_type='Super', subcomparator_type='UTF8Type')
+            Cassandra.CfDef('Keyspace1', 'Super4', column_type='Super', subcomparator_type='UTF8Type'),
+            Cassandra.CfDef('Keyspace1', 'Indexed1', column_metadata=[Cassandra.ColumnDef('birthdate', 'LongType', Cassandra.IndexType.KEYS, 'birthdate')]),
         ])
 
         keyspace2 = Cassandra.KsDef('Keyspace2', 'org.apache.cassandra.locator.RackUnawareStrategy', 1,
@@ -174,10 +175,8 @@ class ThriftTester(BaseTester):
             Cassandra.CfDef('Keyspace4', 'Super3', column_type='Super', subcomparator_type='BytesType'),
             Cassandra.CfDef('Keyspace4', 'Super4', column_type='Super', subcomparator_type='TimeUUIDType')
         ])
-        self.client.system_add_keyspace(keyspace1)
-        self.client.system_add_keyspace(keyspace2)
-        self.client.system_add_keyspace(keyspace3)
-        self.client.system_add_keyspace(keyspace4)
+        for ks in [keyspace1, keyspace2, keyspace3, keyspace4]:
+            self.client.system_add_keyspace(ks)
 
 class AvroTester(BaseTester):
     client = None

Modified: cassandra/trunk/test/system/test_thrift_server.py
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/system/test_thrift_server.py?rev=961795&r1=961794&r2=961795&view=diff
==============================================================================
--- cassandra/trunk/test/system/test_thrift_server.py (original)
+++ cassandra/trunk/test/system/test_thrift_server.py Thu Jul  8 15:17:20 2010
@@ -1054,7 +1054,7 @@ class TestMutations(ThriftTester):
         kspaces = client.describe_keyspaces()
         assert len(kspaces) == 5, kspaces # ['system', 'Keyspace2', 'Keyspace3', 'Keyspace1', 'Keyspace4']
         ks1 = client.describe_keyspace("Keyspace1")
-        assert set(ks1.keys()) == set(['Super1', 'Standard1', 'Standard2', 'StandardLong1', 'StandardLong2', 'Super3', 'Super2', 'Super4'])
+        assert set(ks1.keys()) == set(['Super1', 'Standard1', 'Standard2', 'StandardLong1', 'StandardLong2', 'Super3', 'Super2', 'Super4', 'Indexed1'])
         sysks = client.describe_keyspace("system")
 
     def test_describe(self):
@@ -1205,7 +1205,26 @@ class TestMutations(ThriftTester):
         def req():
             client.describe_ring('system')
         _expect_exception(req, InvalidRequestException)
+
+    def test_index_scan(self):
+        _set_keyspace('Keyspace1')
+        client.insert('key1', ColumnParent('Indexed1'), Column('birthdate', _i64(1), Clock(0)), ConsistencyLevel.ONE)
+        client.insert('key2', ColumnParent('Indexed1'), Column('birthdate', _i64(2), Clock(0)), ConsistencyLevel.ONE)
+        client.insert('key3', ColumnParent('Indexed1'), Column('b', _i64(3), Clock(0)), ConsistencyLevel.ONE)
+
+        cp = ColumnParent('Indexed1')
+        expr = IndexExpression('birthdate', IndexOperator.EQ, _i64(1))
+        rp = RowPredicate(index_clause=IndexClause([expr]))
+        sp = SlicePredicate(slice_range=SliceRange('', ''))
+        result = client.scan(cp, rp, sp, ConsistencyLevel.ONE)
+        assert len(result) == 1, result
+        assert result[0].key == 'key1'
+        assert len(result[0].columns) == 1, result[0].columns
+
+        expr.column_name = 'b'
+        _expect_exception(lambda: client.scan(cp, rp, sp, ConsistencyLevel.ONE), InvalidRequestException)
         
+
 class TestTruncate(ThriftTester):
     def test_truncate(self):
         _set_keyspace('Keyspace1')

Modified: cassandra/trunk/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java?rev=961795&r1=961794&r2=961795&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java Thu Jul  8 15:17:20 2010
@@ -19,6 +19,7 @@
 package org.apache.cassandra.db;
 
 import java.io.IOException;
+import java.io.UnsupportedEncodingException;
 import java.util.*;
 import java.util.concurrent.ExecutionException;
 
@@ -31,6 +32,10 @@ import org.apache.cassandra.CleanupHelpe
 import org.apache.cassandra.Util;
 import org.apache.cassandra.db.filter.*;
 import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.thrift.IndexClause;
+import org.apache.cassandra.thrift.IndexExpression;
+import org.apache.cassandra.thrift.IndexOperator;
+import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.WrappedRunnable;
 
 import java.net.InetAddress;
@@ -159,6 +164,21 @@ public class ColumnFamilyStoreTest exten
         assert Arrays.equals(result.get(0).key.key, "key2".getBytes());
     }
 
+    @Test
+    public void testIndexScan() throws IOException
+    {
+        RowMutation rm;
+        rm = new RowMutation("Keyspace1", "k".getBytes());
+        rm.add(new QueryPath("Indexed1", null, "birthdate".getBytes("UTF8")), FBUtilities.toByteArray(1L), new TimestampClock(0));
+        rm.apply();
+
+        IndexExpression expr = new IndexExpression("birthdate".getBytes("UTF8"), IndexOperator.EQ, FBUtilities.toByteArray(1L));
+        IndexClause clause = new IndexClause(Arrays.asList(expr), 100);
+        IFilter filter = new IdentityQueryFilter();
+        List<Row> rows = Table.open("Keyspace1").getColumnFamilyStore("Indexed1").scan(clause, filter);
+        assert rows != null && rows.size() > 0;
+    }
+
     private ColumnFamilyStore insertKey1Key2() throws IOException, ExecutionException, InterruptedException
     {
         List<RowMutation> rms = new LinkedList<RowMutation>();

Modified: cassandra/trunk/test/unit/org/apache/cassandra/db/DefsTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/db/DefsTest.java?rev=961795&r1=961794&r2=961795&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/db/DefsTest.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/db/DefsTest.java Thu Jul  8 15:17:20 2010
@@ -195,17 +195,18 @@ public class DefsTest extends CleanupHel
         
         // any write should fail.
         rm = new RowMutation(ks.name, dk.key);
+        boolean success = true;
         try
         {
             rm.add(new QueryPath("Standard1", null, "col0".getBytes()), "value0".getBytes(), new TimestampClock(1L));
             rm.apply();
-            assert false : "This mutation should have failed since the CF no longer exists.";
         }
         catch (Throwable th)
         {
-            assert th instanceof IllegalArgumentException;
+            success = false;
         }
-        
+        assert !success : "This mutation should have failed since the CF no longer exists.";
+
         // verify that the files are gone.
         assert DefsTable.getFiles(cfm.tableName, cfm.cfName).size() == 0;
     }    
@@ -308,17 +309,18 @@ public class DefsTest extends CleanupHel
         
         // write should fail.
         rm = new RowMutation(ks.name, dk.key);
+        boolean success = true;
         try
         {
             rm.add(new QueryPath("Standard1", null, "col0".getBytes()), "value0".getBytes(), new TimestampClock(1L));
             rm.apply();
-            throw new AssertionError("This mutation should have failed since the CF no longer exists.");
         }
         catch (Throwable th)
         {
-            assert th instanceof IllegalArgumentException;
+            success = false;
         }
-        
+        assert !success : "This mutation should have failed since the CF no longer exists.";
+
         // reads should fail too.
         try
         {
@@ -375,16 +377,17 @@ public class DefsTest extends CleanupHel
         
         // write on old should fail.
         rm = new RowMutation(oldKs.name, "any key will do".getBytes());
+        boolean success = true;
         try
         {
             rm.add(new QueryPath(cfName, null, "col0".getBytes()), "value0".getBytes(), new TimestampClock(1L));
             rm.apply();
-            throw new AssertionError("This mutation should have failed since the CF/Table no longer exists.");
         }
         catch (Throwable th)
         {
-            assert th instanceof IllegalArgumentException;
+            success = false;
         }
+        assert !success : "This mutation should have failed since the CF/Table no longer exists.";
         
         // write on new should work.
         rm = new RowMutation(newKsName, dk.key);