You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2012/08/09 19:07:04 UTC
[1/2] git commit: Composite secondary indexes (for CQL3)
Updated Branches:
refs/heads/trunk 8a290c1b7 -> 80ea03f5e
Composite secondary indexes (for CQL3)
patch by slebresne; reviewed by yukim for CASSANDRA-3680
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/80ea03f5
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/80ea03f5
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/80ea03f5
Branch: refs/heads/trunk
Commit: 80ea03f5e928fec184f33682038df528652e97ba
Parents: 8a290c1
Author: Sylvain Lebresne <sy...@datastax.com>
Authored: Wed Jun 20 19:46:09 2012 +0200
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Thu Aug 9 19:05:44 2012 +0200
----------------------------------------------------------------------
CHANGES.txt | 1 +
interface/cassandra.thrift | 3 +-
.../org/apache/cassandra/thrift/IndexType.java | 5 +-
.../org/apache/cassandra/config/CFMetaData.java | 2 +-
.../org/apache/cassandra/cql/QueryProcessor.java | 6 +-
.../org/apache/cassandra/cql3/CFDefinition.java | 2 +-
.../cql3/statements/CreateIndexStatement.java | 20 +-
.../cassandra/cql3/statements/SelectStatement.java | 15 +-
.../org/apache/cassandra/db/ColumnFamilyStore.java | 47 ++-
src/java/org/apache/cassandra/db/DeletionInfo.java | 38 ++
.../org/apache/cassandra/db/MeteredFlusher.java | 2 +-
.../org/apache/cassandra/db/SliceQueryPager.java | 88 ++++
src/java/org/apache/cassandra/db/Table.java | 91 ++++-
.../cassandra/db/compaction/CompactionManager.java | 9 +-
.../apache/cassandra/db/filter/ColumnSlice.java | 5 +
.../apache/cassandra/db/filter/ExtendedFilter.java | 46 ++-
.../cassandra/db/filter/SliceQueryFilter.java | 8 +
.../AbstractSimplePerColumnSecondaryIndex.java | 166 ++++++++
.../apache/cassandra/db/index/SecondaryIndex.java | 120 +++++--
.../cassandra/db/index/SecondaryIndexBuilder.java | 10 +-
.../cassandra/db/index/SecondaryIndexManager.java | 109 +++--
.../db/index/composites/CompositesIndex.java | 111 +++++
.../db/index/composites/CompositesSearcher.java | 318 +++++++++++++++
.../apache/cassandra/db/index/keys/KeysIndex.java | 133 +------
.../cassandra/db/index/keys/KeysSearcher.java | 2 +-
.../apache/cassandra/db/marshal/CompositeType.java | 41 ++-
.../apache/cassandra/io/sstable/SSTableReader.java | 3 +-
.../cassandra/streaming/StreamInSession.java | 2 +-
.../apache/cassandra/thrift/ThriftValidation.java | 5 +-
.../unit/org/apache/cassandra/config/DefsTest.java | 4 +-
test/unit/org/apache/cassandra/db/CleanupTest.java | 1 -
31 files changed, 1119 insertions(+), 294 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/80ea03f5/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 3de82e2..f823937 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -34,6 +34,7 @@
* (cql3) Allow definitions with only a PK (CASSANDRA-4361)
* (cql3) Add support for row key composites (CASSANDRA-4179)
* improve DynamicEndpointSnitch by using reservoir sampling (CASSANDRA-4038)
+ * (cql3) Add support for 2ndary indexes (CASSANDRA-3680)
1.1.4
http://git-wip-us.apache.org/repos/asf/cassandra/blob/80ea03f5/interface/cassandra.thrift
----------------------------------------------------------------------
diff --git a/interface/cassandra.thrift b/interface/cassandra.thrift
index 1041661..5e933d7 100644
--- a/interface/cassandra.thrift
+++ b/interface/cassandra.thrift
@@ -393,7 +393,8 @@ struct AuthenticationRequest {
enum IndexType {
KEYS,
- CUSTOM
+ CUSTOM,
+ COMPOSITES
}
/* describes a column in a column family. */
http://git-wip-us.apache.org/repos/asf/cassandra/blob/80ea03f5/interface/thrift/gen-java/org/apache/cassandra/thrift/IndexType.java
----------------------------------------------------------------------
diff --git a/interface/thrift/gen-java/org/apache/cassandra/thrift/IndexType.java b/interface/thrift/gen-java/org/apache/cassandra/thrift/IndexType.java
index a351bd7..283fa71 100644
--- a/interface/thrift/gen-java/org/apache/cassandra/thrift/IndexType.java
+++ b/interface/thrift/gen-java/org/apache/cassandra/thrift/IndexType.java
@@ -33,7 +33,8 @@ import org.apache.thrift.TEnum;
public enum IndexType implements org.apache.thrift.TEnum {
KEYS(0),
- CUSTOM(1);
+ CUSTOM(1),
+ COMPOSITES(2);
private final int value;
@@ -58,6 +59,8 @@ public enum IndexType implements org.apache.thrift.TEnum {
return KEYS;
case 1:
return CUSTOM;
+ case 2:
+ return COMPOSITES;
default:
return null;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/80ea03f5/src/java/org/apache/cassandra/config/CFMetaData.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/CFMetaData.java b/src/java/org/apache/cassandra/config/CFMetaData.java
index 203b7ec..b124463 100644
--- a/src/java/org/apache/cassandra/config/CFMetaData.java
+++ b/src/java/org/apache/cassandra/config/CFMetaData.java
@@ -918,7 +918,7 @@ public final class CFMetaData
{
if (column.getIndexType() != null && column.getIndexName() == null)
{
- String baseName = getDefaultIndexName(cfName, comparator, column.name);
+ String baseName = getDefaultIndexName(cfName, getColumnDefinitionComparator(column), column.name);
String indexName = baseName;
int i = 0;
while (existingNames.contains(indexName))
http://git-wip-us.apache.org/repos/asf/cassandra/blob/80ea03f5/src/java/org/apache/cassandra/cql/QueryProcessor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql/QueryProcessor.java b/src/java/org/apache/cassandra/cql/QueryProcessor.java
index 502979e..b23cf44 100644
--- a/src/java/org/apache/cassandra/cql/QueryProcessor.java
+++ b/src/java/org/apache/cassandra/cql/QueryProcessor.java
@@ -30,6 +30,7 @@ import org.apache.cassandra.db.CounterColumn;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.context.CounterContext;
import org.apache.cassandra.db.filter.*;
+import org.apache.cassandra.db.index.SecondaryIndexManager;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.db.marshal.AsciiType;
import org.apache.cassandra.db.marshal.MarshalException;
@@ -301,10 +302,11 @@ public class QueryProcessor
if (select.getColumnRelations().size() > 0)
{
AbstractType<?> comparator = select.getComparator(keyspace);
- Set<ByteBuffer> indexed = Table.open(keyspace).getColumnFamilyStore(select.getColumnFamily()).indexManager.getIndexedColumns();
+ SecondaryIndexManager idxManager = Table.open(keyspace).getColumnFamilyStore(select.getColumnFamily()).indexManager;
for (Relation relation : select.getColumnRelations())
{
- if ((relation.operator() == RelationType.EQ) && indexed.contains(relation.getEntity().getByteBuffer(comparator, variables)))
+ ByteBuffer name = relation.getEntity().getByteBuffer(comparator, variables);
+ if ((relation.operator() == RelationType.EQ) && idxManager.indexes(name))
return;
}
throw new InvalidRequestException("No indexed columns present in by-columns clause with \"equals\" operator");
http://git-wip-us.apache.org/repos/asf/cassandra/blob/80ea03f5/src/java/org/apache/cassandra/cql3/CFDefinition.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/CFDefinition.java b/src/java/org/apache/cassandra/cql3/CFDefinition.java
index 56b68eb..51f7ceb7 100644
--- a/src/java/org/apache/cassandra/cql3/CFDefinition.java
+++ b/src/java/org/apache/cassandra/cql3/CFDefinition.java
@@ -345,7 +345,7 @@ public class CFDefinition implements Iterable<CFDefinition.Name>
public ByteBuffer buildAsEndOfRange()
{
- throw new IllegalStateException();
+ return build();
}
public NonCompositeBuilder copy()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/80ea03f5/src/java/org/apache/cassandra/cql3/statements/CreateIndexStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/CreateIndexStatement.java b/src/java/org/apache/cassandra/cql3/statements/CreateIndexStatement.java
index 50eb8f9..6dc3a9f 100644
--- a/src/java/org/apache/cassandra/cql3/statements/CreateIndexStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/CreateIndexStatement.java
@@ -18,6 +18,8 @@
package org.apache.cassandra.cql3.statements;
import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -26,6 +28,8 @@ import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.ColumnDefinition;
import org.apache.cassandra.config.ConfigurationException;
import org.apache.cassandra.cql3.*;
+import org.apache.cassandra.db.index.composites.CompositesIndex;
+import org.apache.cassandra.db.marshal.CompositeType;
import org.apache.cassandra.service.MigrationManager;
import org.apache.cassandra.thrift.IndexType;
import org.apache.cassandra.thrift.InvalidRequestException;
@@ -54,9 +58,6 @@ public class CreateIndexStatement extends SchemaAlteringStatement
CFMetaData cfm = oldCfm.clone();
CFDefinition cfDef = oldCfm.getCfDef();
- if (cfDef.isComposite)
- throw new InvalidRequestException("Secondary indexes are not (yet) supported on tables with composite PRIMARY KEY");
-
for (ColumnDefinition cd : cfm.getColumn_metadata().values())
{
if (cd.name.equals(columnName.key))
@@ -65,7 +66,18 @@ public class CreateIndexStatement extends SchemaAlteringStatement
throw new InvalidRequestException("Index already exists");
if (logger.isDebugEnabled())
logger.debug("Updating column {} definition for index {}", columnName, indexName);
- cd.setIndexType(IndexType.KEYS, Collections.<String, String>emptyMap());
+
+ if (cfDef.isComposite)
+ {
+ CompositeType composite = (CompositeType)cfm.comparator;
+ Map<String, String> opts = new HashMap<String, String>();
+ opts.put(CompositesIndex.PREFIX_SIZE_OPTION, String.valueOf(composite.types.size() - 1));
+ cd.setIndexType(IndexType.COMPOSITES, opts);
+ }
+ else
+ {
+ cd.setIndexType(IndexType.KEYS, Collections.<String, String>emptyMap());
+ }
cd.setIndexName(indexName);
columnExists = true;
break;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/80ea03f5/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
index 1649a8c..95a1fee 100644
--- a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
@@ -30,10 +30,13 @@ import org.apache.cassandra.auth.Permission;
import org.apache.cassandra.cql3.*;
import org.apache.cassandra.transport.messages.ResultMessage;
import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.ColumnDefinition;
import org.apache.cassandra.config.ConfigurationException;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.context.CounterContext;
import org.apache.cassandra.db.filter.*;
+import org.apache.cassandra.db.index.SecondaryIndex;
+import org.apache.cassandra.db.index.SecondaryIndexManager;
import org.apache.cassandra.db.marshal.*;
import org.apache.cassandra.dht.*;
import org.apache.cassandra.service.ClientState;
@@ -1060,11 +1063,19 @@ public class SelectStatement implements CQLStatement
{
stmt.isKeyRange = true;
boolean hasEq = false;
- Set<ByteBuffer> indexed = Table.open(keyspace()).getColumnFamilyStore(columnFamily()).indexManager.getIndexedColumns();
+ SecondaryIndexManager idxManager = Table.open(keyspace()).getColumnFamilyStore(columnFamily()).indexManager;
+ Set<ByteBuffer> indexedNames = new HashSet<ByteBuffer>();
+ for (SecondaryIndex index : idxManager.getIndexes())
+ {
+ for (ColumnDefinition cdef : index.getColumnDefs())
+ indexedNames.add(cdef.name);
+ }
+ // Note: we cannot use idxManager.indexes() methods because we don't have a complete column name at this point, we only
+ // have the indexed component.
for (Map.Entry<CFDefinition.Name, Restriction> entry : stmt.metadataRestrictions.entrySet())
{
- if (entry.getValue().isEquality() && indexed.contains(entry.getKey().name.key))
+ if (entry.getValue().isEquality() && indexedNames.contains(entry.getKey().name.key))
{
hasEq = true;
break;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/80ea03f5/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 03497ef..20b037e 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -499,7 +499,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
data.addSSTables(newSSTables);
try
{
- indexManager.maybeBuildSecondaryIndexes(newSSTables, indexManager.getIndexedColumns());
+ indexManager.maybeBuildSecondaryIndexes(newSSTables, indexManager.allIndexesNames());
}
finally
{
@@ -513,11 +513,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
{
ColumnFamilyStore cfs = Table.open(ksName).getColumnFamilyStore(cfName);
- SortedSet<ByteBuffer> indexes = new TreeSet<ByteBuffer>(cfs.metadata.comparator);
- if (idxNames.length == 0)
- indexes.addAll(cfs.indexManager.getIndexedColumns());
- for (String idxName : idxNames)
- indexes.add(cfs.indexManager.getColumnByIdxName(idxName));
+ Set<String> indexes = new HashSet<String>(Arrays.asList(idxNames));
Collection<SSTableReader> sstables = cfs.getSSTables();
try
@@ -1313,7 +1309,13 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
return columns;
}
- public static abstract class AbstractScanIterator extends AbstractIterator<Row> implements CloseableIterator<Row> {}
+ public static abstract class AbstractScanIterator extends AbstractIterator<Row> implements CloseableIterator<Row>
+ {
+ public boolean needsFiltering()
+ {
+ return true;
+ }
+ }
/**
* Iterate over a range of rows and columns from memtables/sstables.
@@ -1415,23 +1417,28 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
Row rawRow = rowIterator.next();
ColumnFamily data = rawRow.cf;
- // roughtly
- IFilter extraFilter = filter.getExtraFilter(data);
- if (extraFilter != null)
+ if (rowIterator.needsFiltering())
{
- QueryPath path = new QueryPath(columnFamily);
- ColumnFamily cf = filter.cfs.getColumnFamily(new QueryFilter(rawRow.key, path, extraFilter));
- if (cf != null)
- data.addAll(cf, HeapAllocator.instance);
- }
+ // roughtly
+ IFilter extraFilter = filter.getExtraFilter(data);
+ if (extraFilter != null)
+ {
+ QueryPath path = new QueryPath(columnFamily);
+ ColumnFamily cf = filter.cfs.getColumnFamily(new QueryFilter(rawRow.key, path, extraFilter));
+ if (cf != null)
+ data.addAll(cf, HeapAllocator.instance);
+ }
- if (!filter.isSatisfiedBy(data))
- continue;
+ if (!filter.isSatisfiedBy(data, null))
+ continue;
+
+ logger.debug("{} satisfies all filter expressions", data);
+ // cut the resultset back to what was requested, if necessary
+ data = filter.prune(data);
+ }
- logger.debug("{} satisfies all filter expressions", data);
- // cut the resultset back to what was requested, if necessary
- data = filter.prune(data);
rows.add(new Row(rawRow.key, data));
+
if (data != null)
columnsCount += filter.lastCounted(data);
// Update the underlying filter to avoid querying more columns per slice than necessary and to handle paging
http://git-wip-us.apache.org/repos/asf/cassandra/blob/80ea03f5/src/java/org/apache/cassandra/db/DeletionInfo.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DeletionInfo.java b/src/java/org/apache/cassandra/db/DeletionInfo.java
index b63686f..480d50a 100644
--- a/src/java/org/apache/cassandra/db/DeletionInfo.java
+++ b/src/java/org/apache/cassandra/db/DeletionInfo.java
@@ -27,6 +27,7 @@ import java.util.*;
import com.google.common.base.Objects;
import com.google.common.collect.Iterables;
+import org.apache.cassandra.db.filter.ColumnSlice;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.io.ISerializer;
import org.apache.cassandra.io.ISSTableSerializer;
@@ -115,6 +116,43 @@ public class DeletionInfo
}
/**
+ * Return the slice range covered by this deletion info or null is nothing is deleted.
+ */
+ public ColumnSlice[] coveredSlices()
+ {
+ if (isLive())
+ return null;
+
+ if (!topLevel.equals(DeletionTime.LIVE))
+ return ColumnSlice.ALL_COLUMNS_ARRAY;
+
+ List<ColumnSlice> slices = new ArrayList<ColumnSlice>();
+ ColumnSlice current = null;
+ for (RangeTombstone tombstone : ranges)
+ {
+ if (current == null)
+ {
+ current = new ColumnSlice(tombstone.min, tombstone.max);
+ }
+ else if (ranges.comparator().compare(current.finish, tombstone.min) < 0)
+ {
+ // If next if strictly after current, we've finish current slice
+ slices.add(current);
+ current = new ColumnSlice(tombstone.min, tombstone.max);
+ }
+ else if (ranges.comparator().compare(current.finish, tombstone.max) < 0)
+ {
+ // if tombstone end if after current end, extend current
+ current = new ColumnSlice(current.start, tombstone.max);
+ }
+ // otherwise, tombstone is fully included in current already, skip it
+ }
+ if (current != null)
+ slices.add(current);
+ return slices.isEmpty() ? null : slices.toArray(new ColumnSlice[slices.size()]);
+ }
+
+ /**
* Return a new DeletionInfo correspond to purging every tombstones that
* are older than {@code gcbefore}.
*
http://git-wip-us.apache.org/repos/asf/cassandra/blob/80ea03f5/src/java/org/apache/cassandra/db/MeteredFlusher.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/MeteredFlusher.java b/src/java/org/apache/cassandra/db/MeteredFlusher.java
index f8d249d..7984944 100644
--- a/src/java/org/apache/cassandra/db/MeteredFlusher.java
+++ b/src/java/org/apache/cassandra/db/MeteredFlusher.java
@@ -52,7 +52,7 @@ class MeteredFlusher implements Runnable
+ 1 // potentially a flushed memtable being counted by jamm
+ DatabaseDescriptor.getFlushWriters()
+ DatabaseDescriptor.getFlushQueueSize())
- / (1 + cfs.indexManager.getIndexedColumns().size()));
+ / (1 + cfs.indexManager.getIndexesBackedByCfs().size()));
if (size > (DatabaseDescriptor.getTotalMemtableSpaceInMB() * 1048576L - flushingBytes) / maxInFlight)
{
logger.info("flushing high-traffic column family {} (estimated {} bytes)", cfs, size);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/80ea03f5/src/java/org/apache/cassandra/db/SliceQueryPager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SliceQueryPager.java b/src/java/org/apache/cassandra/db/SliceQueryPager.java
new file mode 100644
index 0000000..405c7c4
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/SliceQueryPager.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db;
+
+import java.util.Arrays;
+import java.util.Iterator;
+
+import org.apache.cassandra.db.filter.*;
+
+public class SliceQueryPager implements Iterator<ColumnFamily>
+{
+ public static final int DEFAULT_PAGE_SIZE = 10000;
+
+ public final ColumnFamilyStore cfs;
+ public final DecoratedKey key;
+
+ private ColumnSlice[] slices;
+ private ColumnFamily current;
+ private boolean exhausted;
+
+ public SliceQueryPager(ColumnFamilyStore cfs, DecoratedKey key, ColumnSlice[] slices)
+ {
+ this.cfs = cfs;
+ this.key = key;
+ this.slices = slices;
+ }
+
+ // This will *not* do a query
+ public boolean hasNext()
+ {
+ return !exhausted;
+ }
+
+ // This might return an empty column family (but never a null one)
+ public ColumnFamily next()
+ {
+ if (exhausted)
+ return null;
+
+ QueryPath path = new QueryPath(cfs.getColumnFamilyName());
+ QueryFilter filter = new QueryFilter(key, path, new SliceQueryFilter(slices, false, DEFAULT_PAGE_SIZE));
+ ColumnFamily cf = cfs.getColumnFamily(filter);
+ if (cf == null || cf.getLiveColumnCount() < DEFAULT_PAGE_SIZE)
+ {
+ exhausted = true;
+ }
+ else
+ {
+ Iterator<IColumn> iter = cf.getReverseSortedColumns().iterator();
+ IColumn lastColumn = iter.next();
+ while (lastColumn.isMarkedForDelete())
+ lastColumn = iter.next();
+
+ int i = 0;
+ for (; i < slices.length; ++i)
+ {
+ ColumnSlice current = slices[i];
+ if (cfs.getComparator().compare(lastColumn.name(), current.finish) <= 0)
+ break;
+ }
+ if (i >= slices.length)
+ exhausted = true;
+ else
+ slices = Arrays.copyOfRange(slices, i, slices.length);
+ }
+ return cf == null ? ColumnFamily.create(cfs.metadata) : cf;
+ }
+
+ public void remove()
+ {
+ throw new UnsupportedOperationException();
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/80ea03f5/src/java/org/apache/cassandra/db/Table.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Table.java b/src/java/org/apache/cassandra/db/Table.java
index b6ddcad..0732534 100644
--- a/src/java/org/apache/cassandra/db/Table.java
+++ b/src/java/org/apache/cassandra/db/Table.java
@@ -32,8 +32,7 @@ import org.slf4j.LoggerFactory;
import org.apache.cassandra.config.*;
import org.apache.cassandra.db.commitlog.CommitLog;
-import org.apache.cassandra.db.filter.QueryFilter;
-import org.apache.cassandra.db.filter.QueryPath;
+import org.apache.cassandra.db.filter.*;
import org.apache.cassandra.io.sstable.SSTableReader;
import org.apache.cassandra.locator.AbstractReplicationStrategy;
import org.apache.cassandra.service.StorageService;
@@ -356,24 +355,27 @@ public class Table
continue;
}
+ ColumnSlice[] deletionSlices = null;
SortedSet<ByteBuffer> mutatedIndexedColumns = null;
if (updateIndexes)
{
- for (ByteBuffer column : cfs.indexManager.getIndexedColumns())
+ // If cf has some range deletion, we need to fetch those ranges to know if something indexed was updated
+ // Note: we could "optimize" that for Keys index, because we know that the columnDef name is directly
+ // the indexed column name.
+ deletionSlices = cf.deletionInfo().coveredSlices();
+
+ for (IColumn updated : cf)
{
- if (cf.getColumnNames().contains(column) || cf.isMarkedForDelete())
+ if (cfs.indexManager.indexes(updated))
{
if (mutatedIndexedColumns == null)
mutatedIndexedColumns = new TreeSet<ByteBuffer>(cf.getComparator());
- mutatedIndexedColumns.add(column);
+ mutatedIndexedColumns.add(updated.name());
if (logger.isDebugEnabled())
{
- // can't actually use validator to print value here, because we overload value
- // for deletion timestamp as well (which may not be a well-formed value for the column type)
- ByteBuffer value = cf.getColumn(column) == null ? null : cf.getColumn(column).value(); // may be null on row-level deletion
- logger.debug(String.format("mutating indexed column %s value %s",
- cf.getComparator().getString(column),
- value == null ? "null" : ByteBufferUtil.bytesToHex(value)));
+ logger.debug(String.format("Mutated indexed column %s value %s",
+ cf.getComparator().getString(updated.name()),
+ ByteBufferUtil.bytesToHex(updated.value())));
}
}
}
@@ -382,7 +384,7 @@ public class Table
// Sharding the lock is insufficient to avoid contention when there is a "hot" row, e.g., for
// hint writes when a node is down (keyed by target IP). So it is worth special-casing the
// no-index case to avoid the synchronization.
- if (mutatedIndexedColumns == null)
+ if (mutatedIndexedColumns == null && deletionSlices == null)
{
cfs.apply(key, cf);
continue;
@@ -390,11 +392,23 @@ public class Table
// else mutatedIndexedColumns != null
synchronized (indexLockFor(mutation.key()))
{
+ if (mutatedIndexedColumns == null)
+ mutatedIndexedColumns = new TreeSet<ByteBuffer>(cf.getComparator());
+
// with the raw data CF, we can just apply every update in any order and let
// read-time resolution throw out obsolete versions, thus avoiding read-before-write.
// but for indexed data we need to make sure that we're not creating index entries
// for obsolete writes.
- ColumnFamily oldIndexedColumns = readCurrentIndexedColumns(key, cfs, mutatedIndexedColumns);
+ ColumnFamily oldIndexedColumns = readCurrentIndexedColumns(key, cfs, mutatedIndexedColumns, deletionSlices);
+
+ // We might still have no mutated columns in case it is a deletion but the row had
+ // no indexed columns
+ if (mutatedIndexedColumns.isEmpty())
+ {
+ cfs.apply(key, cf);
+ continue;
+ }
+
logger.debug("Pre-mutation index row is {}", oldIndexedColumns);
ignoreObsoleteMutations(cf, mutatedIndexedColumns, oldIndexedColumns);
@@ -454,10 +468,33 @@ public class Table
}
}
- private static ColumnFamily readCurrentIndexedColumns(DecoratedKey key, ColumnFamilyStore cfs, SortedSet<ByteBuffer> mutatedIndexedColumns)
+ private static ColumnFamily readCurrentIndexedColumns(DecoratedKey key, ColumnFamilyStore cfs, SortedSet<ByteBuffer> mutatedIndexedColumns, ColumnSlice[] deletionSlices)
{
- QueryFilter filter = QueryFilter.getNamesFilter(key, new QueryPath(cfs.getColumnFamilyName()), mutatedIndexedColumns);
- return cfs.getColumnFamily(filter);
+ // Note: we could only query names not covered by the slices
+ QueryPath path = new QueryPath(cfs.getColumnFamilyName());
+ ColumnFamily cf = ColumnFamily.create(cfs.metadata);
+
+ if (mutatedIndexedColumns != null)
+ cf.resolve(cfs.getColumnFamily(QueryFilter.getNamesFilter(key, path, mutatedIndexedColumns)));
+
+ if (deletionSlices != null)
+ {
+ SliceQueryPager pager = new SliceQueryPager(cfs, key, deletionSlices);
+ while (pager.hasNext())
+ {
+ ColumnFamily cf2 = pager.next();
+ cf.delete(cf2);
+ for (IColumn column : cf2)
+ {
+ if (cfs.indexManager.indexes(column))
+ {
+ cf.addColumn(column);
+ mutatedIndexedColumns.add(column.name());
+ }
+ }
+ }
+ }
+ return cf;
}
public AbstractReplicationStrategy getReplicationStrategy()
@@ -470,7 +507,7 @@ public class Table
* @param cfs ColumnFamily to index row in
* @param indexedColumns columns to index, in comparator order
*/
- public static void indexRow(DecoratedKey key, ColumnFamilyStore cfs, SortedSet<ByteBuffer> indexedColumns)
+ public static void indexRow(DecoratedKey key, ColumnFamilyStore cfs, Set<String> idxNames)
{
if (logger.isDebugEnabled())
logger.debug("Indexing row {} ", cfs.metadata.getKeyValidator().getString(key.key));
@@ -478,11 +515,23 @@ public class Table
switchLock.readLock().lock();
try
{
- synchronized (cfs.table.indexLockFor(key.key))
+ // Our index lock is per-row, but we don't want to hold writes for too long, so for large rows
+ // we release the lock between pages
+ SliceQueryPager pager = new SliceQueryPager(cfs, key, ColumnSlice.ALL_COLUMNS_ARRAY);
+
+ while (pager.hasNext())
{
- ColumnFamily cf = readCurrentIndexedColumns(key, cfs, indexedColumns);
- if (cf != null)
- cfs.indexManager.applyIndexUpdates(key.key, cf, cf.getColumnNames(), null);
+ synchronized (cfs.table.indexLockFor(key.key))
+ {
+ ColumnFamily cf = pager.next();
+ ColumnFamily cf2 = cf.cloneMeShallow();
+ for (IColumn column : cf)
+ {
+ if (cfs.indexManager.indexes(column.name(), idxNames))
+ cf2.addColumn(column);
+ }
+ cfs.indexManager.applyIndexUpdates(key.key, cf2, cf2.getColumnNames(), null);
+ }
}
}
finally
http://git-wip-us.apache.org/repos/asf/cassandra/blob/80ea03f5/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index b9bdfa7..b888ffd 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -526,12 +526,11 @@ public class CompactionManager implements CompactionManagerMBean
}
boolean isCommutative = cfs.metadata.getDefaultValidator().isCommutative();
- Collection<ByteBuffer> indexedColumns = cfs.indexManager.getIndexedColumns();
+ boolean hasIndexes = !cfs.indexManager.getIndexes().isEmpty();
for (SSTableReader sstable : sstables)
{
- if (indexedColumns.isEmpty()
- && !new Bounds<Token>(sstable.first.token, sstable.last.token).intersects(ranges))
+ if (!hasIndexes && !new Bounds<Token>(sstable.first.token, sstable.last.token).intersects(ranges))
{
cfs.replaceCompactedSSTables(Arrays.asList(sstable), Collections.<SSTableReader>emptyList(), OperationType.CLEANUP);
continue;
@@ -583,7 +582,7 @@ public class CompactionManager implements CompactionManagerMBean
{
cfs.invalidateCachedRow(row.getKey());
- if (!indexedColumns.isEmpty() || isCommutative)
+ if (hasIndexes || isCommutative)
{
if (indexedColumnsInRow != null)
indexedColumnsInRow.clear();
@@ -593,7 +592,7 @@ public class CompactionManager implements CompactionManagerMBean
OnDiskAtom column = row.next();
if (column instanceof CounterColumn)
renewer.maybeRenew((CounterColumn) column);
- if (column instanceof IColumn && indexedColumns.contains(column.name()))
+ if (column instanceof IColumn && cfs.indexManager.indexes((IColumn)column))
{
if (indexedColumnsInRow == null)
indexedColumnsInRow = new ArrayList<IColumn>();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/80ea03f5/src/java/org/apache/cassandra/db/filter/ColumnSlice.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/ColumnSlice.java b/src/java/org/apache/cassandra/db/filter/ColumnSlice.java
index 2a295eb..cdcdc17 100644
--- a/src/java/org/apache/cassandra/db/filter/ColumnSlice.java
+++ b/src/java/org/apache/cassandra/db/filter/ColumnSlice.java
@@ -84,6 +84,11 @@ public class ColumnSlice
throw new IllegalArgumentException("Slice finish must come after start in traversal order");
}
+ public boolean includes(Comparator<ByteBuffer> cmp, ByteBuffer name)
+ {
+ return cmp.compare(start, name) <= 0 && (finish.equals(ByteBufferUtil.EMPTY_BYTE_BUFFER) || cmp.compare(finish, name) >= 0);
+ }
+
@Override
public final int hashCode()
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/80ea03f5/src/java/org/apache/cassandra/db/filter/ExtendedFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/ExtendedFilter.java b/src/java/org/apache/cassandra/db/filter/ExtendedFilter.java
index ec6f6ad..9e49237 100644
--- a/src/java/org/apache/cassandra/db/filter/ExtendedFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/ExtendedFilter.java
@@ -26,7 +26,9 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.cql3.ColumnNameBuilder;
import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.marshal.CompositeType;
import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
import org.apache.cassandra.thrift.IndexExpression;
import org.apache.cassandra.thrift.IndexOperator;
@@ -55,7 +57,9 @@ public abstract class ExtendedFilter
{
if (isPaging)
throw new IllegalArgumentException("Cross-row paging is not supported along with index clauses");
- return new FilterWithClauses(cfs, filter, clause, maxResults, maxIsColumns);
+ return cfs.getComparator() instanceof CompositeType
+ ? new FilterWithCompositeClauses(cfs, filter, clause, maxResults, maxIsColumns)
+ : new FilterWithClauses(cfs, filter, clause, maxResults, maxIsColumns);
}
}
@@ -112,6 +116,11 @@ public abstract class ExtendedFilter
/** The initial filter we'll do our first slice with (either the original or a superset of it) */
public abstract IFilter initialFilter();
+ public IFilter originalFilter()
+ {
+ return originalFilter;
+ }
+
public abstract List<IndexExpression> getClause();
/**
@@ -130,7 +139,7 @@ public abstract class ExtendedFilter
* @return true if the provided data satisfies all the expressions from
* the clause of this filter.
*/
- public abstract boolean isSatisfiedBy(ColumnFamily data);
+ public abstract boolean isSatisfiedBy(ColumnFamily data, ColumnNameBuilder builder);
public static boolean satisfies(int comparison, IndexOperator op)
{
@@ -165,7 +174,7 @@ public abstract class ExtendedFilter
}
/** Sets up the initial filter. */
- private IFilter computeInitialFilter()
+ protected IFilter computeInitialFilter()
{
if (originalFilter instanceof SliceQueryFilter)
{
@@ -264,14 +273,15 @@ public abstract class ExtendedFilter
return pruned;
}
- public boolean isSatisfiedBy(ColumnFamily data)
+ public boolean isSatisfiedBy(ColumnFamily data, ColumnNameBuilder builder)
{
// We enforces even the primary clause because reads are not synchronized with writes and it is thus possible to have a race
// where the index returned a row which doesn't have the primary column when we actually read it
for (IndexExpression expression : clause)
{
// check column data vs expression
- IColumn column = data.getColumn(expression.column_name);
+ ByteBuffer colName = builder == null ? expression.column_name : builder.copy().add(expression.column_name).build();
+ IColumn column = data.getColumn(colName);
if (column == null)
return false;
int v = data.metadata().getValueValidator(expression.column_name).compare(column.value(), expression.value);
@@ -282,6 +292,30 @@ public abstract class ExtendedFilter
}
}
+ private static class FilterWithCompositeClauses extends FilterWithClauses
+ {
+ public FilterWithCompositeClauses(ColumnFamilyStore cfs, IFilter filter, List<IndexExpression> clause, int maxResults, boolean maxIsColumns)
+ {
+ super(cfs, filter, clause, maxResults, maxIsColumns);
+ }
+
+ /*
+ * For composites, the index name is not a valid column name (it's only
+ * one of the component), which means we should not do the
+ * NamesQueryFilter part of FilterWithClauses in particular.
+ * Besides, CompositesSearcher doesn't really use the initial filter
+ * expect to know the limit set by the user, so create a fake filter
+ * with only the count information.
+ */
+ protected IFilter computeInitialFilter()
+ {
+ int limit = originalFilter instanceof SliceQueryFilter
+ ? ((SliceQueryFilter)originalFilter).count
+ : Integer.MAX_VALUE;
+ return new SliceQueryFilter(ColumnSlice.ALL_COLUMNS_ARRAY, false, limit);
+ }
+ }
+
private static class EmptyClauseFilter extends ExtendedFilter
{
public EmptyClauseFilter(ColumnFamilyStore cfs, IFilter filter, int maxResults, boolean maxIsColumns, boolean isPaging)
@@ -309,7 +343,7 @@ public abstract class ExtendedFilter
return data;
}
- public boolean isSatisfiedBy(ColumnFamily data)
+ public boolean isSatisfiedBy(ColumnFamily data, ColumnNameBuilder builder)
{
return true;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/80ea03f5/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java b/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java
index 0b78581..f868539 100644
--- a/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java
@@ -201,6 +201,14 @@ public class SliceQueryFilter implements IFilter
count = newLimit;
}
+ public boolean includes(Comparator<ByteBuffer> cmp, ByteBuffer name)
+ {
+ for (ColumnSlice slice : slices)
+ if (slice.includes(cmp, name))
+ return true;
+ return false;
+ }
+
public static class Serializer implements IVersionedSerializer<SliceQueryFilter>
{
public void serialize(SliceQueryFilter f, DataOutput dos, int version) throws IOException
http://git-wip-us.apache.org/repos/asf/cassandra/blob/80ea03f5/src/java/org/apache/cassandra/db/index/AbstractSimplePerColumnSecondaryIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/AbstractSimplePerColumnSecondaryIndex.java b/src/java/org/apache/cassandra/db/index/AbstractSimplePerColumnSecondaryIndex.java
new file mode 100644
index 0000000..24f09ab
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/index/AbstractSimplePerColumnSecondaryIndex.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.index;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.config.ConfigurationException;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.index.PerColumnSecondaryIndex;
+import org.apache.cassandra.db.index.SecondaryIndexSearcher;
+import org.apache.cassandra.db.marshal.*;
+import org.apache.cassandra.dht.*;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+/**
+ * Implements a secondary index for a column family using a second column family
+ * in which the row keys are indexed values, and column names are base row keys.
+ */
+public abstract class AbstractSimplePerColumnSecondaryIndex extends PerColumnSecondaryIndex
+{
+ private ColumnFamilyStore indexCfs;
+
+ public void init()
+ {
+ assert baseCfs != null && columnDefs != null && columnDefs.size() == 1;
+
+ ColumnDefinition columnDef = columnDefs.iterator().next();
+ init(columnDef);
+
+ AbstractType indexComparator = SecondaryIndex.getIndexComparator(baseCfs.metadata, columnDef);
+ CFMetaData indexedCfMetadata = CFMetaData.newIndexMetadata(baseCfs.metadata, columnDef, indexComparator);
+ indexCfs = ColumnFamilyStore.createColumnFamilyStore(baseCfs.table,
+ indexedCfMetadata.cfName,
+ new LocalPartitioner(columnDef.getValidator()),
+ indexedCfMetadata);
+
+ // enable and initialize row cache based on parent's setting and indexed column's cardinality
+ CFMetaData.Caching baseCaching = baseCfs.metadata.getCaching();
+ if (baseCaching == CFMetaData.Caching.ALL || baseCaching == CFMetaData.Caching.ROWS_ONLY)
+ {
+ /*
+ * # of index CF's key = cardinality of indexed column.
+ * if # of keys stored in index CF is more than average column counts (means tall table),
+ * then consider it as high cardinality.
+ */
+ double estimatedKeys = indexCfs.estimateKeys();
+ double averageColumnCount = indexCfs.getMeanColumns();
+ if (averageColumnCount > 0 && estimatedKeys / averageColumnCount > 1)
+ {
+ logger.debug("turning row cache on for " + indexCfs.getColumnFamilyName());
+ indexCfs.metadata.caching(baseCaching);
+ indexCfs.initRowCache();
+ }
+ }
+ }
+
+ protected abstract void init(ColumnDefinition columnDef);
+
+ protected abstract ByteBuffer makeIndexColumnName(ByteBuffer rowKey, IColumn column);
+
+ public void deleteColumn(DecoratedKey valueKey, ByteBuffer rowKey, IColumn column)
+ {
+ if (column.isMarkedForDelete())
+ return;
+
+ int localDeletionTime = (int) (System.currentTimeMillis() / 1000);
+ ColumnFamily cfi = ColumnFamily.create(indexCfs.metadata);
+ cfi.addTombstone(makeIndexColumnName(rowKey, column), localDeletionTime, column.timestamp());
+ indexCfs.apply(valueKey, cfi);
+ if (logger.isDebugEnabled())
+ logger.debug("removed index entry for cleaned-up value {}:{}", valueKey, cfi);
+ }
+
+ public void insertColumn(DecoratedKey valueKey, ByteBuffer rowKey, IColumn column)
+ {
+ ColumnFamily cfi = ColumnFamily.create(indexCfs.metadata);
+ ByteBuffer name = makeIndexColumnName(rowKey, column);
+ if (column instanceof ExpiringColumn)
+ {
+ ExpiringColumn ec = (ExpiringColumn)column;
+ cfi.addColumn(new ExpiringColumn(name, ByteBufferUtil.EMPTY_BYTE_BUFFER, ec.timestamp(), ec.getTimeToLive(), ec.getLocalDeletionTime()));
+ }
+ else
+ {
+ cfi.addColumn(new Column(name, ByteBufferUtil.EMPTY_BYTE_BUFFER, column.timestamp()));
+ }
+ if (logger.isDebugEnabled())
+ logger.debug("applying index row {} in {}", indexCfs.metadata.getKeyValidator().getString(valueKey.key), cfi);
+
+ indexCfs.apply(valueKey, cfi);
+ }
+
+ public void updateColumn(DecoratedKey valueKey, ByteBuffer rowKey, IColumn col)
+ {
+ insertColumn(valueKey, rowKey, col);
+ }
+
+ public void removeIndex(ByteBuffer columnName)
+ {
+ indexCfs.invalidate();
+ }
+
+ public void forceBlockingFlush()
+ {
+ try
+ {
+ indexCfs.forceBlockingFlush();
+ }
+ catch (ExecutionException e)
+ {
+ throw new RuntimeException(e);
+ }
+ catch (InterruptedException e)
+ {
+ throw new AssertionError(e);
+ }
+ }
+
+ public void invalidate()
+ {
+ indexCfs.invalidate();
+ }
+
+ public void truncate(long truncatedAt)
+ {
+ indexCfs.discardSSTables(truncatedAt);
+ }
+
+ public ColumnFamilyStore getIndexCfs()
+ {
+ return indexCfs;
+ }
+
+ public String getIndexName()
+ {
+ return indexCfs.columnFamily;
+ }
+
+ public long getLiveSize()
+ {
+ return indexCfs.getMemtableDataSize();
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/80ea03f5/src/java/org/apache/cassandra/db/index/SecondaryIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/SecondaryIndex.java b/src/java/org/apache/cassandra/db/index/SecondaryIndex.java
index 9141cf8..82b6697 100644
--- a/src/java/org/apache/cassandra/db/index/SecondaryIndex.java
+++ b/src/java/org/apache/cassandra/db/index/SecondaryIndex.java
@@ -26,15 +26,25 @@ import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.ColumnDefinition;
import org.apache.cassandra.config.ConfigurationException;
import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.IColumn;
import org.apache.cassandra.db.SystemTable;
import org.apache.cassandra.db.compaction.CompactionManager;
import org.apache.cassandra.db.index.keys.KeysIndex;
+import org.apache.cassandra.db.index.composites.CompositesIndex;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.marshal.BytesType;
+import org.apache.cassandra.db.marshal.CompositeType;
+import org.apache.cassandra.db.marshal.LocalByPartionerType;
+import org.apache.cassandra.dht.*;
import org.apache.cassandra.io.sstable.ReducingKeyIterator;
import org.apache.cassandra.io.sstable.SSTableReader;
import org.apache.cassandra.thrift.Column;
+import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.ByteBufferUtil;
/**
@@ -44,8 +54,7 @@ import org.apache.cassandra.utils.ByteBufferUtil;
*/
public abstract class SecondaryIndex
{
-
- private static final Logger logger = LoggerFactory.getLogger(SecondaryIndex.class);
+ protected static final Logger logger = LoggerFactory.getLogger(SecondaryIndex.class);
public static final String CUSTOM_INDEX_OPTION_NAME = "class_name";
@@ -97,14 +106,16 @@ public abstract class SecondaryIndex
return SystemTable.isIndexBuilt(baseCfs.table.name, getNameForSystemTable(columnName));
}
- public void setIndexBuilt(ByteBuffer columnName)
+ public void setIndexBuilt()
{
- SystemTable.setIndexBuilt(baseCfs.table.name, getNameForSystemTable(columnName));
+ for (ColumnDefinition columnDef : columnDefs)
+ SystemTable.setIndexBuilt(baseCfs.table.name, getNameForSystemTable(columnDef.name));
}
- public void setIndexRemoved(ByteBuffer columnName)
+ public void setIndexRemoved()
{
- SystemTable.setIndexRemoved(baseCfs.table.name, getNameForSystemTable(columnName));
+ for (ColumnDefinition columnDef : columnDefs)
+ SystemTable.setIndexRemoved(baseCfs.table.name, getNameForSystemTable(columnDef.name));
}
/**
@@ -159,14 +170,9 @@ public abstract class SecondaryIndex
logger.info(String.format("Submitting index build of %s for data in %s",
getIndexName(), StringUtils.join(baseCfs.getSSTables(), ", ")));
- SortedSet<ByteBuffer> columnNames = new TreeSet<ByteBuffer>();
-
- for (ColumnDefinition cdef : columnDefs)
- columnNames.add(cdef.name);
-
Collection<SSTableReader> sstables = baseCfs.markCurrentSSTablesReferenced();
SecondaryIndexBuilder builder = new SecondaryIndexBuilder(baseCfs,
- columnNames,
+ Collections.singleton(getIndexName()),
new ReducingKeyIterator(sstables));
Future<?> future = CompactionManager.instance.submitIndexBuild(builder);
try
@@ -174,16 +180,7 @@ public abstract class SecondaryIndex
future.get();
forceBlockingFlush();
- // Mark all indexed columns as built
- if (this instanceof PerRowSecondaryIndex)
- {
- for (ByteBuffer columnName : columnNames)
- SystemTable.setIndexBuilt(baseCfs.table.name, getIndexName()+ByteBufferUtil.string(columnName));
- }
- else
- {
- SystemTable.setIndexBuilt(baseCfs.table.name, getIndexName());
- }
+ setIndexBuilt();
}
catch (InterruptedException e)
{
@@ -193,10 +190,6 @@ public abstract class SecondaryIndex
{
throw new RuntimeException(e);
}
- catch (CharacterCodingException e)
- {
- throw new RuntimeException(e);
- }
finally
{
SSTableReader.releaseReferences(sstables);
@@ -264,7 +257,7 @@ public abstract class SecondaryIndex
this.baseCfs = baseCfs;
}
- Set<ColumnDefinition> getColumnDefs()
+ public Set<ColumnDefinition> getColumnDefs()
{
return columnDefs;
}
@@ -285,6 +278,34 @@ public abstract class SecondaryIndex
}
/**
+ * Returns the decoratedKey for a column value
+ * @param value column value
+ * @return decorated key
+ */
+ public DecoratedKey getIndexKeyFor(ByteBuffer value)
+ {
+ // FIXME: this imply one column definition per index
+ ByteBuffer name = columnDefs.iterator().next().name;
+ return new DecoratedKey(new LocalToken(baseCfs.metadata.getColumnDefinition(name).getValidator(), value), value);
+ }
+
+ /**
+ * Returns true if the provided column name is indexed by this secondary index.
+ *
+ * The default implement checks whether the name is one the columnDef name,
+ * but this should be overriden but subclass if needed.
+ */
+ public boolean indexes(ByteBuffer name)
+ {
+ for (ColumnDefinition columnDef : columnDefs)
+ {
+ if (baseCfs.getComparator().compare(columnDef.name, name) == 0)
+ return true;
+ }
+ return false;
+ }
+
+ /**
* This is the primary way to create a secondary index instance for a CF column.
* It will validate the index_options before initializing.
*
@@ -303,6 +324,9 @@ public abstract class SecondaryIndex
case KEYS:
index = new KeysIndex();
break;
+ case COMPOSITES:
+ index = new CompositesIndex();
+ break;
case CUSTOM:
assert cdef.getIndexOptions() != null;
String class_name = cdef.getIndexOptions().get(CUSTOM_INDEX_OPTION_NAME);
@@ -326,6 +350,46 @@ public abstract class SecondaryIndex
return index;
}
-
+
public abstract boolean validate(Column column);
+
+ /**
+ * Returns the index comparator for index backed by CFS, or null.
+ *
+ * Note: it would be cleaner to have this be a member method. However we need this when opening indexes
+ * sstables, but by then the CFS won't be fully initiated, so the SecondaryIndex object won't be accessible.
+ */
+ public static AbstractType<?> getIndexComparator(CFMetaData baseMetadata, ColumnDefinition cdef)
+ {
+ IPartitioner rowPartitioner = StorageService.getPartitioner();
+ AbstractType<?> keyComparator = (rowPartitioner instanceof OrderPreservingPartitioner || rowPartitioner instanceof ByteOrderedPartitioner)
+ ? BytesType.instance
+ : new LocalByPartionerType(rowPartitioner);
+
+ switch (cdef.getIndexType())
+ {
+ case KEYS:
+ return keyComparator;
+ case COMPOSITES:
+ assert baseMetadata.comparator instanceof CompositeType;
+ int prefixSize;
+ try
+ {
+ prefixSize = Integer.parseInt(cdef.getIndexOptions().get(CompositesIndex.PREFIX_SIZE_OPTION));
+ }
+ catch (NumberFormatException e)
+ {
+ // This shouldn't happen if validation has been done correctly
+ throw new RuntimeException(e);
+ }
+ List<AbstractType<?>> types = new ArrayList<AbstractType<?>>(prefixSize + 1);
+ types.add(keyComparator);
+ for (int i = 0; i < prefixSize; i++)
+ types.add(((CompositeType)baseMetadata.comparator).types.get(i));
+ return CompositeType.getInstance(types);
+ case CUSTOM:
+ return null;
+ }
+ throw new AssertionError();
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/80ea03f5/src/java/org/apache/cassandra/db/index/SecondaryIndexBuilder.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/SecondaryIndexBuilder.java b/src/java/org/apache/cassandra/db/index/SecondaryIndexBuilder.java
index 59a5ec5..8c7aab8 100644
--- a/src/java/org/apache/cassandra/db/index/SecondaryIndexBuilder.java
+++ b/src/java/org/apache/cassandra/db/index/SecondaryIndexBuilder.java
@@ -19,7 +19,7 @@ package org.apache.cassandra.db.index;
import java.io.IOException;
import java.nio.ByteBuffer;
-import java.util.SortedSet;
+import java.util.Set;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.DecoratedKey;
@@ -35,13 +35,13 @@ import org.apache.cassandra.io.sstable.ReducingKeyIterator;
public class SecondaryIndexBuilder extends CompactionInfo.Holder
{
private final ColumnFamilyStore cfs;
- private final SortedSet<ByteBuffer> columns;
+ private final Set<String> idxNames;
private final ReducingKeyIterator iter;
- public SecondaryIndexBuilder(ColumnFamilyStore cfs, SortedSet<ByteBuffer> columns, ReducingKeyIterator iter)
+ public SecondaryIndexBuilder(ColumnFamilyStore cfs, Set<String> idxNames, ReducingKeyIterator iter)
{
this.cfs = cfs;
- this.columns = columns;
+ this.idxNames = idxNames;
this.iter = iter;
}
@@ -60,7 +60,7 @@ public class SecondaryIndexBuilder extends CompactionInfo.Holder
if (isStopRequested())
throw new CompactionInterruptedException(getCompactionInfo());
DecoratedKey key = iter.next();
- Table.indexRow(key, cfs, columns);
+ Table.indexRow(key, cfs, idxNames);
}
try
http://git-wip-us.apache.org/repos/asf/cassandra/blob/80ea03f5/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java b/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java
index 554aa8d..1d62746 100644
--- a/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java
+++ b/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java
@@ -71,7 +71,7 @@ public class SecondaryIndexManager
public SecondaryIndexManager(ColumnFamilyStore baseCfs)
{
- indexesByColumn = new ConcurrentSkipListMap<ByteBuffer, SecondaryIndex>(baseCfs.getComparator());
+ indexesByColumn = new ConcurrentSkipListMap<ByteBuffer, SecondaryIndex>();
rowLevelIndexMap = new HashMap<Class<? extends SecondaryIndex>, SecondaryIndex>();
this.baseCfs = baseCfs;
@@ -85,7 +85,7 @@ public class SecondaryIndexManager
// figure out what needs to be added and dropped.
// future: if/when we have modifiable settings for secondary indexes,
// they'll need to be handled here.
- Collection<ByteBuffer> indexedColumnNames = getIndexedColumns();
+ Collection<ByteBuffer> indexedColumnNames = indexesByColumn.keySet();
for (ByteBuffer indexedColumn : indexedColumnNames)
{
ColumnDefinition def = baseCfs.metadata.getColumn_metadata().get(indexedColumn);
@@ -104,6 +104,13 @@ public class SecondaryIndexManager
}
}
+ public Set<String> allIndexesNames()
+ {
+ Set<String> names = new HashSet<String>();
+ for (SecondaryIndex index : indexesByColumn.values())
+ names.add(index.getIndexName());
+ return names;
+ }
/**
* Does a full, blocking rebuild of the indexes specified by columns from the sstables.
@@ -114,15 +121,15 @@ public class SecondaryIndexManager
* @param sstables the data to build from
* @param columns the list of columns to index, ordered by comparator
*/
- public void maybeBuildSecondaryIndexes(Collection<SSTableReader> sstables, SortedSet<ByteBuffer> columns)
+ public void maybeBuildSecondaryIndexes(Collection<SSTableReader> sstables, Set<String> idxNames)
{
- if (columns.isEmpty())
+ if (idxNames.isEmpty())
return;
logger.info(String.format("Submitting index build of %s for data in %s",
- baseCfs.metadata.comparator.getString(columns), StringUtils.join(sstables, ", ")));
+ idxNames, StringUtils.join(sstables, ", ")));
- SecondaryIndexBuilder builder = new SecondaryIndexBuilder(baseCfs, columns, new ReducingKeyIterator(sstables));
+ SecondaryIndexBuilder builder = new SecondaryIndexBuilder(baseCfs, idxNames, new ReducingKeyIterator(sstables));
Future<?> future = CompactionManager.instance.submitIndexBuild(builder);
try
{
@@ -139,15 +146,27 @@ public class SecondaryIndexManager
flushIndexesBlocking();
- logger.info("Index build of " + baseCfs.metadata.comparator.getString(columns) + " complete");
+ logger.info("Index build of " + idxNames + " complete");
}
- /**
- * @return the list of indexed columns
- */
- public SortedSet<ByteBuffer> getIndexedColumns()
+ public boolean indexes(ByteBuffer name, Set<String> idxNames)
+ {
+ for (SecondaryIndex index : getIndexesByNames(idxNames))
+ {
+ if (index.indexes(name))
+ return true;
+ }
+ return false;
+ }
+
+ public boolean indexes(IColumn column)
{
- return indexesByColumn.keySet();
+ return indexes(column.name());
+ }
+
+ public boolean indexes(ByteBuffer name)
+ {
+ return indexes(name, allIndexesNames());
}
/**
@@ -212,7 +231,8 @@ public class SecondaryIndexManager
try
{
index = SecondaryIndex.createInstance(baseCfs, cdef);
- } catch (ConfigurationException e)
+ }
+ catch (ConfigurationException e)
{
throw new RuntimeException(e);
}
@@ -265,6 +285,14 @@ public class SecondaryIndexManager
return indexesByColumn.get(column);
}
+ private SecondaryIndex getIndexForFullColumnName(ByteBuffer column)
+ {
+ for (SecondaryIndex index : indexesByColumn.values())
+ if (index.indexes(column))
+ return index;
+ return null;
+ }
+
/**
* Remove the index
*/
@@ -284,18 +312,6 @@ public class SecondaryIndexManager
}
/**
- * Returns the decoratedKey for a column value
- * @param name column name
- * @param value column value
- * @return decorated key
- */
- public DecoratedKey getIndexKeyFor(ByteBuffer name, ByteBuffer value)
- {
- return new DecoratedKey(new LocalToken(baseCfs.metadata.getColumnDefinition(name).getValidator(), value), value);
- }
-
-
- /**
* @return all built indexes (ready to use)
*/
public List<String> getBuiltIndexes()
@@ -424,19 +440,13 @@ public class SecondaryIndexManager
// remove the old index entries
if (oldIndexedColumns != null)
{
- for (ByteBuffer columnName : oldIndexedColumns.getColumnNames())
+ for (IColumn column : oldIndexedColumns)
{
-
- IColumn column = oldIndexedColumns.getColumn(columnName);
-
- if (column == null)
- continue;
-
//this was previously deleted so should not be in index
if (column.isMarkedForDelete())
continue;
- SecondaryIndex index = getIndexForColumn(columnName);
+ SecondaryIndex index = getIndexForFullColumnName(column.name());
if (index == null)
{
logger.debug("Looks like index got dropped mid-update. Skipping");
@@ -454,8 +464,7 @@ public class SecondaryIndexManager
}
else
{
- DecoratedKey valueKey = getIndexKeyFor(columnName, column.value());
-
+ DecoratedKey valueKey = index.getIndexKeyFor(column.value());
((PerColumnSecondaryIndex)index).deleteColumn(valueKey, rowKey, column);
}
}
@@ -468,7 +477,7 @@ public class SecondaryIndexManager
if (column == null || column.isMarkedForDelete())
continue; // null column == row deletion
- SecondaryIndex index = getIndexForColumn(columnName);
+ SecondaryIndex index = getIndexForFullColumnName(columnName);
if (index == null)
{
logger.debug("index on {} removed; skipping remove-old for {}", columnName, ByteBufferUtil.bytesToHex(rowKey));
@@ -486,7 +495,7 @@ public class SecondaryIndexManager
}
else
{
- DecoratedKey valueKey = getIndexKeyFor(columnName, column.value());
+ DecoratedKey valueKey = index.getIndexKeyFor(column.value());
((PerColumnSecondaryIndex)index).insertColumn(valueKey, rowKey, column);
}
@@ -523,7 +532,7 @@ public class SecondaryIndexManager
}
else
{
- DecoratedKey valueKey = getIndexKeyFor(column.name(), column.value());
+ DecoratedKey valueKey = index.getIndexKeyFor(column.value());
((PerColumnSecondaryIndex) index).deleteColumn(valueKey, key.key, column);
}
}
@@ -539,7 +548,6 @@ public class SecondaryIndexManager
{
Map<String, Set<ByteBuffer>> groupByIndexType = new HashMap<String, Set<ByteBuffer>>();
-
//Group columns by type
for (IndexExpression ix : clause)
{
@@ -594,16 +602,27 @@ public class SecondaryIndexManager
return indexSearchers.get(0).search(clause, range, maxResults, dataFilter, maxIsColumns);
}
- public void setIndexBuilt(Collection<ByteBuffer> indexes)
+ private Collection<SecondaryIndex> getIndexesByNames(Set<String> idxNames)
+ {
+ List<SecondaryIndex> result = new ArrayList<SecondaryIndex>();
+ for (SecondaryIndex index : indexesByColumn.values())
+ {
+ if (idxNames.contains(index.getIndexName()))
+ result.add(index);
+ }
+ return result;
+ }
+
+ public void setIndexBuilt(Set<String> idxNames)
{
- for (ByteBuffer colName : indexes)
- indexesByColumn.get(colName).setIndexBuilt(colName);
+ for (SecondaryIndex index : getIndexesByNames(idxNames))
+ index.setIndexBuilt();
}
- public void setIndexRemoved(Collection<ByteBuffer> indexes)
+ public void setIndexRemoved(Set<String> idxNames)
{
- for (ByteBuffer colName : indexes)
- indexesByColumn.get(colName).setIndexRemoved(colName);
+ for (SecondaryIndex index : getIndexesByNames(idxNames))
+ index.setIndexRemoved();
}
public boolean validate(Column column)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/80ea03f5/src/java/org/apache/cassandra/db/index/composites/CompositesIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/composites/CompositesIndex.java b/src/java/org/apache/cassandra/db/index/composites/CompositesIndex.java
new file mode 100644
index 0000000..e53a3bf
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/index/composites/CompositesIndex.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.index.composites;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.config.ConfigurationException;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.index.AbstractSimplePerColumnSecondaryIndex;
+import org.apache.cassandra.db.index.SecondaryIndex;
+import org.apache.cassandra.db.index.SecondaryIndexSearcher;
+import org.apache.cassandra.db.marshal.*;
+import org.apache.cassandra.dht.*;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+/**
+ * Implements a secondary index for a column family using a second column family
+ * in which the row keys are indexed values, and column names are base row keys.
+ */
+public class CompositesIndex extends AbstractSimplePerColumnSecondaryIndex
+{
+ public static final String PREFIX_SIZE_OPTION = "prefix_size";
+
+ private CompositeType indexComparator;
+ private int prefixSize;
+
+ public void init(ColumnDefinition columnDef)
+ {
+ assert baseCfs.getComparator() instanceof CompositeType;
+ CompositeType baseComparator = (CompositeType) baseCfs.getComparator();
+ try
+ {
+ prefixSize = Integer.parseInt(columnDef.getIndexOptions().get(PREFIX_SIZE_OPTION));
+ }
+ catch (NumberFormatException e)
+ {
+ // Shouldn't happen since validateOptions must have been called
+ throw new AssertionError(e);
+ }
+
+ indexComparator = (CompositeType)SecondaryIndex.getIndexComparator(baseCfs.metadata, columnDef);
+ }
+
+ protected ByteBuffer makeIndexColumnName(ByteBuffer rowKey, IColumn column)
+ {
+ CompositeType baseComparator = (CompositeType)baseCfs.getComparator();
+ ByteBuffer[] components = baseComparator.split(column.name());
+ CompositeType.Builder builder = new CompositeType.Builder(indexComparator);
+ builder.add(rowKey);
+ for (int i = 0; i < Math.min(prefixSize, components.length); i++)
+ builder.add(components[i]);
+ return builder.build();
+ }
+
+ @Override
+ public boolean indexes(ByteBuffer name)
+ {
+ ColumnDefinition columnDef = columnDefs.iterator().next();
+ CompositeType baseComparator = (CompositeType)baseCfs.getComparator();
+ ByteBuffer[] components = baseComparator.split(name);
+ AbstractType<?> comp = baseCfs.metadata.getColumnDefinitionComparator(columnDef);
+ return components.length > columnDef.componentIndex
+ && comp.compare(components[columnDef.componentIndex], columnDef.name) == 0;
+ }
+
+ public SecondaryIndexSearcher createSecondaryIndexSearcher(Set<ByteBuffer> columns)
+ {
+ return new CompositesSearcher(baseCfs.indexManager, columns, prefixSize);
+ }
+
+ public void validateOptions() throws ConfigurationException
+ {
+ ColumnDefinition columnDef = columnDefs.iterator().next();
+ String option = columnDef.getIndexOptions().get(PREFIX_SIZE_OPTION);
+
+ if (option == null)
+ throw new ConfigurationException("Missing option " + PREFIX_SIZE_OPTION);
+
+ try
+ {
+ Integer.parseInt(option);
+ }
+ catch (NumberFormatException e)
+ {
+ throw new ConfigurationException(String.format("Invalid non integer value for option %s (got '%s')", PREFIX_SIZE_OPTION, option));
+ }
+ }
+}