You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2013/12/17 17:03:47 UTC
[11/13] Push composites support in the storage engine
http://git-wip-us.apache.org/repos/asf/cassandra/blob/362cc053/src/java/org/apache/cassandra/cql3/statements/CreateIndexStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/CreateIndexStatement.java b/src/java/org/apache/cassandra/cql3/statements/CreateIndexStatement.java
index ae6c15c..e0e2693 100644
--- a/src/java/org/apache/cassandra/cql3/statements/CreateIndexStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/CreateIndexStatement.java
@@ -31,7 +31,6 @@ import org.apache.cassandra.config.ColumnDefinition;
import org.apache.cassandra.config.IndexType;
import org.apache.cassandra.config.Schema;
import org.apache.cassandra.db.index.SecondaryIndex;
-import org.apache.cassandra.db.marshal.CompositeType;
import org.apache.cassandra.exceptions.*;
import org.apache.cassandra.cql3.*;
import org.apache.cassandra.service.ClientState;
@@ -91,7 +90,7 @@ public class CreateIndexStatement extends SchemaAlteringStatement
throw new InvalidRequestException("Cannot specify index class for a non-CUSTOM index");
// TODO: we could lift that limitation
- if (cfm.isDense() && cd.kind != ColumnDefinition.Kind.REGULAR)
+ if (cfm.comparator.isDense() && cd.kind != ColumnDefinition.Kind.REGULAR)
throw new InvalidRequestException(String.format("Secondary index on %s column %s is not yet supported for compact table", cd.kind, columnName));
if (cd.kind == ColumnDefinition.Kind.PARTITION_KEY && cd.isOnAllComponents())
@@ -111,7 +110,7 @@ public class CreateIndexStatement extends SchemaAlteringStatement
{
cd.setIndexType(IndexType.CUSTOM, Collections.singletonMap(SecondaryIndex.CUSTOM_INDEX_OPTION_NAME, indexClass));
}
- else if (cfm.hasCompositeComparator())
+ else if (cfm.comparator.isCompound())
{
Map<String, String> options = Collections.emptyMap();
// For now, we only allow indexing values for collections, but we could later allow
@@ -119,8 +118,7 @@ public class CreateIndexStatement extends SchemaAlteringStatement
// lives easier then.
if (cd.type.isCollection())
options = ImmutableMap.of("index_values", "");
-
- cd.setIndexType(IndexType.COMPOSITES, options);
+ cd.setIndexType(IndexType.COMPOSITES, Collections.<String, String>emptyMap());
}
else
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/362cc053/src/java/org/apache/cassandra/cql3/statements/CreateTableStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/CreateTableStatement.java b/src/java/org/apache/cassandra/cql3/statements/CreateTableStatement.java
index 809e0dc..8f934e3 100644
--- a/src/java/org/apache/cassandra/cql3/statements/CreateTableStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/CreateTableStatement.java
@@ -30,6 +30,7 @@ import org.apache.cassandra.config.ColumnDefinition;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.Schema;
import org.apache.cassandra.cql3.*;
+import org.apache.cassandra.db.composites.*;
import org.apache.cassandra.db.ColumnFamilyType;
import org.apache.cassandra.db.marshal.*;
import org.apache.cassandra.exceptions.AlreadyExistsException;
@@ -43,7 +44,7 @@ import org.apache.cassandra.utils.ByteBufferUtil;
/** A <code>CREATE TABLE</code> parsed from a CQL query statement. */
public class CreateTableStatement extends SchemaAlteringStatement
{
- public AbstractType<?> comparator;
+ public CellNameType comparator;
private AbstractType<?> defaultValidator;
private AbstractType<?> keyValidator;
@@ -87,22 +88,12 @@ public class CreateTableStatement extends SchemaAlteringStatement
}
// Column definitions
- private Map<ByteBuffer, ColumnDefinition> getColumns(CFMetaData cfm)
+ private List<ColumnDefinition> getColumns(CFMetaData cfm)
{
- Map<ByteBuffer, ColumnDefinition> columnDefs = new HashMap<ByteBuffer, ColumnDefinition>();
- Integer componentIndex = null;
- if (cfm.hasCompositeComparator())
- {
- CompositeType ct = (CompositeType) comparator;
- componentIndex = ct.types.get(ct.types.size() - 1) instanceof ColumnToCollectionType
- ? ct.types.size() - 2
- : ct.types.size() - 1;
- }
-
+ List<ColumnDefinition> columnDefs = new ArrayList<>(columns.size());
+ Integer componentIndex = comparator.isCompound() ? comparator.clusteringPrefixSize() : null;
for (Map.Entry<ColumnIdentifier, AbstractType> col : columns.entrySet())
- {
- columnDefs.put(col.getKey().bytes, ColumnDefinition.regularDef(cfm, col.getKey().bytes, col.getValue(), componentIndex));
- }
+ columnDefs.add(ColumnDefinition.regularDef(cfm, col.getKey().bytes, col.getValue(), componentIndex));
return columnDefs;
}
@@ -138,8 +129,7 @@ public class CreateTableStatement extends SchemaAlteringStatement
newCFMD = new CFMetaData(keyspace(),
columnFamily(),
ColumnFamilyType.Standard,
- comparator,
- null);
+ comparator);
applyPropertiesTo(newCFMD);
return newCFMD;
}
@@ -148,10 +138,10 @@ public class CreateTableStatement extends SchemaAlteringStatement
{
cfmd.defaultValidator(defaultValidator)
.keyValidator(keyValidator)
- .columnMetadata(getColumns(cfmd));
+ .addAllColumnDefinitions(getColumns(cfmd));
cfmd.addColumnMetadataFromAliases(keyAliases, keyValidator, ColumnDefinition.Kind.PARTITION_KEY);
- cfmd.addColumnMetadataFromAliases(columnAliases, comparator, ColumnDefinition.Kind.CLUSTERING_COLUMN);
+ cfmd.addColumnMetadataFromAliases(columnAliases, comparator.asAbstractType(), ColumnDefinition.Kind.CLUSTERING_COLUMN);
if (valueAlias != null)
cfmd.addColumnMetadataFromAliases(Collections.<ByteBuffer>singletonList(valueAlias), defaultValidator, ColumnDefinition.Kind.COMPACT_VALUE);
@@ -241,15 +231,13 @@ public class CreateTableStatement extends SchemaAlteringStatement
if (definedCollections != null)
throw new InvalidRequestException("Collection types are not supported with COMPACT STORAGE");
- stmt.comparator = UTF8Type.instance;
+ stmt.comparator = new SimpleSparseCellNameType(UTF8Type.instance);
}
else
{
- List<AbstractType<?>> types = new ArrayList<AbstractType<?>>(definedCollections == null ? 1 : 2);
- types.add(UTF8Type.instance);
- if (definedCollections != null)
- types.add(ColumnToCollectionType.getInstance(definedCollections));
- stmt.comparator = CompositeType.getInstance(types);
+ stmt.comparator = definedCollections == null
+ ? new CompoundSparseCellNameType(Collections.<AbstractType<?>>emptyList())
+ : new CompoundSparseCellNameType.WithCollection(Collections.<AbstractType<?>>emptyList(), ColumnToCollectionType.getInstance(definedCollections));
}
}
else
@@ -261,9 +249,10 @@ public class CreateTableStatement extends SchemaAlteringStatement
if (definedCollections != null)
throw new InvalidRequestException("Collection types are not supported with COMPACT STORAGE");
stmt.columnAliases.add(columnAliases.get(0).bytes);
- stmt.comparator = getTypeAndRemove(stmt.columns, columnAliases.get(0));
- if (stmt.comparator instanceof CounterColumnType)
+ AbstractType<?> at = getTypeAndRemove(stmt.columns, columnAliases.get(0));
+ if (at instanceof CounterColumnType)
throw new InvalidRequestException(String.format("counter type is not supported for PRIMARY KEY part %s", stmt.columnAliases.get(0)));
+ stmt.comparator = new SimpleDenseCellNameType(at);
}
else
{
@@ -282,19 +271,15 @@ public class CreateTableStatement extends SchemaAlteringStatement
{
if (definedCollections != null)
throw new InvalidRequestException("Collection types are not supported with COMPACT STORAGE");
+
+ stmt.comparator = new CompoundDenseCellNameType(types);
}
else
{
- // For sparse, we must add the last UTF8 component
- // and the collection type if there is one
- types.add(UTF8Type.instance);
- if (definedCollections != null)
- types.add(ColumnToCollectionType.getInstance(definedCollections));
+ stmt.comparator = definedCollections == null
+ ? new CompoundSparseCellNameType(types)
+ : new CompoundSparseCellNameType.WithCollection(types, ColumnToCollectionType.getInstance(definedCollections));
}
-
- if (types.isEmpty())
- throw new IllegalStateException("Nonsensical empty parameter list for CompositeType");
- stmt.comparator = CompositeType.getInstance(types);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/362cc053/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java b/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java
index db991c0..b465347 100644
--- a/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java
@@ -24,6 +24,7 @@ import org.apache.cassandra.cql3.*;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.ColumnDefinition;
import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.composites.Composite;
import org.apache.cassandra.exceptions.*;
import org.apache.cassandra.utils.Pair;
@@ -42,47 +43,40 @@ public class DeleteStatement extends ModificationStatement
return false;
}
- public ColumnFamily updateForKey(ByteBuffer key, ColumnNameBuilder builder, UpdateParameters params)
+ public ColumnFamily updateForKey(ByteBuffer key, Composite prefix, UpdateParameters params)
throws InvalidRequestException
{
ColumnFamily cf = TreeMapBackedSortedColumns.factory.create(cfm);
List<Operation> deletions = getOperations();
- boolean fullKey = builder.componentCount() == cfm.clusteringColumns().size();
- boolean isRange = cfm.isDense() ? !fullKey : (!fullKey || deletions.isEmpty());
+ if (prefix.size() < cfm.clusteringColumns().size() && !deletions.isEmpty())
+ throw new InvalidRequestException(String.format("Missing mandatory PRIMARY KEY part %s since %s specified", getFirstEmptyKey(), deletions.iterator().next().column.name));
- if (!deletions.isEmpty() && isRange)
- throw new InvalidRequestException(String.format("Missing mandatory PRIMARY KEY part %s since %s specified", getFirstEmptyKey(), deletions.get(0).columnName));
-
- if (deletions.isEmpty() && builder.componentCount() == 0)
+ if (deletions.isEmpty())
{
- // No columns specified, delete the row
- cf.delete(new DeletionInfo(params.timestamp, params.localDeletionTime));
- }
- else
- {
- if (isRange)
+ // We delete the slice selected by the prefix.
+ // However, for performance reasons, we distinguish 2 cases:
+ // - It's a full internal row delete
+ // - It's a full cell name (i.e it's a dense layout and the prefix is full)
+ if (prefix.isEmpty())
{
- assert deletions.isEmpty();
- ByteBuffer start = builder.build();
- ByteBuffer end = builder.buildAsEndOfRange();
- cf.addAtom(params.makeRangeTombstone(start, end));
+ // No columns specified, delete the row
+ cf.delete(new DeletionInfo(params.timestamp, params.localDeletionTime));
+ }
+ else if (cfm.comparator.isDense() && prefix.size() == cfm.clusteringColumns().size())
+ {
+ cf.addAtom(params.makeTombstone(cfm.comparator.create(prefix, null)));
}
else
{
- // Delete specific columns
- if (cfm.isDense())
- {
- ByteBuffer columnName = builder.build();
- cf.addColumn(params.makeTombstone(columnName));
- }
- else
- {
- for (Operation deletion : deletions)
- deletion.execute(key, cf, builder.copy(), params);
- }
+ cf.addAtom(params.makeRangeTombstone(prefix.slice()));
}
}
+ else
+ {
+ for (Operation op : deletions)
+ op.execute(key, cf, prefix, params);
+ }
return cf;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/362cc053/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
index 25f59c7..2574f73 100644
--- a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
@@ -27,10 +27,10 @@ import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.ColumnDefinition;
import org.apache.cassandra.cql3.*;
import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.composites.CBuilder;
+import org.apache.cassandra.db.composites.Composite;
import org.apache.cassandra.db.filter.ColumnSlice;
import org.apache.cassandra.db.filter.SliceQueryFilter;
-import org.apache.cassandra.db.marshal.CompositeType;
-import org.apache.cassandra.db.marshal.UTF8Type;
import org.apache.cassandra.db.marshal.ListType;
import org.apache.cassandra.db.marshal.BooleanType;
import org.apache.cassandra.exceptions.*;
@@ -72,7 +72,7 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
}
public abstract boolean requireFullClusteringKey();
- public abstract ColumnFamily updateForKey(ByteBuffer key, ColumnNameBuilder builder, UpdateParameters params) throws InvalidRequestException;
+ public abstract ColumnFamily updateForKey(ByteBuffer key, Composite prefix, UpdateParameters params) throws InvalidRequestException;
public int getBoundsTerms()
{
@@ -215,7 +215,7 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
public List<ByteBuffer> buildPartitionKeyNames(List<ByteBuffer> variables)
throws InvalidRequestException
{
- ColumnNameBuilder keyBuilder = cfm.getKeyNameBuilder();
+ CBuilder keyBuilder = cfm.getKeyValidatorAsCType().builder();
List<ByteBuffer> keys = new ArrayList<ByteBuffer>();
for (ColumnDefinition def : cfm.partitionKeyColumns())
{
@@ -231,7 +231,7 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
{
if (val == null)
throw new InvalidRequestException(String.format("Invalid null value for partition key part %s", def.name));
- keys.add(keyBuilder.copy().add(val).build());
+ keys.add(keyBuilder.buildWith(val).toByteBuffer());
}
}
else
@@ -247,10 +247,10 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
return keys;
}
- public ColumnNameBuilder createClusteringPrefixBuilder(List<ByteBuffer> variables)
+ public Composite createClusteringPrefix(List<ByteBuffer> variables)
throws InvalidRequestException
{
- ColumnNameBuilder builder = cfm.getColumnNameBuilder();
+ CBuilder builder = cfm.comparator.prefixBuilder();
ColumnDefinition firstEmptyKey = null;
for (ColumnDefinition def : cfm.clusteringColumns())
{
@@ -258,7 +258,7 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
if (r == null)
{
firstEmptyKey = def;
- if (requireFullClusteringKey() && cfm.hasCompositeComparator() && !cfm.isDense())
+ if (requireFullClusteringKey() && !cfm.comparator.isDense() && cfm.comparator.isCompound())
throw new InvalidRequestException(String.format("Missing mandatory PRIMARY KEY part %s", def.name));
}
else if (firstEmptyKey != null)
@@ -275,7 +275,7 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
builder.add(val);
}
}
- return builder;
+ return builder.build();
}
protected ColumnDefinition getFirstEmptyKey()
@@ -288,7 +288,7 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
return null;
}
- protected Map<ByteBuffer, ColumnGroupMap> readRequiredRows(List<ByteBuffer> partitionKeys, ColumnNameBuilder clusteringPrefix, boolean local, ConsistencyLevel cl)
+ protected Map<ByteBuffer, CQL3Row> readRequiredRows(List<ByteBuffer> partitionKeys, Composite clusteringPrefix, boolean local, ConsistencyLevel cl)
throws RequestExecutionException, RequestValidationException
{
// Lists SET operation incurs a read.
@@ -299,14 +299,14 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
{
if (toRead == null)
toRead = new TreeSet<ColumnIdentifier>();
- toRead.add(op.columnName);
+ toRead.add(op.column.name);
}
}
- return toRead == null ? null : readRows(partitionKeys, clusteringPrefix, toRead, (CompositeType)cfm.comparator, local, cl);
+ return toRead == null ? null : readRows(partitionKeys, clusteringPrefix, toRead, cfm, local, cl);
}
- private Map<ByteBuffer, ColumnGroupMap> readRows(List<ByteBuffer> partitionKeys, ColumnNameBuilder clusteringPrefix, Set<ColumnIdentifier> toRead, CompositeType composite, boolean local, ConsistencyLevel cl)
+ protected Map<ByteBuffer, CQL3Row> readRows(List<ByteBuffer> partitionKeys, Composite rowPrefix, Set<ColumnIdentifier> toRead, CFMetaData cfm, boolean local, ConsistencyLevel cl)
throws RequestExecutionException, RequestValidationException
{
try
@@ -321,11 +321,7 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
ColumnSlice[] slices = new ColumnSlice[toRead.size()];
int i = 0;
for (ColumnIdentifier name : toRead)
- {
- ByteBuffer start = clusteringPrefix.copy().add(name).build();
- ByteBuffer finish = clusteringPrefix.copy().add(name).buildAsEndOfRange();
- slices[i++] = new ColumnSlice(start, finish);
- }
+ slices[i++] = cfm.comparator.create(rowPrefix, name).slice();
List<ReadCommand> commands = new ArrayList<ReadCommand>(partitionKeys.size());
long now = System.currentTimeMillis();
@@ -340,20 +336,19 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
? SelectStatement.readLocally(keyspace(), commands)
: StorageProxy.read(commands, cl);
- Map<ByteBuffer, ColumnGroupMap> map = new HashMap<ByteBuffer, ColumnGroupMap>();
+ Map<ByteBuffer, CQL3Row> map = new HashMap<ByteBuffer, CQL3Row>();
for (Row row : rows)
{
- if (row.cf == null || row.cf.getColumnCount() == 0)
+ if (row.cf == null || row.cf.isEmpty())
continue;
- ColumnGroupMap.Builder groupBuilder = new ColumnGroupMap.Builder(composite, true, now);
- for (Column column : row.cf)
- groupBuilder.add(column);
-
- List<ColumnGroupMap> groups = groupBuilder.groups();
- assert groups.isEmpty() || groups.size() == 1;
- if (!groups.isEmpty())
- map.put(row.key.key, groups.get(0));
+ Iterator<CQL3Row> iter = cfm.comparator.CQL3RowBuilder(now).group(row.cf.getSortedColumns().iterator());
+ if (iter.hasNext())
+ {
+ map.put(row.key.key, iter.next());
+ // We can only update one CQ3Row per partition key at a time (we don't allow IN for clustering key)
+ assert !iter.hasNext();
+ }
}
return map;
}
@@ -402,7 +397,7 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
if (keys.size() > 1)
throw new InvalidRequestException("IN on the partition key is not supported with conditional updates");
- ColumnNameBuilder clusteringPrefix = createClusteringPrefixBuilder(variables);
+ Composite clusteringPrefix = createClusteringPrefix(variables);
ByteBuffer key = keys.get(0);
ThriftValidation.validateKey(cfm, key);
@@ -467,7 +462,7 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
{
List<ColumnDefinition> defs = new ArrayList<>(columnConditions.size());
for (Operation condition : columnConditions)
- defs.add(cfm.getColumnDefinition(condition.columnName));
+ defs.add(condition.column);
selection = Selection.forColumns(defs);
}
@@ -503,10 +498,10 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
throws RequestExecutionException, RequestValidationException
{
List<ByteBuffer> keys = buildPartitionKeyNames(variables);
- ColumnNameBuilder clusteringPrefix = createClusteringPrefixBuilder(variables);
+ Composite clusteringPrefix = createClusteringPrefix(variables);
// Some lists operation requires reading
- Map<ByteBuffer, ColumnGroupMap> rows = readRequiredRows(keys, clusteringPrefix, local, cl);
+ Map<ByteBuffer, CQL3Row> rows = readRequiredRows(keys, clusteringPrefix, local, cl);
UpdateParameters params = new UpdateParameters(cfm, variables, getTimestamp(now, variables), getTimeToLive(variables), rows);
Collection<IMutation> mutations = new ArrayList<IMutation>();
@@ -535,7 +530,7 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
return isCounter() ? new CounterMutation(rm, cl) : rm;
}
- private ColumnFamily buildConditions(ByteBuffer key, ColumnNameBuilder clusteringPrefix, UpdateParameters params)
+ private ColumnFamily buildConditions(ByteBuffer key, Composite clusteringPrefix, UpdateParameters params)
throws InvalidRequestException
{
if (ifNotExists)
@@ -544,15 +539,12 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
ColumnFamily cf = TreeMapBackedSortedColumns.factory.create(cfm);
// CQL row marker
- if (cfm.hasCompositeComparator() && !cfm.isDense() && !cfm.isSuper())
- {
- ByteBuffer name = clusteringPrefix.copy().add(ByteBufferUtil.EMPTY_BYTE_BUFFER).build();
- cf.addColumn(params.makeColumn(name, ByteBufferUtil.EMPTY_BYTE_BUFFER));
- }
+ if (cfm.isCQL3Table())
+ cf.addColumn(params.makeColumn(cfm.comparator.rowMarker(clusteringPrefix), ByteBufferUtil.EMPTY_BYTE_BUFFER));
// Conditions
for (Operation condition : columnConditions)
- condition.execute(key, cf, clusteringPrefix.copy(), params);
+ condition.execute(key, cf, clusteringPrefix, params);
assert !cf.isEmpty();
return cf;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/362cc053/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
index 62ebd21..d9b4e04 100644
--- a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
@@ -29,6 +29,7 @@ import org.github.jamm.MemoryMeter;
import org.apache.cassandra.auth.Permission;
import org.apache.cassandra.cql3.*;
+import org.apache.cassandra.db.composites.*;
import org.apache.cassandra.transport.messages.ResultMessage;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.ColumnDefinition;
@@ -47,7 +48,6 @@ import org.apache.cassandra.thrift.ThriftValidation;
import org.apache.cassandra.serializers.MarshalException;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.Pair;
/**
* Encapsulates a completely parsed SELECT query, including the target
@@ -77,8 +77,9 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
private boolean isKeyRange;
private boolean keyIsInRelation;
private boolean usesSecondaryIndexing;
+ private boolean lastClusteringIsIn;
- private Map<ColumnDefinition, Integer> orderingIndexes;
+ private Map<ColumnIdentifier, Integer> orderingIndexes;
// Used by forSelection below
private static final Parameters defaultParameters = new Parameters(Collections.<ColumnIdentifier, Boolean>emptyMap(), false, false, null, false);
@@ -366,13 +367,9 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
}
else if (isColumnRange())
{
- // For sparse, we used to ask for 'defined columns' * 'asked limit' (where defined columns includes the row marker)
- // to account for the grouping of columns.
- // Since that doesn't work for maps/sets/lists, we now use the compositesToGroup option of SliceQueryFilter.
- // But we must preserve backward compatibility too (for mixed version cluster that is).
- int toGroup = cfm.isDense() ? -1 : cfm.clusteringColumns().size();
- List<ByteBuffer> startBounds = getRequestedBound(Bound.START, variables);
- List<ByteBuffer> endBounds = getRequestedBound(Bound.END, variables);
+ int toGroup = cfm.comparator.isDense() ? -1 : cfm.clusteringColumns().size();
+ List<Composite> startBounds = getRequestedBound(Bound.START, variables);
+ List<Composite> endBounds = getRequestedBound(Bound.END, variables);
assert startBounds.size() == endBounds.size();
// The case where startBounds == 1 is common enough that it's worth optimizing
@@ -402,7 +399,7 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
}
else
{
- SortedSet<ByteBuffer> cellNames = getRequestedColumns(variables);
+ SortedSet<CellName> cellNames = getRequestedColumns(variables);
if (cellNames == null) // in case of IN () for the last column of the key
return null;
QueryProcessor.validateCellNames(cellNames);
@@ -444,7 +441,7 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
private Collection<ByteBuffer> getKeys(final List<ByteBuffer> variables) throws InvalidRequestException
{
List<ByteBuffer> keys = new ArrayList<ByteBuffer>();
- ColumnNameBuilder builder = cfm.getKeyNameBuilder();
+ CBuilder builder = cfm.getKeyValidatorAsCType().builder();
for (ColumnDefinition def : cfm.partitionKeyColumns())
{
Restriction r = keyRestrictions[def.position()];
@@ -458,7 +455,7 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
{
if (val == null)
throw new InvalidRequestException(String.format("Invalid null value for partition key part %s", def.name));
- keys.add(builder.copy().add(val).build());
+ keys.add(builder.buildWith(val).toByteBuffer());
}
}
else
@@ -484,7 +481,7 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
return ByteBufferUtil.EMPTY_BYTE_BUFFER;
// We deal with IN queries for keys in other places, so we know buildBound will return only one result
- return buildBound(b, cfm.partitionKeyColumns(), keyRestrictions, false, cfm.getKeyNameBuilder(), variables).get(0);
+ return buildBound(b, cfm.partitionKeyColumns(), keyRestrictions, false, cfm.getKeyValidatorAsCType(), variables).get(0).toByteBuffer();
}
private Token getTokenBound(Bound b, List<ByteBuffer> variables, IPartitioner<?> p) throws InvalidRequestException
@@ -528,8 +525,8 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
{
// Due to CASSANDRA-5762, we always do a slice for CQL3 tables (not dense, composite).
// Static CF (non dense but non composite) never entails a column slice however
- if (!cfm.isDense())
- return cfm.hasCompositeComparator();
+ if (!cfm.comparator.isDense())
+ return cfm.comparator.isCompound();
// Otherwise (i.e. for compact table where we don't have a row marker anyway and thus don't care about CASSANDRA-5762),
// it is a range query if it has at least one the column alias for which no relation is defined or is not EQ.
@@ -541,15 +538,15 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
return false;
}
- private SortedSet<ByteBuffer> getRequestedColumns(List<ByteBuffer> variables) throws InvalidRequestException
+ private SortedSet<CellName> getRequestedColumns(List<ByteBuffer> variables) throws InvalidRequestException
{
assert !isColumnRange();
- ColumnNameBuilder builder = cfm.getColumnNameBuilder();
+ CBuilder builder = cfm.comparator.prefixBuilder();
Iterator<ColumnDefinition> idIter = cfm.clusteringColumns().iterator();
for (Restriction r : columnRestrictions)
{
- ColumnIdentifier id = idIter.next().name;
+ ColumnDefinition def = idIter.next();
assert r != null && !r.isSlice();
List<ByteBuffer> values = r.values(variables);
@@ -557,7 +554,7 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
{
ByteBuffer val = values.get(0);
if (val == null)
- throw new InvalidRequestException(String.format("Invalid null value for clustering key part %s", id));
+ throw new InvalidRequestException(String.format("Invalid null value for clustering key part %s", def.name));
builder.add(val);
}
else
@@ -567,32 +564,29 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
// for each value of the IN, creates all the columns corresponding to the selection.
if (values.isEmpty())
return null;
- SortedSet<ByteBuffer> columns = new TreeSet<ByteBuffer>(cfm.comparator);
+ SortedSet<CellName> columns = new TreeSet<CellName>(cfm.comparator);
Iterator<ByteBuffer> iter = values.iterator();
while (iter.hasNext())
{
ByteBuffer val = iter.next();
- ColumnNameBuilder b = iter.hasNext() ? builder.copy() : builder;
if (val == null)
- throw new InvalidRequestException(String.format("Invalid null value for clustering key part %s", id));
- b.add(val);
- if (cfm.isDense())
- columns.add(b.build());
- else
- columns.addAll(addSelectedColumns(b));
+ throw new InvalidRequestException(String.format("Invalid null value for clustering key part %s", def.name));
+
+ Composite prefix = builder.buildWith(val);
+ columns.addAll(addSelectedColumns(prefix));
}
return columns;
}
}
- return addSelectedColumns(builder);
+ return addSelectedColumns(builder.build());
}
- private SortedSet<ByteBuffer> addSelectedColumns(ColumnNameBuilder builder)
+ private SortedSet<CellName> addSelectedColumns(Composite prefix)
{
- if (cfm.isDense())
+ if (cfm.comparator.isDense())
{
- return FBUtilities.singleton(builder.build());
+ return FBUtilities.singleton(cfm.comparator.create(prefix, null), cfm.comparator);
}
else
{
@@ -600,31 +594,26 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
// non-know set of columns, so we shouldn't get there
assert !selectACollection();
- SortedSet<ByteBuffer> columns = new TreeSet<ByteBuffer>(cfm.comparator);
+ SortedSet<CellName> columns = new TreeSet<CellName>(cfm.comparator);
// We need to query the selected column as well as the marker
// column (for the case where the row exists but has no columns outside the PK)
// Two exceptions are "static CF" (non-composite non-compact CF) and "super CF"
// that don't have marker and for which we must query all columns instead
- if (cfm.hasCompositeComparator() && !cfm.isSuper())
+ if (cfm.comparator.isCompound() && !cfm.isSuper())
{
// marker
- columns.add(builder.copy().add(ByteBufferUtil.EMPTY_BYTE_BUFFER).build());
+ columns.add(cfm.comparator.rowMarker(prefix));
// selected columns
- for (ColumnIdentifier id : selection.regularColumnsToFetch())
- columns.add(builder.copy().add(id).build());
+ for (ColumnDefinition def : selection.getColumnsList())
+ if (def.kind == ColumnDefinition.Kind.REGULAR)
+ columns.add(cfm.comparator.create(prefix, def.name));
}
else
{
- Iterator<ColumnDefinition> iter = cfm.regularColumns().iterator();
- while (iter.hasNext())
- {
- ColumnDefinition def = iter.next();
- ColumnNameBuilder b = iter.hasNext() ? builder.copy() : builder;
- ByteBuffer cname = b.add(def.name).build();
- columns.add(cname);
- }
+ for (ColumnDefinition def : cfm.regularColumns())
+ columns.add(cfm.comparator.create(prefix, def.name));
}
return columns;
}
@@ -632,7 +621,7 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
private boolean selectACollection()
{
- if (!cfm.hasCollections())
+ if (!cfm.comparator.hasCollections())
return false;
for (ColumnDefinition def : selection.getColumnsList())
@@ -644,13 +633,15 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
return false;
}
- private List<ByteBuffer> buildBound(Bound bound,
- Collection<ColumnDefinition> defs,
- Restriction[] restrictions,
- boolean isReversed,
- ColumnNameBuilder builder,
- List<ByteBuffer> variables) throws InvalidRequestException
+ private static List<Composite> buildBound(Bound bound,
+ Collection<ColumnDefinition> defs,
+ Restriction[] restrictions,
+ boolean isReversed,
+ CType type,
+ List<ByteBuffer> variables) throws InvalidRequestException
{
+ CBuilder builder = type.builder();
+
// The end-of-component of composite doesn't depend on whether the
// component type is reversed or not (i.e. the ReversedType is applied
// to the component comparator but not to the end-of-component itself),
@@ -668,9 +659,8 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
// There wasn't any non EQ relation on that key, we select all records having the preceding component as prefix.
// For composites, if there was preceding component and we're computing the end, we must change the last component
// End-Of-Component, otherwise we would be selecting only one record.
- return Collections.singletonList(builder.componentCount() > 0 && eocBound == Bound.END
- ? builder.buildAsEndOfRange()
- : builder.build());
+ Composite prefix = builder.build();
+ return Collections.singletonList(!prefix.isEmpty() && eocBound == Bound.END ? prefix.end() : prefix);
}
if (r.isSlice())
@@ -680,7 +670,7 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
ByteBuffer val = slice.bound(b, variables);
if (val == null)
throw new InvalidRequestException(String.format("Invalid null clustering key part %s", def.name));
- return Collections.singletonList(builder.add(val, slice.getRelation(eocBound, b)).build());
+ return Collections.singletonList(builder.add(val).build().withEOC(eocForRelation(slice.getRelation(eocBound, b))));
}
else
{
@@ -691,16 +681,16 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
assert def.position() == defs.size() - 1;
// The IN query might not have listed the values in comparator order, so we need to re-sort
// the bounds lists to make sure the slices works correctly (also, to avoid duplicates).
- TreeSet<ByteBuffer> s = new TreeSet<ByteBuffer>(isReversed ? cfm.comparator.reverseComparator : cfm.comparator);
+ TreeSet<Composite> s = new TreeSet<Composite>(isReversed ? type.reverseComparator() : type);
for (ByteBuffer val : values)
{
if (val == null)
throw new InvalidRequestException(String.format("Invalid null clustering key part %s", def.name));
- ColumnNameBuilder copy = builder.copy().add(val);
+ Composite prefix = builder.buildWith(val);
// See below for why this
- s.add((bound == Bound.END && copy.remainingCount() > 0) ? copy.buildAsEndOfRange() : copy.build());
+ s.add((bound == Bound.END && builder.remainingCount() > 0) ? prefix.end() : prefix);
}
- return new ArrayList<ByteBuffer>(s);
+ return new ArrayList<Composite>(s);
}
ByteBuffer val = values.get(0);
@@ -714,14 +704,34 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
// it would be harmless to do it. However, we use this method got the partition key too. And when a query
// with 2ndary index is done, and with the the partition provided with an EQ, we'll end up here, and in that
// case using the eoc would be bad, since for the random partitioner we have no guarantee that
- // builder.buildAsEndOfRange() will sort after builder.build() (see #5240).
- return Collections.singletonList((bound == Bound.END && builder.remainingCount() > 0) ? builder.buildAsEndOfRange() : builder.build());
+ // prefix.end() will sort after prefix (see #5240).
+ Composite prefix = builder.build();
+ return Collections.singletonList(bound == Bound.END && builder.remainingCount() > 0 ? prefix.end() : prefix);
}
- private List<ByteBuffer> getRequestedBound(Bound b, List<ByteBuffer> variables) throws InvalidRequestException
+ private static Composite.EOC eocForRelation(Relation.Type op)
+ {
+ switch (op)
+ {
+ case LT:
+ // < X => using startOf(X) as finish bound
+ return Composite.EOC.START;
+ case GT:
+ case LTE:
+ // > X => using endOf(X) as start bound
+ // <= X => using endOf(X) as finish bound
+ return Composite.EOC.END;
+ default:
+ // >= X => using X as start bound (could use START_OF too)
+ // = X => using X
+ return Composite.EOC.NONE;
+ }
+ }
+
+ private List<Composite> getRequestedBound(Bound b, List<ByteBuffer> variables) throws InvalidRequestException
{
assert isColumnRange();
- return buildBound(b, cfm.clusteringColumns(), columnRestrictions, isReversed, cfm.getColumnNameBuilder(), variables);
+ return buildBound(b, cfm.clusteringColumns(), columnRestrictions, isReversed, cfm.comparator, variables);
}
public List<IndexExpression> getIndexExpressions(List<ByteBuffer> variables) throws InvalidRequestException
@@ -798,46 +808,30 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
return value;
}
- private Iterable<Column> columnsInOrder(final ColumnFamily cf, final List<ByteBuffer> variables) throws InvalidRequestException
+ private Iterator<Column> applySliceRestriction(final Iterator<Column> cells, final List<ByteBuffer> variables) throws InvalidRequestException
{
- if (columnRestrictions.length == 0)
- return cf.getSortedColumns();
-
- // If the restriction for the last column alias is an IN, respect
- // requested order
- Restriction last = columnRestrictions[columnRestrictions.length - 1];
- if (last == null || last.isSlice())
- return cf.getSortedColumns();
-
- ColumnNameBuilder builder = cfm.getColumnNameBuilder();
- for (int i = 0; i < columnRestrictions.length - 1; i++)
- builder.add(columnRestrictions[i].values(variables).get(0));
-
- List<ByteBuffer> values = last.values(variables);
- final List<ByteBuffer> requested = new ArrayList<ByteBuffer>(values.size());
- Iterator<ByteBuffer> iter = values.iterator();
- while (iter.hasNext())
- {
- ByteBuffer t = iter.next();
- ColumnNameBuilder b = iter.hasNext() ? builder.copy() : builder;
- requested.add(b.add(t).build());
- }
+ assert sliceRestriction != null;
+
+ final CellNameType type = cfm.comparator;
+ final CellName excludedStart = sliceRestriction.isInclusive(Bound.START) ? null : type.makeCellName(sliceRestriction.bound(Bound.START, variables));
+ final CellName excludedEnd = sliceRestriction.isInclusive(Bound.END) ? null : type.makeCellName(sliceRestriction.bound(Bound.END, variables));
- return new Iterable<Column>()
+ return new AbstractIterator<Column>()
{
- public Iterator<Column> iterator()
+ protected Column computeNext()
{
- return new AbstractIterator<Column>()
- {
- Iterator<ByteBuffer> iter = requested.iterator();
- public Column computeNext()
- {
- if (!iter.hasNext())
- return endOfData();
- Column column = cf.getColumn(iter.next());
- return column == null ? computeNext() : column;
- }
- };
+ if (!cells.hasNext())
+ return endOfData();
+
+ Column c = cells.next();
+
+ // For dynamic CF, the column could be out of the requested bounds (because we don't support strict bounds internally (unless
+ // the comparator is composite that is)), filter here
+ if ( (excludedStart != null && type.compare(c.name(), excludedStart) == 0)
+ || (excludedEnd != null && type.compare(c.name(), excludedEnd) == 0) )
+ return computeNext();
+
+ return c;
}
};
}
@@ -856,7 +850,7 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
ResultSet cqlRows = result.build();
- orderResults(cqlRows);
+ orderResults(cqlRows, variables);
// Internal calls always return columns in the comparator order, even when reverse was set
if (isReversed)
@@ -871,177 +865,131 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
void processColumnFamily(ByteBuffer key, ColumnFamily cf, List<ByteBuffer> variables, long now, Selection.ResultSetBuilder result)
throws InvalidRequestException
{
- ByteBuffer[] keyComponents = cfm.getKeyValidator() instanceof CompositeType
- ? ((CompositeType)cfm.getKeyValidator()).split(key)
- : new ByteBuffer[]{ key };
-
- if (parameters.isDistinct)
+ CFMetaData cfm = cf.metadata();
+ ByteBuffer[] keyComponents = null;
+ if (cfm.getKeyValidator() instanceof CompositeType)
{
- if (!cf.hasOnlyTombstones(now))
- {
- result.newRow();
- // selection.getColumnsList() will contain only the partition key components - all of them.
- for (ColumnDefinition def : selection.getColumnsList())
- result.add(keyComponents[def.position()]);
- }
+ keyComponents = ((CompositeType)cfm.getKeyValidator()).split(key);
}
- else if (cfm.isDense())
+ else
{
- // One cqlRow per column
- for (Column c : columnsInOrder(cf, variables))
- {
- if (c.isMarkedForDelete(now))
- continue;
-
- ByteBuffer[] components = null;
- if (cfm.hasCompositeComparator())
- {
- components = ((CompositeType)cfm.comparator).split(c.name());
- }
- else if (sliceRestriction != null)
- {
- // For dynamic CF, the column could be out of the requested bounds, filter here
- if (!sliceRestriction.isInclusive(Bound.START) && c.name().equals(sliceRestriction.bound(Bound.START, variables)))
- continue;
- if (!sliceRestriction.isInclusive(Bound.END) && c.name().equals(sliceRestriction.bound(Bound.END, variables)))
- continue;
- }
-
- result.newRow();
- // Respect selection order
- for (ColumnDefinition def : selection.getColumnsList())
- {
- switch (def.kind)
- {
- case PARTITION_KEY:
- result.add(keyComponents[def.position()]);
- break;
- case CLUSTERING_COLUMN:
- ByteBuffer val = cfm.hasCompositeComparator()
- ? (def.position() < components.length ? components[def.position()] : null)
- : c.name();
- result.add(val);
- break;
- case COMPACT_VALUE:
- result.add(c);
- break;
- case REGULAR:
- // This should not happen for compact CF
- throw new AssertionError();
- default:
- throw new AssertionError();
- }
- }
- }
+ keyComponents = new ByteBuffer[]{ key };
}
- else if (cfm.hasCompositeComparator())
- {
- // Sparse case: group column in cqlRow when composite prefix is equal
- CompositeType composite = (CompositeType)cfm.comparator;
- ColumnGroupMap.Builder builder = new ColumnGroupMap.Builder(composite, cfm.hasCollections(), now);
+ Iterator<Column> cells = cf.getSortedColumns().iterator();
+ if (sliceRestriction != null)
+ cells = applySliceRestriction(cells, variables);
- for (Column c : cf)
- {
- if (c.isMarkedForDelete(now))
- continue;
-
- builder.add(c);
- }
-
- for (ColumnGroupMap group : builder.groups())
- handleGroup(selection, result, keyComponents, group);
- }
- else
+ for (Iterator<CQL3Row> iter = cfm.comparator.CQL3RowBuilder(now).group(cells); iter.hasNext();)
{
- if (cf.hasOnlyTombstones(now))
- return;
+ CQL3Row cql3Row = iter.next();
- // Static case: One cqlRow for all columns
+ // Respect requested order
result.newRow();
+ // Respect selection order
for (ColumnDefinition def : selection.getColumnsList())
{
- if (def.kind == ColumnDefinition.Kind.PARTITION_KEY)
- result.add(keyComponents[def.position()]);
- else
- result.add(cf.getColumn(def.name.bytes));
- }
+ switch (def.kind)
+ {
+ case PARTITION_KEY:
+ result.add(keyComponents[def.position()]);
+ break;
+ case CLUSTERING_COLUMN:
+ result.add(cql3Row.getClusteringColumn(def.position()));
+ break;
+ case COMPACT_VALUE:
+ result.add(cql3Row.getColumn(null));
+ break;
+ case REGULAR:
+ if (def.type.isCollection())
+ {
+ List<Column> collection = cql3Row.getCollection(def.name);
+ ByteBuffer value = collection == null
+ ? null
+ : ((CollectionType)def.type).serialize(collection);
+ result.add(value);
+ }
+ else
+ {
+ result.add(cql3Row.getColumn(def.name));
+ }
+ break;
+ }
+ }
}
}
/**
* Orders results when multiple keys are selected (using IN)
*/
- private void orderResults(ResultSet cqlRows)
+ private void orderResults(ResultSet cqlRows, List<ByteBuffer> variables) throws InvalidRequestException
{
- // There is nothing to do if
- // a. there are no results,
- // b. no ordering information where given,
- // c. key restriction is a Range or not an IN expression
- if (cqlRows.size() == 0 || parameters.orderings.isEmpty() || isKeyRange || !keyIsInRelation)
+ if (cqlRows.size() == 0)
+ return;
+
+ /*
+ * We need to do post-query ordering in 2 cases:
+ * 1) if the last clustering key is restricted by a IN.
+ * 2) if the row key is restricted by a IN and there is some ORDER BY values
+ */
+ if (!(lastClusteringIsIn || (keyIsInRelation && parameters.orderings.size() > 0)))
return;
assert orderingIndexes != null;
- // optimization when only *one* order condition was given
- // because there is no point of using composite comparator if there is only one order condition
- if (parameters.orderings.size() == 1)
+ List<Integer> idToSort = new ArrayList<Integer>();
+ List<Comparator<ByteBuffer>> sorters = new ArrayList<Comparator<ByteBuffer>>();
+
+ // If the restriction for the last clustering key is an IN, respect requested order
+ if (lastClusteringIsIn)
{
- ColumnDefinition ordering = cfm.getColumnDefinition(parameters.orderings.keySet().iterator().next());
- Collections.sort(cqlRows.rows, new SingleColumnComparator(orderingIndexes.get(ordering), ordering.type));
- return;
+ List<ColumnDefinition> cc = cfm.clusteringColumns();
+ idToSort.add(orderingIndexes.get(cc.get(cc.size() - 1).name));
+ Restriction last = columnRestrictions[columnRestrictions.length - 1];
+ sorters.add(makeComparatorFor(last.values(variables)));
}
- // builds a 'composite' type for multi-column comparison from the comparators of the ordering components
- // and passes collected position information and built composite comparator to CompositeComparator to do
- // an actual comparison of the CQL rows.
- List<AbstractType<?>> types = new ArrayList<AbstractType<?>>(parameters.orderings.size());
- int[] positions = new int[parameters.orderings.size()];
-
- int idx = 0;
+ // Then add the order by
for (ColumnIdentifier identifier : parameters.orderings.keySet())
{
ColumnDefinition orderingColumn = cfm.getColumnDefinition(identifier);
- types.add(orderingColumn.type);
- positions[idx++] = orderingIndexes.get(orderingColumn);
+ idToSort.add(orderingIndexes.get(orderingColumn.name));
+ sorters.add(orderingColumn.type);
}
- Collections.sort(cqlRows.rows, new CompositeComparator(types, positions));
+ Comparator<List<ByteBuffer>> comparator = idToSort.size() == 1
+ ? new SingleColumnComparator(idToSort.get(0), sorters.get(0))
+ : new CompositeComparator(sorters, idToSort);
+ Collections.sort(cqlRows.rows, comparator);
}
- private void handleGroup(Selection selection, Selection.ResultSetBuilder result, ByteBuffer[] keyComponents, ColumnGroupMap columns) throws InvalidRequestException
+ // Comparator used when the last clustering key is an IN, to sort result
+ // rows in the order of the values provided for the IN.
+ private Comparator<ByteBuffer> makeComparatorFor(final List<ByteBuffer> values)
{
- // Respect requested order
- result.newRow();
- for (ColumnDefinition def : selection.getColumnsList())
+ // This may not always be the most efficient, but it probably is if
+ // values is small, which is likely to be the most common case.
+ return new Comparator<ByteBuffer>()
{
- switch (def.kind)
+ public int compare(ByteBuffer b1, ByteBuffer b2)
{
- case PARTITION_KEY:
- result.add(keyComponents[def.position()]);
- break;
- case CLUSTERING_COLUMN:
- result.add(columns.getKeyComponent(def.position()));
- break;
- case COMPACT_VALUE:
- // This should not happen for SPARSE
- throw new AssertionError();
- case REGULAR:
- if (def.type.isCollection())
- {
- List<Pair<ByteBuffer, Column>> collection = columns.getCollection(def.name.bytes);
- ByteBuffer value = collection == null
- ? null
- : ((CollectionType)def.type).serialize(collection);
- result.add(value);
- }
- else
- {
- result.add(columns.getSimple(def.name.bytes));
- }
- break;
+ int idx1 = -1;
+ int idx2 = -1;
+ for (int i = 0; i < values.size(); i++)
+ {
+ ByteBuffer bb = values.get(i);
+ if (bb.equals(b1))
+ idx1 = i;
+ if (bb.equals(b2))
+ idx2 = i;
+
+ if (idx1 >= 0 && idx2 >= 0)
+ break;
+ }
+ assert idx1 >= 0 && idx2 >= 0 : "Got CQL3 row that was not queried in resultset";
+ return idx1 - idx2;
}
- }
+ };
}
private static boolean isReversedType(ColumnDefinition def)
@@ -1263,7 +1211,7 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
Restriction.Slice slice = (Restriction.Slice)restriction;
// For non-composite slices, we don't support internally the difference between exclusive and
// inclusive bounds, so we deal with it manually.
- if (!cfm.hasCompositeComparator() && (!slice.isInclusive(Bound.START) || !slice.isInclusive(Bound.END)))
+ if (!cfm.comparator.isCompound() && (!slice.isInclusive(Bound.START) || !slice.isInclusive(Bound.END)))
stmt.sliceRestriction = slice;
}
else if (restriction.isIN())
@@ -1274,6 +1222,7 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
throw new InvalidRequestException(String.format("PRIMARY KEY part %s cannot be restricted by IN relation", cdef.name));
else if (stmt.selectACollection())
throw new InvalidRequestException(String.format("Cannot restrict PRIMARY KEY part %s by IN relation as a collection is selected by the query", cdef.name));
+ stmt.lastClusteringIsIn = true;
}
previous = cdef;
@@ -1311,7 +1260,7 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
// queried automatically, and then removing it from the resultSet afterwards if needed)
if (stmt.keyIsInRelation)
{
- stmt.orderingIndexes = new HashMap<ColumnDefinition, Integer>();
+ stmt.orderingIndexes = new HashMap<ColumnIdentifier, Integer>();
for (ColumnIdentifier column : stmt.parameters.orderings.keySet())
{
final ColumnDefinition def = cfm.getColumnDefinition(column);
@@ -1325,14 +1274,7 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
if (selectClause.isEmpty()) // wildcard
{
- stmt.orderingIndexes.put(def, Iterators.indexOf(cfm.allColumnsInSelectOrder(),
- new Predicate<ColumnDefinition>()
- {
- public boolean apply(ColumnDefinition n)
- {
- return def.equals(n);
- }
- }));
+ stmt.orderingIndexes.put(def.name, indexOf(def, cfm.allColumnsInSelectOrder()));
}
else
{
@@ -1342,7 +1284,7 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
RawSelector selector = selectClause.get(i);
if (def.name.equals(selector.selectable))
{
- stmt.orderingIndexes.put(def, i);
+ stmt.orderingIndexes.put(def.name, i);
hasColumn = true;
break;
}
@@ -1399,6 +1341,16 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
stmt.isReversed = isReversed;
}
+ if (stmt.lastClusteringIsIn)
+ {
+ // This means we'll have to do post-query reordering, so update the orderingIndexes
+ if (stmt.orderingIndexes == null)
+ stmt.orderingIndexes = new HashMap<ColumnIdentifier, Integer>();
+
+ ColumnDefinition last = cfm.clusteringColumns().get(cfm.clusteringColumns().size() - 1);
+ stmt.orderingIndexes.put(last.name, indexOf(last, stmt.selection.getColumnsList().iterator()));
+ }
+
// Make sure this queries is allowed (note: non key range non indexed cannot involve filtering underneath)
if (!parameters.allowFiltering && (stmt.isKeyRange || stmt.usesSecondaryIndexing))
{
@@ -1413,6 +1365,17 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
return new ParsedStatement.Prepared(stmt, names);
}
+ private int indexOf(final ColumnDefinition def, Iterator<ColumnDefinition> defs)
+ {
+ return Iterators.indexOf(defs, new Predicate<ColumnDefinition>()
+ {
+ public boolean apply(ColumnDefinition n)
+ {
+ return def.name.equals(n.name);
+ }
+ });
+ }
+
private void validateDistinctSelection(Collection<ColumnDefinition> requestedColumns, Collection<ColumnDefinition> partitionKey)
throws InvalidRequestException
{
@@ -1584,9 +1547,9 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
private static class SingleColumnComparator implements Comparator<List<ByteBuffer>>
{
private final int index;
- private final AbstractType<?> comparator;
+ private final Comparator<ByteBuffer> comparator;
- public SingleColumnComparator(int columnIndex, AbstractType<?> orderer)
+ public SingleColumnComparator(int columnIndex, Comparator<ByteBuffer> orderer)
{
index = columnIndex;
comparator = orderer;
@@ -1603,10 +1566,10 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
*/
private static class CompositeComparator implements Comparator<List<ByteBuffer>>
{
- private final List<AbstractType<?>> orderTypes;
- private final int[] positions;
+ private final List<Comparator<ByteBuffer>> orderTypes;
+ private final List<Integer> positions;
- private CompositeComparator(List<AbstractType<?>> orderTypes, int[] positions)
+ private CompositeComparator(List<Comparator<ByteBuffer>> orderTypes, List<Integer> positions)
{
this.orderTypes = orderTypes;
this.positions = positions;
@@ -1614,10 +1577,10 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
public int compare(List<ByteBuffer> a, List<ByteBuffer> b)
{
- for (int i = 0; i < positions.length; i++)
+ for (int i = 0; i < positions.size(); i++)
{
- AbstractType<?> type = orderTypes.get(i);
- int columnPos = positions[i];
+ Comparator<ByteBuffer> type = orderTypes.get(i);
+ int columnPos = positions.get(i);
ByteBuffer aValue = a.get(columnPos);
ByteBuffer bValue = b.get(columnPos);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/362cc053/src/java/org/apache/cassandra/cql3/statements/Selection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/Selection.java b/src/java/org/apache/cassandra/cql3/statements/Selection.java
index a578f3f..6a8cfe6 100644
--- a/src/java/org/apache/cassandra/cql3/statements/Selection.java
+++ b/src/java/org/apache/cassandra/cql3/statements/Selection.java
@@ -235,20 +235,6 @@ public abstract class Selection
protected abstract List<ByteBuffer> handleRow(ResultSetBuilder rs) throws InvalidRequestException;
/**
- * @return the list of CQL3 "regular" (the "COLUMN_METADATA" ones) column names to fetch.
- */
- public List<ColumnIdentifier> regularColumnsToFetch()
- {
- List<ColumnIdentifier> toFetch = new ArrayList<ColumnIdentifier>();
- for (ColumnDefinition def : columnsList)
- {
- if (def.kind == ColumnDefinition.Kind.REGULAR)
- toFetch.add(def.name);
- }
- return toFetch;
- }
-
- /**
* @return the list of CQL3 columns value this SelectionClause needs.
*/
public List<ColumnDefinition> getColumnsList()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/362cc053/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java b/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java
index 7a8340d..6cf0856 100644
--- a/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java
@@ -24,7 +24,7 @@ import org.apache.cassandra.cql3.*;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.ColumnDefinition;
import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.marshal.CompositeType;
+import org.apache.cassandra.db.composites.Composite;
import org.apache.cassandra.exceptions.*;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.Pair;
@@ -35,7 +35,7 @@ import org.apache.cassandra.utils.Pair;
*/
public class UpdateStatement extends ModificationStatement
{
- private static final Operation setToEmptyOperation = new Constants.Setter(null, new Constants.Value(ByteBufferUtil.EMPTY_BYTE_BUFFER));
+ private static final Constants.Value EMPTY = new Constants.Value(ByteBufferUtil.EMPTY_BYTE_BUFFER);
private UpdateStatement(int boundTerms, CFMetaData cfm, Attributes attrs)
{
@@ -47,7 +47,7 @@ public class UpdateStatement extends ModificationStatement
return true;
}
- public void addUpdateForKey(ColumnFamily cf, ByteBuffer key, ColumnNameBuilder builder, UpdateParameters params)
+ public void addUpdateForKey(ColumnFamily cf, ByteBuffer key, Composite prefix, UpdateParameters params)
throws InvalidRequestException
{
// Inserting the CQL row marker (see #4361)
@@ -61,17 +61,14 @@ public class UpdateStatement extends ModificationStatement
// 'DELETE FROM t WHERE k = 1' does remove the row entirely)
//
// We never insert markers for Super CF as this would confuse the thrift side.
- if (cfm.hasCompositeComparator() && !cfm.isDense() && !cfm.isSuper())
- {
- ByteBuffer name = builder.copy().add(ByteBufferUtil.EMPTY_BYTE_BUFFER).build();
- cf.addColumn(params.makeColumn(name, ByteBufferUtil.EMPTY_BYTE_BUFFER));
- }
+ if (cfm.isCQL3Table())
+ cf.addColumn(params.makeColumn(cfm.comparator.rowMarker(prefix), ByteBufferUtil.EMPTY_BYTE_BUFFER));
List<Operation> updates = getOperations();
- if (cfm.isDense())
+ if (cfm.comparator.isDense())
{
- if (builder.componentCount() == 0)
+ if (prefix.isEmpty())
throw new InvalidRequestException(String.format("Missing PRIMARY KEY part %s", cfm.clusteringColumns().iterator().next()));
// An empty name for the compact value is what we use to recognize the case where there is not column
@@ -80,7 +77,7 @@ public class UpdateStatement extends ModificationStatement
{
// There is no column outside the PK. So no operation could have passed through validation
assert updates.isEmpty();
- setToEmptyOperation.execute(key, cf, builder.copy(), params);
+ new Constants.Setter(cfm.compactValueColumn(), EMPTY).execute(key, cf, prefix, params);
}
else
{
@@ -89,21 +86,21 @@ public class UpdateStatement extends ModificationStatement
throw new InvalidRequestException(String.format("Column %s is mandatory for this COMPACT STORAGE table", cfm.compactValueColumn().name));
for (Operation update : updates)
- update.execute(key, cf, builder.copy(), params);
+ update.execute(key, cf, prefix, params);
}
}
else
{
for (Operation update : updates)
- update.execute(key, cf, builder.copy(), params);
+ update.execute(key, cf, prefix, params);
}
}
- public ColumnFamily updateForKey(ByteBuffer key, ColumnNameBuilder builder, UpdateParameters params)
+ public ColumnFamily updateForKey(ByteBuffer key, Composite prefix, UpdateParameters params)
throws InvalidRequestException
{
ColumnFamily cf = UnsortedColumns.factory.create(cfm);
- addUpdateForKey(cf, key, builder, params);
+ addUpdateForKey(cf, key, prefix, params);
return cf;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/362cc053/src/java/org/apache/cassandra/db/ArrayBackedSortedColumns.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ArrayBackedSortedColumns.java b/src/java/org/apache/cassandra/db/ArrayBackedSortedColumns.java
index d2825c1..432b47e 100644
--- a/src/java/org/apache/cassandra/db/ArrayBackedSortedColumns.java
+++ b/src/java/org/apache/cassandra/db/ArrayBackedSortedColumns.java
@@ -17,7 +17,6 @@
*/
package org.apache.cassandra.db;
-import java.nio.ByteBuffer;
import java.util.*;
import com.google.common.base.Function;
@@ -26,8 +25,10 @@ import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.composites.CellName;
+import org.apache.cassandra.db.composites.CellNameType;
+import org.apache.cassandra.db.composites.Composite;
import org.apache.cassandra.db.filter.ColumnSlice;
-import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.utils.Allocator;
/**
@@ -79,12 +80,12 @@ public class ArrayBackedSortedColumns extends AbstractThreadUnsafeSortedColumns
return reversed;
}
- private Comparator<ByteBuffer> internalComparator()
+ private Comparator<Composite> internalComparator()
{
- return reversed ? getComparator().reverseComparator : getComparator();
+ return reversed ? getComparator().reverseComparator() : getComparator();
}
- public Column getColumn(ByteBuffer name)
+ public Column getColumn(CellName name)
{
int pos = binarySearch(name);
return pos >= 0 ? columns.get(pos) : null;
@@ -147,7 +148,7 @@ public class ArrayBackedSortedColumns extends AbstractThreadUnsafeSortedColumns
columns.set(i, reconciledColumn);
}
- private int binarySearch(ByteBuffer name)
+ private int binarySearch(CellName name)
{
return binarySearch(columns, internalComparator(), name, 0);
}
@@ -158,7 +159,7 @@ public class ArrayBackedSortedColumns extends AbstractThreadUnsafeSortedColumns
* (We don't use Collections.binarySearch() directly because it would require us to create
* a fake Column (as well as an Column comparator) to do the search, which is ugly.
*/
- private static int binarySearch(List<Column> columns, Comparator<ByteBuffer> comparator, ByteBuffer name, int start)
+ private static int binarySearch(List<Column> columns, Comparator<Composite> comparator, Composite name, int start)
{
int low = start;
int mid = columns.size();
@@ -266,11 +267,11 @@ public class ArrayBackedSortedColumns extends AbstractThreadUnsafeSortedColumns
columns.clear();
}
- public Iterable<ByteBuffer> getColumnNames()
+ public Iterable<CellName> getColumnNames()
{
- return Iterables.transform(columns, new Function<Column, ByteBuffer>()
+ return Iterables.transform(columns, new Function<Column, CellName>()
{
- public ByteBuffer apply(Column column)
+ public CellName apply(Column column)
{
return column.name;
}
@@ -296,17 +297,17 @@ public class ArrayBackedSortedColumns extends AbstractThreadUnsafeSortedColumns
{
private final List<Column> list;
private final ColumnSlice[] slices;
- private final Comparator<ByteBuffer> comparator;
+ private final Comparator<Composite> comparator;
private int idx = 0;
private int previousSliceEnd = 0;
private Iterator<Column> currentSlice;
- public SlicesIterator(List<Column> list, AbstractType<?> comparator, ColumnSlice[] slices, boolean reversed)
+ public SlicesIterator(List<Column> list, CellNameType comparator, ColumnSlice[] slices, boolean reversed)
{
this.list = reversed ? Lists.reverse(list) : list;
this.slices = slices;
- this.comparator = reversed ? comparator.reverseComparator : comparator;
+ this.comparator = reversed ? comparator.reverseComparator() : comparator;
}
protected Column computeNext()
@@ -318,12 +319,12 @@ public class ArrayBackedSortedColumns extends AbstractThreadUnsafeSortedColumns
ColumnSlice slice = slices[idx++];
// The first idx to include
- int startIdx = slice.start.remaining() == 0 ? 0 : binarySearch(list, comparator, slice.start, previousSliceEnd);
+ int startIdx = slice.start.isEmpty() ? 0 : binarySearch(list, comparator, slice.start, previousSliceEnd);
if (startIdx < 0)
startIdx = -startIdx - 1;
// The first idx to exclude
- int finishIdx = slice.finish.remaining() == 0 ? list.size() - 1 : binarySearch(list, comparator, slice.finish, previousSliceEnd);
+ int finishIdx = slice.finish.isEmpty() ? list.size() - 1 : binarySearch(list, comparator, slice.finish, previousSliceEnd);
if (finishIdx >= 0)
finishIdx++;
else
http://git-wip-us.apache.org/repos/asf/cassandra/blob/362cc053/src/java/org/apache/cassandra/db/AtomDeserializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/AtomDeserializer.java b/src/java/org/apache/cassandra/db/AtomDeserializer.java
new file mode 100644
index 0000000..799ed0e
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/AtomDeserializer.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db;
+
+import java.io.DataInput;
+import java.io.IOException;
+
+import org.apache.cassandra.db.composites.CellName;
+import org.apache.cassandra.db.composites.CellNameType;
+import org.apache.cassandra.db.composites.Composite;
+import org.apache.cassandra.io.sstable.Descriptor;
+
+/**
+ * Helper class to deserialize OnDiskAtom efficiently.
+ *
+ * More precisely, this class is used by the low-level readers
+ * (IndexedSliceReader and SSTableNamesIterator) to ensure we don't
+ * do more work than necessary (i.e. we don't allocate/deserialize
+ * objects for things we don't care about).
+ */
+public class AtomDeserializer
+{
+ private final CellNameType type;
+ private final CellNameType.Deserializer nameDeserializer;
+ private final DataInput in;
+ private final ColumnSerializer.Flag flag;
+ private final int expireBefore;
+ private final Descriptor.Version version;
+
+ public AtomDeserializer(CellNameType type, DataInput in, ColumnSerializer.Flag flag, int expireBefore, Descriptor.Version version)
+ {
+ this.type = type;
+ this.nameDeserializer = type.newDeserializer(in);
+ this.in = in;
+ this.flag = flag;
+ this.expireBefore = expireBefore;
+ this.version = version;
+ }
+
+ /**
+ * Whether or not there is more atom to read.
+ */
+ public boolean hasNext() throws IOException
+ {
+ return nameDeserializer.hasNext();
+ }
+
+ /**
+ * Whether or not some atom has been read but not processed (neither readNext() nor
+ * skipNext() has been called for that atom) yet.
+ */
+ public boolean hasUnprocessed() throws IOException
+ {
+ return nameDeserializer.hasUnprocessed();
+ }
+
+ /**
+ * Compare the provided composite to the next atom to read on disk.
+ *
+ * This will not read/deserialize the whole atom but only what is necessary for the
+ * comparison. Whenever we know what to do with this atom (read it or skip it),
+ * readNext or skipNext should be called.
+ */
+ public int compareNextTo(Composite composite) throws IOException
+ {
+ return nameDeserializer.compareNextTo(composite);
+ }
+
+ /**
+ * Returns the next atom.
+ */
+ public OnDiskAtom readNext() throws IOException
+ {
+ Composite name = nameDeserializer.readNext();
+ assert !name.isEmpty(); // This would imply hasNext() hasn't been called
+ int b = in.readUnsignedByte();
+ if ((b & ColumnSerializer.RANGE_TOMBSTONE_MASK) != 0)
+ return type.rangeTombstoneSerializer().deserializeBody(in, name, version);
+ else
+ return type.columnSerializer().deserializeColumnBody(in, (CellName)name, b, flag, expireBefore);
+ }
+
+ /**
+ * Skips the next atom.
+ */
+ public void skipNext() throws IOException
+ {
+ nameDeserializer.skipNext();
+ int b = in.readUnsignedByte();
+ if ((b & ColumnSerializer.RANGE_TOMBSTONE_MASK) != 0)
+ type.rangeTombstoneSerializer().skipBody(in, version);
+ else
+ type.columnSerializer().skipColumnBody(in, b);
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/362cc053/src/java/org/apache/cassandra/db/AtomicSortedColumns.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/AtomicSortedColumns.java b/src/java/org/apache/cassandra/db/AtomicSortedColumns.java
index b44d8bf..5056c26 100644
--- a/src/java/org/apache/cassandra/db/AtomicSortedColumns.java
+++ b/src/java/org/apache/cassandra/db/AtomicSortedColumns.java
@@ -17,7 +17,6 @@
*/
package org.apache.cassandra.db;
-import java.nio.ByteBuffer;
import java.util.*;
import java.util.concurrent.atomic.AtomicReference;
@@ -27,9 +26,10 @@ import com.google.common.collect.Iterables;
import edu.stanford.ppl.concurrent.SnapTreeMap;
import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.composites.CellName;
+import org.apache.cassandra.db.composites.CellNameType;
import org.apache.cassandra.db.filter.ColumnSlice;
import org.apache.cassandra.db.index.SecondaryIndexManager;
-import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.utils.Allocator;
/**
@@ -70,9 +70,9 @@ public class AtomicSortedColumns extends ColumnFamily
this.ref = new AtomicReference<>(holder);
}
- public AbstractType<?> getComparator()
+ public CellNameType getComparator()
{
- return (AbstractType<?>)ref.get().map.comparator();
+ return (CellNameType)ref.get().map.comparator();
}
public ColumnFamily.Factory getFactory()
@@ -233,12 +233,12 @@ public class AtomicSortedColumns extends ColumnFamily
while (!ref.compareAndSet(current, modified));
}
- public Column getColumn(ByteBuffer name)
+ public Column getColumn(CellName name)
{
return ref.get().map.get(name);
}
- public SortedSet<ByteBuffer> getColumnNames()
+ public SortedSet<CellName> getColumnNames()
{
return ref.get().map.keySet();
}
@@ -279,15 +279,15 @@ public class AtomicSortedColumns extends ColumnFamily
// so we can safely alias one DeletionInfo.live() reference and avoid some allocations.
private static final DeletionInfo LIVE = DeletionInfo.live();
- final SnapTreeMap<ByteBuffer, Column> map;
+ final SnapTreeMap<CellName, Column> map;
final DeletionInfo deletionInfo;
- Holder(AbstractType<?> comparator)
+ Holder(CellNameType comparator)
{
- this(new SnapTreeMap<ByteBuffer, Column>(comparator), LIVE);
+ this(new SnapTreeMap<CellName, Column>(comparator), LIVE);
}
- Holder(SnapTreeMap<ByteBuffer, Column> map, DeletionInfo deletionInfo)
+ Holder(SnapTreeMap<CellName, Column> map, DeletionInfo deletionInfo)
{
this.map = map;
this.deletionInfo = deletionInfo;
@@ -303,7 +303,7 @@ public class AtomicSortedColumns extends ColumnFamily
return new Holder(map, info);
}
- Holder with(SnapTreeMap<ByteBuffer, Column> newMap)
+ Holder with(SnapTreeMap<CellName, Column> newMap)
{
return new Holder(newMap, deletionInfo);
}
@@ -312,12 +312,12 @@ public class AtomicSortedColumns extends ColumnFamily
// afterwards.
Holder clear()
{
- return new Holder(new SnapTreeMap<ByteBuffer, Column>(map.comparator()), LIVE);
+ return new Holder(new SnapTreeMap<CellName, Column>(map.comparator()), LIVE);
}
long addColumn(Column column, Allocator allocator, SecondaryIndexManager.Updater indexer)
{
- ByteBuffer name = column.name();
+ CellName name = column.name();
while (true)
{
Column oldColumn = map.putIfAbsent(name, column);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/362cc053/src/java/org/apache/cassandra/db/BatchlogManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/BatchlogManager.java b/src/java/org/apache/cassandra/db/BatchlogManager.java
index bba3ffe..9bc857b 100644
--- a/src/java/org/apache/cassandra/db/BatchlogManager.java
+++ b/src/java/org/apache/cassandra/db/BatchlogManager.java
@@ -35,6 +35,7 @@ import javax.management.ObjectName;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
+import org.apache.cassandra.db.composites.CellName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -45,7 +46,6 @@ import org.apache.cassandra.cql3.QueryProcessor;
import org.apache.cassandra.cql3.UntypedResultSet;
import org.apache.cassandra.db.compaction.CompactionManager;
import org.apache.cassandra.db.marshal.LongType;
-import org.apache.cassandra.db.marshal.UTF8Type;
import org.apache.cassandra.db.marshal.UUIDType;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.exceptions.WriteTimeoutException;
@@ -128,9 +128,9 @@ public class BatchlogManager implements BatchlogManagerMBean
ByteBuffer data = serializeRowMutations(mutations);
ColumnFamily cf = ArrayBackedSortedColumns.factory.create(CFMetaData.BatchlogCf);
- cf.addColumn(new Column(columnName(""), ByteBufferUtil.EMPTY_BYTE_BUFFER, timestamp));
- cf.addColumn(new Column(columnName("data"), data, timestamp));
- cf.addColumn(new Column(columnName("written_at"), writtenAt, timestamp));
+ cf.addColumn(new Column(cellName(""), ByteBufferUtil.EMPTY_BYTE_BUFFER, timestamp));
+ cf.addColumn(new Column(cellName("data"), data, timestamp));
+ cf.addColumn(new Column(cellName("written_at"), writtenAt, timestamp));
return new RowMutation(Keyspace.SYSTEM_KS, UUIDType.instance.decompose(uuid), cf);
}
@@ -282,9 +282,9 @@ public class BatchlogManager implements BatchlogManagerMBean
return (int) ((HintedHandOffManager.calculateHintTTL(mutation) * 1000 - (System.currentTimeMillis() - writtenAt)) / 1000);
}
- private static ByteBuffer columnName(String name)
+ private static CellName cellName(String name)
{
- return CFMetaData.BatchlogCf.getColumnNameBuilder().add(UTF8Type.instance.decompose(name)).build();
+ return CFMetaData.BatchlogCf.comparator.makeCellName(name);
}
// force flush + compaction to reclaim space from the replayed batches