You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sa...@apache.org on 2015/08/22 01:48:36 UTC
[05/15] cassandra git commit: New 2i API and implementations for
built in indexes
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0626be86/src/java/org/apache/cassandra/db/index/SecondaryIndexSearcher.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/SecondaryIndexSearcher.java b/src/java/org/apache/cassandra/db/index/SecondaryIndexSearcher.java
deleted file mode 100644
index d4ca707..0000000
--- a/src/java/org/apache/cassandra/db/index/SecondaryIndexSearcher.java
+++ /dev/null
@@ -1,244 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db.index;
-
-import java.nio.ByteBuffer;
-import java.util.*;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.ColumnDefinition;
-import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.filter.*;
-import org.apache.cassandra.db.partitions.PartitionIterator;
-import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
-import org.apache.cassandra.db.rows.*;
-import org.apache.cassandra.dht.AbstractBounds;
-import org.apache.cassandra.tracing.Tracing;
-import org.apache.cassandra.utils.btree.BTreeSet;
-import org.apache.cassandra.utils.FBUtilities;
-
-public abstract class SecondaryIndexSearcher
-{
- private static final Logger logger = LoggerFactory.getLogger(SecondaryIndexSearcher.class);
-
- protected final SecondaryIndexManager indexManager;
- protected final Set<ColumnDefinition> columns;
- protected final ColumnFamilyStore baseCfs;
-
- public SecondaryIndexSearcher(SecondaryIndexManager indexManager, Set<ColumnDefinition> columns)
- {
- this.indexManager = indexManager;
- this.columns = columns;
- this.baseCfs = indexManager.baseCfs;
- }
-
- public SecondaryIndex highestSelectivityIndex(RowFilter filter)
- {
- RowFilter.Expression expr = highestSelectivityPredicate(filter, false);
- return expr == null ? null : indexManager.getIndexForColumn(expr.column());
- }
-
- public RowFilter.Expression primaryClause(ReadCommand command)
- {
- return highestSelectivityPredicate(command.rowFilter(), false);
- }
-
- @SuppressWarnings("resource") // Both the OpOrder and 'indexIter' are closed on exception, or through the closing of the result
- // of this method.
- public UnfilteredPartitionIterator search(ReadCommand command, ReadOrderGroup orderGroup)
- {
- RowFilter.Expression primary = highestSelectivityPredicate(command.rowFilter(), true);
- assert primary != null;
-
- AbstractSimplePerColumnSecondaryIndex index = (AbstractSimplePerColumnSecondaryIndex)indexManager.getIndexForColumn(primary.column());
- assert index != null && index.getIndexCfs() != null;
-
- if (logger.isDebugEnabled())
- logger.debug("Most-selective indexed predicate is {}", primary);
-
- DecoratedKey indexKey = index.getIndexKeyFor(primary.getIndexValue());
-
- UnfilteredRowIterator indexIter = queryIndex(index, indexKey, command, orderGroup);
- try
- {
- return queryDataFromIndex(index, indexKey, UnfilteredRowIterators.filter(indexIter, command.nowInSec()), command, orderGroup);
- }
- catch (RuntimeException | Error e)
- {
- indexIter.close();
- throw e;
- }
- }
-
- private UnfilteredRowIterator queryIndex(AbstractSimplePerColumnSecondaryIndex index, DecoratedKey indexKey, ReadCommand command, ReadOrderGroup orderGroup)
- {
- ClusteringIndexFilter filter = makeIndexFilter(index, command);
- CFMetaData indexMetadata = index.getIndexCfs().metadata;
- return SinglePartitionReadCommand.create(indexMetadata, command.nowInSec(), indexKey, ColumnFilter.all(indexMetadata), filter)
- .queryMemtableAndDisk(index.getIndexCfs(), orderGroup.indexReadOpOrderGroup());
- }
-
- private ClusteringIndexFilter makeIndexFilter(AbstractSimplePerColumnSecondaryIndex index, ReadCommand command)
- {
- if (command instanceof SinglePartitionReadCommand)
- {
- // Note: as yet there's no route to get here - a 2i query *always* uses a
- // PartitionRangeReadCommand. This is here in preparation for coming changes
- // in SelectStatement.
- SinglePartitionReadCommand sprc = (SinglePartitionReadCommand)command;
- ByteBuffer pk = sprc.partitionKey().getKey();
- ClusteringIndexFilter filter = sprc.clusteringIndexFilter();
-
- if (filter instanceof ClusteringIndexNamesFilter)
- {
- NavigableSet<Clustering> requested = ((ClusteringIndexNamesFilter)filter).requestedRows();
- BTreeSet.Builder<Clustering> clusterings = BTreeSet.builder(index.getIndexComparator());
- for (Clustering c : requested)
- clusterings.add(index.makeIndexClustering(pk, c, (Cell)null));
- return new ClusteringIndexNamesFilter(clusterings.build(), filter.isReversed());
- }
- else
- {
- Slices requested = ((ClusteringIndexSliceFilter)filter).requestedSlices();
- Slices.Builder builder = new Slices.Builder(index.getIndexComparator());
- for (Slice slice : requested)
- builder.add(index.makeIndexBound(pk, slice.start()), index.makeIndexBound(pk, slice.end()));
- return new ClusteringIndexSliceFilter(builder.build(), filter.isReversed());
- }
- }
- else
- {
-
- DataRange dataRange = ((PartitionRangeReadCommand)command).dataRange();
- AbstractBounds<PartitionPosition> range = dataRange.keyRange();
-
- Slice slice = Slice.ALL;
-
- /*
- * XXX: If the range requested is a token range, we'll have to start at the beginning (and stop at the end) of
- * the indexed row unfortunately (which will be inefficient), because we have no way to intuit the smallest possible
- * key having a given token. A potential fix would be to actually store the token along the key in the indexed row.
- */
- if (range.left instanceof DecoratedKey)
- {
- // the right hand side of the range may not be a DecoratedKey (for instance if we're paging),
- // but if it is, we can optimise slightly by restricting the slice
- if (range.right instanceof DecoratedKey)
- {
-
- DecoratedKey startKey = (DecoratedKey) range.left;
- DecoratedKey endKey = (DecoratedKey) range.right;
-
- Slice.Bound start = Slice.Bound.BOTTOM;
- Slice.Bound end = Slice.Bound.TOP;
-
- /*
- * For index queries over a range, we can't do a whole lot better than querying everything for the key range, though for
- * slice queries where we can slightly restrict the beginning and end.
- */
- if (!dataRange.isNamesQuery())
- {
- ClusteringIndexSliceFilter startSliceFilter = ((ClusteringIndexSliceFilter) dataRange.clusteringIndexFilter(startKey));
- ClusteringIndexSliceFilter endSliceFilter = ((ClusteringIndexSliceFilter) dataRange.clusteringIndexFilter(endKey));
-
- // We can't effectively support reversed queries when we have a range, so we don't support it
- // (or through post-query reordering) and shouldn't get there.
- assert !startSliceFilter.isReversed() && !endSliceFilter.isReversed();
-
- Slices startSlices = startSliceFilter.requestedSlices();
- Slices endSlices = endSliceFilter.requestedSlices();
-
- if (startSlices.size() > 0)
- start = startSlices.get(0).start();
-
- if (endSlices.size() > 0)
- end = endSlices.get(endSlices.size() - 1).end();
- }
-
- slice = Slice.make(index.makeIndexBound(startKey.getKey(), start),
- index.makeIndexBound(endKey.getKey(), end));
- }
- else
- {
- // otherwise, just start the index slice from the key we do have
- slice = Slice.make(index.makeIndexBound(((DecoratedKey)range.left).getKey(), Slice.Bound.BOTTOM),
- Slice.Bound.TOP);
- }
- }
- return new ClusteringIndexSliceFilter(Slices.with(index.getIndexComparator(), slice), false);
- }
- }
-
- protected abstract UnfilteredPartitionIterator queryDataFromIndex(AbstractSimplePerColumnSecondaryIndex index,
- DecoratedKey indexKey,
- RowIterator indexHits,
- ReadCommand command,
- ReadOrderGroup orderGroup);
-
- protected RowFilter.Expression highestSelectivityPredicate(RowFilter filter, boolean includeInTrace)
- {
- RowFilter.Expression best = null;
- int bestMeanCount = Integer.MAX_VALUE;
- Map<SecondaryIndex, Integer> candidates = new HashMap<>();
-
- for (RowFilter.Expression expression : filter)
- {
- // skip columns belonging to a different index type
- if (!columns.contains(expression.column()))
- continue;
-
- SecondaryIndex index = indexManager.getIndexForColumn(expression.column());
- if (index == null || index.getIndexCfs() == null || !index.supportsOperator(expression.operator()))
- continue;
-
- int columns = index.getIndexCfs().getMeanColumns();
- candidates.put(index, columns);
- if (columns < bestMeanCount)
- {
- best = expression;
- bestMeanCount = columns;
- }
- }
-
- if (includeInTrace)
- {
- if (best == null)
- Tracing.trace("No applicable indexes found");
- else if (Tracing.isTracing())
- // pay for an additional threadlocal get() rather than build the strings unnecessarily
- Tracing.trace("Candidate index mean cardinalities are {}. Scanning with {}.", FBUtilities.toString(candidates), indexManager.getIndexForColumn(best.column()).getIndexName());
- }
- return best;
- }
-
- /**
- * Post-process the result of an index query. This is done by the coordinator node after it has reconciled
- * the replica responses.
- *
- * @param command The {@code ReadCommand} use for the query.
- * @param result The index query results to be post-processed
- * @return The post-processed results
- */
- public PartitionIterator postReconciliationProcessing(RowFilter filter, PartitionIterator result)
- {
- return result;
- }
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0626be86/src/java/org/apache/cassandra/db/index/composites/CompositesIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/composites/CompositesIndex.java b/src/java/org/apache/cassandra/db/index/composites/CompositesIndex.java
deleted file mode 100644
index abc5344..0000000
--- a/src/java/org/apache/cassandra/db/index/composites/CompositesIndex.java
+++ /dev/null
@@ -1,168 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db.index.composites;
-
-import java.nio.ByteBuffer;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.cassandra.schema.IndexMetadata;
-import org.apache.cassandra.utils.concurrent.OpOrder;
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.ColumnDefinition;
-import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.rows.*;
-import org.apache.cassandra.db.partitions.*;
-import org.apache.cassandra.db.index.AbstractSimplePerColumnSecondaryIndex;
-import org.apache.cassandra.db.index.SecondaryIndex;
-import org.apache.cassandra.db.index.SecondaryIndexManager;
-import org.apache.cassandra.db.index.SecondaryIndexSearcher;
-import org.apache.cassandra.db.marshal.CollectionType;
-import org.apache.cassandra.exceptions.ConfigurationException;
-
-/**
- * Base class for internal secondary indexes (this could be merged with AbstractSimplePerColumnSecondaryIndex).
- */
-public abstract class CompositesIndex extends AbstractSimplePerColumnSecondaryIndex
-{
- public static CompositesIndex create(IndexMetadata indexDef, CFMetaData baseMetadata)
- {
- ColumnDefinition cfDef = indexDef.indexedColumn(baseMetadata);
- if (cfDef.type.isCollection() && cfDef.type.isMultiCell())
- {
- switch (((CollectionType)cfDef.type).kind)
- {
- case LIST:
- return new CompositesIndexOnCollectionValue();
- case SET:
- return new CompositesIndexOnCollectionKey();
- case MAP:
- if (indexDef.options.containsKey(SecondaryIndex.INDEX_KEYS_OPTION_NAME))
- return new CompositesIndexOnCollectionKey();
- else if (indexDef.options.containsKey(SecondaryIndex.INDEX_ENTRIES_OPTION_NAME))
- return new CompositesIndexOnCollectionKeyAndValue();
- else
- return new CompositesIndexOnCollectionValue();
- }
- }
-
- switch (cfDef.kind)
- {
- case CLUSTERING:
- return new CompositesIndexOnClusteringKey();
- case REGULAR:
- return new CompositesIndexOnRegular();
- case PARTITION_KEY:
- return new CompositesIndexOnPartitionKey();
- //case COMPACT_VALUE:
- // return new CompositesIndexOnCompactValue();
- }
- throw new AssertionError();
- }
-
- public static void addIndexClusteringColumns(CFMetaData.Builder indexMetadata, CFMetaData baseMetadata, IndexMetadata indexDef)
- {
- ColumnDefinition cfDef = indexDef.indexedColumn(baseMetadata);
- if (cfDef.type.isCollection() && cfDef.type.isMultiCell())
- {
- CollectionType type = (CollectionType)cfDef.type;
- if (type.kind == CollectionType.Kind.LIST
- || (type.kind == CollectionType.Kind.MAP && indexDef.options.containsKey(SecondaryIndex.INDEX_VALUES_OPTION_NAME)))
- {
- CompositesIndexOnCollectionValue.addClusteringColumns(indexMetadata, baseMetadata, cfDef);
- }
- else
- {
- addGenericClusteringColumns(indexMetadata, baseMetadata, cfDef);
- }
- }
- else if (cfDef.isClusteringColumn())
- {
- CompositesIndexOnClusteringKey.addClusteringColumns(indexMetadata, baseMetadata, cfDef);
- }
- else
- {
- addGenericClusteringColumns(indexMetadata, baseMetadata, cfDef);
- }
- }
-
- protected static void addGenericClusteringColumns(CFMetaData.Builder indexMetadata, CFMetaData baseMetadata, ColumnDefinition columnDef)
- {
- indexMetadata.addClusteringColumn("partition_key", baseMetadata.partitioner.partitionOrdering());
- for (ColumnDefinition def : baseMetadata.clusteringColumns())
- indexMetadata.addClusteringColumn(def.name, def.type);
- }
-
- public abstract IndexedEntry decodeEntry(DecoratedKey indexedValue, Row indexEntry);
-
- public abstract boolean isStale(Row row, ByteBuffer indexValue, int nowInSec);
-
- public void delete(IndexedEntry entry, OpOrder.Group opGroup, int nowInSec)
- {
- Row row = BTreeRow.emptyDeletedRow(entry.indexClustering, new DeletionTime(entry.timestamp, nowInSec));
- PartitionUpdate upd = PartitionUpdate.singleRowUpdate(indexCfs.metadata, entry.indexValue, row);
- indexCfs.apply(upd, SecondaryIndexManager.nullUpdater, opGroup, null);
-
- if (logger.isDebugEnabled())
- logger.debug("removed index entry for cleaned-up value {}:{}", entry.indexValue, upd);
- }
-
- public SecondaryIndexSearcher createSecondaryIndexSearcher(Set<ColumnDefinition> columns)
- {
- return new CompositesSearcher(baseCfs.indexManager, columns);
- }
-
- public void validateOptions(CFMetaData baseCfm, IndexMetadata indexMetadata) throws ConfigurationException
- {
- ColumnDefinition columnDef = indexMetadata.indexedColumn(baseCfm);
- Map<String, String> options = new HashMap<String, String>(indexMetadata.options);
-
- // We used to have an option called "prefix_size" so skip it silently for backward compatibility sake.
- options.remove("prefix_size");
-
- if (columnDef.type.isCollection())
- {
- options.remove(SecondaryIndex.INDEX_VALUES_OPTION_NAME);
- options.remove(SecondaryIndex.INDEX_KEYS_OPTION_NAME);
- options.remove(SecondaryIndex.INDEX_ENTRIES_OPTION_NAME);
- }
-
- if (!options.isEmpty())
- throw new ConfigurationException("Unknown options provided for COMPOSITES index: " + options.keySet());
- }
-
- public static class IndexedEntry
- {
- public final DecoratedKey indexValue;
- public final Clustering indexClustering;
- public final long timestamp;
-
- public final ByteBuffer indexedKey;
- public final Clustering indexedEntryClustering;
-
- public IndexedEntry(DecoratedKey indexValue, Clustering indexClustering, long timestamp, ByteBuffer indexedKey, Clustering indexedEntryClustering)
- {
- this.indexValue = indexValue;
- this.indexClustering = indexClustering;
- this.timestamp = timestamp;
- this.indexedKey = indexedKey;
- this.indexedEntryClustering = indexedEntryClustering;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0626be86/src/java/org/apache/cassandra/db/index/composites/CompositesIndexIncludingCollectionKey.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/composites/CompositesIndexIncludingCollectionKey.java b/src/java/org/apache/cassandra/db/index/composites/CompositesIndexIncludingCollectionKey.java
deleted file mode 100644
index 7624c1f..0000000
--- a/src/java/org/apache/cassandra/db/index/composites/CompositesIndexIncludingCollectionKey.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db.index.composites;
-
-import java.nio.ByteBuffer;
-
-import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.rows.*;
-
-/**
- * Common superclass for indexes that capture collection keys, including
- * indexes on such keys themselves.
- *
- * A cell indexed by this index will have the general form:
- * ck_0 ... ck_n c_name [col_elt] : v
- * where ck_i are the cluster keys, c_name the CQL3 column name, col_elt the
- * collection element that we want to index (which may or may not be there depending
- * on whether c_name is the collection we're indexing), and v the cell value.
- *
- * Such a cell is indexed if c_name is the indexed collection (in which case we are guaranteed to have
- * col_elt). The index entry can be viewed in the following way:
- * - the row key is determined by subclasses of this type.
- * - the cell name will be 'rk ck_0 ... ck_n' where rk is the row key of the initial cell.
- */
-public abstract class CompositesIndexIncludingCollectionKey extends CompositesIndex
-{
- protected CBuilder buildIndexClusteringPrefix(ByteBuffer rowKey, ClusteringPrefix prefix, CellPath path)
- {
- CBuilder builder = CBuilder.create(getIndexComparator());
- builder.add(rowKey);
- for (int i = 0; i < prefix.size(); i++)
- builder.add(prefix.get(i));
- return builder;
- }
-
- public IndexedEntry decodeEntry(DecoratedKey indexedValue, Row indexEntry)
- {
- int count = 1 + baseCfs.metadata.clusteringColumns().size();
- Clustering clustering = indexEntry.clustering();
- CBuilder builder = CBuilder.create(baseCfs.getComparator());
- for (int i = 0; i < count - 1; i++)
- builder.add(clustering.get(i + 1));
- return new IndexedEntry(indexedValue, clustering, indexEntry.primaryKeyLivenessInfo().timestamp(), clustering.get(0), builder.build());
- }
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0626be86/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnClusteringKey.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnClusteringKey.java b/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnClusteringKey.java
deleted file mode 100644
index cd4aff9..0000000
--- a/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnClusteringKey.java
+++ /dev/null
@@ -1,137 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db.index.composites;
-
-import java.nio.ByteBuffer;
-import java.util.List;
-
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.ColumnDefinition;
-import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.rows.*;
-import org.apache.cassandra.utils.concurrent.OpOrder;
-
-/**
- * Index on a CLUSTERING column definition.
- *
- * A cell indexed by this index will have the general form:
- * ck_0 ... ck_n c_name : v
- * where ck_i are the cluster keys, c_name the last component of the cell
- * composite name (or second to last if collections are in use, but this
- * has no impact) and v the cell value.
- *
- * Such a cell is always indexed by this index (or rather, it is indexed if
- * n >= columnDef.componentIndex, which will always be the case in practice)
- * and it will generate (makeIndexColumnName()) an index entry whose:
- * - row key will be ck_i (getIndexedValue()) where i == columnDef.componentIndex.
- * - cell name will
- * rk ck_0 ... ck_{i-1} ck_{i+1} ck_n
- * where rk is the row key of the initial cell and i == columnDef.componentIndex.
- */
-public class CompositesIndexOnClusteringKey extends CompositesIndex
-{
- public static void addClusteringColumns(CFMetaData.Builder indexMetadata, CFMetaData baseMetadata, ColumnDefinition columnDef)
- {
- indexMetadata.addClusteringColumn("partition_key", baseMetadata.partitioner.partitionOrdering());
-
- List<ColumnDefinition> cks = baseMetadata.clusteringColumns();
- for (int i = 0; i < columnDef.position(); i++)
- {
- ColumnDefinition def = cks.get(i);
- indexMetadata.addClusteringColumn(def.name, def.type);
- }
- for (int i = columnDef.position() + 1; i < cks.size(); i++)
- {
- ColumnDefinition def = cks.get(i);
- indexMetadata.addClusteringColumn(def.name, def.type);
- }
- }
-
- protected ByteBuffer getIndexedValue(ByteBuffer rowKey, Clustering clustering, ByteBuffer cellValue, CellPath path)
- {
- return clustering.get(columnDef.position());
- }
-
- protected CBuilder buildIndexClusteringPrefix(ByteBuffer rowKey, ClusteringPrefix prefix, CellPath path)
- {
- CBuilder builder = CBuilder.create(getIndexComparator());
- builder.add(rowKey);
- for (int i = 0; i < Math.min(columnDef.position(), prefix.size()); i++)
- builder.add(prefix.get(i));
- for (int i = columnDef.position() + 1; i < prefix.size(); i++)
- builder.add(prefix.get(i));
- return builder;
- }
-
- public IndexedEntry decodeEntry(DecoratedKey indexedValue, Row indexEntry)
- {
- int ckCount = baseCfs.metadata.clusteringColumns().size();
-
- Clustering clustering = indexEntry.clustering();
- CBuilder builder = CBuilder.create(baseCfs.getComparator());
- for (int i = 0; i < columnDef.position(); i++)
- builder.add(clustering.get(i + 1));
-
- builder.add(indexedValue.getKey());
-
- for (int i = columnDef.position() + 1; i < ckCount; i++)
- builder.add(clustering.get(i));
-
- return new IndexedEntry(indexedValue, clustering, indexEntry.primaryKeyLivenessInfo().timestamp(), clustering.get(0), builder.build());
- }
-
- @Override
- protected boolean indexPrimaryKeyColumn()
- {
- return true;
- }
-
- @Override
- public boolean indexes(ColumnDefinition c)
- {
- // Actual indexing for this index type is done through maybeIndex
- return false;
- }
-
- public boolean isStale(Row data, ByteBuffer indexValue, int nowInSec)
- {
- return !data.hasLiveData(nowInSec);
- }
-
- @Override
- public void maybeIndex(ByteBuffer partitionKey, Clustering clustering, long timestamp, int ttl, OpOrder.Group opGroup, int nowInSec)
- {
- if (clustering != Clustering.STATIC_CLUSTERING && clustering.get(columnDef.position()) != null)
- insert(partitionKey, clustering, null, LivenessInfo.create(indexCfs.metadata, timestamp, ttl, nowInSec), opGroup);
- }
-
- @Override
- public void maybeDelete(ByteBuffer partitionKey, Clustering clustering, DeletionTime deletion, OpOrder.Group opGroup)
- {
- if (clustering.get(columnDef.position()) != null && !deletion.isLive())
- delete(partitionKey, clustering, null, null, deletion, opGroup);
- }
-
- @Override
- public void delete(ByteBuffer rowKey, Clustering clustering, Cell cell, OpOrder.Group opGroup, int nowInSec)
- {
- // We only know that one column of the CQL row has been updated/deleted, but we don't know if the
- // full row has been deleted so we should not do anything. If it ends up that the whole row has
- // been deleted, it will be eventually cleaned up on read because the entry will be detected stale.
- }
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0626be86/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnCollectionKey.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnCollectionKey.java b/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnCollectionKey.java
deleted file mode 100644
index 50e81c4..0000000
--- a/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnCollectionKey.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db.index.composites;
-
-import java.nio.ByteBuffer;
-
-import org.apache.cassandra.cql3.Operator;
-import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.rows.*;
-import org.apache.cassandra.db.marshal.*;
-
-/**
- * Index on the collection element of the cell name of a collection.
- *
- * The row keys for this index are given by the collection element for
- * indexed columns.
- */
-public class CompositesIndexOnCollectionKey extends CompositesIndexIncludingCollectionKey
-{
- @Override
- protected AbstractType<?> getIndexKeyComparator()
- {
- return ((CollectionType)columnDef.type).nameComparator();
- }
-
- protected ByteBuffer getIndexedValue(ByteBuffer rowKey, Clustering clustering, ByteBuffer cellValue, CellPath path)
- {
- return path.get(0);
- }
-
- @Override
- public boolean supportsOperator(Operator operator)
- {
- return operator == Operator.CONTAINS_KEY ||
- operator == Operator.CONTAINS && columnDef.type instanceof SetType;
- }
-
- public boolean isStale(Row data, ByteBuffer indexValue, int nowInSec)
- {
- Cell cell = data.getCell(columnDef, CellPath.create(indexValue));
- return cell == null || !cell.isLive(nowInSec);
- }
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0626be86/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnCollectionKeyAndValue.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnCollectionKeyAndValue.java b/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnCollectionKeyAndValue.java
deleted file mode 100644
index 766f803..0000000
--- a/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnCollectionKeyAndValue.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db.index.composites;
-
-import java.nio.ByteBuffer;
-
-import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.rows.*;
-import org.apache.cassandra.db.marshal.*;
-
-/**
- * Index on the element and value of cells participating in a collection.
- *
- * The row keys for this index are a composite of the collection element
- * and value of indexed columns.
- */
-public class CompositesIndexOnCollectionKeyAndValue extends CompositesIndexIncludingCollectionKey
-{
- @Override
- protected AbstractType<?> getIndexKeyComparator()
- {
- CollectionType colType = (CollectionType)columnDef.type;
- return CompositeType.getInstance(colType.nameComparator(), colType.valueComparator());
- }
-
- protected ByteBuffer getIndexedValue(ByteBuffer rowKey, Clustering clustering, ByteBuffer cellValue, CellPath path)
- {
- return CompositeType.build(path.get(0), cellValue);
- }
-
- public boolean isStale(Row data, ByteBuffer indexValue, int nowInSec)
- {
- ByteBuffer[] components = ((CompositeType)getIndexKeyComparator()).split(indexValue);
- ByteBuffer mapKey = components[0];
- ByteBuffer mapValue = components[1];
-
- Cell cell = data.getCell(columnDef, CellPath.create(mapKey));
- if (cell == null || !cell.isLive(nowInSec))
- return true;
-
- AbstractType<?> valueComparator = ((CollectionType)columnDef.type).valueComparator();
- return valueComparator.compare(mapValue, cell.value()) != 0;
- }
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0626be86/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnCollectionValue.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnCollectionValue.java b/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnCollectionValue.java
deleted file mode 100644
index 30391cf..0000000
--- a/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnCollectionValue.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db.index.composites;
-
-import java.nio.ByteBuffer;
-import java.util.Iterator;
-
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.ColumnDefinition;
-import org.apache.cassandra.cql3.Operator;
-import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.rows.*;
-import org.apache.cassandra.db.marshal.*;
-
-/**
- * Index the value of a collection cell.
- *
- * This is a lot like an index on REGULAR, except that we also need to make
- * the collection key part of the index entry so that:
- * 1) we don't have to scan the whole collection at query time to know the
- * entry is stale and if it still satisfies the query.
- * 2) if a collection has multiple time the same value, we need one entry
- * for each so that if we delete one of the value only we only delete the
- * entry corresponding to that value.
- */
-public class CompositesIndexOnCollectionValue extends CompositesIndex
-{
- public static void addClusteringColumns(CFMetaData.Builder indexMetadata, CFMetaData baseMetadata, ColumnDefinition columnDef)
- {
- addGenericClusteringColumns(indexMetadata, baseMetadata, columnDef);
-
- // collection key
- indexMetadata.addClusteringColumn("cell_path", ((CollectionType)columnDef.type).nameComparator());
- }
-
- @Override
- protected AbstractType<?> getIndexKeyComparator()
- {
- return ((CollectionType)columnDef.type).valueComparator();
- }
-
- protected ByteBuffer getIndexedValue(ByteBuffer rowKey, Clustering clustering, ByteBuffer cellValue, CellPath path)
- {
- return cellValue;
- }
-
- protected CBuilder buildIndexClusteringPrefix(ByteBuffer rowKey, ClusteringPrefix prefix, CellPath path)
- {
- CBuilder builder = CBuilder.create(getIndexComparator());
- builder.add(rowKey);
- for (int i = 0; i < prefix.size(); i++)
- builder.add(prefix.get(i));
-
- // When indexing, cell will be present, but when searching, it won't (CASSANDRA-7525)
- if (prefix.size() == baseCfs.metadata.clusteringColumns().size() && path != null)
- builder.add(path.get(0));
-
- return builder;
- }
-
- public IndexedEntry decodeEntry(DecoratedKey indexedValue, Row indexEntry)
- {
- Clustering clustering = indexEntry.clustering();
- CBuilder builder = CBuilder.create(baseCfs.getComparator());
- for (int i = 0; i < baseCfs.getComparator().size(); i++)
- builder.add(clustering.get(i + 1));
- return new IndexedEntry(indexedValue, clustering, indexEntry.primaryKeyLivenessInfo().timestamp(), clustering.get(0), builder.build());
- }
-
- @Override
- public boolean supportsOperator(Operator operator)
- {
- return operator == Operator.CONTAINS && !(columnDef.type instanceof SetType);
- }
-
- public boolean isStale(Row data, ByteBuffer indexValue, int nowInSec)
- {
- ComplexColumnData complexData = data.getComplexColumnData(columnDef);
- for (Cell cell : complexData)
- {
- if (cell.isLive(nowInSec) && ((CollectionType) columnDef.type).valueComparator().compare(indexValue, cell.value()) == 0)
- return false;
- }
- return true;
- }
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0626be86/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnPartitionKey.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnPartitionKey.java b/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnPartitionKey.java
deleted file mode 100644
index a93f8e1..0000000
--- a/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnPartitionKey.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db.index.composites;
-
-import java.nio.ByteBuffer;
-
-import org.apache.cassandra.config.ColumnDefinition;
-import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.rows.*;
-import org.apache.cassandra.db.marshal.*;
-import org.apache.cassandra.utils.concurrent.OpOrder;
-
-/**
- * Index on a PARTITION_KEY column definition.
- *
- * This suppose a composite row key:
- * rk = rk_0 ... rk_n
- *
- * The corresponding index entry will be:
- * - index row key will be rk_i (where i == columnDef.componentIndex)
- * - cell name will be: rk ck
- * where rk is the fully partition key and ck the clustering keys of the
- * original cell names (thus excluding the last column name as we want to refer to
- * the whole CQL3 row, not just the cell itself)
- *
- * Note that contrarily to other type of index, we repeat the indexed value in
- * the index cell name (we use the whole partition key). The reason is that we
- * want to order the index cell name by partitioner first, and skipping a part
- * of the row key would change the order.
- */
-public class CompositesIndexOnPartitionKey extends CompositesIndex
-{
- protected ByteBuffer getIndexedValue(ByteBuffer rowKey, Clustering clustering, ByteBuffer cellValue, CellPath path)
- {
- CompositeType keyComparator = (CompositeType)baseCfs.metadata.getKeyValidator();
- ByteBuffer[] components = keyComparator.split(rowKey);
- return components[columnDef.position()];
- }
-
- protected CBuilder buildIndexClusteringPrefix(ByteBuffer rowKey, ClusteringPrefix prefix, CellPath path)
- {
- CBuilder builder = CBuilder.create(getIndexComparator());
- builder.add(rowKey);
- for (int i = 0; i < prefix.size(); i++)
- builder.add(prefix.get(i));
- return builder;
- }
-
- public IndexedEntry decodeEntry(DecoratedKey indexedValue, Row indexEntry)
- {
- int ckCount = baseCfs.metadata.clusteringColumns().size();
- Clustering clustering = indexEntry.clustering();
- CBuilder builder = CBuilder.create(baseCfs.getComparator());
- for (int i = 0; i < ckCount; i++)
- builder.add(clustering.get(i + 1));
-
- return new IndexedEntry(indexedValue, clustering, indexEntry.primaryKeyLivenessInfo().timestamp(), clustering.get(0), builder.build());
- }
-
- @Override
- protected boolean indexPrimaryKeyColumn()
- {
- return true;
- }
-
- @Override
- public boolean indexes(ColumnDefinition c)
- {
- // Actual indexing for this index type is done through maybeIndex
- return false;
- }
-
- public boolean isStale(Row data, ByteBuffer indexValue, int nowInSec)
- {
- return !data.hasLiveData(nowInSec);
- }
-
- @Override
- public void maybeIndex(ByteBuffer partitionKey, Clustering clustering, long timestamp, int ttl, OpOrder.Group opGroup, int nowInSec)
- {
- insert(partitionKey, clustering, null, LivenessInfo.create(indexCfs.metadata, timestamp, ttl, nowInSec), opGroup);
- }
-
- @Override
- public void delete(ByteBuffer rowKey, Clustering clustering, Cell cell, OpOrder.Group opGroup, int nowInSec)
- {
- // We only know that one column of the CQL row has been updated/deleted, but we don't know if the
- // full row has been deleted so we should not do anything. If it ends up that the whole row has
- // been deleted, it will be eventually cleaned up on read because the entry will be detected stale.
- }
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0626be86/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnRegular.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnRegular.java b/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnRegular.java
deleted file mode 100644
index a88502a..0000000
--- a/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnRegular.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db.index.composites;
-
-import java.nio.ByteBuffer;
-
-import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.rows.*;
-
-/**
- * Index on a REGULAR column definition on a composite type.
- *
- * A cell indexed by this index will have the general form:
- * ck_0 ... ck_n c_name : v
- * where ck_i are the cluster keys, c_name the last component of the cell
- * composite name (or second to last if collections are in use, but this
- * has no impact) and v the cell value.
- *
- * Such a cell is indexed if c_name == columnDef.name, and it will generate
- * (makeIndexColumnName()) an index entry whose:
- * - row key will be the value v (getIndexedValue()).
- * - cell name will
- * rk ck_0 ... ck_n
- * where rk is the row key of the initial cell. I.e. the index entry store
- * all the information require to locate back the indexed cell.
- */
-public class CompositesIndexOnRegular extends CompositesIndex
-{
- protected ByteBuffer getIndexedValue(ByteBuffer rowKey, Clustering clustering, ByteBuffer cellValue, CellPath path)
- {
- return cellValue;
- }
-
- protected CBuilder buildIndexClusteringPrefix(ByteBuffer rowKey, ClusteringPrefix prefix, CellPath path)
- {
- CBuilder builder = CBuilder.create(getIndexComparator());
- builder.add(rowKey);
- for (int i = 0; i < prefix.size(); i++)
- builder.add(prefix.get(i));
- return builder;
- }
-
- public IndexedEntry decodeEntry(DecoratedKey indexedValue, Row indexEntry)
- {
- Clustering clustering = indexEntry.clustering();
- ClusteringComparator baseComparator = baseCfs.getComparator();
- CBuilder builder = CBuilder.create(baseComparator);
- for (int i = 0; i < baseComparator.size(); i++)
- builder.add(clustering.get(i + 1));
- return new IndexedEntry(indexedValue, clustering, indexEntry.primaryKeyLivenessInfo().timestamp(), clustering.get(0), builder.build());
- }
-
- public boolean isStale(Row data, ByteBuffer indexValue, int nowInSec)
- {
- Cell cell = data.getCell(columnDef);
- return cell == null
- || !cell.isLive(nowInSec)
- || columnDef.type.compare(indexValue, cell.value()) != 0;
- }
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0626be86/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java b/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java
deleted file mode 100644
index b76bf7e..0000000
--- a/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java
+++ /dev/null
@@ -1,218 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db.index.composites;
-
-import java.nio.ByteBuffer;
-import java.util.*;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.ColumnDefinition;
-import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.rows.*;
-import org.apache.cassandra.db.filter.*;
-import org.apache.cassandra.db.index.*;
-import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
-import org.apache.cassandra.utils.btree.BTreeSet;
-import org.apache.cassandra.utils.concurrent.OpOrder;
-
-
-public class CompositesSearcher extends SecondaryIndexSearcher
-{
- private static final Logger logger = LoggerFactory.getLogger(CompositesSearcher.class);
-
- public CompositesSearcher(SecondaryIndexManager indexManager, Set<ColumnDefinition> columns)
- {
- super(indexManager, columns);
- }
-
- private boolean isMatchingEntry(DecoratedKey partitionKey, CompositesIndex.IndexedEntry entry, ReadCommand command)
- {
- return command.selects(partitionKey, entry.indexedEntryClustering);
- }
-
- protected UnfilteredPartitionIterator queryDataFromIndex(AbstractSimplePerColumnSecondaryIndex secondaryIdx,
- final DecoratedKey indexKey,
- final RowIterator indexHits,
- final ReadCommand command,
- final ReadOrderGroup orderGroup)
- {
- assert indexHits.staticRow() == Rows.EMPTY_STATIC_ROW;
-
- assert secondaryIdx instanceof CompositesIndex;
- final CompositesIndex index = (CompositesIndex)secondaryIdx;
-
- return new UnfilteredPartitionIterator()
- {
- private CompositesIndex.IndexedEntry nextEntry;
-
- private UnfilteredRowIterator next;
-
- public boolean isForThrift()
- {
- return command.isForThrift();
- }
-
- public CFMetaData metadata()
- {
- return command.metadata();
- }
-
- public boolean hasNext()
- {
- return prepareNext();
- }
-
- public UnfilteredRowIterator next()
- {
- if (next == null)
- prepareNext();
-
- UnfilteredRowIterator toReturn = next;
- next = null;
- return toReturn;
- }
-
- private boolean prepareNext()
- {
- if (next != null)
- return true;
-
- if (nextEntry == null)
- {
- if (!indexHits.hasNext())
- return false;
-
- nextEntry = index.decodeEntry(indexKey, indexHits.next());
- }
-
- // Gather all index hits belonging to the same partition and query the data for those hits.
- // TODO: it's much more efficient to do 1 read for all hits to the same partition than doing
- // 1 read per index hit. However, this basically mean materializing all hits for a partition
- // in memory so we should consider adding some paging mechanism. However, index hits should
- // be relatively small so it's much better than the previous code that was materializing all
- // *data* for a given partition.
- BTreeSet.Builder<Clustering> clusterings = BTreeSet.builder(baseCfs.getComparator());
- List<CompositesIndex.IndexedEntry> entries = new ArrayList<>();
- DecoratedKey partitionKey = baseCfs.decorateKey(nextEntry.indexedKey);
-
- while (nextEntry != null && partitionKey.getKey().equals(nextEntry.indexedKey))
- {
- // We're queried a slice of the index, but some hits may not match some of the clustering column constraints
- if (isMatchingEntry(partitionKey, nextEntry, command))
- {
- clusterings.add(nextEntry.indexedEntryClustering);
- entries.add(nextEntry);
- }
-
- nextEntry = indexHits.hasNext() ? index.decodeEntry(indexKey, indexHits.next()) : null;
- }
-
- // Because we've eliminated entries that don't match the clustering columns, it's possible we added nothing
- if (clusterings.isEmpty())
- return prepareNext();
-
- // Query the gathered index hits. We still need to filter stale hits from the resulting query.
- ClusteringIndexNamesFilter filter = new ClusteringIndexNamesFilter(clusterings.build(), false);
- SinglePartitionReadCommand dataCmd = new SinglePartitionNamesCommand(metadata(),
- command.nowInSec(),
- command.columnFilter(),
- command.rowFilter(),
- DataLimits.NONE,
- partitionKey,
- filter);
- @SuppressWarnings("resource") // We close right away if empty, and if it's assign to next it will be called either
- // by the next caller of next, or through closing this iterator is this come before.
- UnfilteredRowIterator dataIter = filterStaleEntries(dataCmd.queryMemtableAndDisk(baseCfs, orderGroup.baseReadOpOrderGroup()),
- index,
- indexKey.getKey(),
- entries,
- orderGroup.writeOpOrderGroup(),
- command.nowInSec());
- if (dataIter.isEmpty())
- {
- dataIter.close();
- return prepareNext();
- }
-
- next = dataIter;
- return true;
- }
-
- public void remove()
- {
- throw new UnsupportedOperationException();
- }
-
- public void close()
- {
- indexHits.close();
- if (next != null)
- next.close();
- }
- };
- }
-
- private UnfilteredRowIterator filterStaleEntries(UnfilteredRowIterator dataIter,
- final CompositesIndex index,
- final ByteBuffer indexValue,
- final List<CompositesIndex.IndexedEntry> entries,
- final OpOrder.Group writeOp,
- final int nowInSec)
- {
- return new AlteringUnfilteredRowIterator(dataIter)
- {
- private int entriesIdx;
-
- @Override
- protected Row computeNext(Row row)
- {
- CompositesIndex.IndexedEntry entry = findEntry(row.clustering(), writeOp, nowInSec);
- if (!index.isStale(row, indexValue, nowInSec))
- return row;
-
- // The entry is stale: delete the entry and ignore otherwise
- index.delete(entry, writeOp, nowInSec);
- return null;
- }
-
- private CompositesIndex.IndexedEntry findEntry(Clustering clustering, OpOrder.Group writeOp, int nowInSec)
- {
- assert entriesIdx < entries.size();
- while (entriesIdx < entries.size())
- {
- CompositesIndex.IndexedEntry entry = entries.get(entriesIdx++);
- // The entries are in clustering order. So that the requested entry should be the
- // next entry, the one at 'entriesIdx'. However, we can have stale entries, entries
- // that have no corresponding row in the base table typically because of a range
- // tombstone or partition level deletion. Delete such stale entries.
- int cmp = metadata().comparator.compare(entry.indexedEntryClustering, clustering);
- assert cmp <= 0; // this would means entries are not in clustering order, which shouldn't happen
- if (cmp == 0)
- return entry;
- else
- index.delete(entry, writeOp, nowInSec);
- }
- // entries correspond to the rows we've queried, so we shouldn't have a row that has no corresponding entry.
- throw new AssertionError();
- }
- };
- }
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0626be86/src/java/org/apache/cassandra/db/index/keys/KeysIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/keys/KeysIndex.java b/src/java/org/apache/cassandra/db/index/keys/KeysIndex.java
deleted file mode 100644
index 384b29d..0000000
--- a/src/java/org/apache/cassandra/db/index/keys/KeysIndex.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db.index.keys;
-
-import java.nio.ByteBuffer;
-import java.util.Set;
-
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.ColumnDefinition;
-import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.rows.*;
-import org.apache.cassandra.db.index.AbstractSimplePerColumnSecondaryIndex;
-import org.apache.cassandra.db.index.SecondaryIndexSearcher;
-import org.apache.cassandra.exceptions.ConfigurationException;
-import org.apache.cassandra.schema.IndexMetadata;
-import org.apache.cassandra.utils.concurrent.OpOrder;
-
-/**
- * Implements a secondary index for a column family using a second column family.
- * The design uses inverted index http://en.wikipedia.org/wiki/Inverted_index.
- * The row key is the indexed value. For example, if we're indexing a column named
- * city, the index value of city is the row key.
- * The column names are the keys of the records. To see a detailed example, please
- * refer to wikipedia.
- */
-public class KeysIndex extends AbstractSimplePerColumnSecondaryIndex
-{
- public static void addIndexClusteringColumns(CFMetaData.Builder indexMetadata, CFMetaData baseMetadata)
- {
- indexMetadata.addClusteringColumn("partition_key", baseMetadata.partitioner.partitionOrdering());
- }
-
- @Override
- public void indexRow(DecoratedKey key, Row row, OpOrder.Group opGroup, int nowInSec)
- {
- super.indexRow(key, row, opGroup, nowInSec);
-
- // This is used when building indexes, in particular when the index is first created. On thrift, this
- // potentially means the column definition just got created, and so we need to check if's not a "dynamic"
- // row that actually correspond to the index definition.
- assert baseCfs.metadata.isCompactTable();
- if (!row.isStatic())
- {
- Clustering clustering = row.clustering();
- if (clustering.get(0).equals(columnDef.name.bytes))
- {
- Cell cell = row.getCell(baseCfs.metadata.compactValueColumn());
- if (cell != null && cell.isLive(nowInSec))
- insert(key.getKey(), clustering, cell, opGroup);
- }
- }
- }
-
- protected ByteBuffer getIndexedValue(ByteBuffer rowKey, Clustering clustering, ByteBuffer cellValue, CellPath path)
- {
- return cellValue;
- }
-
- protected CBuilder buildIndexClusteringPrefix(ByteBuffer rowKey, ClusteringPrefix prefix, CellPath path)
- {
- CBuilder builder = CBuilder.create(getIndexComparator());
- builder.add(rowKey);
- return builder;
- }
-
- public SecondaryIndexSearcher createSecondaryIndexSearcher(Set<ColumnDefinition> columns)
- {
- return new KeysSearcher(baseCfs.indexManager, columns);
- }
-
- public void validateOptions(CFMetaData baseCfm, IndexMetadata def) throws ConfigurationException
- {
- // no options used
- }
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0626be86/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java b/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java
deleted file mode 100644
index 4b70dcf..0000000
--- a/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java
+++ /dev/null
@@ -1,191 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db.index.keys;
-
-import java.nio.ByteBuffer;
-import java.util.Set;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.ColumnDefinition;
-import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.rows.*;
-import org.apache.cassandra.db.partitions.*;
-import org.apache.cassandra.db.index.*;
-import org.apache.cassandra.db.filter.*;
-import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
-import org.apache.cassandra.utils.concurrent.OpOrder;
-
-public class KeysSearcher extends SecondaryIndexSearcher
-{
- private static final Logger logger = LoggerFactory.getLogger(KeysSearcher.class);
-
- public KeysSearcher(SecondaryIndexManager indexManager, Set<ColumnDefinition> columns)
- {
- super(indexManager, columns);
- }
-
- protected UnfilteredPartitionIterator queryDataFromIndex(final AbstractSimplePerColumnSecondaryIndex index,
- final DecoratedKey indexKey,
- final RowIterator indexHits,
- final ReadCommand command,
- final ReadOrderGroup orderGroup)
- {
- assert indexHits.staticRow() == Rows.EMPTY_STATIC_ROW;
-
- return new UnfilteredPartitionIterator()
- {
- private UnfilteredRowIterator next;
-
- public boolean isForThrift()
- {
- return command.isForThrift();
- }
-
- public CFMetaData metadata()
- {
- return command.metadata();
- }
-
- public boolean hasNext()
- {
- return prepareNext();
- }
-
- public UnfilteredRowIterator next()
- {
- if (next == null)
- prepareNext();
-
- UnfilteredRowIterator toReturn = next;
- next = null;
- return toReturn;
- }
-
- private boolean prepareNext()
- {
- while (next == null && indexHits.hasNext())
- {
- Row hit = indexHits.next();
- DecoratedKey key = baseCfs.decorateKey(hit.clustering().get(0));
-
- SinglePartitionReadCommand dataCmd = SinglePartitionReadCommand.create(isForThrift(),
- baseCfs.metadata,
- command.nowInSec(),
- command.columnFilter(),
- command.rowFilter(),
- DataLimits.NONE,
- key,
- command.clusteringIndexFilter(key));
- @SuppressWarnings("resource") // filterIfStale closes it's iterator if either it materialize it or if it returns null.
- // Otherwise, we close right away if empty, and if it's assigned to next it will be called either
- // by the next caller of next, or through closing this iterator is this come before.
- UnfilteredRowIterator dataIter = filterIfStale(dataCmd.queryMemtableAndDisk(baseCfs, orderGroup.baseReadOpOrderGroup()),
- index,
- hit,
- indexKey.getKey(),
- orderGroup.writeOpOrderGroup(),
- isForThrift(),
- command.nowInSec());
-
- if (dataIter != null)
- {
- if (dataIter.isEmpty())
- dataIter.close();
- else
- next = dataIter;
- }
- }
- return next != null;
- }
-
- public void remove()
- {
- throw new UnsupportedOperationException();
- }
-
- public void close()
- {
- indexHits.close();
- if (next != null)
- next.close();
- }
- };
- }
-
- private UnfilteredRowIterator filterIfStale(UnfilteredRowIterator iterator,
- AbstractSimplePerColumnSecondaryIndex index,
- Row indexHit,
- ByteBuffer indexedValue,
- OpOrder.Group writeOp,
- boolean isForThrift,
- int nowInSec)
- {
- if (isForThrift)
- {
- // The data we got has gone though ThrifResultsMerger, so we're looking for the row whose clustering
- // is the indexed name. Ans so we need to materialize the partition.
- ImmutableBTreePartition result = ImmutableBTreePartition.create(iterator);
- iterator.close();
- Row data = result.getRow(new Clustering(index.indexedColumn().name.bytes));
- Cell cell = data == null ? null : data.getCell(baseCfs.metadata.compactValueColumn());
- return deleteIfStale(iterator.partitionKey(), cell, index, indexHit, indexedValue, writeOp, nowInSec)
- ? null
- : result.unfilteredIterator();
- }
- else
- {
- assert iterator.metadata().isCompactTable();
- Row data = iterator.staticRow();
- Cell cell = data == null ? null : data.getCell(index.indexedColumn());
- if (deleteIfStale(iterator.partitionKey(), cell, index, indexHit, indexedValue, writeOp, nowInSec))
- {
- iterator.close();
- return null;
- }
- else
- {
- return iterator;
- }
- }
- }
-
- private boolean deleteIfStale(DecoratedKey partitionKey,
- Cell cell,
- AbstractSimplePerColumnSecondaryIndex index,
- Row indexHit,
- ByteBuffer indexedValue,
- OpOrder.Group writeOp,
- int nowInSec)
- {
- if (cell == null || !cell.isLive(nowInSec) || index.indexedColumn().type.compare(indexedValue, cell.value()) != 0)
- {
- // Index is stale, remove the index entry and ignore
- index.delete(partitionKey.getKey(),
- new Clustering(index.indexedColumn().name.bytes),
- indexedValue,
- null,
- new DeletionTime(indexHit.primaryKeyLivenessInfo().timestamp(), nowInSec),
- writeOp);
- return true;
- }
- return false;
- }
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0626be86/src/java/org/apache/cassandra/db/partitions/AtomicBTreePartition.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/partitions/AtomicBTreePartition.java b/src/java/org/apache/cassandra/db/partitions/AtomicBTreePartition.java
index 0b73292..d279a6b 100644
--- a/src/java/org/apache/cassandra/db/partitions/AtomicBTreePartition.java
+++ b/src/java/org/apache/cassandra/db/partitions/AtomicBTreePartition.java
@@ -19,7 +19,6 @@ package org.apache.cassandra.db.partitions;
import java.nio.ByteBuffer;
import java.util.ArrayList;
-import java.util.Iterator;
import java.util.List;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
@@ -27,17 +26,18 @@ import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.rows.*;
-import org.apache.cassandra.db.index.SecondaryIndexManager;
+import org.apache.cassandra.db.rows.EncodingStats;
+import org.apache.cassandra.db.rows.Row;
+import org.apache.cassandra.db.rows.Rows;
+import org.apache.cassandra.index.transactions.UpdateTransaction;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.ObjectSizes;
import org.apache.cassandra.utils.btree.BTree;
import org.apache.cassandra.utils.btree.UpdateFunction;
-import org.apache.cassandra.utils.ObjectSizes;
-import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.concurrent.OpOrder;
import org.apache.cassandra.utils.concurrent.Locks;
-import org.apache.cassandra.utils.memory.MemtableAllocator;
+import org.apache.cassandra.utils.concurrent.OpOrder;
import org.apache.cassandra.utils.memory.HeapAllocator;
-import static org.apache.cassandra.db.index.SecondaryIndexManager.Updater;
+import org.apache.cassandra.utils.memory.MemtableAllocator;
/**
* A thread-safe and atomic Partition implementation.
@@ -86,7 +86,7 @@ public class AtomicBTreePartition extends AbstractBTreePartition
public AtomicBTreePartition(CFMetaData metadata, DecoratedKey partitionKey, MemtableAllocator allocator)
{
- // TODO: is this a potential bug? partition columns may be a subset if we alter columns while it's in memtable
+ // involved in potential bug? partition columns may be a subset if we alter columns while it's in memtable
super(metadata, partitionKey, metadata.partitionColumns());
this.allocator = allocator;
this.ref = EMPTY;
@@ -108,11 +108,10 @@ public class AtomicBTreePartition extends AbstractBTreePartition
* @return an array containing first the difference in size seen after merging the updates, and second the minimum
* time detla between updates.
*/
- public long[] addAllWithSizeDelta(final PartitionUpdate update, OpOrder.Group writeOp, Updater indexer)
+ public long[] addAllWithSizeDelta(final PartitionUpdate update, OpOrder.Group writeOp, UpdateTransaction indexer)
{
RowUpdater updater = new RowUpdater(this, allocator, writeOp, indexer);
DeletionInfo inputDeletionInfoCopy = null;
-
boolean monitorOwned = false;
try
{
@@ -121,12 +120,21 @@ public class AtomicBTreePartition extends AbstractBTreePartition
Locks.monitorEnterUnsafe(this);
monitorOwned = true;
}
+
+ indexer.start();
+
while (true)
{
Holder current = ref;
updater.ref = current;
updater.reset();
+ if (!update.deletionInfo().isLive())
+ indexer.onPartitionDeletion(update.deletionInfo().getPartitionDeletion());
+
+ if (update.deletionInfo().hasRanges())
+ update.deletionInfo().rangeIterator(false).forEachRemaining(indexer::onRangeTombstone);
+
DeletionInfo deletionInfo;
if (update.deletionInfo().mayModify(current.deletionInfo))
{
@@ -150,7 +158,6 @@ public class AtomicBTreePartition extends AbstractBTreePartition
if (tree != null && refUpdater.compareAndSet(this, current, new Holder(tree, deletionInfo, staticRow, newStats)))
{
- indexer.updateRowLevelIndexes();
updater.finish();
return new long[]{ updater.dataSize, updater.colUpdateTimeDelta };
}
@@ -171,6 +178,7 @@ public class AtomicBTreePartition extends AbstractBTreePartition
}
finally
{
+ indexer.commit();
if (monitorOwned)
Locks.monitorExitUnsafe(this);
}
@@ -230,7 +238,7 @@ public class AtomicBTreePartition extends AbstractBTreePartition
final AtomicBTreePartition updating;
final MemtableAllocator allocator;
final OpOrder.Group writeOp;
- final Updater indexer;
+ final UpdateTransaction indexer;
final int nowInSec;
Holder ref;
Row.Builder regularBuilder;
@@ -240,7 +248,7 @@ public class AtomicBTreePartition extends AbstractBTreePartition
final MemtableAllocator.DataReclaimer reclaimer;
List<Row> inserted; // TODO: replace with walk of aborted BTree
- private RowUpdater(AtomicBTreePartition updating, MemtableAllocator allocator, OpOrder.Group writeOp, Updater indexer)
+ private RowUpdater(AtomicBTreePartition updating, MemtableAllocator allocator, OpOrder.Group writeOp, UpdateTransaction indexer)
{
this.updating = updating;
this.allocator = allocator;
@@ -265,7 +273,7 @@ public class AtomicBTreePartition extends AbstractBTreePartition
public Row apply(Row insert)
{
Row data = Rows.copy(insert, builder(insert.clustering())).build();
- insertIntoIndexes(data);
+ indexer.onInserted(insert);
this.dataSize += data.dataSize();
this.heapSize += data.unsharedHeapSizeExcludingData();
@@ -280,10 +288,12 @@ public class AtomicBTreePartition extends AbstractBTreePartition
Columns mergedColumns = existing.columns().mergeTo(update.columns());
Row.Builder builder = builder(existing.clustering());
- colUpdateTimeDelta = Math.min(colUpdateTimeDelta, Rows.merge(existing, update, mergedColumns, builder, nowInSec, indexer));
+ colUpdateTimeDelta = Math.min(colUpdateTimeDelta, Rows.merge(existing, update, mergedColumns, builder, nowInSec));
Row reconciled = builder.build();
+ indexer.onUpdated(existing, reconciled);
+
dataSize += reconciled.dataSize() - existing.dataSize();
heapSize += reconciled.unsharedHeapSizeExcludingData() - existing.unsharedHeapSizeExcludingData();
if (inserted == null)
@@ -294,41 +304,6 @@ public class AtomicBTreePartition extends AbstractBTreePartition
return reconciled;
}
- private void insertIntoIndexes(Row toInsert)
- {
- if (indexer == SecondaryIndexManager.nullUpdater)
- return;
-
- maybeIndexPrimaryKeyColumns(toInsert);
- Clustering clustering = toInsert.clustering();
- for (Cell cell : toInsert.cells())
- indexer.insert(clustering, cell);
- }
-
- private void maybeIndexPrimaryKeyColumns(Row row)
- {
- // We want to update a primary key index with the most up to date info contains in that inserted row (if only for
- // backward compatibility). Note that if there is an index but not a partition key or clustering column one, we've
- // wasting this work. We might be able to avoid that if row indexing was pushed in the index updater.
- long timestamp = row.primaryKeyLivenessInfo().timestamp();
- int ttl = row.primaryKeyLivenessInfo().ttl();
-
- for (Cell cell : row.cells())
- {
- long cellTimestamp = cell.timestamp();
- if (cell.isLive(nowInSec))
- {
- if (cellTimestamp > timestamp)
- {
- timestamp = cellTimestamp;
- ttl = cell.ttl();
- }
- }
- }
-
- indexer.maybeIndex(row.clustering(), timestamp, ttl, row.deletion());
- }
-
protected void reset()
{
this.dataSize = 0;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0626be86/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java b/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
index 2f0a4ec..3d2c94b 100644
--- a/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
+++ b/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
@@ -19,30 +19,23 @@ package org.apache.cassandra.db.partitions;
import java.io.IOException;
import java.nio.ByteBuffer;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
import com.google.common.collect.Iterables;
-import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.ColumnDefinition;
import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.context.CounterContext;
import org.apache.cassandra.db.filter.ColumnFilter;
import org.apache.cassandra.db.rows.*;
-import org.apache.cassandra.io.util.DataInputBuffer;
-import org.apache.cassandra.io.util.DataInputPlus;
-import org.apache.cassandra.io.util.DataOutputBuffer;
-import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.io.util.*;
import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.MergeIterator;
-import org.apache.cassandra.utils.Pair;
import org.apache.cassandra.utils.btree.BTree;
import org.apache.cassandra.utils.btree.UpdateFunction;