You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2012/08/09 19:07:04 UTC

[1/2] git commit: Composite secondary indexes (for CQL3)

Updated Branches:
  refs/heads/trunk 8a290c1b7 -> 80ea03f5e


Composite secondary indexes (for CQL3)

patch by slebresne; reviewed by yukim for CASSANDRA-3680


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/80ea03f5
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/80ea03f5
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/80ea03f5

Branch: refs/heads/trunk
Commit: 80ea03f5e928fec184f33682038df528652e97ba
Parents: 8a290c1
Author: Sylvain Lebresne <sy...@datastax.com>
Authored: Wed Jun 20 19:46:09 2012 +0200
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Thu Aug 9 19:05:44 2012 +0200

----------------------------------------------------------------------
 CHANGES.txt                                        |    1 +
 interface/cassandra.thrift                         |    3 +-
 .../org/apache/cassandra/thrift/IndexType.java     |    5 +-
 .../org/apache/cassandra/config/CFMetaData.java    |    2 +-
 .../org/apache/cassandra/cql/QueryProcessor.java   |    6 +-
 .../org/apache/cassandra/cql3/CFDefinition.java    |    2 +-
 .../cql3/statements/CreateIndexStatement.java      |   20 +-
 .../cassandra/cql3/statements/SelectStatement.java |   15 +-
 .../org/apache/cassandra/db/ColumnFamilyStore.java |   47 ++-
 src/java/org/apache/cassandra/db/DeletionInfo.java |   38 ++
 .../org/apache/cassandra/db/MeteredFlusher.java    |    2 +-
 .../org/apache/cassandra/db/SliceQueryPager.java   |   88 ++++
 src/java/org/apache/cassandra/db/Table.java        |   91 ++++-
 .../cassandra/db/compaction/CompactionManager.java |    9 +-
 .../apache/cassandra/db/filter/ColumnSlice.java    |    5 +
 .../apache/cassandra/db/filter/ExtendedFilter.java |   46 ++-
 .../cassandra/db/filter/SliceQueryFilter.java      |    8 +
 .../AbstractSimplePerColumnSecondaryIndex.java     |  166 ++++++++
 .../apache/cassandra/db/index/SecondaryIndex.java  |  120 +++++--
 .../cassandra/db/index/SecondaryIndexBuilder.java  |   10 +-
 .../cassandra/db/index/SecondaryIndexManager.java  |  109 +++--
 .../db/index/composites/CompositesIndex.java       |  111 +++++
 .../db/index/composites/CompositesSearcher.java    |  318 +++++++++++++++
 .../apache/cassandra/db/index/keys/KeysIndex.java  |  133 +------
 .../cassandra/db/index/keys/KeysSearcher.java      |    2 +-
 .../apache/cassandra/db/marshal/CompositeType.java |   41 ++-
 .../apache/cassandra/io/sstable/SSTableReader.java |    3 +-
 .../cassandra/streaming/StreamInSession.java       |    2 +-
 .../apache/cassandra/thrift/ThriftValidation.java  |    5 +-
 .../unit/org/apache/cassandra/config/DefsTest.java |    4 +-
 test/unit/org/apache/cassandra/db/CleanupTest.java |    1 -
 31 files changed, 1119 insertions(+), 294 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/80ea03f5/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 3de82e2..f823937 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -34,6 +34,7 @@
  * (cql3) Allow definitions with only a PK (CASSANDRA-4361)
  * (cql3) Add support for row key composites (CASSANDRA-4179)
  * improve DynamicEndpointSnitch by using reservoir sampling (CASSANDRA-4038)
+ * (cql3) Add support for 2ndary indexes (CASSANDRA-3680)
 
 
 1.1.4

http://git-wip-us.apache.org/repos/asf/cassandra/blob/80ea03f5/interface/cassandra.thrift
----------------------------------------------------------------------
diff --git a/interface/cassandra.thrift b/interface/cassandra.thrift
index 1041661..5e933d7 100644
--- a/interface/cassandra.thrift
+++ b/interface/cassandra.thrift
@@ -393,7 +393,8 @@ struct AuthenticationRequest {
 
 enum IndexType {
     KEYS,
-    CUSTOM
+    CUSTOM,
+    COMPOSITES
 }
 
 /* describes a column in a column family. */

http://git-wip-us.apache.org/repos/asf/cassandra/blob/80ea03f5/interface/thrift/gen-java/org/apache/cassandra/thrift/IndexType.java
----------------------------------------------------------------------
diff --git a/interface/thrift/gen-java/org/apache/cassandra/thrift/IndexType.java b/interface/thrift/gen-java/org/apache/cassandra/thrift/IndexType.java
index a351bd7..283fa71 100644
--- a/interface/thrift/gen-java/org/apache/cassandra/thrift/IndexType.java
+++ b/interface/thrift/gen-java/org/apache/cassandra/thrift/IndexType.java
@@ -33,7 +33,8 @@ import org.apache.thrift.TEnum;
 
 public enum IndexType implements org.apache.thrift.TEnum {
   KEYS(0),
-  CUSTOM(1);
+  CUSTOM(1),
+  COMPOSITES(2);
 
   private final int value;
 
@@ -58,6 +59,8 @@ public enum IndexType implements org.apache.thrift.TEnum {
         return KEYS;
       case 1:
         return CUSTOM;
+      case 2:
+        return COMPOSITES;
       default:
         return null;
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/80ea03f5/src/java/org/apache/cassandra/config/CFMetaData.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/CFMetaData.java b/src/java/org/apache/cassandra/config/CFMetaData.java
index 203b7ec..b124463 100644
--- a/src/java/org/apache/cassandra/config/CFMetaData.java
+++ b/src/java/org/apache/cassandra/config/CFMetaData.java
@@ -918,7 +918,7 @@ public final class CFMetaData
         {
             if (column.getIndexType() != null && column.getIndexName() == null)
             {
-                String baseName = getDefaultIndexName(cfName, comparator, column.name);
+                String baseName = getDefaultIndexName(cfName, getColumnDefinitionComparator(column), column.name);
                 String indexName = baseName;
                 int i = 0;
                 while (existingNames.contains(indexName))

http://git-wip-us.apache.org/repos/asf/cassandra/blob/80ea03f5/src/java/org/apache/cassandra/cql/QueryProcessor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql/QueryProcessor.java b/src/java/org/apache/cassandra/cql/QueryProcessor.java
index 502979e..b23cf44 100644
--- a/src/java/org/apache/cassandra/cql/QueryProcessor.java
+++ b/src/java/org/apache/cassandra/cql/QueryProcessor.java
@@ -30,6 +30,7 @@ import org.apache.cassandra.db.CounterColumn;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.context.CounterContext;
 import org.apache.cassandra.db.filter.*;
+import org.apache.cassandra.db.index.SecondaryIndexManager;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.db.marshal.AsciiType;
 import org.apache.cassandra.db.marshal.MarshalException;
@@ -301,10 +302,11 @@ public class QueryProcessor
         if (select.getColumnRelations().size() > 0)
         {
             AbstractType<?> comparator = select.getComparator(keyspace);
-            Set<ByteBuffer> indexed = Table.open(keyspace).getColumnFamilyStore(select.getColumnFamily()).indexManager.getIndexedColumns();
+            SecondaryIndexManager idxManager = Table.open(keyspace).getColumnFamilyStore(select.getColumnFamily()).indexManager;
             for (Relation relation : select.getColumnRelations())
             {
-                if ((relation.operator() == RelationType.EQ) && indexed.contains(relation.getEntity().getByteBuffer(comparator, variables)))
+                ByteBuffer name = relation.getEntity().getByteBuffer(comparator, variables);
+                if ((relation.operator() == RelationType.EQ) && idxManager.indexes(name))
                     return;
             }
             throw new InvalidRequestException("No indexed columns present in by-columns clause with \"equals\" operator");

http://git-wip-us.apache.org/repos/asf/cassandra/blob/80ea03f5/src/java/org/apache/cassandra/cql3/CFDefinition.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/CFDefinition.java b/src/java/org/apache/cassandra/cql3/CFDefinition.java
index 56b68eb..51f7ceb7 100644
--- a/src/java/org/apache/cassandra/cql3/CFDefinition.java
+++ b/src/java/org/apache/cassandra/cql3/CFDefinition.java
@@ -345,7 +345,7 @@ public class CFDefinition implements Iterable<CFDefinition.Name>
 
         public ByteBuffer buildAsEndOfRange()
         {
-            throw new IllegalStateException();
+            return build();
         }
 
         public NonCompositeBuilder copy()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/80ea03f5/src/java/org/apache/cassandra/cql3/statements/CreateIndexStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/CreateIndexStatement.java b/src/java/org/apache/cassandra/cql3/statements/CreateIndexStatement.java
index 50eb8f9..6dc3a9f 100644
--- a/src/java/org/apache/cassandra/cql3/statements/CreateIndexStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/CreateIndexStatement.java
@@ -18,6 +18,8 @@
 package org.apache.cassandra.cql3.statements;
 
 import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -26,6 +28,8 @@ import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.config.ConfigurationException;
 import org.apache.cassandra.cql3.*;
+import org.apache.cassandra.db.index.composites.CompositesIndex;
+import org.apache.cassandra.db.marshal.CompositeType;
 import org.apache.cassandra.service.MigrationManager;
 import org.apache.cassandra.thrift.IndexType;
 import org.apache.cassandra.thrift.InvalidRequestException;
@@ -54,9 +58,6 @@ public class CreateIndexStatement extends SchemaAlteringStatement
         CFMetaData cfm = oldCfm.clone();
         CFDefinition cfDef = oldCfm.getCfDef();
 
-        if (cfDef.isComposite)
-            throw new InvalidRequestException("Secondary indexes are not (yet) supported on tables with composite PRIMARY KEY");
-
         for (ColumnDefinition cd : cfm.getColumn_metadata().values())
         {
             if (cd.name.equals(columnName.key))
@@ -65,7 +66,18 @@ public class CreateIndexStatement extends SchemaAlteringStatement
                     throw new InvalidRequestException("Index already exists");
                 if (logger.isDebugEnabled())
                     logger.debug("Updating column {} definition for index {}", columnName, indexName);
-                cd.setIndexType(IndexType.KEYS, Collections.<String, String>emptyMap());
+
+                if (cfDef.isComposite)
+                {
+                    CompositeType composite = (CompositeType)cfm.comparator;
+                    Map<String, String> opts = new HashMap<String, String>();
+                    opts.put(CompositesIndex.PREFIX_SIZE_OPTION, String.valueOf(composite.types.size() - 1));
+                    cd.setIndexType(IndexType.COMPOSITES, opts);
+                }
+                else
+                {
+                    cd.setIndexType(IndexType.KEYS, Collections.<String, String>emptyMap());
+                }
                 cd.setIndexName(indexName);
                 columnExists = true;
                 break;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/80ea03f5/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
index 1649a8c..95a1fee 100644
--- a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
@@ -30,10 +30,13 @@ import org.apache.cassandra.auth.Permission;
 import org.apache.cassandra.cql3.*;
 import org.apache.cassandra.transport.messages.ResultMessage;
 import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.config.ConfigurationException;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.context.CounterContext;
 import org.apache.cassandra.db.filter.*;
+import org.apache.cassandra.db.index.SecondaryIndex;
+import org.apache.cassandra.db.index.SecondaryIndexManager;
 import org.apache.cassandra.db.marshal.*;
 import org.apache.cassandra.dht.*;
 import org.apache.cassandra.service.ClientState;
@@ -1060,11 +1063,19 @@ public class SelectStatement implements CQLStatement
             {
                 stmt.isKeyRange = true;
                 boolean hasEq = false;
-                Set<ByteBuffer> indexed = Table.open(keyspace()).getColumnFamilyStore(columnFamily()).indexManager.getIndexedColumns();
+                SecondaryIndexManager idxManager = Table.open(keyspace()).getColumnFamilyStore(columnFamily()).indexManager;
+                Set<ByteBuffer> indexedNames = new HashSet<ByteBuffer>();
+                for (SecondaryIndex index : idxManager.getIndexes())
+                {
+                    for (ColumnDefinition cdef : index.getColumnDefs())
+                        indexedNames.add(cdef.name);
+                }
 
+                // Note: we cannot use idxManager.indexes() methods because we don't have a complete column name at this point, we only
+                // have the indexed component.
                 for (Map.Entry<CFDefinition.Name, Restriction> entry : stmt.metadataRestrictions.entrySet())
                 {
-                    if (entry.getValue().isEquality() && indexed.contains(entry.getKey().name.key))
+                    if (entry.getValue().isEquality() && indexedNames.contains(entry.getKey().name.key))
                     {
                         hasEq = true;
                         break;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/80ea03f5/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 03497ef..20b037e 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -499,7 +499,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         data.addSSTables(newSSTables);
         try
         {
-            indexManager.maybeBuildSecondaryIndexes(newSSTables, indexManager.getIndexedColumns());
+            indexManager.maybeBuildSecondaryIndexes(newSSTables, indexManager.allIndexesNames());
         }
         finally
         {
@@ -513,11 +513,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
     {
         ColumnFamilyStore cfs = Table.open(ksName).getColumnFamilyStore(cfName);
 
-        SortedSet<ByteBuffer> indexes = new TreeSet<ByteBuffer>(cfs.metadata.comparator);
-        if (idxNames.length == 0)
-            indexes.addAll(cfs.indexManager.getIndexedColumns());
-        for (String idxName : idxNames)
-            indexes.add(cfs.indexManager.getColumnByIdxName(idxName));
+        Set<String> indexes = new HashSet<String>(Arrays.asList(idxNames));
 
         Collection<SSTableReader> sstables = cfs.getSSTables();
         try
@@ -1313,7 +1309,13 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         return columns;
     }
 
-    public static abstract class AbstractScanIterator extends AbstractIterator<Row> implements CloseableIterator<Row> {}
+    public static abstract class AbstractScanIterator extends AbstractIterator<Row> implements CloseableIterator<Row>
+    {
+        public boolean needsFiltering()
+        {
+            return true;
+        }
+    }
 
     /**
       * Iterate over a range of rows and columns from memtables/sstables.
@@ -1415,23 +1417,28 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
                 Row rawRow = rowIterator.next();
                 ColumnFamily data = rawRow.cf;
 
-                // roughtly
-                IFilter extraFilter = filter.getExtraFilter(data);
-                if (extraFilter != null)
+                if (rowIterator.needsFiltering())
                 {
-                    QueryPath path = new QueryPath(columnFamily);
-                    ColumnFamily cf = filter.cfs.getColumnFamily(new QueryFilter(rawRow.key, path, extraFilter));
-                    if (cf != null)
-                        data.addAll(cf, HeapAllocator.instance);
-                }
+                    // roughtly
+                    IFilter extraFilter = filter.getExtraFilter(data);
+                    if (extraFilter != null)
+                    {
+                        QueryPath path = new QueryPath(columnFamily);
+                        ColumnFamily cf = filter.cfs.getColumnFamily(new QueryFilter(rawRow.key, path, extraFilter));
+                        if (cf != null)
+                            data.addAll(cf, HeapAllocator.instance);
+                    }
 
-                if (!filter.isSatisfiedBy(data))
-                    continue;
+                    if (!filter.isSatisfiedBy(data, null))
+                        continue;
+
+                    logger.debug("{} satisfies all filter expressions", data);
+                    // cut the resultset back to what was requested, if necessary
+                    data = filter.prune(data);
+                }
 
-                logger.debug("{} satisfies all filter expressions", data);
-                // cut the resultset back to what was requested, if necessary
-                data = filter.prune(data);
                 rows.add(new Row(rawRow.key, data));
+
                 if (data != null)
                     columnsCount += filter.lastCounted(data);
                 // Update the underlying filter to avoid querying more columns per slice than necessary and to handle paging

http://git-wip-us.apache.org/repos/asf/cassandra/blob/80ea03f5/src/java/org/apache/cassandra/db/DeletionInfo.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DeletionInfo.java b/src/java/org/apache/cassandra/db/DeletionInfo.java
index b63686f..480d50a 100644
--- a/src/java/org/apache/cassandra/db/DeletionInfo.java
+++ b/src/java/org/apache/cassandra/db/DeletionInfo.java
@@ -27,6 +27,7 @@ import java.util.*;
 import com.google.common.base.Objects;
 import com.google.common.collect.Iterables;
 
+import org.apache.cassandra.db.filter.ColumnSlice;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.io.ISerializer;
 import org.apache.cassandra.io.ISSTableSerializer;
@@ -115,6 +116,43 @@ public class DeletionInfo
     }
 
     /**
+     * Return the slice range covered by this deletion info or null is nothing is deleted.
+     */
+    public ColumnSlice[] coveredSlices()
+    {
+        if (isLive())
+            return null;
+
+        if (!topLevel.equals(DeletionTime.LIVE))
+            return ColumnSlice.ALL_COLUMNS_ARRAY;
+
+        List<ColumnSlice> slices = new ArrayList<ColumnSlice>();
+        ColumnSlice current = null;
+        for (RangeTombstone tombstone : ranges)
+        {
+            if (current == null)
+            {
+                current = new ColumnSlice(tombstone.min, tombstone.max);
+            }
+            else if (ranges.comparator().compare(current.finish, tombstone.min) < 0)
+            {
+                // If next if strictly after current, we've finish current slice
+                slices.add(current);
+                current = new ColumnSlice(tombstone.min, tombstone.max);
+            }
+            else if (ranges.comparator().compare(current.finish, tombstone.max) < 0)
+            {
+                // if tombstone end if after current end, extend current
+                current = new ColumnSlice(current.start, tombstone.max);
+            }
+            // otherwise, tombstone is fully included in current already, skip it
+        }
+        if (current != null)
+            slices.add(current);
+        return slices.isEmpty() ? null : slices.toArray(new ColumnSlice[slices.size()]);
+    }
+
+    /**
      * Return a new DeletionInfo correspond to purging every tombstones that
      * are older than {@code gcbefore}.
      *

http://git-wip-us.apache.org/repos/asf/cassandra/blob/80ea03f5/src/java/org/apache/cassandra/db/MeteredFlusher.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/MeteredFlusher.java b/src/java/org/apache/cassandra/db/MeteredFlusher.java
index f8d249d..7984944 100644
--- a/src/java/org/apache/cassandra/db/MeteredFlusher.java
+++ b/src/java/org/apache/cassandra/db/MeteredFlusher.java
@@ -52,7 +52,7 @@ class MeteredFlusher implements Runnable
                                                             + 1 // potentially a flushed memtable being counted by jamm
                                                             + DatabaseDescriptor.getFlushWriters()
                                                             + DatabaseDescriptor.getFlushQueueSize())
-                                                  / (1 + cfs.indexManager.getIndexedColumns().size()));
+                                                  / (1 + cfs.indexManager.getIndexesBackedByCfs().size()));
                 if (size > (DatabaseDescriptor.getTotalMemtableSpaceInMB() * 1048576L - flushingBytes) / maxInFlight)
                 {
                     logger.info("flushing high-traffic column family {} (estimated {} bytes)", cfs, size);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/80ea03f5/src/java/org/apache/cassandra/db/SliceQueryPager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SliceQueryPager.java b/src/java/org/apache/cassandra/db/SliceQueryPager.java
new file mode 100644
index 0000000..405c7c4
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/SliceQueryPager.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db;
+
+import java.util.Arrays;
+import java.util.Iterator;
+
+import org.apache.cassandra.db.filter.*;
+
+public class SliceQueryPager implements Iterator<ColumnFamily>
+{
+    public static final int DEFAULT_PAGE_SIZE = 10000;
+
+    public final ColumnFamilyStore cfs;
+    public final DecoratedKey key;
+
+    private ColumnSlice[] slices;
+    private ColumnFamily current;
+    private boolean exhausted;
+
+    public SliceQueryPager(ColumnFamilyStore cfs, DecoratedKey key, ColumnSlice[] slices)
+    {
+        this.cfs = cfs;
+        this.key = key;
+        this.slices = slices;
+    }
+
+    // This will *not* do a query
+    public boolean hasNext()
+    {
+        return !exhausted;
+    }
+
+    // This might return an empty column family (but never a null one)
+    public ColumnFamily next()
+    {
+        if (exhausted)
+            return null;
+
+        QueryPath path = new QueryPath(cfs.getColumnFamilyName());
+        QueryFilter filter = new QueryFilter(key, path, new SliceQueryFilter(slices, false, DEFAULT_PAGE_SIZE));
+        ColumnFamily cf = cfs.getColumnFamily(filter);
+        if (cf == null || cf.getLiveColumnCount() < DEFAULT_PAGE_SIZE)
+        {
+            exhausted = true;
+        }
+        else
+        {
+            Iterator<IColumn> iter = cf.getReverseSortedColumns().iterator();
+            IColumn lastColumn = iter.next();
+            while (lastColumn.isMarkedForDelete())
+                lastColumn = iter.next();
+
+            int i = 0;
+            for (; i < slices.length; ++i)
+            {
+                ColumnSlice current = slices[i];
+                if (cfs.getComparator().compare(lastColumn.name(), current.finish) <= 0)
+                    break;
+            }
+            if (i >= slices.length)
+                exhausted = true;
+            else
+                slices = Arrays.copyOfRange(slices, i, slices.length);
+        }
+        return cf == null ? ColumnFamily.create(cfs.metadata) : cf;
+    }
+
+    public void remove()
+    {
+        throw new UnsupportedOperationException();
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/80ea03f5/src/java/org/apache/cassandra/db/Table.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Table.java b/src/java/org/apache/cassandra/db/Table.java
index b6ddcad..0732534 100644
--- a/src/java/org/apache/cassandra/db/Table.java
+++ b/src/java/org/apache/cassandra/db/Table.java
@@ -32,8 +32,7 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.config.*;
 import org.apache.cassandra.db.commitlog.CommitLog;
-import org.apache.cassandra.db.filter.QueryFilter;
-import org.apache.cassandra.db.filter.QueryPath;
+import org.apache.cassandra.db.filter.*;
 import org.apache.cassandra.io.sstable.SSTableReader;
 import org.apache.cassandra.locator.AbstractReplicationStrategy;
 import org.apache.cassandra.service.StorageService;
@@ -356,24 +355,27 @@ public class Table
                     continue;
                 }
 
+                ColumnSlice[] deletionSlices = null;
                 SortedSet<ByteBuffer> mutatedIndexedColumns = null;
                 if (updateIndexes)
                 {
-                    for (ByteBuffer column : cfs.indexManager.getIndexedColumns())
+                    // If cf has some range deletion, we need to fetch those ranges to know if something indexed was updated
+                    // Note: we could "optimize" that for Keys index, because we know that the columnDef name is directly
+                    // the indexed column name.
+                    deletionSlices = cf.deletionInfo().coveredSlices();
+
+                    for (IColumn updated : cf)
                     {
-                        if (cf.getColumnNames().contains(column) || cf.isMarkedForDelete())
+                        if (cfs.indexManager.indexes(updated))
                         {
                             if (mutatedIndexedColumns == null)
                                 mutatedIndexedColumns = new TreeSet<ByteBuffer>(cf.getComparator());
-                            mutatedIndexedColumns.add(column);
+                            mutatedIndexedColumns.add(updated.name());
                             if (logger.isDebugEnabled())
                             {
-                                // can't actually use validator to print value here, because we overload value
-                                // for deletion timestamp as well (which may not be a well-formed value for the column type)
-                                ByteBuffer value = cf.getColumn(column) == null ? null : cf.getColumn(column).value(); // may be null on row-level deletion
-                                logger.debug(String.format("mutating indexed column %s value %s",
-                                                           cf.getComparator().getString(column),
-                                                           value == null ? "null" : ByteBufferUtil.bytesToHex(value)));
+                                logger.debug(String.format("Mutated indexed column %s value %s",
+                                                           cf.getComparator().getString(updated.name()),
+                                                           ByteBufferUtil.bytesToHex(updated.value())));
                             }
                         }
                     }
@@ -382,7 +384,7 @@ public class Table
                 // Sharding the lock is insufficient to avoid contention when there is a "hot" row, e.g., for
                 // hint writes when a node is down (keyed by target IP).  So it is worth special-casing the
                 // no-index case to avoid the synchronization.
-                if (mutatedIndexedColumns == null)
+                if (mutatedIndexedColumns == null && deletionSlices == null)
                 {
                     cfs.apply(key, cf);
                     continue;
@@ -390,11 +392,23 @@ public class Table
                 // else mutatedIndexedColumns != null
                 synchronized (indexLockFor(mutation.key()))
                 {
+                    if (mutatedIndexedColumns == null)
+                        mutatedIndexedColumns = new TreeSet<ByteBuffer>(cf.getComparator());
+
                     // with the raw data CF, we can just apply every update in any order and let
                     // read-time resolution throw out obsolete versions, thus avoiding read-before-write.
                     // but for indexed data we need to make sure that we're not creating index entries
                     // for obsolete writes.
-                    ColumnFamily oldIndexedColumns = readCurrentIndexedColumns(key, cfs, mutatedIndexedColumns);
+                    ColumnFamily oldIndexedColumns = readCurrentIndexedColumns(key, cfs, mutatedIndexedColumns, deletionSlices);
+
+                    // We might still have no mutated columns in case it is a deletion but the row had
+                    // no indexed columns
+                    if (mutatedIndexedColumns.isEmpty())
+                    {
+                        cfs.apply(key, cf);
+                        continue;
+                    }
+
                     logger.debug("Pre-mutation index row is {}", oldIndexedColumns);
                     ignoreObsoleteMutations(cf, mutatedIndexedColumns, oldIndexedColumns);
 
@@ -454,10 +468,33 @@ public class Table
         }
     }
 
-    private static ColumnFamily readCurrentIndexedColumns(DecoratedKey key, ColumnFamilyStore cfs, SortedSet<ByteBuffer> mutatedIndexedColumns)
+    private static ColumnFamily readCurrentIndexedColumns(DecoratedKey key, ColumnFamilyStore cfs, SortedSet<ByteBuffer> mutatedIndexedColumns, ColumnSlice[] deletionSlices)
     {
-        QueryFilter filter = QueryFilter.getNamesFilter(key, new QueryPath(cfs.getColumnFamilyName()), mutatedIndexedColumns);
-        return cfs.getColumnFamily(filter);
+        // Note: we could only query names not covered by the slices
+        QueryPath path = new QueryPath(cfs.getColumnFamilyName());
+        ColumnFamily cf = ColumnFamily.create(cfs.metadata);
+
+        if (mutatedIndexedColumns != null)
+            cf.resolve(cfs.getColumnFamily(QueryFilter.getNamesFilter(key, path, mutatedIndexedColumns)));
+
+        if (deletionSlices != null)
+        {
+            SliceQueryPager pager = new SliceQueryPager(cfs, key, deletionSlices);
+            while (pager.hasNext())
+            {
+                ColumnFamily cf2 = pager.next();
+                cf.delete(cf2);
+                for (IColumn column : cf2)
+                {
+                    if (cfs.indexManager.indexes(column))
+                    {
+                        cf.addColumn(column);
+                        mutatedIndexedColumns.add(column.name());
+                    }
+                }
+            }
+        }
+        return cf;
     }
 
     public AbstractReplicationStrategy getReplicationStrategy()
@@ -470,7 +507,7 @@ public class Table
      * @param cfs ColumnFamily to index row in
      * @param indexedColumns columns to index, in comparator order
      */
-    public static void indexRow(DecoratedKey key, ColumnFamilyStore cfs, SortedSet<ByteBuffer> indexedColumns)
+    public static void indexRow(DecoratedKey key, ColumnFamilyStore cfs, Set<String> idxNames)
     {
         if (logger.isDebugEnabled())
             logger.debug("Indexing row {} ", cfs.metadata.getKeyValidator().getString(key.key));
@@ -478,11 +515,23 @@ public class Table
         switchLock.readLock().lock();
         try
         {
-            synchronized (cfs.table.indexLockFor(key.key))
+            // Our index lock is per-row, but we don't want to hold writes for too long, so for large rows
+            // we release the lock between pages
+            SliceQueryPager pager = new SliceQueryPager(cfs, key, ColumnSlice.ALL_COLUMNS_ARRAY);
+
+            while (pager.hasNext())
             {
-                ColumnFamily cf = readCurrentIndexedColumns(key, cfs, indexedColumns);
-                if (cf != null)
-                    cfs.indexManager.applyIndexUpdates(key.key, cf, cf.getColumnNames(), null);
+                synchronized (cfs.table.indexLockFor(key.key))
+                {
+                    ColumnFamily cf = pager.next();
+                    ColumnFamily cf2 = cf.cloneMeShallow();
+                    for (IColumn column : cf)
+                    {
+                        if (cfs.indexManager.indexes(column.name(), idxNames))
+                            cf2.addColumn(column);
+                    }
+                    cfs.indexManager.applyIndexUpdates(key.key, cf2, cf2.getColumnNames(), null);
+                }
             }
         }
         finally

http://git-wip-us.apache.org/repos/asf/cassandra/blob/80ea03f5/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index b9bdfa7..b888ffd 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -526,12 +526,11 @@ public class CompactionManager implements CompactionManagerMBean
         }
 
         boolean isCommutative = cfs.metadata.getDefaultValidator().isCommutative();
-        Collection<ByteBuffer> indexedColumns = cfs.indexManager.getIndexedColumns();
+        boolean hasIndexes = !cfs.indexManager.getIndexes().isEmpty();
 
         for (SSTableReader sstable : sstables)
         {
-            if (indexedColumns.isEmpty()
-                && !new Bounds<Token>(sstable.first.token, sstable.last.token).intersects(ranges))
+            if (!hasIndexes && !new Bounds<Token>(sstable.first.token, sstable.last.token).intersects(ranges))
             {
                 cfs.replaceCompactedSSTables(Arrays.asList(sstable), Collections.<SSTableReader>emptyList(), OperationType.CLEANUP);
                 continue;
@@ -583,7 +582,7 @@ public class CompactionManager implements CompactionManagerMBean
                     {
                         cfs.invalidateCachedRow(row.getKey());
 
-                        if (!indexedColumns.isEmpty() || isCommutative)
+                        if (hasIndexes || isCommutative)
                         {
                             if (indexedColumnsInRow != null)
                                 indexedColumnsInRow.clear();
@@ -593,7 +592,7 @@ public class CompactionManager implements CompactionManagerMBean
                                 OnDiskAtom column = row.next();
                                 if (column instanceof CounterColumn)
                                     renewer.maybeRenew((CounterColumn) column);
-                                if (column instanceof IColumn && indexedColumns.contains(column.name()))
+                                if (column instanceof IColumn && cfs.indexManager.indexes((IColumn)column))
                                 {
                                     if (indexedColumnsInRow == null)
                                         indexedColumnsInRow = new ArrayList<IColumn>();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/80ea03f5/src/java/org/apache/cassandra/db/filter/ColumnSlice.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/ColumnSlice.java b/src/java/org/apache/cassandra/db/filter/ColumnSlice.java
index 2a295eb..cdcdc17 100644
--- a/src/java/org/apache/cassandra/db/filter/ColumnSlice.java
+++ b/src/java/org/apache/cassandra/db/filter/ColumnSlice.java
@@ -84,6 +84,11 @@ public class ColumnSlice
             throw new IllegalArgumentException("Slice finish must come after start in traversal order");
     }
 
+    public boolean includes(Comparator<ByteBuffer> cmp, ByteBuffer name)
+    {
+        return cmp.compare(start, name) <= 0 && (finish.equals(ByteBufferUtil.EMPTY_BYTE_BUFFER) || cmp.compare(finish, name) >= 0);
+    }
+
     @Override
     public final int hashCode()
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/80ea03f5/src/java/org/apache/cassandra/db/filter/ExtendedFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/ExtendedFilter.java b/src/java/org/apache/cassandra/db/filter/ExtendedFilter.java
index ec6f6ad..9e49237 100644
--- a/src/java/org/apache/cassandra/db/filter/ExtendedFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/ExtendedFilter.java
@@ -26,7 +26,9 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.cql3.ColumnNameBuilder;
 import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.marshal.CompositeType;
 import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
 import org.apache.cassandra.thrift.IndexExpression;
 import org.apache.cassandra.thrift.IndexOperator;
@@ -55,7 +57,9 @@ public abstract class ExtendedFilter
         {
             if (isPaging)
                 throw new IllegalArgumentException("Cross-row paging is not supported along with index clauses");
-            return new FilterWithClauses(cfs, filter, clause, maxResults, maxIsColumns);
+            return cfs.getComparator() instanceof CompositeType
+                 ? new FilterWithCompositeClauses(cfs, filter, clause, maxResults, maxIsColumns)
+                 : new FilterWithClauses(cfs, filter, clause, maxResults, maxIsColumns);
         }
     }
 
@@ -112,6 +116,11 @@ public abstract class ExtendedFilter
     /** The initial filter we'll do our first slice with (either the original or a superset of it) */
     public abstract IFilter initialFilter();
 
+    public IFilter originalFilter()
+    {
+        return originalFilter;
+    }
+
     public abstract List<IndexExpression> getClause();
 
     /**
@@ -130,7 +139,7 @@ public abstract class ExtendedFilter
      * @return true if the provided data satisfies all the expressions from
      * the clause of this filter.
      */
-    public abstract boolean isSatisfiedBy(ColumnFamily data);
+    public abstract boolean isSatisfiedBy(ColumnFamily data, ColumnNameBuilder builder);
 
     public static boolean satisfies(int comparison, IndexOperator op)
     {
@@ -165,7 +174,7 @@ public abstract class ExtendedFilter
         }
 
         /** Sets up the initial filter. */
-        private IFilter computeInitialFilter()
+        protected IFilter computeInitialFilter()
         {
             if (originalFilter instanceof SliceQueryFilter)
             {
@@ -264,14 +273,15 @@ public abstract class ExtendedFilter
             return pruned;
         }
 
-        public boolean isSatisfiedBy(ColumnFamily data)
+        public boolean isSatisfiedBy(ColumnFamily data, ColumnNameBuilder builder)
         {
             // We enforces even the primary clause because reads are not synchronized with writes and it is thus possible to have a race
             // where the index returned a row which doesn't have the primary column when we actually read it
             for (IndexExpression expression : clause)
             {
                 // check column data vs expression
-                IColumn column = data.getColumn(expression.column_name);
+                ByteBuffer colName = builder == null ? expression.column_name : builder.copy().add(expression.column_name).build();
+                IColumn column = data.getColumn(colName);
                 if (column == null)
                     return false;
                 int v = data.metadata().getValueValidator(expression.column_name).compare(column.value(), expression.value);
@@ -282,6 +292,30 @@ public abstract class ExtendedFilter
         }
     }
 
+    private static class FilterWithCompositeClauses extends FilterWithClauses
+    {
+        public FilterWithCompositeClauses(ColumnFamilyStore cfs, IFilter filter, List<IndexExpression> clause, int maxResults, boolean maxIsColumns)
+        {
+            super(cfs, filter, clause, maxResults, maxIsColumns);
+        }
+
+        /*
+         * For composites, the index name is not a valid column name (it's only
+         * one of the component), which means we should not do the
+         * NamesQueryFilter part of FilterWithClauses in particular.
+         * Besides, CompositesSearcher doesn't really use the initial filter
+         * expect to know the limit set by the user, so create a fake filter
+         * with only the count information.
+         */
+        protected IFilter computeInitialFilter()
+        {
+            int limit = originalFilter instanceof SliceQueryFilter
+                      ? ((SliceQueryFilter)originalFilter).count
+                      : Integer.MAX_VALUE;
+            return new SliceQueryFilter(ColumnSlice.ALL_COLUMNS_ARRAY, false, limit);
+        }
+    }
+
     private static class EmptyClauseFilter extends ExtendedFilter
     {
         public EmptyClauseFilter(ColumnFamilyStore cfs, IFilter filter, int maxResults, boolean maxIsColumns, boolean isPaging)
@@ -309,7 +343,7 @@ public abstract class ExtendedFilter
             return data;
         }
 
-        public boolean isSatisfiedBy(ColumnFamily data)
+        public boolean isSatisfiedBy(ColumnFamily data, ColumnNameBuilder builder)
         {
             return true;
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/80ea03f5/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java b/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java
index 0b78581..f868539 100644
--- a/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java
@@ -201,6 +201,14 @@ public class SliceQueryFilter implements IFilter
         count = newLimit;
     }
 
+    public boolean includes(Comparator<ByteBuffer> cmp, ByteBuffer name)
+    {
+        for (ColumnSlice slice : slices)
+            if (slice.includes(cmp, name))
+                return true;
+        return false;
+    }
+
     public static class Serializer implements IVersionedSerializer<SliceQueryFilter>
     {
         public void serialize(SliceQueryFilter f, DataOutput dos, int version) throws IOException

http://git-wip-us.apache.org/repos/asf/cassandra/blob/80ea03f5/src/java/org/apache/cassandra/db/index/AbstractSimplePerColumnSecondaryIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/AbstractSimplePerColumnSecondaryIndex.java b/src/java/org/apache/cassandra/db/index/AbstractSimplePerColumnSecondaryIndex.java
new file mode 100644
index 0000000..24f09ab
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/index/AbstractSimplePerColumnSecondaryIndex.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.index;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.config.ConfigurationException;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.index.PerColumnSecondaryIndex;
+import org.apache.cassandra.db.index.SecondaryIndexSearcher;
+import org.apache.cassandra.db.marshal.*;
+import org.apache.cassandra.dht.*;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+/**
+ * Implements a secondary index for a column family using a second column family
+ * in which the row keys are indexed values, and column names are base row keys.
+ */
+public abstract class AbstractSimplePerColumnSecondaryIndex extends PerColumnSecondaryIndex
+{
+    private ColumnFamilyStore indexCfs;
+
+    public void init()
+    {
+        assert baseCfs != null && columnDefs != null && columnDefs.size() == 1;
+
+        ColumnDefinition columnDef = columnDefs.iterator().next();
+        init(columnDef);
+
+        AbstractType indexComparator = SecondaryIndex.getIndexComparator(baseCfs.metadata, columnDef);
+        CFMetaData indexedCfMetadata = CFMetaData.newIndexMetadata(baseCfs.metadata, columnDef, indexComparator);
+        indexCfs = ColumnFamilyStore.createColumnFamilyStore(baseCfs.table,
+                                                             indexedCfMetadata.cfName,
+                                                             new LocalPartitioner(columnDef.getValidator()),
+                                                             indexedCfMetadata);
+
+        // enable and initialize row cache based on parent's setting and indexed column's cardinality
+        CFMetaData.Caching baseCaching = baseCfs.metadata.getCaching();
+        if (baseCaching == CFMetaData.Caching.ALL || baseCaching == CFMetaData.Caching.ROWS_ONLY)
+        {
+            /*
+             * # of index CF's key = cardinality of indexed column.
+             * if # of keys stored in index CF is more than average column counts (means tall table),
+             * then consider it as high cardinality.
+             */
+            double estimatedKeys = indexCfs.estimateKeys();
+            double averageColumnCount = indexCfs.getMeanColumns();
+            if (averageColumnCount > 0 && estimatedKeys / averageColumnCount > 1)
+            {
+                logger.debug("turning row cache on for " + indexCfs.getColumnFamilyName());
+                indexCfs.metadata.caching(baseCaching);
+                indexCfs.initRowCache();
+            }
+        }
+    }
+
+    protected abstract void init(ColumnDefinition columnDef);
+
+    protected abstract ByteBuffer makeIndexColumnName(ByteBuffer rowKey, IColumn column);
+
+    public void deleteColumn(DecoratedKey valueKey, ByteBuffer rowKey, IColumn column)
+    {
+        if (column.isMarkedForDelete())
+            return;
+
+        int localDeletionTime = (int) (System.currentTimeMillis() / 1000);
+        ColumnFamily cfi = ColumnFamily.create(indexCfs.metadata);
+        cfi.addTombstone(makeIndexColumnName(rowKey, column), localDeletionTime, column.timestamp());
+        indexCfs.apply(valueKey, cfi);
+        if (logger.isDebugEnabled())
+            logger.debug("removed index entry for cleaned-up value {}:{}", valueKey, cfi);
+    }
+
+    public void insertColumn(DecoratedKey valueKey, ByteBuffer rowKey, IColumn column)
+    {
+        ColumnFamily cfi = ColumnFamily.create(indexCfs.metadata);
+        ByteBuffer name = makeIndexColumnName(rowKey, column);
+        if (column instanceof ExpiringColumn)
+        {
+            ExpiringColumn ec = (ExpiringColumn)column;
+            cfi.addColumn(new ExpiringColumn(name, ByteBufferUtil.EMPTY_BYTE_BUFFER, ec.timestamp(), ec.getTimeToLive(), ec.getLocalDeletionTime()));
+        }
+        else
+        {
+            cfi.addColumn(new Column(name, ByteBufferUtil.EMPTY_BYTE_BUFFER, column.timestamp()));
+        }
+        if (logger.isDebugEnabled())
+            logger.debug("applying index row {} in {}", indexCfs.metadata.getKeyValidator().getString(valueKey.key), cfi);
+
+        indexCfs.apply(valueKey, cfi);
+    }
+
+    public void updateColumn(DecoratedKey valueKey, ByteBuffer rowKey, IColumn col)
+    {
+        insertColumn(valueKey, rowKey, col);
+    }
+
+    public void removeIndex(ByteBuffer columnName)
+    {
+        indexCfs.invalidate();
+    }
+
+    public void forceBlockingFlush()
+    {
+        try
+        {
+            indexCfs.forceBlockingFlush();
+        }
+        catch (ExecutionException e)
+        {
+            throw new RuntimeException(e);
+        }
+        catch (InterruptedException e)
+        {
+            throw new AssertionError(e);
+        }
+    }
+
+    public void invalidate()
+    {
+        indexCfs.invalidate();
+    }
+
+    public void truncate(long truncatedAt)
+    {
+        indexCfs.discardSSTables(truncatedAt);
+    }
+
+    public ColumnFamilyStore getIndexCfs()
+    {
+       return indexCfs;
+    }
+
+    public String getIndexName()
+    {
+        return indexCfs.columnFamily;
+    }
+
+    public long getLiveSize()
+    {
+        return indexCfs.getMemtableDataSize();
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/80ea03f5/src/java/org/apache/cassandra/db/index/SecondaryIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/SecondaryIndex.java b/src/java/org/apache/cassandra/db/index/SecondaryIndex.java
index 9141cf8..82b6697 100644
--- a/src/java/org/apache/cassandra/db/index/SecondaryIndex.java
+++ b/src/java/org/apache/cassandra/db/index/SecondaryIndex.java
@@ -26,15 +26,25 @@ import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.config.ConfigurationException;
 import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.IColumn;
 import org.apache.cassandra.db.SystemTable;
 import org.apache.cassandra.db.compaction.CompactionManager;
 import org.apache.cassandra.db.index.keys.KeysIndex;
+import org.apache.cassandra.db.index.composites.CompositesIndex;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.marshal.BytesType;
+import org.apache.cassandra.db.marshal.CompositeType;
+import org.apache.cassandra.db.marshal.LocalByPartionerType;
+import org.apache.cassandra.dht.*;
 import org.apache.cassandra.io.sstable.ReducingKeyIterator;
 import org.apache.cassandra.io.sstable.SSTableReader;
 import org.apache.cassandra.thrift.Column;
+import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.ByteBufferUtil;
 
 /**
@@ -44,8 +54,7 @@ import org.apache.cassandra.utils.ByteBufferUtil;
  */
 public abstract class SecondaryIndex
 {
-
-    private static final Logger logger = LoggerFactory.getLogger(SecondaryIndex.class);
+    protected static final Logger logger = LoggerFactory.getLogger(SecondaryIndex.class);
 
     public static final String CUSTOM_INDEX_OPTION_NAME = "class_name";
 
@@ -97,14 +106,16 @@ public abstract class SecondaryIndex
         return SystemTable.isIndexBuilt(baseCfs.table.name, getNameForSystemTable(columnName));
     }
 
-    public void setIndexBuilt(ByteBuffer columnName)
+    public void setIndexBuilt()
     {
-        SystemTable.setIndexBuilt(baseCfs.table.name, getNameForSystemTable(columnName));
+        for (ColumnDefinition columnDef : columnDefs)
+            SystemTable.setIndexBuilt(baseCfs.table.name, getNameForSystemTable(columnDef.name));
     }
 
-    public void setIndexRemoved(ByteBuffer columnName)
+    public void setIndexRemoved()
     {
-        SystemTable.setIndexRemoved(baseCfs.table.name, getNameForSystemTable(columnName));
+        for (ColumnDefinition columnDef : columnDefs)
+            SystemTable.setIndexRemoved(baseCfs.table.name, getNameForSystemTable(columnDef.name));
     }
 
     /**
@@ -159,14 +170,9 @@ public abstract class SecondaryIndex
         logger.info(String.format("Submitting index build of %s for data in %s",
                 getIndexName(), StringUtils.join(baseCfs.getSSTables(), ", ")));
 
-        SortedSet<ByteBuffer> columnNames = new TreeSet<ByteBuffer>();
-
-        for (ColumnDefinition cdef : columnDefs)
-            columnNames.add(cdef.name);
-
         Collection<SSTableReader> sstables = baseCfs.markCurrentSSTablesReferenced();
         SecondaryIndexBuilder builder = new SecondaryIndexBuilder(baseCfs,
-                                                                  columnNames,
+                                                                  Collections.singleton(getIndexName()),
                                                                   new ReducingKeyIterator(sstables));
         Future<?> future = CompactionManager.instance.submitIndexBuild(builder);
         try
@@ -174,16 +180,7 @@ public abstract class SecondaryIndex
             future.get();
             forceBlockingFlush();
 
-            // Mark all indexed columns as built
-            if (this instanceof PerRowSecondaryIndex)
-            {
-                for (ByteBuffer columnName : columnNames)
-                    SystemTable.setIndexBuilt(baseCfs.table.name, getIndexName()+ByteBufferUtil.string(columnName));
-            }
-            else
-            {
-                SystemTable.setIndexBuilt(baseCfs.table.name, getIndexName());
-            }
+            setIndexBuilt();
         }
         catch (InterruptedException e)
         {
@@ -193,10 +190,6 @@ public abstract class SecondaryIndex
         {
             throw new RuntimeException(e);
         }
-        catch (CharacterCodingException e)
-        {
-            throw new RuntimeException(e);
-        }
         finally
         {
             SSTableReader.releaseReferences(sstables);
@@ -264,7 +257,7 @@ public abstract class SecondaryIndex
         this.baseCfs = baseCfs;
     }
 
-    Set<ColumnDefinition> getColumnDefs()
+    public Set<ColumnDefinition> getColumnDefs()
     {
         return columnDefs;
     }
@@ -285,6 +278,34 @@ public abstract class SecondaryIndex
     }
 
     /**
+     * Returns the decoratedKey for a column value
+     * @param value column value
+     * @return decorated key
+     */
+    public DecoratedKey getIndexKeyFor(ByteBuffer value)
+    {
+        // FIXME: this imply one column definition per index
+        ByteBuffer name = columnDefs.iterator().next().name;
+        return new DecoratedKey(new LocalToken(baseCfs.metadata.getColumnDefinition(name).getValidator(), value), value);
+    }
+
+    /**
+     * Returns true if the provided column name is indexed by this secondary index.
+     *
+     * The default implement checks whether the name is one the columnDef name,
+     * but this should be overriden but subclass if needed.
+     */
+    public boolean indexes(ByteBuffer name)
+    {
+        for (ColumnDefinition columnDef : columnDefs)
+        {
+            if (baseCfs.getComparator().compare(columnDef.name, name) == 0)
+                return true;
+        }
+        return false;
+    }
+
+    /**
      * This is the primary way to create a secondary index instance for a CF column.
      * It will validate the index_options before initializing.
      *
@@ -303,6 +324,9 @@ public abstract class SecondaryIndex
         case KEYS:
             index = new KeysIndex();
             break;
+        case COMPOSITES:
+            index = new CompositesIndex();
+            break;
         case CUSTOM:
             assert cdef.getIndexOptions() != null;
             String class_name = cdef.getIndexOptions().get(CUSTOM_INDEX_OPTION_NAME);
@@ -326,6 +350,46 @@ public abstract class SecondaryIndex
 
         return index;
     }
-    
+
     public abstract boolean validate(Column column);
+
+    /**
+     * Returns the index comparator for index backed by CFS, or null.
+     *
+     * Note: it would be cleaner to have this be a member method. However we need this when opening indexes
+     * sstables, but by then the CFS won't be fully initiated, so the SecondaryIndex object won't be accessible.
+     */
+    public static AbstractType<?> getIndexComparator(CFMetaData baseMetadata, ColumnDefinition cdef)
+    {
+        IPartitioner rowPartitioner = StorageService.getPartitioner();
+        AbstractType<?> keyComparator = (rowPartitioner instanceof OrderPreservingPartitioner || rowPartitioner instanceof ByteOrderedPartitioner)
+                                      ? BytesType.instance
+                                      : new LocalByPartionerType(rowPartitioner);
+
+        switch (cdef.getIndexType())
+        {
+            case KEYS:
+                return keyComparator;
+            case COMPOSITES:
+                assert baseMetadata.comparator instanceof CompositeType;
+                int prefixSize;
+                try
+                {
+                    prefixSize = Integer.parseInt(cdef.getIndexOptions().get(CompositesIndex.PREFIX_SIZE_OPTION));
+                }
+                catch (NumberFormatException e)
+                {
+                    // This shouldn't happen if validation has been done correctly
+                    throw new RuntimeException(e);
+                }
+                List<AbstractType<?>> types = new ArrayList<AbstractType<?>>(prefixSize + 1);
+                types.add(keyComparator);
+                for (int i = 0; i < prefixSize; i++)
+                    types.add(((CompositeType)baseMetadata.comparator).types.get(i));
+                return CompositeType.getInstance(types);
+            case CUSTOM:
+                return null;
+        }
+        throw new AssertionError();
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/80ea03f5/src/java/org/apache/cassandra/db/index/SecondaryIndexBuilder.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/SecondaryIndexBuilder.java b/src/java/org/apache/cassandra/db/index/SecondaryIndexBuilder.java
index 59a5ec5..8c7aab8 100644
--- a/src/java/org/apache/cassandra/db/index/SecondaryIndexBuilder.java
+++ b/src/java/org/apache/cassandra/db/index/SecondaryIndexBuilder.java
@@ -19,7 +19,7 @@ package org.apache.cassandra.db.index;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.util.SortedSet;
+import java.util.Set;
 
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.DecoratedKey;
@@ -35,13 +35,13 @@ import org.apache.cassandra.io.sstable.ReducingKeyIterator;
 public class SecondaryIndexBuilder extends CompactionInfo.Holder
 {
     private final ColumnFamilyStore cfs;
-    private final SortedSet<ByteBuffer> columns;
+    private final Set<String> idxNames;
     private final ReducingKeyIterator iter;
 
-    public SecondaryIndexBuilder(ColumnFamilyStore cfs, SortedSet<ByteBuffer> columns, ReducingKeyIterator iter)
+    public SecondaryIndexBuilder(ColumnFamilyStore cfs, Set<String> idxNames, ReducingKeyIterator iter)
     {
         this.cfs = cfs;
-        this.columns = columns;
+        this.idxNames = idxNames;
         this.iter = iter;
     }
 
@@ -60,7 +60,7 @@ public class SecondaryIndexBuilder extends CompactionInfo.Holder
             if (isStopRequested())
                 throw new CompactionInterruptedException(getCompactionInfo());
             DecoratedKey key = iter.next();
-            Table.indexRow(key, cfs, columns);
+            Table.indexRow(key, cfs, idxNames);
         }
 
         try

http://git-wip-us.apache.org/repos/asf/cassandra/blob/80ea03f5/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java b/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java
index 554aa8d..1d62746 100644
--- a/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java
+++ b/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java
@@ -71,7 +71,7 @@ public class SecondaryIndexManager
 
     public SecondaryIndexManager(ColumnFamilyStore baseCfs)
     {
-        indexesByColumn = new ConcurrentSkipListMap<ByteBuffer, SecondaryIndex>(baseCfs.getComparator());
+        indexesByColumn = new ConcurrentSkipListMap<ByteBuffer, SecondaryIndex>();
         rowLevelIndexMap = new HashMap<Class<? extends SecondaryIndex>, SecondaryIndex>();
 
         this.baseCfs = baseCfs;
@@ -85,7 +85,7 @@ public class SecondaryIndexManager
         // figure out what needs to be added and dropped.
         // future: if/when we have modifiable settings for secondary indexes,
         // they'll need to be handled here.
-        Collection<ByteBuffer> indexedColumnNames = getIndexedColumns();
+        Collection<ByteBuffer> indexedColumnNames = indexesByColumn.keySet();
         for (ByteBuffer indexedColumn : indexedColumnNames)
         {
             ColumnDefinition def = baseCfs.metadata.getColumn_metadata().get(indexedColumn);
@@ -104,6 +104,13 @@ public class SecondaryIndexManager
         }
     }
 
+    public Set<String> allIndexesNames()
+    {
+        Set<String> names = new HashSet<String>();
+        for (SecondaryIndex index : indexesByColumn.values())
+            names.add(index.getIndexName());
+        return names;
+    }
 
     /**
      * Does a full, blocking rebuild of the indexes specified by columns from the sstables.
@@ -114,15 +121,15 @@ public class SecondaryIndexManager
      * @param sstables the data to build from
      * @param columns the list of columns to index, ordered by comparator
      */
-    public void maybeBuildSecondaryIndexes(Collection<SSTableReader> sstables, SortedSet<ByteBuffer> columns)
+    public void maybeBuildSecondaryIndexes(Collection<SSTableReader> sstables, Set<String> idxNames)
     {
-        if (columns.isEmpty())
+        if (idxNames.isEmpty())
             return;
 
         logger.info(String.format("Submitting index build of %s for data in %s",
-                                  baseCfs.metadata.comparator.getString(columns), StringUtils.join(sstables, ", ")));
+                                  idxNames, StringUtils.join(sstables, ", ")));
 
-        SecondaryIndexBuilder builder = new SecondaryIndexBuilder(baseCfs, columns, new ReducingKeyIterator(sstables));
+        SecondaryIndexBuilder builder = new SecondaryIndexBuilder(baseCfs, idxNames, new ReducingKeyIterator(sstables));
         Future<?> future = CompactionManager.instance.submitIndexBuild(builder);
         try
         {
@@ -139,15 +146,27 @@ public class SecondaryIndexManager
 
         flushIndexesBlocking();
 
-        logger.info("Index build of " + baseCfs.metadata.comparator.getString(columns) + " complete");
+        logger.info("Index build of " + idxNames + " complete");
     }
 
-    /**
-     * @return the list of indexed columns
-     */
-    public SortedSet<ByteBuffer> getIndexedColumns()
+    public boolean indexes(ByteBuffer name, Set<String> idxNames)
+    {
+        for (SecondaryIndex index : getIndexesByNames(idxNames))
+        {
+            if (index.indexes(name))
+                return true;
+        }
+        return false;
+    }
+
+    public boolean indexes(IColumn column)
     {
-        return indexesByColumn.keySet();
+        return indexes(column.name());
+    }
+
+    public boolean indexes(ByteBuffer name)
+    {
+        return indexes(name, allIndexesNames());
     }
 
     /**
@@ -212,7 +231,8 @@ public class SecondaryIndexManager
         try
         {
             index = SecondaryIndex.createInstance(baseCfs, cdef);
-        } catch (ConfigurationException e)
+        }
+        catch (ConfigurationException e)
         {
             throw new RuntimeException(e);
         }
@@ -265,6 +285,14 @@ public class SecondaryIndexManager
         return indexesByColumn.get(column);
     }
 
+    private SecondaryIndex getIndexForFullColumnName(ByteBuffer column)
+    {
+        for (SecondaryIndex index : indexesByColumn.values())
+            if (index.indexes(column))
+                return index;
+        return null;
+    }
+
     /**
      * Remove the index
      */
@@ -284,18 +312,6 @@ public class SecondaryIndexManager
     }
 
     /**
-     * Returns the decoratedKey for a column value
-     * @param name column name
-     * @param value column value
-     * @return decorated key
-     */
-    public DecoratedKey getIndexKeyFor(ByteBuffer name, ByteBuffer value)
-    {
-        return new DecoratedKey(new LocalToken(baseCfs.metadata.getColumnDefinition(name).getValidator(), value), value);
-    }
-
-
-    /**
      * @return all built indexes (ready to use)
      */
     public List<String> getBuiltIndexes()
@@ -424,19 +440,13 @@ public class SecondaryIndexManager
         // remove the old index entries
         if (oldIndexedColumns != null)
         {
-            for (ByteBuffer columnName : oldIndexedColumns.getColumnNames())
+            for (IColumn column : oldIndexedColumns)
             {
-
-                IColumn column = oldIndexedColumns.getColumn(columnName);
-
-                if (column == null)
-                    continue;
-
                 //this was previously deleted so should not be in index
                 if (column.isMarkedForDelete())
                     continue;
 
-                SecondaryIndex index = getIndexForColumn(columnName);
+                SecondaryIndex index = getIndexForFullColumnName(column.name());
                 if (index == null)
                 {
                     logger.debug("Looks like index got dropped mid-update.  Skipping");
@@ -454,8 +464,7 @@ public class SecondaryIndexManager
                 }
                 else
                 {
-                    DecoratedKey valueKey = getIndexKeyFor(columnName, column.value());
-
+                    DecoratedKey valueKey = index.getIndexKeyFor(column.value());
                     ((PerColumnSecondaryIndex)index).deleteColumn(valueKey, rowKey, column);
                 }
             }
@@ -468,7 +477,7 @@ public class SecondaryIndexManager
             if (column == null || column.isMarkedForDelete())
                 continue; // null column == row deletion
 
-            SecondaryIndex index = getIndexForColumn(columnName);
+            SecondaryIndex index = getIndexForFullColumnName(columnName);
             if (index == null)
             {
                 logger.debug("index on {} removed; skipping remove-old for {}", columnName, ByteBufferUtil.bytesToHex(rowKey));
@@ -486,7 +495,7 @@ public class SecondaryIndexManager
             }
             else
             {
-                DecoratedKey valueKey = getIndexKeyFor(columnName, column.value());
+                DecoratedKey valueKey = index.getIndexKeyFor(column.value());
 
                 ((PerColumnSecondaryIndex)index).insertColumn(valueKey, rowKey, column);
             }
@@ -523,7 +532,7 @@ public class SecondaryIndexManager
             }
             else
             {
-                DecoratedKey valueKey = getIndexKeyFor(column.name(), column.value());
+                DecoratedKey valueKey = index.getIndexKeyFor(column.value());
                 ((PerColumnSecondaryIndex) index).deleteColumn(valueKey, key.key, column);
             }
         }
@@ -539,7 +548,6 @@ public class SecondaryIndexManager
     {
         Map<String, Set<ByteBuffer>> groupByIndexType = new HashMap<String, Set<ByteBuffer>>();
 
-
         //Group columns by type
         for (IndexExpression ix : clause)
         {
@@ -594,16 +602,27 @@ public class SecondaryIndexManager
         return indexSearchers.get(0).search(clause, range, maxResults, dataFilter, maxIsColumns);
     }
 
-    public void setIndexBuilt(Collection<ByteBuffer> indexes)
+    private Collection<SecondaryIndex> getIndexesByNames(Set<String> idxNames)
+    {
+        List<SecondaryIndex> result = new ArrayList<SecondaryIndex>();
+        for (SecondaryIndex index : indexesByColumn.values())
+        {
+            if (idxNames.contains(index.getIndexName()))
+                result.add(index);
+        }
+        return result;
+    }
+
+    public void setIndexBuilt(Set<String> idxNames)
     {
-        for (ByteBuffer colName : indexes)
-            indexesByColumn.get(colName).setIndexBuilt(colName);
+        for (SecondaryIndex index : getIndexesByNames(idxNames))
+            index.setIndexBuilt();
     }
 
-    public void setIndexRemoved(Collection<ByteBuffer> indexes)
+    public void setIndexRemoved(Set<String> idxNames)
     {
-        for (ByteBuffer colName : indexes)
-            indexesByColumn.get(colName).setIndexRemoved(colName);
+        for (SecondaryIndex index : getIndexesByNames(idxNames))
+            index.setIndexRemoved();
     }
 
     public boolean validate(Column column)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/80ea03f5/src/java/org/apache/cassandra/db/index/composites/CompositesIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/composites/CompositesIndex.java b/src/java/org/apache/cassandra/db/index/composites/CompositesIndex.java
new file mode 100644
index 0000000..e53a3bf
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/index/composites/CompositesIndex.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.index.composites;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.config.ConfigurationException;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.index.AbstractSimplePerColumnSecondaryIndex;
+import org.apache.cassandra.db.index.SecondaryIndex;
+import org.apache.cassandra.db.index.SecondaryIndexSearcher;
+import org.apache.cassandra.db.marshal.*;
+import org.apache.cassandra.dht.*;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+/**
+ * Implements a secondary index for a column family using a second column family
+ * in which the row keys are indexed values, and column names are base row keys.
+ */
+public class CompositesIndex extends AbstractSimplePerColumnSecondaryIndex
+{
+    public static final String PREFIX_SIZE_OPTION = "prefix_size";
+
+    private CompositeType indexComparator;
+    private int prefixSize;
+
+    public void init(ColumnDefinition columnDef)
+    {
+        assert baseCfs.getComparator() instanceof CompositeType;
+        CompositeType baseComparator = (CompositeType) baseCfs.getComparator();
+        try
+        {
+            prefixSize = Integer.parseInt(columnDef.getIndexOptions().get(PREFIX_SIZE_OPTION));
+        }
+        catch (NumberFormatException e)
+        {
+            // Shouldn't happen since validateOptions must have been called
+            throw new AssertionError(e);
+        }
+
+        indexComparator = (CompositeType)SecondaryIndex.getIndexComparator(baseCfs.metadata, columnDef);
+    }
+
+    protected ByteBuffer makeIndexColumnName(ByteBuffer rowKey, IColumn column)
+    {
+        CompositeType baseComparator = (CompositeType)baseCfs.getComparator();
+        ByteBuffer[] components = baseComparator.split(column.name());
+        CompositeType.Builder builder = new CompositeType.Builder(indexComparator);
+        builder.add(rowKey);
+        for (int i = 0; i < Math.min(prefixSize, components.length); i++)
+            builder.add(components[i]);
+        return builder.build();
+    }
+
+    @Override
+    public boolean indexes(ByteBuffer name)
+    {
+        ColumnDefinition columnDef = columnDefs.iterator().next();
+        CompositeType baseComparator = (CompositeType)baseCfs.getComparator();
+        ByteBuffer[] components = baseComparator.split(name);
+        AbstractType<?> comp = baseCfs.metadata.getColumnDefinitionComparator(columnDef);
+        return components.length > columnDef.componentIndex
+            && comp.compare(components[columnDef.componentIndex], columnDef.name) == 0;
+    }
+
+    public SecondaryIndexSearcher createSecondaryIndexSearcher(Set<ByteBuffer> columns)
+    {
+        return new CompositesSearcher(baseCfs.indexManager, columns, prefixSize);
+    }
+
+    public void validateOptions() throws ConfigurationException
+    {
+        ColumnDefinition columnDef = columnDefs.iterator().next();
+        String option = columnDef.getIndexOptions().get(PREFIX_SIZE_OPTION);
+
+        if (option == null)
+            throw new ConfigurationException("Missing option " + PREFIX_SIZE_OPTION);
+
+        try
+        {
+            Integer.parseInt(option);
+        }
+        catch (NumberFormatException e)
+        {
+            throw new ConfigurationException(String.format("Invalid non integer value for option %s (got '%s')", PREFIX_SIZE_OPTION, option));
+        }
+    }
+}