You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2013/01/02 13:41:16 UTC
[6/9] Replace supercolumns internally by composites
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a005df3/src/java/org/apache/cassandra/db/SuperColumns.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SuperColumns.java b/src/java/org/apache/cassandra/db/SuperColumns.java
new file mode 100644
index 0000000..c45f9f4
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/SuperColumns.java
@@ -0,0 +1,442 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOError;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.filter.*;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.marshal.CompositeType;
+import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+public class SuperColumns
+{
+ public static Iterator<OnDiskAtom> onDiskIterator(DataInput dis, int superColumnCount, ColumnSerializer.Flag flag, int expireBefore)
+ {
+ return new SCIterator(dis, superColumnCount, flag, expireBefore);
+ }
+
+ public static void serializeSuperColumnFamily(ColumnFamily scf, DataOutput dos, int version) throws IOException
+ {
+ /*
+ * There is 2 complications:
+ * 1) We need to know the number of super columns in the column
+ * family to write in the header (so we do a first pass to group
+ * columns before serializing).
+ * 2) For deletion infos, we need to figure out which are top-level
+ * deletions and which are super columns deletions (i.e. the
+ * subcolumns range deletions).
+ */
+ DeletionInfo delInfo = scf.deletionInfo();
+ Map<ByteBuffer, List<Column>> scMap = groupSuperColumns(scf);
+
+ // Actually Serialize
+ DeletionInfo.serializer().serialize(new DeletionInfo(delInfo.getTopLevelDeletion()), dos, version);
+ dos.writeInt(scMap.size());
+
+ for (Map.Entry<ByteBuffer, List<Column>> entry : scMap.entrySet())
+ {
+ ByteBufferUtil.writeWithShortLength(entry.getKey(), dos);
+
+ List<DeletionTime> delTimes = delInfo.rangeCovering(entry.getKey());
+ assert delTimes.size() <= 1; // We're supposed to have either no deletion, or a full SC deletion.
+ DeletionInfo scDelInfo = delTimes.isEmpty() ? DeletionInfo.LIVE : new DeletionInfo(delTimes.get(0));
+ DeletionInfo.serializer().serialize(scDelInfo, dos, MessagingService.VERSION_10);
+
+ dos.writeInt(entry.getValue().size());
+ for (Column subColumn : entry.getValue())
+ Column.serializer().serialize(subColumn, dos);
+ }
+ }
+
+ private static Map<ByteBuffer, List<Column>> groupSuperColumns(ColumnFamily scf)
+ {
+ CompositeType type = (CompositeType)scf.getComparator();
+ // The order of insertion matters!
+ Map<ByteBuffer, List<Column>> scMap = new LinkedHashMap<ByteBuffer, List<Column>>();
+
+ ByteBuffer scName = null;
+ List<Column> subColumns = null;
+ for (Column column : scf)
+ {
+ ByteBuffer newScName = scName(column.name());
+ ByteBuffer newSubName = subName(column.name());
+
+ if (scName == null || type.types.get(0).compare(scName, newScName) != 0)
+ {
+ // new super column
+ scName = newScName;
+ subColumns = new ArrayList<Column>();
+ scMap.put(scName, subColumns);
+ }
+
+ subColumns.add(((Column)column).withUpdatedName(newSubName));
+ }
+ return scMap;
+ }
+
+ public static void deserializerSuperColumnFamily(DataInput dis, ColumnFamily cf, ColumnSerializer.Flag flag, int expireBefore, int version) throws IOException
+ {
+ // Note that there was no way to insert a range tombstone in a SCF in 1.2
+ cf.delete(DeletionInfo.serializer().deserialize(dis, version, cf.getComparator()));
+ assert !cf.deletionInfo().rangeIterator().hasNext();
+
+ Iterator<OnDiskAtom> iter = onDiskIterator(dis, dis.readInt(), flag, expireBefore);
+ while (iter.hasNext())
+ cf.addAtom(iter.next());
+ }
+
+ public static long serializedSize(ColumnFamily scf, TypeSizes typeSizes, int version)
+ {
+ Map<ByteBuffer, List<Column>> scMap = groupSuperColumns(scf);
+ DeletionInfo delInfo = scf.deletionInfo();
+
+ // Actually Serialize
+ long size = DeletionInfo.serializer().serializedSize(new DeletionInfo(delInfo.getTopLevelDeletion()), version);
+ for (Map.Entry<ByteBuffer, List<Column>> entry : scMap.entrySet())
+ {
+ int nameSize = entry.getKey().remaining();
+ size += typeSizes.sizeof((short) nameSize) + nameSize;
+
+ List<DeletionTime> delTimes = delInfo.rangeCovering(entry.getKey());
+ assert delTimes.size() <= 1; // We're supposed to have either no deletion, or a full SC deletion.
+ DeletionInfo scDelInfo = delTimes.isEmpty() ? DeletionInfo.LIVE : new DeletionInfo(delTimes.get(0));
+ size += DeletionInfo.serializer().serializedSize(scDelInfo, MessagingService.VERSION_10);
+
+ size += typeSizes.sizeof(entry.getValue().size());
+ for (Column subColumn : entry.getValue())
+ size += Column.serializer().serializedSize(subColumn, typeSizes);
+ }
+ return size;
+ }
+
+ private static class SCIterator implements Iterator<OnDiskAtom>
+ {
+ private final DataInput dis;
+ private final int scCount;
+
+ private final ColumnSerializer.Flag flag;
+ private final int expireBefore;
+
+ private int read;
+ private ByteBuffer scName;
+ private Iterator<Column> subColumnsIterator;
+
+ private SCIterator(DataInput dis, int superColumnCount, ColumnSerializer.Flag flag, int expireBefore)
+ {
+ this.dis = dis;
+ this.scCount = superColumnCount;
+ this.flag = flag;
+ this.expireBefore = expireBefore;
+ }
+
+ public boolean hasNext()
+ {
+ return (subColumnsIterator != null && subColumnsIterator.hasNext()) || read < scCount;
+ }
+
+ public OnDiskAtom next()
+ {
+ try
+ {
+ if (subColumnsIterator != null && subColumnsIterator.hasNext())
+ {
+ Column c = subColumnsIterator.next();
+ return c.withUpdatedName(CompositeType.build(scName, c.name()));
+ }
+
+ // Read one more super column
+ ++read;
+
+ scName = ByteBufferUtil.readWithShortLength(dis);
+ DeletionInfo delInfo = DeletionInfo.serializer().deserialize(dis, MessagingService.VERSION_10, null);
+ assert !delInfo.rangeIterator().hasNext(); // We assume no range tombstone (there was no way to insert some in a SCF in 1.2)
+
+ /* read the number of columns */
+ int size = dis.readInt();
+ List<Column> subColumns = new ArrayList<Column>(size);
+
+ for (int i = 0; i < size; ++i)
+ subColumns.add(Column.serializer().deserialize(dis, flag, expireBefore));
+
+ subColumnsIterator = subColumns.iterator();
+
+ // If the SC was deleted, return that first, otherwise return the first subcolumn
+ DeletionTime dtime = delInfo.getTopLevelDeletion();
+ if (!dtime.equals(DeletionTime.LIVE))
+ return new RangeTombstone(startOf(scName), endOf(scName), dtime);
+
+ return next();
+ }
+ catch (IOException e)
+ {
+ throw new IOError(e);
+ }
+ }
+
+ public void remove()
+ {
+ throw new UnsupportedOperationException();
+ }
+ }
+
+ public static AbstractType<?> getComparatorFor(CFMetaData metadata, ByteBuffer superColumn)
+ {
+ return getComparatorFor(metadata, superColumn != null);
+ }
+
+ public static AbstractType<?> getComparatorFor(CFMetaData metadata, boolean subColumn)
+ {
+ return metadata.isSuper()
+ ? ((CompositeType)metadata.comparator).types.get(subColumn ? 1 : 0)
+ : metadata.comparator;
+ }
+
+ // Extract the first component of a columnName, i.e. the super column name
+ public static ByteBuffer scName(ByteBuffer columnName)
+ {
+ return CompositeType.extractComponent(columnName, 0);
+ }
+
+ // Extract the 2nd component of a columnName, i.e. the sub-column name
+ public static ByteBuffer subName(ByteBuffer columnName)
+ {
+ return CompositeType.extractComponent(columnName, 1);
+ }
+
+ // We don't use CompositeType.Builder mostly because we want to avoid having to provide the comparator.
+ public static ByteBuffer startOf(ByteBuffer scName)
+ {
+ int length = scName.remaining();
+ ByteBuffer bb = ByteBuffer.allocate(2 + length + 1);
+
+ bb.put((byte) ((length >> 8) & 0xFF));
+ bb.put((byte) (length & 0xFF));
+ bb.put(scName.duplicate());
+ bb.put((byte) 0);
+ bb.flip();
+ return bb;
+ }
+
+ public static ByteBuffer endOf(ByteBuffer scName)
+ {
+ ByteBuffer bb = startOf(scName);
+ bb.put(bb.remaining() - 1, (byte)1);
+ return bb;
+ }
+
+ public static SCFilter filterToSC(CompositeType type, IDiskAtomFilter filter)
+ {
+ if (filter instanceof NamesQueryFilter)
+ return namesFilterToSC(type, (NamesQueryFilter)filter);
+ else
+ return sliceFilterToSC(type, (SliceQueryFilter)filter);
+ }
+
+ public static SCFilter namesFilterToSC(CompositeType type, NamesQueryFilter filter)
+ {
+ ByteBuffer scName = null;
+ SortedSet<ByteBuffer> newColumns = new TreeSet<ByteBuffer>(filter.columns.comparator());
+ for (ByteBuffer name : filter.columns)
+ {
+ ByteBuffer newScName = scName(name);
+
+ if (scName == null)
+ {
+ scName = newScName;
+ }
+ else if (type.types.get(0).compare(scName, newScName) != 0)
+ {
+ // If we're selecting column across multiple SC, it's not something we can translate for an old node
+ throw new RuntimeException("Cannot convert filter to old super column format. Update all nodes to Cassandra 2.0 first.");
+ }
+
+ newColumns.add(subName(name));
+ }
+ return new SCFilter(scName, new NamesQueryFilter(newColumns));
+ }
+
+ public static SCFilter sliceFilterToSC(CompositeType type, SliceQueryFilter filter)
+ {
+ /*
+ * There is 3 main cases that we can translate back into super column
+ * queries:
+ * 1) We have only one slice where the first component of start and
+ * finish is the same, we translate as a slice query on one SC.
+ * 2) We have only one slice, neither the start and finish have a 2nd
+ * component, and end has the 'end of component' set, we translate
+ * as a slice of SCs.
+ * 3) Each slice has the same first component for start and finish, no
+ * 2nd component and each finish has the 'end of component' set, we
+ * translate as a names query of SCs (the filter must then not be reversed).
+ * Otherwise, we can't do much.
+ */
+
+ boolean reversed = filter.reversed;
+ if (filter.slices.length == 1)
+ {
+ ByteBuffer start = filter.slices[0].start;
+ ByteBuffer finish = filter.slices[0].start;
+
+ if (filter.compositesToGroup == 1)
+ {
+ // Note: all the resulting filter must have compositeToGroup == 0 because this
+ // make no sense for super column on the destination node otherwise
+ if (start.remaining() == 0)
+ {
+ if (finish.remaining() == 0)
+ // An 'IdentityFilter', keep as is (except for the compositeToGroup)
+ return new SCFilter(null, new SliceQueryFilter(filter.start(), filter.finish(), reversed, filter.count));
+
+ if (subName(finish) == null
+ && ((!reversed && !firstEndOfComponent(finish)) || (reversed && firstEndOfComponent(finish))))
+ return new SCFilter(null, new SliceQueryFilter(ByteBufferUtil.EMPTY_BYTE_BUFFER, scName(finish), reversed, filter.count));
+ }
+ else if (finish.remaining() == 0)
+ {
+ if (subName(start) == null
+ && ((!reversed && firstEndOfComponent(start)) || (reversed && !firstEndOfComponent(start))))
+ return new SCFilter(null, new SliceQueryFilter(scName(start), ByteBufferUtil.EMPTY_BYTE_BUFFER, reversed, filter.count));
+ }
+ else if (subName(start) == null && subName(finish) == null
+ && (( reversed && !firstEndOfComponent(start) && firstEndOfComponent(finish))
+ || (!reversed && firstEndOfComponent(start) && !firstEndOfComponent(finish))))
+ {
+ // A slice of supercolumns
+ return new SCFilter(null, new SliceQueryFilter(scName(start), scName(finish), reversed, filter.count));
+ }
+ }
+ else if (filter.compositesToGroup == 0 && type.types.get(0).compare(scName(start), scName(finish)) == 0)
+ {
+ // A slice of subcolumns
+ return new SCFilter(scName(start), filter.withUpdatedSlice(subName(start), subName(finish)));
+ }
+ }
+ else if (!reversed)
+ {
+ SortedSet<ByteBuffer> columns = new TreeSet<ByteBuffer>(type.types.get(0));
+ for (int i = 0; i < filter.slices.length; ++i)
+ {
+ ByteBuffer start = filter.slices[i].start;
+ ByteBuffer finish = filter.slices[i].finish;
+
+ if (subName(start) != null || subName(finish) != null
+ || type.types.get(0).compare(scName(start), scName(finish)) != 0
+ || firstEndOfComponent(start) || !firstEndOfComponent(finish))
+ throw new RuntimeException("Cannot convert filter to old super column format. Update all nodes to Cassandra 2.0 first.");
+
+ columns.add(scName(start));
+ }
+ return new SCFilter(null, new NamesQueryFilter(columns));
+ }
+ throw new RuntimeException("Cannot convert filter to old super column format. Update all nodes to Cassandra 2.0 first.");
+ }
+
+ public static IDiskAtomFilter fromSCFilter(CompositeType type, ByteBuffer scName, IDiskAtomFilter filter)
+ {
+ if (filter instanceof NamesQueryFilter)
+ return fromSCNamesFilter(type, scName, (NamesQueryFilter)filter);
+ else
+ return fromSCSliceFilter(type, scName, (SliceQueryFilter)filter);
+ }
+
+ public static IDiskAtomFilter fromSCNamesFilter(CompositeType type, ByteBuffer scName, NamesQueryFilter filter)
+ {
+ if (scName == null)
+ {
+ ColumnSlice[] slices = new ColumnSlice[filter.columns.size()];
+ int i = 0;
+ for (ByteBuffer bb : filter.columns)
+ {
+ CompositeType.Builder builder = type.builder().add(bb);
+ slices[i++] = new ColumnSlice(builder.build(), builder.buildAsEndOfRange());
+ }
+ return new SliceQueryFilter(slices, false, slices.length, 1, 1);
+ }
+ else
+ {
+ SortedSet<ByteBuffer> newColumns = new TreeSet<ByteBuffer>(type);
+ for (ByteBuffer c : filter.columns)
+ newColumns.add(CompositeType.build(scName, c));
+ return filter.withUpdatedColumns(newColumns);
+ }
+ }
+
+ public static SliceQueryFilter fromSCSliceFilter(CompositeType type, ByteBuffer scName, SliceQueryFilter filter)
+ {
+ assert filter.slices.length == 1;
+ if (scName == null)
+ {
+ ByteBuffer start = filter.start().remaining() == 0
+ ? filter.start()
+ : (filter.reversed ? type.builder().add(filter.start()).buildAsEndOfRange()
+ : type.builder().add(filter.start()).build());
+ ByteBuffer finish = filter.finish().remaining() == 0
+ ? filter.finish()
+ : (filter.reversed ? type.builder().add(filter.finish()).build()
+ : type.builder().add(filter.finish()).buildAsEndOfRange());
+ return new SliceQueryFilter(start, finish, filter.reversed, filter.count, 1);
+ }
+ else
+ {
+ CompositeType.Builder builder = type.builder().add(scName);
+ ByteBuffer start = filter.start().remaining() == 0
+ ? filter.reversed ? builder.buildAsEndOfRange() : builder.build()
+ : builder.copy().add(filter.start()).build();
+ ByteBuffer end = filter.finish().remaining() == 0
+ ? filter.reversed ? builder.build() : builder.buildAsEndOfRange()
+ : builder.add(filter.finish()).build();
+ return new SliceQueryFilter(start, end, filter.reversed, filter.count);
+ }
+ }
+
+ private static boolean firstEndOfComponent(ByteBuffer bb)
+ {
+ bb = bb.duplicate();
+ int length = (bb.get() & 0xFF) << 8;
+ length |= (bb.get() & 0xFF);
+
+ return bb.get(length + 2) == 1;
+ }
+
+ public static class SCFilter
+ {
+ public final ByteBuffer scName;
+ public final IDiskAtomFilter updatedFilter;
+
+ public SCFilter(ByteBuffer scName, IDiskAtomFilter updatedFilter)
+ {
+ this.scName = scName;
+ this.updatedFilter = updatedFilter;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a005df3/src/java/org/apache/cassandra/db/SystemTable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SystemTable.java b/src/java/org/apache/cassandra/db/SystemTable.java
index 8f60904..940300c 100644
--- a/src/java/org/apache/cassandra/db/SystemTable.java
+++ b/src/java/org/apache/cassandra/db/SystemTable.java
@@ -43,7 +43,6 @@ import org.apache.cassandra.cql3.QueryProcessor;
import org.apache.cassandra.cql3.UntypedResultSet;
import org.apache.cassandra.db.columniterator.IdentityQueryFilter;
import org.apache.cassandra.db.filter.QueryFilter;
-import org.apache.cassandra.db.filter.QueryPath;
import org.apache.cassandra.db.marshal.AsciiType;
import org.apache.cassandra.db.marshal.BytesType;
import org.apache.cassandra.db.marshal.UTF8Type;
@@ -156,9 +155,9 @@ public class SystemTable
SortedSet<ByteBuffer> cols = new TreeSet<ByteBuffer>(BytesType.instance);
cols.add(ByteBufferUtil.bytes("ClusterName"));
cols.add(ByteBufferUtil.bytes("Token"));
- QueryFilter filter = QueryFilter.getNamesFilter(decorate(ByteBufferUtil.bytes("L")), new QueryPath(OLD_STATUS_CF), cols);
+ QueryFilter filter = QueryFilter.getNamesFilter(decorate(ByteBufferUtil.bytes("L")), OLD_STATUS_CF, cols);
ColumnFamily oldCf = oldStatusCfs.getColumnFamily(filter);
- Iterator<IColumn> oldColumns = oldCf.columns.iterator();
+ Iterator<Column> oldColumns = oldCf.columns.iterator();
String clusterName = null;
try
@@ -514,7 +513,7 @@ public class SystemTable
{
ColumnFamilyStore cfs = Table.open(Table.SYSTEM_KS).getColumnFamilyStore(INDEX_CF);
QueryFilter filter = QueryFilter.getNamesFilter(decorate(ByteBufferUtil.bytes(table)),
- new QueryPath(INDEX_CF),
+ INDEX_CF,
ByteBufferUtil.bytes(indexName));
return ColumnFamilyStore.removeDeleted(cfs.getColumnFamily(filter), Integer.MAX_VALUE) != null;
}
@@ -532,7 +531,7 @@ public class SystemTable
public static void setIndexRemoved(String table, String indexName)
{
RowMutation rm = new RowMutation(Table.SYSTEM_KS, ByteBufferUtil.bytes(table));
- rm.delete(new QueryPath(INDEX_CF, null, ByteBufferUtil.bytes(indexName)), FBUtilities.timestampMicros());
+ rm.delete(INDEX_CF, ByteBufferUtil.bytes(indexName), FBUtilities.timestampMicros());
rm.apply();
forceBlockingFlush(INDEX_CF);
}
@@ -573,7 +572,7 @@ public class SystemTable
// Get the last CounterId (since CounterId are timeuuid is thus ordered from the older to the newer one)
QueryFilter filter = QueryFilter.getSliceFilter(decorate(ALL_LOCAL_NODE_ID_KEY),
- new QueryPath(COUNTER_ID_CF),
+ COUNTER_ID_CF,
ByteBufferUtil.EMPTY_BYTE_BUFFER,
ByteBufferUtil.EMPTY_BYTE_BUFFER,
true,
@@ -611,11 +610,11 @@ public class SystemTable
List<CounterId.CounterIdRecord> l = new ArrayList<CounterId.CounterIdRecord>();
Table table = Table.open(Table.SYSTEM_KS);
- QueryFilter filter = QueryFilter.getIdentityFilter(decorate(ALL_LOCAL_NODE_ID_KEY), new QueryPath(COUNTER_ID_CF));
+ QueryFilter filter = QueryFilter.getIdentityFilter(decorate(ALL_LOCAL_NODE_ID_KEY), COUNTER_ID_CF);
ColumnFamily cf = table.getColumnFamilyStore(COUNTER_ID_CF).getColumnFamily(filter);
CounterId previous = null;
- for (IColumn c : cf)
+ for (Column c : cf)
{
if (previous != null)
l.add(new CounterId.CounterIdRecord(previous, c.timestamp()));
@@ -655,8 +654,7 @@ public class SystemTable
{
Token minToken = StorageService.getPartitioner().getMinimumToken();
- return schemaCFS(schemaCfName).getRangeSlice(null,
- new Range<RowPosition>(minToken.minKeyBound(),
+ return schemaCFS(schemaCfName).getRangeSlice(new Range<RowPosition>(minToken.minKeyBound(),
minToken.maxKeyBound()),
Integer.MAX_VALUE,
new IdentityQueryFilter(),
@@ -713,7 +711,7 @@ public class SystemTable
DecoratedKey key = StorageService.getPartitioner().decorateKey(getSchemaKSKey(ksName));
ColumnFamilyStore schemaCFS = SystemTable.schemaCFS(SCHEMA_KEYSPACES_CF);
- ColumnFamily result = schemaCFS.getColumnFamily(QueryFilter.getIdentityFilter(key, new QueryPath(SCHEMA_KEYSPACES_CF)));
+ ColumnFamily result = schemaCFS.getColumnFamily(QueryFilter.getIdentityFilter(key, SCHEMA_KEYSPACES_CF));
return new Row(key, result);
}
@@ -724,7 +722,6 @@ public class SystemTable
ColumnFamilyStore schemaCFS = SystemTable.schemaCFS(SCHEMA_COLUMNFAMILIES_CF);
ColumnFamily result = schemaCFS.getColumnFamily(key,
- new QueryPath(SCHEMA_COLUMNFAMILIES_CF),
DefsTable.searchComposite(cfName, true),
DefsTable.searchComposite(cfName, false),
false,
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a005df3/src/java/org/apache/cassandra/db/Table.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Table.java b/src/java/org/apache/cassandra/db/Table.java
index 786ed43..361e3b2 100644
--- a/src/java/org/apache/cassandra/db/Table.java
+++ b/src/java/org/apache/cassandra/db/Table.java
@@ -425,7 +425,7 @@ public class Table
{
ColumnFamily cf = pager.next();
ColumnFamily cf2 = cf.cloneMeShallow();
- for (IColumn column : cf)
+ for (Column column : cf)
{
if (cfs.indexManager.indexes(column.name(), indexes))
cf2.addColumn(column);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a005df3/src/java/org/apache/cassandra/db/ThreadSafeSortedColumns.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ThreadSafeSortedColumns.java b/src/java/org/apache/cassandra/db/ThreadSafeSortedColumns.java
index 6aa19f0..776e543 100644
--- a/src/java/org/apache/cassandra/db/ThreadSafeSortedColumns.java
+++ b/src/java/org/apache/cassandra/db/ThreadSafeSortedColumns.java
@@ -33,7 +33,7 @@ import org.apache.cassandra.utils.Allocator;
public class ThreadSafeSortedColumns extends AbstractThreadUnsafeSortedColumns implements ISortedColumns
{
- private final ConcurrentSkipListMap<ByteBuffer, IColumn> map;
+ private final ConcurrentSkipListMap<ByteBuffer, Column> map;
public static final ISortedColumns.Factory factory = new Factory()
{
@@ -42,7 +42,7 @@ public class ThreadSafeSortedColumns extends AbstractThreadUnsafeSortedColumns i
return new ThreadSafeSortedColumns(comparator);
}
- public ISortedColumns fromSorted(SortedMap<ByteBuffer, IColumn> sortedMap, boolean insertReversed)
+ public ISortedColumns fromSorted(SortedMap<ByteBuffer, Column> sortedMap, boolean insertReversed)
{
return new ThreadSafeSortedColumns(sortedMap);
}
@@ -60,12 +60,12 @@ public class ThreadSafeSortedColumns extends AbstractThreadUnsafeSortedColumns i
private ThreadSafeSortedColumns(AbstractType<?> comparator)
{
- this.map = new ConcurrentSkipListMap<ByteBuffer, IColumn>(comparator);
+ this.map = new ConcurrentSkipListMap<ByteBuffer, Column>(comparator);
}
- private ThreadSafeSortedColumns(SortedMap<ByteBuffer, IColumn> columns)
+ private ThreadSafeSortedColumns(SortedMap<ByteBuffer, Column> columns)
{
- this.map = new ConcurrentSkipListMap<ByteBuffer, IColumn>(columns);
+ this.map = new ConcurrentSkipListMap<ByteBuffer, Column>(columns);
}
public ISortedColumns.Factory getFactory()
@@ -87,59 +87,49 @@ public class ThreadSafeSortedColumns extends AbstractThreadUnsafeSortedColumns i
* If we find an old column that has the same name
* the ask it to resolve itself else add the new column
*/
- public void addColumn(IColumn column, Allocator allocator)
+ public void addColumn(Column column, Allocator allocator)
{
addColumnInternal(column, allocator);
}
- private long addColumnInternal(IColumn column, Allocator allocator)
+ private long addColumnInternal(Column column, Allocator allocator)
{
ByteBuffer name = column.name();
while (true)
{
- IColumn oldColumn = map.putIfAbsent(name, column);
+ Column oldColumn = map.putIfAbsent(name, column);
if (oldColumn == null)
return column.dataSize();
- if (oldColumn instanceof SuperColumn)
- {
- assert column instanceof SuperColumn;
- long previousSize = oldColumn.dataSize();
- ((SuperColumn) oldColumn).putColumn((SuperColumn)column, allocator);
- return oldColumn.dataSize() - previousSize;
- }
- else
- {
- // calculate reconciled col from old (existing) col and new col
- IColumn reconciledColumn = column.reconcile(oldColumn, allocator);
- if (map.replace(name, oldColumn, reconciledColumn))
- return reconciledColumn.dataSize() - oldColumn.dataSize();
-
- // We failed to replace column due to a concurrent update or a concurrent removal. Keep trying.
- // (Currently, concurrent removal should not happen (only updates), but let us support that anyway.)
- }
+ // calculate reconciled col from old (existing) col and new col
+ Column reconciledColumn = column.reconcile(oldColumn, allocator);
+ if (map.replace(name, oldColumn, reconciledColumn))
+ return reconciledColumn.dataSize() - oldColumn.dataSize();
+
+ // We failed to replace column due to a concurrent update or a concurrent removal. Keep trying.
+ // (Currently, concurrent removal should not happen (only updates), but let us support that anyway.)
}
}
/**
* We need to go through each column in the column container and resolve it before adding
*/
- public void addAll(ISortedColumns cm, Allocator allocator, Function<IColumn, IColumn> transformation)
+ public void addAll(ISortedColumns cm, Allocator allocator, Function<Column, Column> transformation)
{
addAllWithSizeDelta(cm, allocator, transformation, null);
}
@Override
- public long addAllWithSizeDelta(ISortedColumns cm, Allocator allocator, Function<IColumn, IColumn> transformation, SecondaryIndexManager.Updater indexer)
+ public long addAllWithSizeDelta(ISortedColumns cm, Allocator allocator, Function<Column, Column> transformation, SecondaryIndexManager.Updater indexer)
{
delete(cm.getDeletionInfo());
long sizeDelta = 0;
- for (IColumn column : cm.getSortedColumns())
+ for (Column column : cm.getSortedColumns())
sizeDelta += addColumnInternal(transformation.apply(column), allocator);
return sizeDelta;
}
- public boolean replace(IColumn oldColumn, IColumn newColumn)
+ public boolean replace(Column oldColumn, Column newColumn)
{
if (!oldColumn.name().equals(newColumn.name()))
throw new IllegalArgumentException();
@@ -147,7 +137,7 @@ public class ThreadSafeSortedColumns extends AbstractThreadUnsafeSortedColumns i
return map.replace(oldColumn.name(), oldColumn, newColumn);
}
- public IColumn getColumn(ByteBuffer name)
+ public Column getColumn(ByteBuffer name)
{
return map.get(name);
}
@@ -167,12 +157,12 @@ public class ThreadSafeSortedColumns extends AbstractThreadUnsafeSortedColumns i
return map.size();
}
- public Collection<IColumn> getSortedColumns()
+ public Collection<Column> getSortedColumns()
{
return map.values();
}
- public Collection<IColumn> getReverseSortedColumns()
+ public Collection<Column> getReverseSortedColumns()
{
return map.descendingMap().values();
}
@@ -182,17 +172,17 @@ public class ThreadSafeSortedColumns extends AbstractThreadUnsafeSortedColumns i
return map.navigableKeySet();
}
- public Iterator<IColumn> iterator()
+ public Iterator<Column> iterator()
{
return map.values().iterator();
}
- public Iterator<IColumn> iterator(ColumnSlice[] slices)
+ public Iterator<Column> iterator(ColumnSlice[] slices)
{
return new ColumnSlice.NavigableMapIterator(map, slices);
}
- public Iterator<IColumn> reverseIterator(ColumnSlice[] slices)
+ public Iterator<Column> reverseIterator(ColumnSlice[] slices)
{
return new ColumnSlice.NavigableMapIterator(map.descendingMap(), slices);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a005df3/src/java/org/apache/cassandra/db/TreeMapBackedSortedColumns.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/TreeMapBackedSortedColumns.java b/src/java/org/apache/cassandra/db/TreeMapBackedSortedColumns.java
index c4ec52f..20cbd90 100644
--- a/src/java/org/apache/cassandra/db/TreeMapBackedSortedColumns.java
+++ b/src/java/org/apache/cassandra/db/TreeMapBackedSortedColumns.java
@@ -33,7 +33,7 @@ import org.apache.cassandra.utils.Allocator;
public class TreeMapBackedSortedColumns extends AbstractThreadUnsafeSortedColumns implements ISortedColumns
{
- private final TreeMap<ByteBuffer, IColumn> map;
+ private final TreeMap<ByteBuffer, Column> map;
public static final ISortedColumns.Factory factory = new Factory()
{
@@ -42,7 +42,7 @@ public class TreeMapBackedSortedColumns extends AbstractThreadUnsafeSortedColumn
return new TreeMapBackedSortedColumns(comparator);
}
- public ISortedColumns fromSorted(SortedMap<ByteBuffer, IColumn> sortedMap, boolean insertReversed)
+ public ISortedColumns fromSorted(SortedMap<ByteBuffer, Column> sortedMap, boolean insertReversed)
{
return new TreeMapBackedSortedColumns(sortedMap);
}
@@ -60,12 +60,12 @@ public class TreeMapBackedSortedColumns extends AbstractThreadUnsafeSortedColumn
private TreeMapBackedSortedColumns(AbstractType<?> comparator)
{
- this.map = new TreeMap<ByteBuffer, IColumn>(comparator);
+ this.map = new TreeMap<ByteBuffer, Column>(comparator);
}
- private TreeMapBackedSortedColumns(SortedMap<ByteBuffer, IColumn> columns)
+ private TreeMapBackedSortedColumns(SortedMap<ByteBuffer, Column> columns)
{
- this.map = new TreeMap<ByteBuffer, IColumn>(columns);
+ this.map = new TreeMap<ByteBuffer, Column>(columns);
}
public ISortedColumns.Factory getFactory()
@@ -83,7 +83,7 @@ public class TreeMapBackedSortedColumns extends AbstractThreadUnsafeSortedColumn
return false;
}
- public void addColumn(IColumn column, Allocator allocator)
+ public void addColumn(Column column, Allocator allocator)
{
addColumn(column, allocator, SecondaryIndexManager.nullUpdater);
}
@@ -92,47 +92,33 @@ public class TreeMapBackedSortedColumns extends AbstractThreadUnsafeSortedColumn
* If we find an old column that has the same name
* the ask it to resolve itself else add the new column
*/
- public long addColumn(IColumn column, Allocator allocator, SecondaryIndexManager.Updater indexer)
+ public long addColumn(Column column, Allocator allocator, SecondaryIndexManager.Updater indexer)
{
ByteBuffer name = column.name();
// this is a slightly unusual way to structure this; a more natural way is shown in ThreadSafeSortedColumns,
// but TreeMap lacks putAbsent. Rather than split it into a "get, then put" check, we do it as follows,
// which saves the extra "get" in the no-conflict case [for both normal and super columns],
// in exchange for a re-put in the SuperColumn case.
- IColumn oldColumn = map.put(name, column);
+ Column oldColumn = map.put(name, column);
if (oldColumn == null)
return column.dataSize();
- if (oldColumn instanceof SuperColumn)
- {
- assert column instanceof SuperColumn;
- long previousSize = oldColumn.dataSize();
- // since oldColumn is where we've been accumulating results, it's usually going to be faster to
- // add the new one to the old, then place old back in the Map, rather than copy the old contents
- // into the new Map entry.
- ((SuperColumn) oldColumn).putColumn((SuperColumn)column, allocator);
- map.put(name, oldColumn);
- return oldColumn.dataSize() - previousSize;
- }
+ // calculate reconciled col from old (existing) col and new col
+ Column reconciledColumn = column.reconcile(oldColumn, allocator);
+ map.put(name, reconciledColumn);
+ // for memtable updates we only care about oldcolumn, reconciledcolumn, but when compacting
+ // we need to make sure we update indexes no matter the order we merge
+ if (reconciledColumn == column)
+ indexer.update(oldColumn, reconciledColumn);
else
- {
- // calculate reconciled col from old (existing) col and new col
- IColumn reconciledColumn = column.reconcile(oldColumn, allocator);
- map.put(name, reconciledColumn);
- // for memtable updates we only care about oldcolumn, reconciledcolumn, but when compacting
- // we need to make sure we update indexes no matter the order we merge
- if (reconciledColumn == column)
- indexer.update(oldColumn, reconciledColumn);
- else
- indexer.update(column, reconciledColumn);
- return reconciledColumn.dataSize() - oldColumn.dataSize();
- }
+ indexer.update(column, reconciledColumn);
+ return reconciledColumn.dataSize() - oldColumn.dataSize();
}
- public long addAllWithSizeDelta(ISortedColumns cm, Allocator allocator, Function<IColumn, IColumn> transformation, SecondaryIndexManager.Updater indexer)
+ public long addAllWithSizeDelta(ISortedColumns cm, Allocator allocator, Function<Column, Column> transformation, SecondaryIndexManager.Updater indexer)
{
delete(cm.getDeletionInfo());
- for (IColumn column : cm.getSortedColumns())
+ for (Column column : cm.getSortedColumns())
addColumn(transformation.apply(column), allocator, indexer);
// we don't use this for memtables, so we don't bother computing size
@@ -142,12 +128,12 @@ public class TreeMapBackedSortedColumns extends AbstractThreadUnsafeSortedColumn
/**
* We need to go through each column in the column container and resolve it before adding
*/
- public void addAll(ISortedColumns cm, Allocator allocator, Function<IColumn, IColumn> transformation)
+ public void addAll(ISortedColumns cm, Allocator allocator, Function<Column, Column> transformation)
{
addAllWithSizeDelta(cm, allocator, transformation, SecondaryIndexManager.nullUpdater);
}
- public boolean replace(IColumn oldColumn, IColumn newColumn)
+ public boolean replace(Column oldColumn, Column newColumn)
{
if (!oldColumn.name().equals(newColumn.name()))
throw new IllegalArgumentException();
@@ -156,7 +142,7 @@ public class TreeMapBackedSortedColumns extends AbstractThreadUnsafeSortedColumn
// column or the column was not equal to oldColumn (to be coherent
// with other implementation). We optimize for the common case where
// oldColumn do is present though.
- IColumn previous = map.put(oldColumn.name(), newColumn);
+ Column previous = map.put(oldColumn.name(), newColumn);
if (previous == null)
{
map.remove(oldColumn.name());
@@ -170,7 +156,7 @@ public class TreeMapBackedSortedColumns extends AbstractThreadUnsafeSortedColumn
return true;
}
- public IColumn getColumn(ByteBuffer name)
+ public Column getColumn(ByteBuffer name)
{
return map.get(name);
}
@@ -190,12 +176,12 @@ public class TreeMapBackedSortedColumns extends AbstractThreadUnsafeSortedColumn
return map.size();
}
- public Collection<IColumn> getSortedColumns()
+ public Collection<Column> getSortedColumns()
{
return map.values();
}
- public Collection<IColumn> getReverseSortedColumns()
+ public Collection<Column> getReverseSortedColumns()
{
return map.descendingMap().values();
}
@@ -205,17 +191,17 @@ public class TreeMapBackedSortedColumns extends AbstractThreadUnsafeSortedColumn
return map.navigableKeySet();
}
- public Iterator<IColumn> iterator()
+ public Iterator<Column> iterator()
{
return map.values().iterator();
}
- public Iterator<IColumn> iterator(ColumnSlice[] slices)
+ public Iterator<Column> iterator(ColumnSlice[] slices)
{
return new ColumnSlice.NavigableMapIterator(map, slices);
}
- public Iterator<IColumn> reverseIterator(ColumnSlice[] slices)
+ public Iterator<Column> reverseIterator(ColumnSlice[] slices)
{
return new ColumnSlice.NavigableMapIterator(map.descendingMap(), slices);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a005df3/src/java/org/apache/cassandra/db/columniterator/IdentityQueryFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/columniterator/IdentityQueryFilter.java b/src/java/org/apache/cassandra/db/columniterator/IdentityQueryFilter.java
index 9c9682f..9697335 100644
--- a/src/java/org/apache/cassandra/db/columniterator/IdentityQueryFilter.java
+++ b/src/java/org/apache/cassandra/db/columniterator/IdentityQueryFilter.java
@@ -17,7 +17,6 @@
*/
package org.apache.cassandra.db.columniterator;
-import org.apache.cassandra.db.SuperColumn;
import org.apache.cassandra.db.filter.SliceQueryFilter;
import org.apache.cassandra.utils.ByteBufferUtil;
@@ -30,10 +29,4 @@ public class IdentityQueryFilter extends SliceQueryFilter
{
super(ByteBufferUtil.EMPTY_BYTE_BUFFER, ByteBufferUtil.EMPTY_BYTE_BUFFER, false, Integer.MAX_VALUE);
}
-
- public SuperColumn filterSuperColumn(SuperColumn superColumn, int gcBefore)
- {
- // no filtering done, deliberately
- return superColumn;
- }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a005df3/src/java/org/apache/cassandra/db/columniterator/IndexedSliceReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/columniterator/IndexedSliceReader.java b/src/java/org/apache/cassandra/db/columniterator/IndexedSliceReader.java
index db0130e..ac03583 100644
--- a/src/java/org/apache/cassandra/db/columniterator/IndexedSliceReader.java
+++ b/src/java/org/apache/cassandra/db/columniterator/IndexedSliceReader.java
@@ -21,6 +21,7 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayDeque;
import java.util.Deque;
+import java.util.Iterator;
import java.util.List;
import com.google.common.collect.AbstractIterator;
@@ -352,7 +353,9 @@ class IndexedSliceReader extends AbstractIterator<OnDiskAtom> implements OnDiskA
if (file == null)
file = originalInput == null ? sstable.getFileDataInput(positionToSeek) : originalInput;
- OnDiskAtom.Serializer atomSerializer = emptyColumnFamily.getOnDiskSerializer();
+ // Give a bogus atom count since we'll deserialize as long as we're
+ // within the index block but we don't know how much atom is there
+ Iterator<OnDiskAtom> atomIterator = emptyColumnFamily.metadata().getOnDiskIterator(file, Integer.MAX_VALUE, sstable.descriptor.version);
file.seek(positionToSeek);
FileMark mark = file.mark();
@@ -365,7 +368,7 @@ class IndexedSliceReader extends AbstractIterator<OnDiskAtom> implements OnDiskA
{
// Only fetch a new column if we haven't dealt with the previous one.
if (column == null)
- column = atomSerializer.deserializeFromSSTable(file, sstable.descriptor.version);
+ column = atomIterator.next();
// col is before slice
// (If in slice, don't bother checking that until we change slice)
@@ -434,12 +437,10 @@ class IndexedSliceReader extends AbstractIterator<OnDiskAtom> implements OnDiskA
// We remenber when we are whithin a slice to avoid some comparison
boolean inSlice = false;
- OnDiskAtom.Serializer atomSerializer = emptyColumnFamily.getOnDiskSerializer();
- int columns = file.readInt();
-
- for (int i = 0; i < columns; i++)
+ Iterator<OnDiskAtom> atomIterator = emptyColumnFamily.metadata().getOnDiskIterator(file, file.readInt(), sstable.descriptor.version);
+ while (atomIterator.hasNext())
{
- OnDiskAtom column = atomSerializer.deserializeFromSSTable(file, sstable.descriptor.version);
+ OnDiskAtom column = atomIterator.next();
// col is before slice
// (If in slice, don't bother checking that until we change slice)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a005df3/src/java/org/apache/cassandra/db/columniterator/SSTableNamesIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/columniterator/SSTableNamesIterator.java b/src/java/org/apache/cassandra/db/columniterator/SSTableNamesIterator.java
index da4631d..389fbb2 100644
--- a/src/java/org/apache/cassandra/db/columniterator/SSTableNamesIterator.java
+++ b/src/java/org/apache/cassandra/db/columniterator/SSTableNamesIterator.java
@@ -26,7 +26,7 @@ import org.apache.cassandra.db.ColumnFamily;
import org.apache.cassandra.db.ColumnFamilySerializer;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.DeletionInfo;
-import org.apache.cassandra.db.IColumn;
+import org.apache.cassandra.db.Column;
import org.apache.cassandra.db.RowIndexEntry;
import org.apache.cassandra.db.OnDiskAtom;
import org.apache.cassandra.db.marshal.AbstractType;
@@ -196,13 +196,12 @@ public class SSTableNamesIterator extends SimpleAbstractColumnIterator implement
private void readSimpleColumns(FileDataInput file, SortedSet<ByteBuffer> columnNames, List<ByteBuffer> filteredColumnNames, List<OnDiskAtom> result) throws IOException
{
- OnDiskAtom.Serializer atomSerializer = cf.getOnDiskSerializer();
- int columns = file.readInt();
+ Iterator<OnDiskAtom> atomIterator = cf.metadata().getOnDiskIterator(file, file.readInt(), sstable.descriptor.version);
int n = 0;
- for (int i = 0; i < columns; i++)
+ while (atomIterator.hasNext())
{
- OnDiskAtom column = atomSerializer.deserializeFromSSTable(file, sstable.descriptor.version);
- if (column instanceof IColumn)
+ OnDiskAtom column = atomIterator.next();
+ if (column instanceof Column)
{
if (columnNames.contains(column.name()))
{
@@ -255,15 +254,16 @@ public class SSTableNamesIterator extends SimpleAbstractColumnIterator implement
if (file == null)
file = createFileDataInput(positionToSeek);
- OnDiskAtom.Serializer atomSerializer = cf.getOnDiskSerializer();
+ // We'll read as much atom as there is in the index block, so provide a bogus atom count
+ Iterator<OnDiskAtom> atomIterator = cf.metadata().getOnDiskIterator(file, Integer.MAX_VALUE, sstable.descriptor.version);
file.seek(positionToSeek);
FileMark mark = file.mark();
// TODO only completely deserialize columns we are interested in
while (file.bytesPastMark(mark) < indexInfo.width)
{
- OnDiskAtom column = atomSerializer.deserializeFromSSTable(file, sstable.descriptor.version);
+ OnDiskAtom column = atomIterator.next();
// we check vs the original Set, not the filtered List, for efficiency
- if (!(column instanceof IColumn) || columnNames.contains(column.name()))
+ if (!(column instanceof Column) || columnNames.contains(column.name()))
result.add(column);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a005df3/src/java/org/apache/cassandra/db/columniterator/SimpleSliceReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/columniterator/SimpleSliceReader.java b/src/java/org/apache/cassandra/db/columniterator/SimpleSliceReader.java
index d19e6a5..652d60c 100644
--- a/src/java/org/apache/cassandra/db/columniterator/SimpleSliceReader.java
+++ b/src/java/org/apache/cassandra/db/columniterator/SimpleSliceReader.java
@@ -19,6 +19,7 @@ package org.apache.cassandra.db.columniterator;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.Iterator;
import com.google.common.collect.AbstractIterator;
@@ -43,10 +44,9 @@ class SimpleSliceReader extends AbstractIterator<OnDiskAtom> implements OnDiskAt
private final ByteBuffer finishColumn;
private final AbstractType<?> comparator;
private final ColumnFamily emptyColumnFamily;
- private final int columns;
private int i;
private FileMark mark;
- private final OnDiskAtom.Serializer atomSerializer;
+ private final Iterator<OnDiskAtom> atomIterator;
public SimpleSliceReader(SSTableReader sstable, RowIndexEntry indexEntry, FileDataInput input, ByteBuffer finishColumn)
{
@@ -79,8 +79,7 @@ class SimpleSliceReader extends AbstractIterator<OnDiskAtom> implements OnDiskAt
emptyColumnFamily = ColumnFamily.create(sstable.metadata);
emptyColumnFamily.delete(DeletionInfo.serializer().deserializeFromSSTable(file, sstable.descriptor.version));
- atomSerializer = emptyColumnFamily.getOnDiskSerializer();
- columns = file.readInt();
+ atomIterator = emptyColumnFamily.metadata().getOnDiskIterator(file, file.readInt(), sstable.descriptor.version);
mark = file.mark();
}
catch (IOException e)
@@ -92,14 +91,14 @@ class SimpleSliceReader extends AbstractIterator<OnDiskAtom> implements OnDiskAt
protected OnDiskAtom computeNext()
{
- if (i++ >= columns)
+ if (!atomIterator.hasNext())
return endOfData();
OnDiskAtom column;
try
{
file.reset(mark);
- column = atomSerializer.deserializeFromSSTable(file, sstable.descriptor.version);
+ column = atomIterator.next();
}
catch (IOException e)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a005df3/src/java/org/apache/cassandra/db/commitlog/CommitLogDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogDescriptor.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogDescriptor.java
index 17f3519..533219c 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogDescriptor.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogDescriptor.java
@@ -35,11 +35,12 @@ public class CommitLogDescriptor
public static final int LEGACY_VERSION = 1;
public static final int VERSION_12 = 2;
+ public static final int VERSION_20 = 3;
/**
* Increment this number if there is a changes in the commit log disc layout or MessagingVersion changes.
* Note: make sure to handle {@link #getMessagingVersion()}
*/
- public static final int current_version = VERSION_12;
+ public static final int current_version = VERSION_20;
private final int version;
public final long id;
@@ -75,13 +76,15 @@ public class CommitLogDescriptor
public int getMessagingVersion()
{
- assert MessagingService.current_version == MessagingService.VERSION_12;
+ assert MessagingService.current_version == MessagingService.VERSION_20;
switch (version)
{
case LEGACY_VERSION:
return MessagingService.VERSION_11;
case VERSION_12:
return MessagingService.VERSION_12;
+ case VERSION_20:
+ return MessagingService.VERSION_20;
default:
throw new IllegalStateException("Unknown commitlog version " + version);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a005df3/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
index 9f949d0..3ef3693 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
@@ -36,7 +36,6 @@ import org.apache.cassandra.concurrent.Stage;
import org.apache.cassandra.concurrent.StageManager;
import org.apache.cassandra.config.Schema;
import org.apache.cassandra.db.*;
-import org.apache.cassandra.io.IColumnSerializer;
import org.apache.cassandra.io.util.FastByteArrayInputStream;
import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.io.util.RandomAccessReader;
@@ -201,7 +200,7 @@ public class CommitLogReplayer
{
// assuming version here. We've gone to lengths to make sure what gets written to the CL is in
// the current version. so do make sure the CL is drained prior to upgrading a node.
- rm = RowMutation.serializer.deserialize(new DataInputStream(bufIn), version, IColumnSerializer.Flag.LOCAL);
+ rm = RowMutation.serializer.deserialize(new DataInputStream(bufIn), version, ColumnSerializer.Flag.LOCAL);
}
catch (UnknownColumnFamilyException ex)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a005df3/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index eec4b4c..e93fdc1 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -603,7 +603,7 @@ public class CompactionManager implements CompactionManagerMBean
SSTableScanner scanner = sstable.getDirectScanner();
long rowsRead = 0;
- List<IColumn> indexedColumnsInRow = null;
+ List<Column> indexedColumnsInRow = null;
CleanupInfo ci = new CleanupInfo(sstable, scanner);
metrics.beginCompaction(ci);
@@ -637,12 +637,12 @@ public class CompactionManager implements CompactionManagerMBean
OnDiskAtom column = row.next();
if (column instanceof CounterColumn)
renewer.maybeRenew((CounterColumn) column);
- if (column instanceof IColumn && cfs.indexManager.indexes((IColumn)column))
+ if (column instanceof Column && cfs.indexManager.indexes((Column)column))
{
if (indexedColumnsInRow == null)
- indexedColumnsInRow = new ArrayList<IColumn>();
+ indexedColumnsInRow = new ArrayList<Column>();
- indexedColumnsInRow.add((IColumn)column);
+ indexedColumnsInRow.add((Column)column);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a005df3/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java b/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
index 7981d88..6ff9a50 100644
--- a/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
+++ b/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
@@ -252,7 +252,7 @@ public class LazilyCompactedRow extends AbstractCompactedRow implements Iterable
}
else
{
- IColumn column = (IColumn) current;
+ Column column = (Column) current;
container.addColumn(column);
if (container.getColumn(column.name()) != column)
indexer.remove(column);
@@ -284,7 +284,7 @@ public class LazilyCompactedRow extends AbstractCompactedRow implements Iterable
container.clear();
return null;
}
- IColumn reduced = purged.iterator().next();
+ Column reduced = purged.iterator().next();
container.clear();
// PrecompactedRow.removeDeletedAndOldShards have only checked the top-level CF deletion times,
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a005df3/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java b/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java
index 7e1983c..58227f6 100644
--- a/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java
+++ b/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java
@@ -207,7 +207,7 @@ public class ParallelCompactionIterable extends AbstractCompactionIterable
{
// addAll is ok even if cf is an ArrayBackedSortedColumns
SecondaryIndexManager.Updater indexer = controller.cfs.indexManager.updaterFor(row.key, false);
- cf.addAllWithSizeDelta(thisCF, HeapAllocator.instance, Functions.<IColumn>identity(), indexer);
+ cf.addAllWithSizeDelta(thisCF, HeapAllocator.instance, Functions.<Column>identity(), indexer);
}
}
@@ -218,7 +218,7 @@ public class ParallelCompactionIterable extends AbstractCompactionIterable
private class DeserializedColumnIterator implements ICountableColumnIterator
{
private final Row row;
- private Iterator<IColumn> iter;
+ private Iterator<Column> iter;
public DeserializedColumnIterator(Row row)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a005df3/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java b/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java
index be4b20e..213bb8e 100644
--- a/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java
+++ b/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java
@@ -121,7 +121,7 @@ public class PrecompactedRow extends AbstractCompactedRow
else
{
// addAll is ok even if cf is an ArrayBackedSortedColumns
- cf.addAllWithSizeDelta(thisCF, HeapAllocator.instance, Functions.<IColumn>identity(), indexer);
+ cf.addAllWithSizeDelta(thisCF, HeapAllocator.instance, Functions.<Column>identity(), indexer);
}
}
return cf;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a005df3/src/java/org/apache/cassandra/db/filter/ColumnCounter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/ColumnCounter.java b/src/java/org/apache/cassandra/db/filter/ColumnCounter.java
index ccf83d9..697ad02 100644
--- a/src/java/org/apache/cassandra/db/filter/ColumnCounter.java
+++ b/src/java/org/apache/cassandra/db/filter/ColumnCounter.java
@@ -22,8 +22,8 @@ package org.apache.cassandra.db.filter;
import java.nio.ByteBuffer;
-import org.apache.cassandra.db.IColumn;
-import org.apache.cassandra.db.IColumnContainer;
+import org.apache.cassandra.db.Column;
+import org.apache.cassandra.db.ColumnFamily;
import org.apache.cassandra.db.marshal.CompositeType;
import org.apache.cassandra.utils.ByteBufferUtil;
@@ -32,7 +32,7 @@ public class ColumnCounter
protected int live;
protected int ignored;
- public void count(IColumn column, IColumnContainer container)
+ public void count(Column column, ColumnFamily container)
{
if (!isLive(column, container))
ignored++;
@@ -40,7 +40,7 @@ public class ColumnCounter
live++;
}
- protected static boolean isLive(IColumn column, IColumnContainer container)
+ protected static boolean isLive(Column column, ColumnFamily container)
{
return column.isLive() && (!container.deletionInfo().isDeleted(column));
}
@@ -79,7 +79,7 @@ public class ColumnCounter
assert toGroup == 0 || type != null;
}
- public void count(IColumn column, IColumnContainer container)
+ public void count(Column column, ColumnFamily container)
{
if (!isLive(column, container))
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a005df3/src/java/org/apache/cassandra/db/filter/ColumnSlice.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/ColumnSlice.java b/src/java/org/apache/cassandra/db/filter/ColumnSlice.java
index cdcdc17..e62a436 100644
--- a/src/java/org/apache/cassandra/db/filter/ColumnSlice.java
+++ b/src/java/org/apache/cassandra/db/filter/ColumnSlice.java
@@ -140,21 +140,21 @@ public class ColumnSlice
}
}
- public static class NavigableMapIterator extends AbstractIterator<IColumn>
+ public static class NavigableMapIterator extends AbstractIterator<Column>
{
- private final NavigableMap<ByteBuffer, IColumn> map;
+ private final NavigableMap<ByteBuffer, Column> map;
private final ColumnSlice[] slices;
private int idx = 0;
- private Iterator<IColumn> currentSlice;
+ private Iterator<Column> currentSlice;
- public NavigableMapIterator(NavigableMap<ByteBuffer, IColumn> map, ColumnSlice[] slices)
+ public NavigableMapIterator(NavigableMap<ByteBuffer, Column> map, ColumnSlice[] slices)
{
this.map = map;
this.slices = slices;
}
- protected IColumn computeNext()
+ protected Column computeNext()
{
if (currentSlice == null)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a005df3/src/java/org/apache/cassandra/db/filter/ExtendedFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/ExtendedFilter.java b/src/java/org/apache/cassandra/db/filter/ExtendedFilter.java
index 4772c53..41f9d28 100644
--- a/src/java/org/apache/cassandra/db/filter/ExtendedFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/ExtendedFilter.java
@@ -281,7 +281,7 @@ public abstract class ExtendedFilter
{
// check column data vs expression
ByteBuffer colName = builder == null ? expression.column_name : builder.copy().add(expression.column_name).build();
- IColumn column = data.getColumn(colName);
+ Column column = data.getColumn(colName);
if (column == null)
return false;
int v = data.metadata().getValueValidator(expression.column_name).compare(column.value(), expression.value);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a005df3/src/java/org/apache/cassandra/db/filter/IDiskAtomFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/IDiskAtomFilter.java b/src/java/org/apache/cassandra/db/filter/IDiskAtomFilter.java
index f1d9611..5115e95 100644
--- a/src/java/org/apache/cassandra/db/filter/IDiskAtomFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/IDiskAtomFilter.java
@@ -66,15 +66,9 @@ public interface IDiskAtomFilter
* by the filter code, which should have some limit on the number of columns
* to avoid running out of memory on large rows.
*/
- public void collectReducedColumns(IColumnContainer container, Iterator<IColumn> reducedColumns, int gcBefore);
+ public void collectReducedColumns(ColumnFamily container, Iterator<Column> reducedColumns, int gcBefore);
- /**
- * subcolumns of a supercolumn are unindexed, so to pick out parts of those we operate in-memory.
- * @param superColumn may be modified by filtering op.
- */
- public SuperColumn filterSuperColumn(SuperColumn superColumn, int gcBefore);
-
- public Comparator<IColumn> getColumnComparator(AbstractType<?> comparator);
+ public Comparator<Column> getColumnComparator(AbstractType<?> comparator);
public boolean isReversed();
public void updateColumnsLimit(int newLimit);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a005df3/src/java/org/apache/cassandra/db/filter/NamesQueryFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/NamesQueryFilter.java b/src/java/org/apache/cassandra/db/filter/NamesQueryFilter.java
index 0581e12..5d0bc49 100644
--- a/src/java/org/apache/cassandra/db/filter/NamesQueryFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/NamesQueryFilter.java
@@ -86,29 +86,17 @@ public class NamesQueryFilter implements IDiskAtomFilter
return new SSTableNamesIterator(sstable, file, key, columns, indexEntry);
}
- public SuperColumn filterSuperColumn(SuperColumn superColumn, int gcBefore)
- {
- for (IColumn column : superColumn.getSubColumns())
- {
- if (!columns.contains(column.name()) || !QueryFilter.isRelevant(column, superColumn, gcBefore))
- {
- superColumn.remove(column.name());
- }
- }
- return superColumn;
- }
-
- public void collectReducedColumns(IColumnContainer container, Iterator<IColumn> reducedColumns, int gcBefore)
+ public void collectReducedColumns(ColumnFamily container, Iterator<Column> reducedColumns, int gcBefore)
{
while (reducedColumns.hasNext())
{
- IColumn column = reducedColumns.next();
+ Column column = reducedColumns.next();
if (QueryFilter.isRelevant(column, container, gcBefore))
container.addColumn(column);
}
}
- public Comparator<IColumn> getColumnComparator(AbstractType<?> comparator)
+ public Comparator<Column> getColumnComparator(AbstractType<?> comparator)
{
return comparator.columnComparator;
}
@@ -136,7 +124,7 @@ public class NamesQueryFilter implements IDiskAtomFilter
return cf.hasOnlyTombstones() ? 0 : 1;
int count = 0;
- for (IColumn column : cf)
+ for (Column column : cf)
{
if (column.isLive())
count++;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a005df3/src/java/org/apache/cassandra/db/filter/QueryFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/QueryFilter.java b/src/java/org/apache/cassandra/db/filter/QueryFilter.java
index 31b9db7..c6414fa 100644
--- a/src/java/org/apache/cassandra/db/filter/QueryFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/QueryFilter.java
@@ -33,16 +33,14 @@ import org.apache.cassandra.utils.MergeIterator;
public class QueryFilter
{
public final DecoratedKey key;
- public final QueryPath path;
+ public final String cfName;
public final IDiskAtomFilter filter;
- private final IDiskAtomFilter superFilter;
- public QueryFilter(DecoratedKey key, QueryPath path, IDiskAtomFilter filter)
+ public QueryFilter(DecoratedKey key, String cfName, IDiskAtomFilter filter)
{
this.key = key;
- this.path = path;
+ this.cfName = cfName;
this.filter = filter;
- superFilter = path.superColumnName == null ? null : new NamesQueryFilter(path.superColumnName);
}
public OnDiskAtomIterator getMemtableColumnIterator(Memtable memtable)
@@ -56,95 +54,62 @@ public class QueryFilter
public OnDiskAtomIterator getMemtableColumnIterator(ColumnFamily cf, DecoratedKey key)
{
assert cf != null;
- if (path.superColumnName == null)
- return filter.getMemtableColumnIterator(cf, key);
- return superFilter.getMemtableColumnIterator(cf, key);
+ return filter.getMemtableColumnIterator(cf, key);
}
- // TODO move gcBefore into a field
public ISSTableColumnIterator getSSTableColumnIterator(SSTableReader sstable)
{
- if (path.superColumnName == null)
- return filter.getSSTableColumnIterator(sstable, key);
- return superFilter.getSSTableColumnIterator(sstable, key);
+ return filter.getSSTableColumnIterator(sstable, key);
}
public ISSTableColumnIterator getSSTableColumnIterator(SSTableReader sstable, FileDataInput file, DecoratedKey key, RowIndexEntry indexEntry)
{
- if (path.superColumnName == null)
- return filter.getSSTableColumnIterator(sstable, file, key, indexEntry);
- return superFilter.getSSTableColumnIterator(sstable, file, key, indexEntry);
+ return filter.getSSTableColumnIterator(sstable, file, key, indexEntry);
}
public void collateOnDiskAtom(final ColumnFamily returnCF, List<? extends CloseableIterator<OnDiskAtom>> toCollate, final int gcBefore)
{
- List<CloseableIterator<IColumn>> filteredIterators = new ArrayList<CloseableIterator<IColumn>>(toCollate.size());
+ List<CloseableIterator<Column>> filteredIterators = new ArrayList<CloseableIterator<Column>>(toCollate.size());
for (CloseableIterator<OnDiskAtom> iter : toCollate)
filteredIterators.add(gatherTombstones(returnCF, iter));
collateColumns(returnCF, filteredIterators, gcBefore);
}
- // TODO move gcBefore into a field
- public void collateColumns(final ColumnFamily returnCF, List<? extends CloseableIterator<IColumn>> toCollate, final int gcBefore)
+ public void collateColumns(final ColumnFamily returnCF, List<? extends CloseableIterator<Column>> toCollate, final int gcBefore)
{
- IDiskAtomFilter topLevelFilter = (superFilter == null ? filter : superFilter);
-
- Comparator<IColumn> fcomp = topLevelFilter.getColumnComparator(returnCF.getComparator());
+ Comparator<Column> fcomp = filter.getColumnComparator(returnCF.getComparator());
// define a 'reduced' iterator that merges columns w/ the same name, which
// greatly simplifies computing liveColumns in the presence of tombstones.
- MergeIterator.Reducer<IColumn, IColumn> reducer = new MergeIterator.Reducer<IColumn, IColumn>()
+ MergeIterator.Reducer<Column, Column> reducer = new MergeIterator.Reducer<Column, Column>()
{
ColumnFamily curCF = returnCF.cloneMeShallow();
- public void reduce(IColumn current)
+ public void reduce(Column current)
{
- if (curCF.isSuper() && curCF.isEmpty())
- {
- // If it is the first super column we add, we must clone it since other super column may modify
- // it otherwise and it could be aliased in a memtable somewhere. We'll also don't have to care about what
- // consumers make of the result (for instance CFS.getColumnFamily() call removeDeleted() on the
- // result which removes column; which shouldn't be done on the original super column).
- assert current instanceof SuperColumn;
- curCF.addColumn(((SuperColumn) current).cloneMe());
- }
- else
- {
- curCF.addColumn(current);
- }
+ curCF.addColumn(current);
}
- protected IColumn getReduced()
+ protected Column getReduced()
{
- IColumn c = curCF.iterator().next();
- if (superFilter != null)
- {
- // filterSuperColumn only looks at immediate parent (the supercolumn) when determining if a subcolumn
- // is still live, i.e., not shadowed by the parent's tombstone. so, bump it up temporarily to the tombstone
- // time of the cf, if that is greater.
- DeletionInfo delInfo = ((SuperColumn) c).deletionInfo();
- ((SuperColumn) c).delete(returnCF.deletionInfo());
- c = filter.filterSuperColumn((SuperColumn) c, gcBefore);
- ((SuperColumn) c).setDeletionInfo(delInfo); // reset sc tombstone time to what it should be
- }
+ Column c = curCF.iterator().next();
curCF.clear();
-
return c;
}
};
- Iterator<IColumn> reduced = MergeIterator.get(toCollate, fcomp, reducer);
+ Iterator<Column> reduced = MergeIterator.get(toCollate, fcomp, reducer);
- topLevelFilter.collectReducedColumns(returnCF, reduced, gcBefore);
+ filter.collectReducedColumns(returnCF, reduced, gcBefore);
}
/**
* Given an iterator of on disk atom, returns an iterator that filters the tombstone range
* markers adding them to {@code returnCF} and returns the normal column.
*/
- public static CloseableIterator<IColumn> gatherTombstones(final ColumnFamily returnCF, final CloseableIterator<OnDiskAtom> iter)
+ public static CloseableIterator<Column> gatherTombstones(final ColumnFamily returnCF, final CloseableIterator<OnDiskAtom> iter)
{
- return new CloseableIterator<IColumn>()
+ return new CloseableIterator<Column>()
{
- private IColumn next;
+ private Column next;
public boolean hasNext()
{
@@ -155,13 +120,13 @@ public class QueryFilter
return next != null;
}
- public IColumn next()
+ public Column next()
{
if (next == null)
getNext();
assert next != null;
- IColumn toReturn = next;
+ Column toReturn = next;
next = null;
return toReturn;
}
@@ -172,9 +137,9 @@ public class QueryFilter
{
OnDiskAtom atom = iter.next();
- if (atom instanceof IColumn)
+ if (atom instanceof Column)
{
- next = (IColumn)atom;
+ next = (Column)atom;
break;
}
else
@@ -198,10 +163,10 @@ public class QueryFilter
public String getColumnFamilyName()
{
- return path.columnFamilyName;
+ return cfName;
}
- public static boolean isRelevant(IColumn column, IColumnContainer container, int gcBefore)
+ public static boolean isRelevant(Column column, ColumnFamily container, int gcBefore)
{
// the column itself must be not gc-able (it is live, or a still relevant tombstone, or has live subcolumns), (1)
// and if its container is deleted, the column must be changed more recently than the container tombstone (2)
@@ -214,51 +179,48 @@ public class QueryFilter
/**
* @return a QueryFilter object to satisfy the given slice criteria:
* @param key the row to slice
- * @param path path to the level to slice at (CF or SuperColumn)
+ * @param cfName column family to query
* @param start column to start slice at, inclusive; empty for "the first column"
* @param finish column to stop slice at, inclusive; empty for "the last column"
* @param reversed true to start with the largest column (as determined by configured sort order) instead of smallest
* @param limit maximum number of non-deleted columns to return
*/
- public static QueryFilter getSliceFilter(DecoratedKey key, QueryPath path, ByteBuffer start, ByteBuffer finish, boolean reversed, int limit)
+ public static QueryFilter getSliceFilter(DecoratedKey key, String cfName, ByteBuffer start, ByteBuffer finish, boolean reversed, int limit)
{
- return new QueryFilter(key, path, new SliceQueryFilter(start, finish, reversed, limit));
+ return new QueryFilter(key, cfName, new SliceQueryFilter(start, finish, reversed, limit));
}
/**
* return a QueryFilter object that includes every column in the row.
* This is dangerous on large rows; avoid except for test code.
*/
- public static QueryFilter getIdentityFilter(DecoratedKey key, QueryPath path)
+ public static QueryFilter getIdentityFilter(DecoratedKey key, String cfName)
{
- return new QueryFilter(key, path, new IdentityQueryFilter());
+ return new QueryFilter(key, cfName, new IdentityQueryFilter());
}
/**
* @return a QueryFilter object that will return columns matching the given names
* @param key the row to slice
- * @param path path to the level to slice at (CF or SuperColumn)
+ * @param cfName column family to query
* @param columns the column names to restrict the results to, sorted in comparator order
*/
- public static QueryFilter getNamesFilter(DecoratedKey key, QueryPath path, SortedSet<ByteBuffer> columns)
+ public static QueryFilter getNamesFilter(DecoratedKey key, String cfName, SortedSet<ByteBuffer> columns)
{
- return new QueryFilter(key, path, new NamesQueryFilter(columns));
+ return new QueryFilter(key, cfName, new NamesQueryFilter(columns));
}
/**
* convenience method for creating a name filter matching a single column
*/
- public static QueryFilter getNamesFilter(DecoratedKey key, QueryPath path, ByteBuffer column)
+ public static QueryFilter getNamesFilter(DecoratedKey key, String cfName, ByteBuffer column)
{
- return new QueryFilter(key, path, new NamesQueryFilter(column));
+ return new QueryFilter(key, cfName, new NamesQueryFilter(column));
}
@Override
- public String toString() {
- return getClass().getSimpleName() + "(key=" + key +
- ", path=" + path +
- (filter == null ? "" : ", filter=" + filter) +
- (superFilter == null ? "" : ", superFilter=" + superFilter) +
- ")";
+ public String toString()
+ {
+ return getClass().getSimpleName() + "(key=" + key + ", cfName=" + cfName + (filter == null ? "" : ", filter=" + filter) + ")";
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a005df3/src/java/org/apache/cassandra/db/filter/QueryPath.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/QueryPath.java b/src/java/org/apache/cassandra/db/filter/QueryPath.java
index 0bae441..022631e 100644
--- a/src/java/org/apache/cassandra/db/filter/QueryPath.java
+++ b/src/java/org/apache/cassandra/db/filter/QueryPath.java
@@ -21,10 +21,12 @@ import java.io.*;
import java.nio.ByteBuffer;
import org.apache.cassandra.db.TypeSizes;
-import org.apache.cassandra.thrift.ColumnParent;
-import org.apache.cassandra.thrift.ColumnPath;
import org.apache.cassandra.utils.ByteBufferUtil;
+/**
+ * This class is obsolete internally, but kept for wire compatibility with
+ * older nodes. I.e. we kept it only for the serialization part.
+ */
public class QueryPath
{
public final String columnFamilyName;
@@ -38,31 +40,11 @@ public class QueryPath
this.columnName = columnName;
}
- public QueryPath(ColumnParent columnParent)
- {
- this(columnParent.column_family, columnParent.super_column, null);
- }
-
public QueryPath(String columnFamilyName, ByteBuffer superColumnName)
{
this(columnFamilyName, superColumnName, null);
}
- public QueryPath(String columnFamilyName)
- {
- this(columnFamilyName, null);
- }
-
- public QueryPath(ColumnPath column_path)
- {
- this(column_path.column_family, column_path.super_column, column_path.column);
- }
-
- public static QueryPath column(ByteBuffer columnName)
- {
- return new QueryPath(null, null, columnName);
- }
-
@Override
public String toString()
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a005df3/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java b/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java
index 2971151..25e7053 100644
--- a/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java
@@ -46,7 +46,7 @@ public class SliceQueryFilter implements IDiskAtomFilter
public final ColumnSlice[] slices;
public final boolean reversed;
public volatile int count;
- private final int compositesToGroup;
+ public final int compositesToGroup;
// This is a hack to allow rolling upgrade with pre-1.2 nodes
private final int countMutliplierForCompatibility;
@@ -91,6 +91,11 @@ public class SliceQueryFilter implements IDiskAtomFilter
return new SliceQueryFilter(newSlices, reversed, count, compositesToGroup, countMutliplierForCompatibility);
}
+ public SliceQueryFilter withUpdatedSlice(ByteBuffer start, ByteBuffer finish)
+ {
+ return new SliceQueryFilter(new ColumnSlice[]{ new ColumnSlice(start, finish) }, reversed, count, compositesToGroup, countMutliplierForCompatibility);
+ }
+
public OnDiskAtomIterator getMemtableColumnIterator(ColumnFamily cf, DecoratedKey key)
{
return Memtable.getSliceIterator(key, cf, this);
@@ -106,57 +111,18 @@ public class SliceQueryFilter implements IDiskAtomFilter
return new SSTableSliceIterator(sstable, file, key, slices, reversed, indexEntry);
}
- public SuperColumn filterSuperColumn(SuperColumn superColumn, int gcBefore)
- {
- // we clone shallow, then add, under the theory that generally we're interested in a relatively small number of subcolumns.
- // this may be a poor assumption.
- SuperColumn scFiltered = superColumn.cloneMeShallow();
- final Iterator<IColumn> subcolumns;
- if (reversed)
- {
- List<IColumn> columnsAsList = new ArrayList<IColumn>(superColumn.getSubColumns());
- subcolumns = Lists.reverse(columnsAsList).iterator();
- }
- else
- {
- subcolumns = superColumn.getSubColumns().iterator();
- }
- final Comparator<ByteBuffer> comparator = reversed ? superColumn.getComparator().reverseComparator : superColumn.getComparator();
- Iterator<IColumn> results = new AbstractIterator<IColumn>()
- {
- protected IColumn computeNext()
- {
- while (subcolumns.hasNext())
- {
- IColumn subcolumn = subcolumns.next();
- // iterate until we get to the "real" start column
- if (comparator.compare(subcolumn.name(), start()) < 0)
- continue;
- // exit loop when columns are out of the range.
- if (finish().remaining() > 0 && comparator.compare(subcolumn.name(), finish()) > 0)
- break;
- return subcolumn;
- }
- return endOfData();
- }
- };
- // subcolumns is either empty now, or has been redefined in the loop above. either is ok.
- collectReducedColumns(scFiltered, results, gcBefore);
- return scFiltered;
- }
-
- public Comparator<IColumn> getColumnComparator(AbstractType<?> comparator)
+ public Comparator<Column> getColumnComparator(AbstractType<?> comparator)
{
return reversed ? comparator.columnReverseComparator : comparator.columnComparator;
}
- public void collectReducedColumns(IColumnContainer container, Iterator<IColumn> reducedColumns, int gcBefore)
+ public void collectReducedColumns(ColumnFamily container, Iterator<Column> reducedColumns, int gcBefore)
{
columnCounter = getColumnCounter(container);
while (reducedColumns.hasNext())
{
- IColumn column = reducedColumns.next();
+ Column column = reducedColumns.next();
if (logger.isTraceEnabled())
logger.trace(String.format("collecting %s of %s: %s",
columnCounter.live(), count, column.getString(container.getComparator())));
@@ -177,12 +143,12 @@ public class SliceQueryFilter implements IDiskAtomFilter
public int getLiveCount(ColumnFamily cf)
{
ColumnCounter counter = getColumnCounter(cf);
- for (IColumn column : cf)
+ for (Column column : cf)
counter.count(column, cf);
return counter.live();
}
- private ColumnCounter getColumnCounter(IColumnContainer container)
+ private ColumnCounter getColumnCounter(ColumnFamily container)
{
AbstractType<?> comparator = container.getComparator();
if (compositesToGroup < 0)
@@ -200,11 +166,11 @@ public class SliceQueryFilter implements IDiskAtomFilter
Collection<ByteBuffer> toRemove = null;
boolean trimRemaining = false;
- Collection<IColumn> columns = reversed
- ? cf.getReverseSortedColumns()
- : cf.getSortedColumns();
+ Collection<Column> columns = reversed
+ ? cf.getReverseSortedColumns()
+ : cf.getSortedColumns();
- for (IColumn column : columns)
+ for (Column column : columns)
{
if (trimRemaining)
{