You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2015/06/30 12:47:51 UTC
[27/51] [partial] cassandra git commit: Storage engine refactor,
a.k.a CASSANDRA-8099
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/filter/IDiskAtomFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/IDiskAtomFilter.java b/src/java/org/apache/cassandra/db/filter/IDiskAtomFilter.java
deleted file mode 100644
index a541d5e..0000000
--- a/src/java/org/apache/cassandra/db/filter/IDiskAtomFilter.java
+++ /dev/null
@@ -1,146 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db.filter;
-
-import java.io.DataInput;
-import java.io.IOException;
-import java.util.Comparator;
-import java.util.Iterator;
-
-import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
-import org.apache.cassandra.db.composites.CellNameType;
-import org.apache.cassandra.db.composites.Composite;
-import org.apache.cassandra.db.composites.CType;
-import org.apache.cassandra.io.IVersionedSerializer;
-import org.apache.cassandra.io.sstable.format.SSTableReader;
-import org.apache.cassandra.io.util.DataOutputPlus;
-import org.apache.cassandra.io.util.FileDataInput;
-
-/**
- * Given an implementation-specific description of what columns to look for, provides methods
- * to extract the desired columns from a Memtable, SSTable, or SuperColumn. Either the get*ColumnIterator
- * methods will be called, or filterSuperColumn, but not both on the same object. QueryFilter
- * takes care of putting the two together if subcolumn filtering needs to be done, based on the
- * querypath that it knows (but that IFilter implementations are oblivious to).
- */
-public interface IDiskAtomFilter
-{
- /**
- * returns an iterator that returns columns from the given columnFamily
- * matching the Filter criteria in sorted order.
- */
- public Iterator<Cell> getColumnIterator(ColumnFamily cf);
-
- public OnDiskAtomIterator getColumnIterator(DecoratedKey key, ColumnFamily cf);
-
- /**
- * Get an iterator that returns columns from the given SSTable using the opened file
- * matching the Filter criteria in sorted order.
- * @param sstable
- * @param file Already opened file data input, saves us opening another one
- * @param key The key of the row we are about to iterate over
- */
- public OnDiskAtomIterator getSSTableColumnIterator(SSTableReader sstable, FileDataInput file, DecoratedKey key, RowIndexEntry indexEntry);
-
- /**
- * returns an iterator that returns columns from the given SSTable
- * matching the Filter criteria in sorted order.
- */
- public OnDiskAtomIterator getSSTableColumnIterator(SSTableReader sstable, DecoratedKey key);
-
- /**
- * collects columns from reducedColumns into returnCF. Termination is determined
- * by the filter code, which should have some limit on the number of columns
- * to avoid running out of memory on large rows.
- */
- public void collectReducedColumns(ColumnFamily container, Iterator<Cell> reducedColumns, DecoratedKey key, int gcBefore, long now);
-
- public Comparator<Cell> getColumnComparator(CellNameType comparator);
-
- public boolean isReversed();
- public void updateColumnsLimit(int newLimit);
-
- public int getLiveCount(ColumnFamily cf, long now);
- public ColumnCounter columnCounter(CellNameType comparator, long now);
-
- public IDiskAtomFilter cloneShallow();
- public boolean maySelectPrefix(CType type, Composite prefix);
-
- public boolean shouldInclude(SSTableReader sstable);
-
- public boolean countCQL3Rows(CellNameType comparator);
-
- public boolean isHeadFilter();
-
- /**
- * Whether the provided cf, that is assumed to contain the head of the
- * partition, contains enough data to cover this filter.
- */
- public boolean isFullyCoveredBy(ColumnFamily cf, long now);
-
- public static class Serializer implements IVersionedSerializer<IDiskAtomFilter>
- {
- private final CellNameType type;
-
- public Serializer(CellNameType type)
- {
- this.type = type;
- }
-
- public void serialize(IDiskAtomFilter filter, DataOutputPlus out, int version) throws IOException
- {
- if (filter instanceof SliceQueryFilter)
- {
- out.writeByte(0);
- type.sliceQueryFilterSerializer().serialize((SliceQueryFilter)filter, out, version);
- }
- else
- {
- out.writeByte(1);
- type.namesQueryFilterSerializer().serialize((NamesQueryFilter)filter, out, version);
- }
- }
-
- public IDiskAtomFilter deserialize(DataInput in, int version) throws IOException
- {
- int b = in.readByte();
- if (b == 0)
- {
- return type.sliceQueryFilterSerializer().deserialize(in, version);
- }
- else
- {
- assert b == 1;
- return type.namesQueryFilterSerializer().deserialize(in, version);
- }
- }
-
- public long serializedSize(IDiskAtomFilter filter, int version)
- {
- int size = 1;
- if (filter instanceof SliceQueryFilter)
- size += type.sliceQueryFilterSerializer().serializedSize((SliceQueryFilter)filter, version);
- else
- size += type.namesQueryFilterSerializer().serializedSize((NamesQueryFilter)filter, version);
- return size;
- }
- }
-
- public Iterator<RangeTombstone> getRangeTombstoneIterator(ColumnFamily source);
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/filter/NamesQueryFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/NamesQueryFilter.java b/src/java/org/apache/cassandra/db/filter/NamesQueryFilter.java
deleted file mode 100644
index c8f63bb..0000000
--- a/src/java/org/apache/cassandra/db/filter/NamesQueryFilter.java
+++ /dev/null
@@ -1,301 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db.filter;
-
-import java.io.DataInput;
-import java.io.IOException;
-import java.util.Comparator;
-import java.util.Iterator;
-import java.util.SortedSet;
-import java.util.TreeSet;
-
-import org.apache.cassandra.io.sstable.format.SSTableReader;
-import org.apache.commons.lang3.StringUtils;
-import com.google.common.collect.AbstractIterator;
-import com.google.common.collect.Iterators;
-
-import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
-import org.apache.cassandra.db.composites.CellName;
-import org.apache.cassandra.db.composites.CellNameType;
-import org.apache.cassandra.db.composites.Composite;
-import org.apache.cassandra.db.composites.CType;
-import org.apache.cassandra.io.ISerializer;
-import org.apache.cassandra.io.IVersionedSerializer;
-import org.apache.cassandra.io.util.DataOutputPlus;
-import org.apache.cassandra.io.util.FileDataInput;
-import org.apache.cassandra.utils.SearchIterator;
-
-public class NamesQueryFilter implements IDiskAtomFilter
-{
- public final SortedSet<CellName> columns;
-
- // If true, getLiveCount will always return either 0 or 1. This uses the fact that we know
- // CQL3 will never use a name filter with cell names spanning multiple CQL3 rows.
- private final boolean countCQL3Rows;
-
- public NamesQueryFilter(SortedSet<CellName> columns)
- {
- this(columns, false);
- }
-
- public NamesQueryFilter(SortedSet<CellName> columns, boolean countCQL3Rows)
- {
- this.columns = columns;
- this.countCQL3Rows = countCQL3Rows;
- }
-
- public NamesQueryFilter cloneShallow()
- {
- // NQF is immutable as far as shallow cloning is concerned, so save the allocation.
- return this;
- }
-
- public NamesQueryFilter withUpdatedColumns(SortedSet<CellName> newColumns)
- {
- return new NamesQueryFilter(newColumns, countCQL3Rows);
- }
-
- @SuppressWarnings("unchecked")
- public Iterator<Cell> getColumnIterator(ColumnFamily cf)
- {
- assert cf != null;
- return (Iterator<Cell>) (Iterator<?>) new ByNameColumnIterator(columns.iterator(), null, cf);
- }
-
- public OnDiskAtomIterator getColumnIterator(DecoratedKey key, ColumnFamily cf)
- {
- assert cf != null;
- return new ByNameColumnIterator(columns.iterator(), key, cf);
- }
-
- public OnDiskAtomIterator getSSTableColumnIterator(SSTableReader sstable, DecoratedKey key)
- {
- return sstable.iterator(key, columns);
- }
-
- public OnDiskAtomIterator getSSTableColumnIterator(SSTableReader sstable, FileDataInput file, DecoratedKey key, RowIndexEntry indexEntry)
- {
- return sstable.iterator(file, key, columns, indexEntry);
- }
-
- public void collectReducedColumns(ColumnFamily container, Iterator<Cell> reducedColumns, DecoratedKey key, int gcBefore, long now)
- {
- DeletionInfo.InOrderTester tester = container.inOrderDeletionTester();
- while (reducedColumns.hasNext())
- container.maybeAppendColumn(reducedColumns.next(), tester, gcBefore);
- }
-
- public Comparator<Cell> getColumnComparator(CellNameType comparator)
- {
- return comparator.columnComparator(false);
- }
-
- @Override
- public String toString()
- {
- return "NamesQueryFilter(" +
- "columns=" + StringUtils.join(columns, ",") +
- ')';
- }
-
- public boolean isReversed()
- {
- return false;
- }
-
- public void updateColumnsLimit(int newLimit)
- {
- }
-
- public int getLiveCount(ColumnFamily cf, long now)
- {
- // Note: we could use columnCounter() but we save the object allocation as it's simple enough
-
- if (countCQL3Rows)
- return cf.hasOnlyTombstones(now) ? 0 : 1;
-
- int count = 0;
- for (Cell cell : cf)
- {
- if (cell.isLive(now))
- count++;
- }
- return count;
- }
-
- public boolean maySelectPrefix(CType type, Composite prefix)
- {
- for (CellName column : columns)
- {
- if (prefix.isPrefixOf(type, column))
- return true;
- }
- return false;
- }
-
- public boolean shouldInclude(SSTableReader sstable)
- {
- return true;
- }
-
- public boolean isFullyCoveredBy(ColumnFamily cf, long now)
- {
- // cf will cover all the requested columns if the range it covers include
- // all said columns
- CellName first = cf.iterator(ColumnSlice.ALL_COLUMNS_ARRAY).next().name();
- CellName last = cf.reverseIterator(ColumnSlice.ALL_COLUMNS_ARRAY).next().name();
-
- return cf.getComparator().compare(first, columns.first()) <= 0
- && cf.getComparator().compare(columns.last(), last) <= 0;
- }
-
- public boolean isHeadFilter()
- {
- return false;
- }
-
- public boolean countCQL3Rows(CellNameType comparator)
- {
- return countCQL3Rows;
- }
-
- public boolean countCQL3Rows()
- {
- return countCQL3Rows(null);
- }
-
- public ColumnCounter columnCounter(CellNameType comparator, long now)
- {
- return countCQL3Rows
- ? new ColumnCounter.GroupByPrefix(now, null, 0)
- : new ColumnCounter(now);
- }
-
- private static class ByNameColumnIterator extends AbstractIterator<OnDiskAtom> implements OnDiskAtomIterator
- {
- private final ColumnFamily cf;
- private final DecoratedKey key;
- private final Iterator<CellName> names;
- private final SearchIterator<CellName, Cell> cells;
-
- public ByNameColumnIterator(Iterator<CellName> names, DecoratedKey key, ColumnFamily cf)
- {
- this.names = names;
- this.cf = cf;
- this.key = key;
- this.cells = cf.searchIterator();
- }
-
- protected OnDiskAtom computeNext()
- {
- while (names.hasNext() && cells.hasNext())
- {
- CellName current = names.next();
- Cell cell = cells.next(current);
- if (cell != null)
- return cell;
- }
- return endOfData();
- }
-
- public ColumnFamily getColumnFamily()
- {
- return cf;
- }
-
- public DecoratedKey getKey()
- {
- return key;
- }
-
- public void close() throws IOException { }
- }
-
- public static class Serializer implements IVersionedSerializer<NamesQueryFilter>
- {
- private CellNameType type;
-
- public Serializer(CellNameType type)
- {
- this.type = type;
- }
-
- public void serialize(NamesQueryFilter f, DataOutputPlus out, int version) throws IOException
- {
- out.writeInt(f.columns.size());
- ISerializer<CellName> serializer = type.cellSerializer();
- for (CellName cName : f.columns)
- {
- serializer.serialize(cName, out);
- }
- out.writeBoolean(f.countCQL3Rows);
- }
-
- public NamesQueryFilter deserialize(DataInput in, int version) throws IOException
- {
- int size = in.readInt();
- SortedSet<CellName> columns = new TreeSet<>(type);
- ISerializer<CellName> serializer = type.cellSerializer();
- for (int i = 0; i < size; ++i)
- columns.add(serializer.deserialize(in));
- boolean countCQL3Rows = in.readBoolean();
- return new NamesQueryFilter(columns, countCQL3Rows);
- }
-
- public long serializedSize(NamesQueryFilter f, int version)
- {
- TypeSizes sizes = TypeSizes.NATIVE;
- int size = sizes.sizeof(f.columns.size());
- ISerializer<CellName> serializer = type.cellSerializer();
- for (CellName cName : f.columns)
- size += serializer.serializedSize(cName, sizes);
- size += sizes.sizeof(f.countCQL3Rows);
- return size;
- }
- }
-
- public Iterator<RangeTombstone> getRangeTombstoneIterator(final ColumnFamily source)
- {
- if (!source.deletionInfo().hasRanges())
- return Iterators.emptyIterator();
-
- return new AbstractIterator<RangeTombstone>()
- {
- private final Iterator<CellName> names = columns.iterator();
- private RangeTombstone lastFindRange;
-
- protected RangeTombstone computeNext()
- {
- while (names.hasNext())
- {
- CellName next = names.next();
- if (lastFindRange != null && lastFindRange.includes(source.getComparator(), next))
- return lastFindRange;
-
- // We keep the last range around as since names are in sort order, it's
- // possible it will match the next name too.
- lastFindRange = source.deletionInfo().rangeCovering(next);
- if (lastFindRange != null)
- return lastFindRange;
- }
- return endOfData();
- }
- };
- }
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/filter/QueryFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/QueryFilter.java b/src/java/org/apache/cassandra/db/filter/QueryFilter.java
deleted file mode 100644
index 15ee33d..0000000
--- a/src/java/org/apache/cassandra/db/filter/QueryFilter.java
+++ /dev/null
@@ -1,262 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db.filter;
-
-import java.util.ArrayList;
-import java.util.Comparator;
-import java.util.Iterator;
-import java.util.List;
-import java.util.SortedSet;
-
-import org.apache.cassandra.db.Cell;
-import org.apache.cassandra.db.ColumnFamily;
-import org.apache.cassandra.db.DecoratedKey;
-import org.apache.cassandra.db.DeletionInfo;
-import org.apache.cassandra.db.OnDiskAtom;
-import org.apache.cassandra.db.RangeTombstone;
-import org.apache.cassandra.db.columniterator.IdentityQueryFilter;
-import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
-import org.apache.cassandra.db.composites.CellName;
-import org.apache.cassandra.db.composites.Composite;
-import org.apache.cassandra.io.sstable.format.SSTableReader;
-import org.apache.cassandra.utils.MergeIterator;
-
-public class QueryFilter
-{
- public final DecoratedKey key;
- public final String cfName;
- public final IDiskAtomFilter filter;
- public final long timestamp;
-
- public QueryFilter(DecoratedKey key, String cfName, IDiskAtomFilter filter, long timestamp)
- {
- this.key = key;
- this.cfName = cfName;
- this.filter = filter;
- this.timestamp = timestamp;
- }
-
- public Iterator<Cell> getIterator(ColumnFamily cf)
- {
- assert cf != null;
- return filter.getColumnIterator(cf);
- }
-
- public OnDiskAtomIterator getSSTableColumnIterator(SSTableReader sstable)
- {
- return filter.getSSTableColumnIterator(sstable, key);
- }
-
- public void collateOnDiskAtom(ColumnFamily returnCF,
- List<? extends Iterator<? extends OnDiskAtom>> toCollate,
- int gcBefore)
- {
- collateOnDiskAtom(returnCF, toCollate, filter, this.key, gcBefore, timestamp);
- }
-
- public static void collateOnDiskAtom(ColumnFamily returnCF,
- List<? extends Iterator<? extends OnDiskAtom>> toCollate,
- IDiskAtomFilter filter,
- DecoratedKey key,
- int gcBefore,
- long timestamp)
- {
- List<Iterator<Cell>> filteredIterators = new ArrayList<>(toCollate.size());
- for (Iterator<? extends OnDiskAtom> iter : toCollate)
- filteredIterators.add(gatherTombstones(returnCF, iter));
- collateColumns(returnCF, filteredIterators, filter, key, gcBefore, timestamp);
- }
-
- // When there is only a single source of atoms, we can skip the collate step
- public void collateOnDiskAtom(ColumnFamily returnCF, Iterator<? extends OnDiskAtom> toCollate, int gcBefore)
- {
- filter.collectReducedColumns(returnCF, gatherTombstones(returnCF, toCollate), this.key, gcBefore, timestamp);
- }
-
- public void collateColumns(ColumnFamily returnCF, List<? extends Iterator<Cell>> toCollate, int gcBefore)
- {
- collateColumns(returnCF, toCollate, filter, this.key, gcBefore, timestamp);
- }
-
- public static void collateColumns(ColumnFamily returnCF,
- List<? extends Iterator<Cell>> toCollate,
- IDiskAtomFilter filter,
- DecoratedKey key,
- int gcBefore,
- long timestamp)
- {
- Comparator<Cell> comparator = filter.getColumnComparator(returnCF.getComparator());
-
- Iterator<Cell> reduced = toCollate.size() == 1
- ? toCollate.get(0)
- : MergeIterator.get(toCollate, comparator, getReducer(comparator));
-
- filter.collectReducedColumns(returnCF, reduced, key, gcBefore, timestamp);
- }
-
- private static MergeIterator.Reducer<Cell, Cell> getReducer(final Comparator<Cell> comparator)
- {
- // define a 'reduced' iterator that merges columns w/ the same name, which
- // greatly simplifies computing liveColumns in the presence of tombstones.
- return new MergeIterator.Reducer<Cell, Cell>()
- {
- Cell current;
-
- public void reduce(Cell next)
- {
- assert current == null || comparator.compare(current, next) == 0;
- current = current == null ? next : current.reconcile(next);
- }
-
- protected Cell getReduced()
- {
- assert current != null;
- Cell toReturn = current;
- current = null;
- return toReturn;
- }
-
- @Override
- public boolean trivialReduceIsTrivial()
- {
- return true;
- }
- };
- }
-
- /**
- * Given an iterator of on disk atom, returns an iterator that filters the tombstone range
- * markers adding them to {@code returnCF} and returns the normal column.
- */
- public static Iterator<Cell> gatherTombstones(final ColumnFamily returnCF, final Iterator<? extends OnDiskAtom> iter)
- {
- return new Iterator<Cell>()
- {
- private Cell next;
-
- public boolean hasNext()
- {
- if (next != null)
- return true;
-
- getNext();
- return next != null;
- }
-
- public Cell next()
- {
- if (next == null)
- getNext();
-
- assert next != null;
- Cell toReturn = next;
- next = null;
- return toReturn;
- }
-
- private void getNext()
- {
- while (iter.hasNext())
- {
- OnDiskAtom atom = iter.next();
-
- if (atom instanceof Cell)
- {
- next = (Cell)atom;
- break;
- }
- else
- {
- returnCF.addAtom(atom);
- }
- }
- }
-
- public void remove()
- {
- throw new UnsupportedOperationException();
- }
- };
- }
-
- public String getColumnFamilyName()
- {
- return cfName;
- }
-
- /**
- * @return a QueryFilter object to satisfy the given slice criteria:
- * @param key the row to slice
- * @param cfName column family to query
- * @param start column to start slice at, inclusive; empty for "the first column"
- * @param finish column to stop slice at, inclusive; empty for "the last column"
- * @param reversed true to start with the largest column (as determined by configured sort order) instead of smallest
- * @param limit maximum number of non-deleted columns to return
- * @param timestamp time to use for determining expiring columns' state
- */
- public static QueryFilter getSliceFilter(DecoratedKey key,
- String cfName,
- Composite start,
- Composite finish,
- boolean reversed,
- int limit,
- long timestamp)
- {
- return new QueryFilter(key, cfName, new SliceQueryFilter(start, finish, reversed, limit), timestamp);
- }
-
- /**
- * return a QueryFilter object that includes every column in the row.
- * This is dangerous on large rows; avoid except for test code.
- */
- public static QueryFilter getIdentityFilter(DecoratedKey key, String cfName, long timestamp)
- {
- return new QueryFilter(key, cfName, new IdentityQueryFilter(), timestamp);
- }
-
- /**
- * @return a QueryFilter object that will return columns matching the given names
- * @param key the row to slice
- * @param cfName column family to query
- * @param columns the column names to restrict the results to, sorted in comparator order
- */
- public static QueryFilter getNamesFilter(DecoratedKey key, String cfName, SortedSet<CellName> columns, long timestamp)
- {
- return new QueryFilter(key, cfName, new NamesQueryFilter(columns), timestamp);
- }
-
- @Override
- public String toString()
- {
- return getClass().getSimpleName() + "(key=" + key + ", cfName=" + cfName + (filter == null ? "" : ", filter=" + filter) + ")";
- }
-
- public boolean shouldInclude(SSTableReader sstable)
- {
- return filter.shouldInclude(sstable);
- }
-
- public void delete(DeletionInfo target, ColumnFamily source)
- {
- target.add(source.deletionInfo().getTopLevelDeletion());
- // source is the CF currently in the memtable, and it can be large compared to what the filter selects,
- // so only consider those range tombstones that the filter do select.
- for (Iterator<RangeTombstone> iter = filter.getRangeTombstoneIterator(source); iter.hasNext(); )
- target.add(iter.next(), source.getComparator());
- }
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/filter/RowFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/RowFilter.java b/src/java/org/apache/cassandra/db/filter/RowFilter.java
new file mode 100644
index 0000000..aff8d16
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/filter/RowFilter.java
@@ -0,0 +1,784 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.filter;
+
+import java.io.DataInput;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.*;
+
+import com.google.common.base.Objects;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.cql3.Operator;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.db.partitions.*;
+import org.apache.cassandra.db.marshal.*;
+import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
+import org.apache.cassandra.exceptions.InvalidRequestException;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
+
+import static org.apache.cassandra.cql3.statements.RequestValidations.*;
+
+/**
+ * A filter on which rows a given query should include or exclude.
+ * <p>
+ * This corresponds to the restrictions on rows that are not handled by the query
+ * {@link ClusteringIndexFilter}. Some of the expressions of this filter may
+ * be handled by a 2ndary index, and the rest is simply filtered out from the
+ * result set (the later can only happen if the query was using ALLOW FILTERING).
+ */
+public abstract class RowFilter implements Iterable<RowFilter.Expression>
+{
+ public static final Serializer serializer = new Serializer();
+ public static final RowFilter NONE = new CQLFilter(Collections.<Expression>emptyList());
+
+ protected final List<Expression> expressions;
+
+ protected RowFilter(List<Expression> expressions)
+ {
+ this.expressions = expressions;
+ }
+
+ public static RowFilter create()
+ {
+ return new CQLFilter(new ArrayList<Expression>());
+ }
+
+ public static RowFilter create(int capacity)
+ {
+ return new CQLFilter(new ArrayList<Expression>(capacity));
+ }
+
+ public static RowFilter forThrift(int capacity)
+ {
+ return new ThriftFilter(new ArrayList<Expression>(capacity));
+ }
+
+ public void add(ColumnDefinition def, Operator op, ByteBuffer value)
+ {
+ expressions.add(new SimpleExpression(def, op, value));
+ }
+
+ public void addMapEquality(ColumnDefinition def, ByteBuffer key, Operator op, ByteBuffer value)
+ {
+ expressions.add(new MapEqualityExpression(def, key, op, value));
+ }
+
+ public void addThriftExpression(CFMetaData metadata, ByteBuffer name, Operator op, ByteBuffer value)
+ {
+ assert (this instanceof ThriftFilter);
+ expressions.add(new ThriftExpression(metadata, name, op, value));
+ }
+
+ /**
+ * Filters the provided iterator so that only the row satisfying the expression of this filter
+ * are included in the resulting iterator.
+ *
+ * @param iter the iterator to filter
+ * @param nowInSec the time of query in seconds.
+ * @return the filtered iterator.
+ */
+ public abstract UnfilteredPartitionIterator filter(UnfilteredPartitionIterator iter, int nowInSec);
+
+ /**
+ * Returns this filter but without the provided expression. This method
+ * *assumes* that the filter contains the provided expression.
+ */
+ public RowFilter without(Expression expression)
+ {
+ assert expressions.contains(expression);
+ if (expressions.size() == 1)
+ return RowFilter.NONE;
+
+ List<Expression> newExpressions = new ArrayList<>(expressions.size() - 1);
+ for (Expression e : expressions)
+ if (!e.equals(expression))
+ newExpressions.add(e);
+
+ return withNewExpressions(newExpressions);
+ }
+
+ protected abstract RowFilter withNewExpressions(List<Expression> expressions);
+
+ public boolean isEmpty()
+ {
+ return expressions.isEmpty();
+ }
+
+ public Iterator<Expression> iterator()
+ {
+ return expressions.iterator();
+ }
+
+ private static Clustering makeCompactClustering(CFMetaData metadata, ByteBuffer name)
+ {
+ assert metadata.isCompactTable();
+ if (metadata.isCompound())
+ {
+ List<ByteBuffer> values = CompositeType.splitName(name);
+ return new SimpleClustering(values.toArray(new ByteBuffer[metadata.comparator.size()]));
+ }
+ else
+ {
+ return new SimpleClustering(name);
+ }
+ }
+
+ @Override
+ public String toString()
+ {
+ StringBuilder sb = new StringBuilder();
+ for (int i = 0; i < expressions.size(); i++)
+ {
+ if (i > 0)
+ sb.append(" AND ");
+ sb.append(expressions.get(i));
+ }
+ return sb.toString();
+ }
+
+ private static class CQLFilter extends RowFilter
+ {
+ private CQLFilter(List<Expression> expressions)
+ {
+ super(expressions);
+ }
+
+ public UnfilteredPartitionIterator filter(UnfilteredPartitionIterator iter, final int nowInSec)
+ {
+ if (expressions.isEmpty())
+ return iter;
+
+ return new WrappingUnfilteredPartitionIterator(iter)
+ {
+ @Override
+ public UnfilteredRowIterator computeNext(final UnfilteredRowIterator iter)
+ {
+ return new FilteringRowIterator(iter)
+ {
+ // We filter tombstones when passing the row to isSatisfiedBy so that the method doesn't have to bother with them.
+ // (we should however not filter them in the output of the method, hence it's not used as row filter for the
+ // FilteringRowIterator)
+ private final TombstoneFilteringRow filter = new TombstoneFilteringRow(nowInSec);
+
+ protected boolean includeRow(Row row)
+ {
+ return CQLFilter.this.isSatisfiedBy(iter.partitionKey(), filter.setTo(row));
+ }
+ };
+ }
+ };
+ }
+
+ /**
+ * Returns whether the provided row (with it's partition key) satisfies
+ * this row filter or not (that is, if it satisfies all of its expressions).
+ */
+ private boolean isSatisfiedBy(DecoratedKey partitionKey, Row row)
+ {
+ for (Expression e : expressions)
+ if (!e.isSatisfiedBy(partitionKey, row))
+ return false;
+
+ return true;
+ }
+
+ protected RowFilter withNewExpressions(List<Expression> expressions)
+ {
+ return new CQLFilter(expressions);
+ }
+ }
+
+ private static class ThriftFilter extends RowFilter
+ {
+ private ThriftFilter(List<Expression> expressions)
+ {
+ super(expressions);
+ }
+
+ public UnfilteredPartitionIterator filter(UnfilteredPartitionIterator iter, final int nowInSec)
+ {
+ if (expressions.isEmpty())
+ return iter;
+
+ return new WrappingUnfilteredPartitionIterator(iter)
+ {
+ @Override
+ public UnfilteredRowIterator computeNext(final UnfilteredRowIterator iter)
+ {
+ // Thrift does not filter rows, it filters entire partition if any of the expression is not
+ // satisfied, which forces us to materialize the result (in theory we could materialize only
+ // what we need which might or might not be everything, but we keep it simple since in practice
+ // it's not worth that it has ever been).
+ ArrayBackedPartition result = ArrayBackedPartition.create(iter);
+
+ // The partition needs to have a row for every expression, and the expression needs to be valid.
+ for (Expression expr : expressions)
+ {
+ assert expr instanceof ThriftExpression;
+ Row row = result.getRow(makeCompactClustering(iter.metadata(), expr.column().name.bytes));
+ if (row == null || !expr.isSatisfiedBy(iter.partitionKey(), row))
+ return null;
+ }
+ // If we get there, it means all expressions where satisfied, so return the original result
+ return result.unfilteredIterator();
+ }
+ };
+ }
+
+ protected RowFilter withNewExpressions(List<Expression> expressions)
+ {
+ return new ThriftFilter(expressions);
+ }
+ }
+
+ public static abstract class Expression
+ {
+ private static final Serializer serializer = new Serializer();
+
+ // Note: the order of this enum matter, it's used for serialization
+ protected enum Kind { SIMPLE, MAP_EQUALITY, THRIFT_DYN_EXPR }
+
+ abstract Kind kind();
+ protected final ColumnDefinition column;
+ protected final Operator operator;
+ protected final ByteBuffer value;
+
+ protected Expression(ColumnDefinition column, Operator operator, ByteBuffer value)
+ {
+ this.column = column;
+ this.operator = operator;
+ this.value = value;
+ }
+
+ public ColumnDefinition column()
+ {
+ return column;
+ }
+
+ public Operator operator()
+ {
+ return operator;
+ }
+
+ /**
+ * Checks if the operator of this <code>IndexExpression</code> is a <code>CONTAINS</code> operator.
+ *
+ * @return <code>true</code> if the operator of this <code>IndexExpression</code> is a <code>CONTAINS</code>
+ * operator, <code>false</code> otherwise.
+ */
+ public boolean isContains()
+ {
+ return Operator.CONTAINS == operator;
+ }
+
+ /**
+ * Checks if the operator of this <code>IndexExpression</code> is a <code>CONTAINS_KEY</code> operator.
+ *
+ * @return <code>true</code> if the operator of this <code>IndexExpression</code> is a <code>CONTAINS_KEY</code>
+ * operator, <code>false</code> otherwise.
+ */
+ public boolean isContainsKey()
+ {
+ return Operator.CONTAINS_KEY == operator;
+ }
+
+ /**
+ * If this expression is used to query an index, the value to use as
+ * partition key for that index query.
+ */
+ public ByteBuffer getIndexValue()
+ {
+ return value;
+ }
+
+ public void validateForIndexing() throws InvalidRequestException
+ {
+ checkNotNull(value, "Unsupported null value for indexed column %s", column.name);
+ checkBindValueSet(value, "Unsupported unset value for indexed column %s", column.name);
+ checkFalse(value.remaining() > FBUtilities.MAX_UNSIGNED_SHORT, "Index expression values may not be larger than 64K");
+ }
+
+ /**
+ * Returns whether the provided row satisfied this expression or not.
+ *
+ * @param partitionKey the partition key for row to check.
+ * @param row the row to check. It should *not* contain deleted cells
+ * (i.e. it should come from a RowIterator).
+ * @return whether the row is satisfied by this expression.
+ */
+ public abstract boolean isSatisfiedBy(DecoratedKey partitionKey, Row row);
+
+ protected ByteBuffer getValue(DecoratedKey partitionKey, Row row)
+ {
+ switch (column.kind)
+ {
+ case PARTITION_KEY:
+ return column.isOnAllComponents()
+ ? partitionKey.getKey()
+ : CompositeType.extractComponent(partitionKey.getKey(), column.position());
+ case CLUSTERING_COLUMN:
+ return row.clustering().get(column.position());
+ default:
+ Cell cell = row.getCell(column);
+ return cell == null ? null : cell.value();
+ }
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o)
+ return true;
+
+ if (!(o instanceof Expression))
+ return false;
+
+ Expression that = (Expression)o;
+
+ return Objects.equal(this.kind(), that.kind())
+ && Objects.equal(this.column.name, that.column.name)
+ && Objects.equal(this.operator, that.operator)
+ && Objects.equal(this.value, that.value);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hashCode(column.name, operator, value);
+ }
+
+ private static class Serializer
+ {
+ public void serialize(Expression expression, DataOutputPlus out, int version) throws IOException
+ {
+ ByteBufferUtil.writeWithShortLength(expression.column.name.bytes, out);
+ expression.operator.writeTo(out);
+
+ if (version >= MessagingService.VERSION_30)
+ out.writeByte(expression.kind().ordinal());
+
+ switch (expression.kind())
+ {
+ case SIMPLE:
+ ByteBufferUtil.writeWithShortLength(((SimpleExpression)expression).value, out);
+ break;
+ case MAP_EQUALITY:
+ MapEqualityExpression mexpr = (MapEqualityExpression)expression;
+ if (version < MessagingService.VERSION_30)
+ {
+ ByteBufferUtil.writeWithShortLength(mexpr.getIndexValue(), out);
+ }
+ else
+ {
+ ByteBufferUtil.writeWithShortLength(mexpr.key, out);
+ ByteBufferUtil.writeWithShortLength(mexpr.value, out);
+ }
+ break;
+ case THRIFT_DYN_EXPR:
+ ByteBufferUtil.writeWithShortLength(((ThriftExpression)expression).value, out);
+ break;
+ }
+ }
+
+ public Expression deserialize(DataInput in, int version, CFMetaData metadata) throws IOException
+ {
+ ByteBuffer name = ByteBufferUtil.readWithShortLength(in);
+ Operator operator = Operator.readFrom(in);
+
+ ColumnDefinition column = metadata.getColumnDefinition(name);
+ if (!metadata.isCompactTable() && column == null)
+ throw new RuntimeException("Unknown (or dropped) column " + UTF8Type.instance.getString(name) + " during deserialization");
+
+ Kind kind;
+ if (version >= MessagingService.VERSION_30)
+ {
+ kind = Kind.values()[in.readByte()];
+ }
+ else
+ {
+ if (column == null)
+ kind = Kind.THRIFT_DYN_EXPR;
+ else if (column.type instanceof MapType && operator == Operator.EQ)
+ kind = Kind.MAP_EQUALITY;
+ else
+ kind = Kind.SIMPLE;
+ }
+
+ switch (kind)
+ {
+ case SIMPLE:
+ return new SimpleExpression(column, operator, ByteBufferUtil.readWithShortLength(in));
+ case MAP_EQUALITY:
+ ByteBuffer key, value;
+ if (version < MessagingService.VERSION_30)
+ {
+ ByteBuffer composite = ByteBufferUtil.readWithShortLength(in);
+ key = CompositeType.extractComponent(composite, 0);
+ value = CompositeType.extractComponent(composite, 0);
+ }
+ else
+ {
+ key = ByteBufferUtil.readWithShortLength(in);
+ value = ByteBufferUtil.readWithShortLength(in);
+ }
+ return new MapEqualityExpression(column, key, operator, value);
+ case THRIFT_DYN_EXPR:
+ return new ThriftExpression(metadata, name, operator, ByteBufferUtil.readWithShortLength(in));
+ }
+ throw new AssertionError();
+ }
+
+ public long serializedSize(Expression expression, int version)
+ {
+ TypeSizes sizes = TypeSizes.NATIVE;
+ long size = ByteBufferUtil.serializedSizeWithShortLength(expression.column().name.bytes, sizes)
+ + expression.operator.serializedSize();
+
+ switch (expression.kind())
+ {
+ case SIMPLE:
+ size += ByteBufferUtil.serializedSizeWithShortLength(((SimpleExpression)expression).value, sizes);
+ break;
+ case MAP_EQUALITY:
+ MapEqualityExpression mexpr = (MapEqualityExpression)expression;
+ if (version < MessagingService.VERSION_30)
+ size += ByteBufferUtil.serializedSizeWithShortLength(mexpr.getIndexValue(), sizes);
+ else
+ size += ByteBufferUtil.serializedSizeWithShortLength(mexpr.key, sizes)
+ + ByteBufferUtil.serializedSizeWithShortLength(mexpr.value, sizes);
+ break;
+ case THRIFT_DYN_EXPR:
+ size += ByteBufferUtil.serializedSizeWithShortLength(((ThriftExpression)expression).value, sizes);
+ break;
+ }
+ return size;
+ }
+ }
+ }
+
+ /**
+ * An expression of the form 'column' 'op' 'value'.
+ */
+ private static class SimpleExpression extends Expression
+ {
+ public SimpleExpression(ColumnDefinition column, Operator operator, ByteBuffer value)
+ {
+ super(column, operator, value);
+ }
+
+ public boolean isSatisfiedBy(DecoratedKey partitionKey, Row row)
+ {
+ // We support null conditions for LWT (in ColumnCondition) but not for RowFilter.
+ // TODO: we should try to merge both code someday.
+ assert value != null;
+
+ if (row.isStatic() != column.isStatic())
+ return true;
+
+ switch (operator)
+ {
+ case EQ:
+ case LT:
+ case LTE:
+ case GTE:
+ case GT:
+ case NEQ:
+ {
+ assert !column.isComplex() : "Only CONTAINS and CONTAINS_KEY are supported for 'complex' types";
+ ByteBuffer foundValue = getValue(partitionKey, row);
+ // Note that CQL expression are always of the form 'x < 4', i.e. the tested value is on the left.
+ return foundValue != null && operator.isSatisfiedBy(column.type, foundValue, value);
+ }
+ case CONTAINS:
+ assert column.type.isCollection();
+ CollectionType<?> type = (CollectionType<?>)column.type;
+ if (column.isComplex())
+ {
+ Iterator<Cell> iter = row.getCells(column);
+ while (iter.hasNext())
+ {
+ Cell cell = iter.next();
+ if (type.kind == CollectionType.Kind.SET)
+ {
+ if (type.nameComparator().compare(cell.path().get(0), value) == 0)
+ return true;
+ }
+ else
+ {
+ if (type.valueComparator().compare(cell.value(), value) == 0)
+ return true;
+ }
+ }
+ return false;
+ }
+ else
+ {
+ ByteBuffer foundValue = getValue(partitionKey, row);
+ if (foundValue == null)
+ return false;
+
+ switch (type.kind)
+ {
+ case LIST:
+ ListType<?> listType = (ListType<?>)type;
+ return listType.compose(foundValue).contains(listType.getElementsType().compose(value));
+ case SET:
+ SetType<?> setType = (SetType<?>)type;
+ return setType.compose(foundValue).contains(setType.getElementsType().compose(value));
+ case MAP:
+ MapType<?,?> mapType = (MapType<?, ?>)type;
+ return mapType.compose(foundValue).containsValue(mapType.getValuesType().compose(value));
+ }
+ throw new AssertionError();
+ }
+ case CONTAINS_KEY:
+ assert column.type.isCollection() && column.type instanceof MapType;
+ MapType<?, ?> mapType = (MapType<?, ?>)column.type;
+ if (column.isComplex())
+ {
+ return row.getCell(column, CellPath.create(value)) != null;
+ }
+ else
+ {
+ ByteBuffer foundValue = getValue(partitionKey, row);
+ return foundValue != null && mapType.getSerializer().getSerializedValue(foundValue, value, mapType.getKeysType()) != null;
+ }
+
+ case IN:
+ // It wouldn't be terribly hard to support this (though doing so would imply supporting
+ // IN for 2ndary index) but currently we don't.
+ throw new AssertionError();
+ }
+ throw new AssertionError();
+ }
+
+ @Override
+ public String toString()
+ {
+ AbstractType<?> type = column.type;
+ switch (operator)
+ {
+ case CONTAINS:
+ assert type instanceof CollectionType;
+ CollectionType<?> ct = (CollectionType<?>)type;
+ type = ct.kind == CollectionType.Kind.SET ? ct.nameComparator() : ct.valueComparator();
+ break;
+ case CONTAINS_KEY:
+ assert type instanceof MapType;
+ type = ((MapType<?, ?>)type).nameComparator();
+ break;
+ case IN:
+ type = ListType.getInstance(type, false);
+ break;
+ default:
+ break;
+ }
+ return String.format("%s %s %s", column.name, operator, type.getString(value));
+ }
+
+ @Override
+ Kind kind()
+ {
+ return Kind.SIMPLE;
+ }
+ }
+
+ /**
+ * An expression of the form 'column' ['key'] = 'value' (which is only
+ * supported when 'column' is a map).
+ */
+ private static class MapEqualityExpression extends Expression
+ {
+ private final ByteBuffer key;
+
+ public MapEqualityExpression(ColumnDefinition column, ByteBuffer key, Operator operator, ByteBuffer value)
+ {
+ super(column, operator, value);
+ assert column.type instanceof MapType && operator == Operator.EQ;
+ this.key = key;
+ }
+
+ @Override
+ public void validateForIndexing() throws InvalidRequestException
+ {
+ super.validateForIndexing();
+ checkNotNull(key, "Unsupported null value for key of map column %s", column.name);
+ checkBindValueSet(key, "Unsupported unset value for key of map column %s", column.name);
+ }
+
+ @Override
+ public ByteBuffer getIndexValue()
+ {
+ return CompositeType.build(key, value);
+ }
+
+ public boolean isSatisfiedBy(DecoratedKey partitionKey, Row row)
+ {
+ assert key != null;
+ // We support null conditions for LWT (in ColumnCondition) but not for RowFilter.
+ // TODO: we should try to merge both code someday.
+ assert value != null;
+
+ if (row.isStatic() != column.isStatic())
+ return true;
+
+ MapType<?, ?> mt = (MapType<?, ?>)column.type;
+ if (column.isComplex())
+ {
+ Cell cell = row.getCell(column, CellPath.create(key));
+ return cell != null && mt.valueComparator().compare(cell.value(), value) == 0;
+ }
+ else
+ {
+ ByteBuffer serializedMap = getValue(partitionKey, row);
+ if (serializedMap == null)
+ return false;
+
+ ByteBuffer foundValue = mt.getSerializer().getSerializedValue(serializedMap, key, mt.getKeysType());
+ return foundValue != null && mt.valueComparator().compare(foundValue, value) == 0;
+ }
+ }
+
+ @Override
+ public String toString()
+ {
+ MapType<?, ?> mt = (MapType<?, ?>)column.type;
+ return String.format("%s[%s] = %s", column.name, mt.nameComparator().getString(key), mt.valueComparator().getString(value));
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o)
+ return true;
+
+ if (!(o instanceof MapEqualityExpression))
+ return false;
+
+ MapEqualityExpression that = (MapEqualityExpression)o;
+
+ return Objects.equal(this.column.name, that.column.name)
+ && Objects.equal(this.operator, that.operator)
+ && Objects.equal(this.key, that.key)
+ && Objects.equal(this.value, that.value);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hashCode(column.name, operator, key, value);
+ }
+
+ @Override
+ Kind kind()
+ {
+ return Kind.MAP_EQUALITY;
+ }
+ }
+
+ /**
+ * An expression of the form 'name' = 'value', but where 'name' is actually the
+ * clustering value for a compact table. This is only for thrift.
+ */
+ private static class ThriftExpression extends Expression
+ {
+ private final CFMetaData metadata;
+
+ public ThriftExpression(CFMetaData metadata, ByteBuffer name, Operator operator, ByteBuffer value)
+ {
+ super(makeDefinition(metadata, name), operator, value);
+ assert metadata.isCompactTable();
+ this.metadata = metadata;
+ }
+
+ private static ColumnDefinition makeDefinition(CFMetaData metadata, ByteBuffer name)
+ {
+ ColumnDefinition def = metadata.getColumnDefinition(name);
+ if (def != null)
+ return def;
+
+ // In thrift, we actually allow expression on non-defined columns for the sake of filtering. To accomodate
+ // this we create a "fake" definition. This is messy but it works so is probably good enough.
+ return ColumnDefinition.regularDef(metadata, name, metadata.compactValueColumn().type, null);
+ }
+
+ public boolean isSatisfiedBy(DecoratedKey partitionKey, Row row)
+ {
+ assert value != null;
+
+ // On thrift queries, even if the column expression is a "static" one, we'll have convert it as a "dynamic"
+ // one in ThriftResultsMerger, so we always expect it to be a dynamic one. Further, we expect this is only
+ // called when the row clustering does match the column (see ThriftFilter above).
+ assert row.clustering().equals(makeCompactClustering(metadata, column.name.bytes));
+ Cell cell = row.getCell(metadata.compactValueColumn());
+ return cell != null && operator.isSatisfiedBy(column.type, cell.value(), value);
+ }
+
+ @Override
+ public String toString()
+ {
+ return String.format("%s %s %s", column.name, operator, column.type.getString(value));
+ }
+
+ @Override
+ Kind kind()
+ {
+ return Kind.THRIFT_DYN_EXPR;
+ }
+ }
+
+ public static class Serializer
+ {
+ public void serialize(RowFilter filter, DataOutputPlus out, int version) throws IOException
+ {
+ out.writeBoolean(filter instanceof ThriftFilter);
+ out.writeShort(filter.expressions.size());
+ for (Expression expr : filter.expressions)
+ Expression.serializer.serialize(expr, out, version);
+ }
+
+ public RowFilter deserialize(DataInput in, int version, CFMetaData metadata) throws IOException
+ {
+ boolean forThrift = in.readBoolean();
+ int size = in.readUnsignedShort();
+ List<Expression> expressions = new ArrayList<>(size);
+ for (int i = 0; i < size; i++)
+ expressions.add(Expression.serializer.deserialize(in, version, metadata));
+ return forThrift
+ ? new ThriftFilter(expressions)
+ : new CQLFilter(expressions);
+ }
+
+ public long serializedSize(RowFilter filter, int version)
+ {
+ TypeSizes sizes = TypeSizes.NATIVE;
+ long size = 1 // forThrift
+ + sizes.sizeof((short)filter.expressions.size());
+ for (Expression expr : filter.expressions)
+ size += Expression.serializer.serializedSize(expr, version);
+ return size;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java b/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java
deleted file mode 100644
index 4571161..0000000
--- a/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java
+++ /dev/null
@@ -1,583 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db.filter;
-
-import java.nio.ByteBuffer;
-import java.io.DataInput;
-import java.io.IOException;
-import java.util.*;
-
-import com.google.common.collect.AbstractIterator;
-import com.google.common.collect.Iterators;
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.db.composites.*;
-import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.cassandra.utils.Pair;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
-import org.apache.cassandra.io.IVersionedSerializer;
-import org.apache.cassandra.io.sstable.format.SSTableReader;
-import org.apache.cassandra.io.util.DataOutputPlus;
-import org.apache.cassandra.io.util.FileDataInput;
-import org.apache.cassandra.service.ClientWarn;
-import org.apache.cassandra.tracing.Tracing;
-
-public class SliceQueryFilter implements IDiskAtomFilter
-{
- private static final Logger logger = LoggerFactory.getLogger(SliceQueryFilter.class);
-
- /**
- * A special value for compositesToGroup that indicates that partitioned tombstones should not be included in results
- * or count towards the limit. See CASSANDRA-8490 for more details on why this is needed (and done this way).
- **/
- public static final int IGNORE_TOMBSTONED_PARTITIONS = -2;
-
- public final ColumnSlice[] slices;
- public final boolean reversed;
- public volatile int count;
- public final int compositesToGroup;
-
- // Not serialized, just a ack for range slices to find the number of live column counted, even when we group
- private ColumnCounter columnCounter;
-
- public SliceQueryFilter(Composite start, Composite finish, boolean reversed, int count)
- {
- this(new ColumnSlice(start, finish), reversed, count);
- }
-
- public SliceQueryFilter(Composite start, Composite finish, boolean reversed, int count, int compositesToGroup)
- {
- this(new ColumnSlice(start, finish), reversed, count, compositesToGroup);
- }
-
- public SliceQueryFilter(ColumnSlice slice, boolean reversed, int count)
- {
- this(new ColumnSlice[]{ slice }, reversed, count);
- }
-
- public SliceQueryFilter(ColumnSlice slice, boolean reversed, int count, int compositesToGroup)
- {
- this(new ColumnSlice[]{ slice }, reversed, count, compositesToGroup);
- }
-
- /**
- * Constructor that accepts multiple slices. All slices are assumed to be in the same direction (forward or
- * reversed).
- */
- public SliceQueryFilter(ColumnSlice[] slices, boolean reversed, int count)
- {
- this(slices, reversed, count, -1);
- }
-
- public SliceQueryFilter(ColumnSlice[] slices, boolean reversed, int count, int compositesToGroup)
- {
- this.slices = slices;
- this.reversed = reversed;
- this.count = count;
- this.compositesToGroup = compositesToGroup;
- }
-
- public SliceQueryFilter cloneShallow()
- {
- return new SliceQueryFilter(slices, reversed, count, compositesToGroup);
- }
-
- public SliceQueryFilter withUpdatedCount(int newCount)
- {
- return new SliceQueryFilter(slices, reversed, newCount, compositesToGroup);
- }
-
- public SliceQueryFilter withUpdatedSlices(ColumnSlice[] newSlices)
- {
- return new SliceQueryFilter(newSlices, reversed, count, compositesToGroup);
- }
-
- /** Returns true if the slice includes static columns, false otherwise. */
- private boolean sliceIncludesStatics(ColumnSlice slice, CFMetaData cfm)
- {
- return cfm.hasStaticColumns() &&
- slice.includes(reversed ? cfm.comparator.reverseComparator() : cfm.comparator, cfm.comparator.staticPrefix().end());
- }
-
- public boolean hasStaticSlice(CFMetaData cfm)
- {
- for (ColumnSlice slice : slices)
- if (sliceIncludesStatics(slice, cfm))
- return true;
-
- return false;
- }
-
- /**
- * Splits this filter into two SliceQueryFilters: one that slices only the static columns, and one that slices the
- * remainder of the normal data.
- *
- * This should only be called when the filter is reversed and the filter is known to cover static columns (through
- * hasStaticSlice()).
- *
- * @return a pair of (static, normal) SliceQueryFilters
- */
- public Pair<SliceQueryFilter, SliceQueryFilter> splitOutStaticSlice(CFMetaData cfm)
- {
- assert reversed;
-
- Composite staticSliceEnd = cfm.comparator.staticPrefix().end();
- List<ColumnSlice> nonStaticSlices = new ArrayList<>(slices.length);
- for (ColumnSlice slice : slices)
- {
- if (sliceIncludesStatics(slice, cfm))
- nonStaticSlices.add(new ColumnSlice(slice.start, staticSliceEnd));
- else
- nonStaticSlices.add(slice);
- }
-
- return Pair.create(
- new SliceQueryFilter(staticSliceEnd, Composites.EMPTY, true, count, compositesToGroup),
- new SliceQueryFilter(nonStaticSlices.toArray(new ColumnSlice[nonStaticSlices.size()]), true, count, compositesToGroup));
- }
-
- public SliceQueryFilter withUpdatedStart(Composite newStart, CFMetaData cfm)
- {
- Comparator<Composite> cmp = reversed ? cfm.comparator.reverseComparator() : cfm.comparator;
-
- // Check our slices to see if any fall before the new start (in which case they can be removed) or
- // if they contain the new start (in which case they should start from the page start). However, if the
- // slices would include static columns, we need to ensure they are also fetched, and so a separate
- // slice for the static columns may be required.
- // Note that if the query is reversed, we can't handle statics by simply adding a separate slice here, so
- // the reversed case is handled by SliceFromReadCommand instead. See CASSANDRA-8502 for more details.
- List<ColumnSlice> newSlices = new ArrayList<>();
- boolean pastNewStart = false;
- for (ColumnSlice slice : slices)
- {
- if (pastNewStart)
- {
- newSlices.add(slice);
- continue;
- }
-
- if (slice.isBefore(cmp, newStart))
- {
- if (!reversed && sliceIncludesStatics(slice, cfm))
- newSlices.add(new ColumnSlice(Composites.EMPTY, cfm.comparator.staticPrefix().end()));
-
- continue;
- }
- else if (slice.includes(cmp, newStart))
- {
- if (!reversed && sliceIncludesStatics(slice, cfm) && !newStart.isEmpty())
- newSlices.add(new ColumnSlice(Composites.EMPTY, cfm.comparator.staticPrefix().end()));
-
- newSlices.add(new ColumnSlice(newStart, slice.finish));
- }
- else
- {
- newSlices.add(slice);
- }
-
- pastNewStart = true;
- }
- return withUpdatedSlices(newSlices.toArray(new ColumnSlice[newSlices.size()]));
- }
-
- public Iterator<Cell> getColumnIterator(ColumnFamily cf)
- {
- assert cf != null;
- return reversed ? cf.reverseIterator(slices) : cf.iterator(slices);
- }
-
- public OnDiskAtomIterator getColumnIterator(final DecoratedKey key, final ColumnFamily cf)
- {
- assert cf != null;
- final Iterator<Cell> iter = getColumnIterator(cf);
-
- return new OnDiskAtomIterator()
- {
- public ColumnFamily getColumnFamily()
- {
- return cf;
- }
-
- public DecoratedKey getKey()
- {
- return key;
- }
-
- public boolean hasNext()
- {
- return iter.hasNext();
- }
-
- public OnDiskAtom next()
- {
- return iter.next();
- }
-
- public void close() throws IOException { }
-
- public void remove()
- {
- throw new UnsupportedOperationException();
- }
- };
- }
-
- public OnDiskAtomIterator getSSTableColumnIterator(SSTableReader sstable, DecoratedKey key)
- {
- return sstable.iterator(key, slices, reversed);
- }
-
- public OnDiskAtomIterator getSSTableColumnIterator(SSTableReader sstable, FileDataInput file, DecoratedKey key, RowIndexEntry indexEntry)
- {
- return sstable.iterator(file, key, slices, reversed, indexEntry);
- }
-
- public Comparator<Cell> getColumnComparator(CellNameType comparator)
- {
- return reversed ? comparator.columnReverseComparator() : comparator.columnComparator(false);
- }
-
- public void collectReducedColumns(ColumnFamily container, Iterator<Cell> reducedColumns, DecoratedKey key, int gcBefore, long now)
- {
- columnCounter = columnCounter(container.getComparator(), now);
- DeletionInfo.InOrderTester tester = container.deletionInfo().inOrderTester(reversed);
-
- while (reducedColumns.hasNext())
- {
- Cell cell = reducedColumns.next();
-
- if (logger.isTraceEnabled())
- logger.trace("collecting {} of {}: {}", columnCounter.live(), count, cell.getString(container.getComparator()));
-
- // An expired tombstone will be immediately discarded in memory, and needn't be counted.
- // Neither should be any cell shadowed by a range- or a partition tombstone.
- if (cell.getLocalDeletionTime() < gcBefore || !columnCounter.count(cell, tester))
- continue;
-
- if (columnCounter.live() > count)
- break;
-
- if (respectTombstoneThresholds() && columnCounter.tombstones() > DatabaseDescriptor.getTombstoneFailureThreshold())
- {
- Tracing.trace("Scanned over {} tombstones; query aborted (see tombstone_failure_threshold); slices={}",
- DatabaseDescriptor.getTombstoneFailureThreshold(), getSlicesInfo(container));
-
- throw new TombstoneOverwhelmingException(columnCounter.tombstones(),
- count,
- container.metadata().ksName,
- container.metadata().cfName,
- container.getComparator().getString(cell.name()),
- getSlicesInfo(container));
- }
-
- container.appendColumn(cell);
- }
-
- boolean warnTombstones = logger.isWarnEnabled() && respectTombstoneThresholds() && columnCounter.tombstones() > DatabaseDescriptor.getTombstoneWarnThreshold();
- if (warnTombstones)
- {
- String msg = String.format("Read %d live and %d tombstone cells in %s.%s for key: %1.512s (see tombstone_warn_threshold). %d columns were requested, slices=%1.512s",
- columnCounter.live(),
- columnCounter.tombstones(),
- container.metadata().ksName,
- container.metadata().cfName,
- container.metadata().getKeyValidator().getString(key.getKey()),
- count,
- getSlicesInfo(container));
- ClientWarn.warn(msg);
- logger.warn(msg);
- }
- Tracing.trace("Read {} live and {} tombstone cells{}",
- columnCounter.live(),
- columnCounter.tombstones(),
- warnTombstones ? " (see tombstone_warn_threshold)" : "");
- }
-
- private String getSlicesInfo(ColumnFamily container)
- {
- StringBuilder sb = new StringBuilder();
- CellNameType type = container.metadata().comparator;
- for (ColumnSlice sl : slices)
- {
- assert sl != null;
-
- sb.append('[');
- sb.append(type.getString(sl.start));
- sb.append('-');
- sb.append(type.getString(sl.finish));
- sb.append(']');
- }
- return sb.toString();
- }
-
- protected boolean respectTombstoneThresholds()
- {
- return true;
- }
-
- public int getLiveCount(ColumnFamily cf, long now)
- {
- return columnCounter(cf.getComparator(), now).countAll(cf).live();
- }
-
- public ColumnCounter columnCounter(CellNameType comparator, long now)
- {
- if (compositesToGroup < 0)
- return new ColumnCounter(now);
- else if (compositesToGroup == 0)
- return new ColumnCounter.GroupByPrefix(now, null, 0);
- else if (reversed)
- return new ColumnCounter.GroupByPrefixReversed(now, comparator, compositesToGroup);
- else
- return new ColumnCounter.GroupByPrefix(now, comparator, compositesToGroup);
- }
-
- public void trim(ColumnFamily cf, int trimTo, long now)
- {
- // each cell can increment the count by at most one, so if we have fewer cells than trimTo, we can skip trimming
- if (cf.getColumnCount() < trimTo)
- return;
-
- ColumnCounter counter = columnCounter(cf.getComparator(), now);
-
- Collection<Cell> cells = reversed
- ? cf.getReverseSortedColumns()
- : cf.getSortedColumns();
-
- DeletionInfo.InOrderTester tester = cf.deletionInfo().inOrderTester(reversed);
-
- for (Iterator<Cell> iter = cells.iterator(); iter.hasNext(); )
- {
- Cell cell = iter.next();
- counter.count(cell, tester);
-
- if (counter.live() > trimTo)
- {
- iter.remove();
- while (iter.hasNext())
- {
- iter.next();
- iter.remove();
- }
- }
- }
- }
-
- public Composite start()
- {
- return this.slices[0].start;
- }
-
- public Composite finish()
- {
- return this.slices[slices.length - 1].finish;
- }
-
- public void setStart(Composite start)
- {
- assert slices.length == 1;
- this.slices[0] = new ColumnSlice(start, this.slices[0].finish);
- }
-
- public int lastCounted()
- {
- // If we have a slice limit set, columnCounter.live() can overcount by one because we have to call
- // columnCounter.count() before we can tell if we've exceeded the slice limit (and accordingly, should not
- // add the cells to returned container). To deal with this overcounting, we take the min of the slice
- // limit and the counter's count.
- return columnCounter == null ? 0 : Math.min(columnCounter.live(), count);
- }
-
- public int lastTombstones()
- {
- return columnCounter == null ? 0 : columnCounter.tombstones();
- }
-
- public int lastLive()
- {
- return columnCounter == null ? 0 : columnCounter.live();
- }
-
- @Override
- public String toString()
- {
- return "SliceQueryFilter [reversed=" + reversed + ", slices=" + Arrays.toString(slices) + ", count=" + count + ", toGroup = " + compositesToGroup + "]";
- }
-
- public boolean isReversed()
- {
- return reversed;
- }
-
- public void updateColumnsLimit(int newLimit)
- {
- count = newLimit;
- }
-
- public boolean maySelectPrefix(CType type, Composite prefix)
- {
- for (ColumnSlice slice : slices)
- if (slice.includes(type, prefix))
- return true;
- return false;
- }
-
- public boolean shouldInclude(SSTableReader sstable)
- {
- List<ByteBuffer> minColumnNames = sstable.getSSTableMetadata().minColumnNames;
- List<ByteBuffer> maxColumnNames = sstable.getSSTableMetadata().maxColumnNames;
- CellNameType comparator = sstable.metadata.comparator;
-
- if (minColumnNames.isEmpty() || maxColumnNames.isEmpty())
- return true;
-
- for (ColumnSlice slice : slices)
- if (slice.intersects(minColumnNames, maxColumnNames, comparator, reversed))
- return true;
-
- return false;
- }
-
- public boolean isHeadFilter()
- {
- return slices.length == 1 && slices[0].start.isEmpty() && !reversed;
- }
-
- public boolean countCQL3Rows(CellNameType comparator)
- {
- // If comparator is dense a cell == a CQL3 rows so we're always counting CQL3 rows
- // in particular. Otherwise, we do so only if we group the cells into CQL rows.
- return comparator.isDense() || compositesToGroup >= 0;
- }
-
- public boolean isFullyCoveredBy(ColumnFamily cf, long now)
- {
- // cf is the beginning of a partition. It covers this filter if:
- // 1) either this filter requests the head of the partition and request less
- // than what cf has to offer (note: we do need to use getLiveCount() for that
- // as it knows if the filter count cells or CQL3 rows).
- // 2) the start and finish bound of this filter are included in cf.
- if (isHeadFilter() && count <= getLiveCount(cf, now))
- return true;
-
- if (start().isEmpty() || finish().isEmpty() || !cf.hasColumns())
- return false;
-
- Composite low = isReversed() ? finish() : start();
- Composite high = isReversed() ? start() : finish();
-
- CellName first = cf.iterator(ColumnSlice.ALL_COLUMNS_ARRAY).next().name();
- CellName last = cf.reverseIterator(ColumnSlice.ALL_COLUMNS_ARRAY).next().name();
-
- return cf.getComparator().compare(first, low) <= 0
- && cf.getComparator().compare(high, last) <= 0;
- }
-
- public static class Serializer implements IVersionedSerializer<SliceQueryFilter>
- {
- private CType type;
-
- public Serializer(CType type)
- {
- this.type = type;
- }
-
- public void serialize(SliceQueryFilter f, DataOutputPlus out, int version) throws IOException
- {
- out.writeInt(f.slices.length);
- for (ColumnSlice slice : f.slices)
- type.sliceSerializer().serialize(slice, out, version);
- out.writeBoolean(f.reversed);
- int count = f.count;
- out.writeInt(count);
-
- out.writeInt(f.compositesToGroup);
- }
-
- public SliceQueryFilter deserialize(DataInput in, int version) throws IOException
- {
- ColumnSlice[] slices;
- slices = new ColumnSlice[in.readInt()];
- for (int i = 0; i < slices.length; i++)
- slices[i] = type.sliceSerializer().deserialize(in, version);
- boolean reversed = in.readBoolean();
- int count = in.readInt();
- int compositesToGroup = in.readInt();
-
- return new SliceQueryFilter(slices, reversed, count, compositesToGroup);
- }
-
- public long serializedSize(SliceQueryFilter f, int version)
- {
- TypeSizes sizes = TypeSizes.NATIVE;
-
- int size = 0;
- size += sizes.sizeof(f.slices.length);
- for (ColumnSlice slice : f.slices)
- size += type.sliceSerializer().serializedSize(slice, version);
- size += sizes.sizeof(f.reversed);
- size += sizes.sizeof(f.count);
-
- size += sizes.sizeof(f.compositesToGroup);
- return size;
- }
- }
-
- public Iterator<RangeTombstone> getRangeTombstoneIterator(final ColumnFamily source)
- {
- final DeletionInfo delInfo = source.deletionInfo();
- if (!delInfo.hasRanges() || slices.length == 0)
- return Iterators.emptyIterator();
-
- return new AbstractIterator<RangeTombstone>()
- {
- private int sliceIdx = 0;
- private Iterator<RangeTombstone> sliceIter = currentRangeIter();
-
- protected RangeTombstone computeNext()
- {
- while (true)
- {
- if (sliceIter.hasNext())
- return sliceIter.next();
-
- if (!nextSlice())
- return endOfData();
-
- sliceIter = currentRangeIter();
- }
- }
-
- private Iterator<RangeTombstone> currentRangeIter()
- {
- ColumnSlice slice = slices[reversed ? (slices.length - 1 - sliceIdx) : sliceIdx];
- return reversed ? delInfo.rangeIterator(slice.finish, slice.start)
- : delInfo.rangeIterator(slice.start, slice.finish);
- }
-
- private boolean nextSlice()
- {
- return ++sliceIdx < slices.length;
- }
- };
- }
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/filter/TombstoneOverwhelmingException.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/TombstoneOverwhelmingException.java b/src/java/org/apache/cassandra/db/filter/TombstoneOverwhelmingException.java
index 7624e1b..98b539e 100644
--- a/src/java/org/apache/cassandra/db/filter/TombstoneOverwhelmingException.java
+++ b/src/java/org/apache/cassandra/db/filter/TombstoneOverwhelmingException.java
@@ -18,49 +18,51 @@
*/
package org.apache.cassandra.db.filter;
-import org.apache.cassandra.db.DecoratedKey;
+import java.nio.ByteBuffer;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.marshal.*;
public class TombstoneOverwhelmingException extends RuntimeException
{
- private final int numTombstones;
- private final int numRequested;
- private final String ksName;
- private final String cfName;
- private final String lastCellName;
- private final String slicesInfo;
- private String partitionKey = null;
-
- public TombstoneOverwhelmingException(int numTombstones,
- int numRequested,
- String ksName,
- String cfName,
- String lastCellName,
- String slicesInfo)
+ public TombstoneOverwhelmingException(int numTombstones, String query, CFMetaData metadata, DecoratedKey lastPartitionKey, ClusteringPrefix lastClustering)
{
- this.numTombstones = numTombstones;
- this.numRequested = numRequested;
- this.ksName = ksName;
- this.cfName = cfName;
- this.lastCellName = lastCellName;
- this.slicesInfo = slicesInfo;
+ super(String.format("Scanned over %d tombstones during query '%s' (last scanned row partion key was (%s)); query aborted",
+ numTombstones, query, makePKString(metadata, lastPartitionKey.getKey(), lastClustering)));
}
- public void setKey(DecoratedKey key)
+ private static String makePKString(CFMetaData metadata, ByteBuffer partitionKey, ClusteringPrefix clustering)
{
- if (key != null)
- partitionKey = key.toString();
- }
+ StringBuilder sb = new StringBuilder();
- public String getLocalizedMessage()
- {
- return getMessage();
- }
+ if (clustering.size() > 0)
+ sb.append("(");
- public String getMessage()
- {
- return String.format(
- "Scanned over %d tombstones in %s.%s; %d columns were requested; query aborted " +
- "(see tombstone_failure_threshold); partitionKey=%s; lastCell=%s; slices=%s",
- numTombstones, ksName, cfName, numRequested, partitionKey, lastCellName, slicesInfo);
+ // TODO: We should probably make that a lot easier/transparent for partition keys
+ AbstractType<?> pkType = metadata.getKeyValidator();
+ if (pkType instanceof CompositeType)
+ {
+ CompositeType ct = (CompositeType)pkType;
+ ByteBuffer[] values = ct.split(partitionKey);
+ for (int i = 0; i < values.length; i++)
+ {
+ if (i > 0)
+ sb.append(", ");
+ sb.append(ct.types.get(i).getString(values[i]));
+ }
+ }
+ else
+ {
+ sb.append(pkType.getString(partitionKey));
+ }
+
+ if (clustering.size() > 0)
+ sb.append(")");
+
+ for (int i = 0; i < clustering.size(); i++)
+ sb.append(", ").append(metadata.comparator.subtype(i).getString(clustering.get(i)));
+
+ return sb.toString();
}
}