You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2013/12/17 17:03:46 UTC
[10/13] Push composites support in the storage engine
http://git-wip-us.apache.org/repos/asf/cassandra/blob/362cc053/src/java/org/apache/cassandra/db/CFRowAdder.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/CFRowAdder.java b/src/java/org/apache/cassandra/db/CFRowAdder.java
new file mode 100644
index 0000000..7c70cf6
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/CFRowAdder.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db;
+
+import org.apache.cassandra.cql3.ColumnIdentifier;
+import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.db.composites.CellName;
+import org.apache.cassandra.db.composites.Composite;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.marshal.MapType;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+/**
+ * Convenience object to populate a given CQL3 row in a ColumnFamily object.
+ *
+ * This is meant for when performance is not of the utmost importance. When
+ * performance matters, it might be worth allocating such builder.
+ */
+public class CFRowAdder
+{
+ public final ColumnFamily cf;
+ public final Composite prefix;
+ public final long timestamp;
+ private final int ldt;
+
+ public CFRowAdder(ColumnFamily cf, Composite prefix, long timestamp)
+ {
+ this.cf = cf;
+ this.prefix = prefix;
+ this.timestamp = timestamp;
+ this.ldt = (int) (System.currentTimeMillis() / 1000);
+
+ // If a CQL3 table, add the row marker
+ if (cf.metadata().isCQL3Table())
+ cf.addColumn(new Column(cf.getComparator().rowMarker(prefix), ByteBufferUtil.EMPTY_BYTE_BUFFER, timestamp));
+ }
+
+ public CFRowAdder add(String cql3ColumnName, Object value)
+ {
+ ColumnDefinition def = getDefinition(cql3ColumnName);
+ return add(cf.getComparator().create(prefix, def.name), def, value);
+ }
+
+ public CFRowAdder addMapEntry(String cql3ColumnName, Object key, Object value)
+ {
+ ColumnDefinition def = getDefinition(cql3ColumnName);
+ assert def.type instanceof MapType;
+ MapType mt = (MapType)def.type;
+ CellName name = cf.getComparator().create(prefix, def.name, mt.keys.decompose(key));
+ return add(name, def, value);
+ }
+
+ private ColumnDefinition getDefinition(String name)
+ {
+ return cf.metadata().getColumnDefinition(new ColumnIdentifier(name, false));
+ }
+
+ private CFRowAdder add(CellName name, ColumnDefinition def, Object value)
+ {
+ if (value == null)
+ cf.addColumn(new DeletedColumn(name, ldt, timestamp));
+ else
+ cf.addColumn(new Column(name, ((AbstractType)def.type).decompose(value), timestamp));
+ return this;
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/362cc053/src/java/org/apache/cassandra/db/CollationController.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/CollationController.java b/src/java/org/apache/cassandra/db/CollationController.java
index d07e772..12f53db 100644
--- a/src/java/org/apache/cassandra/db/CollationController.java
+++ b/src/java/org/apache/cassandra/db/CollationController.java
@@ -17,13 +17,13 @@
*/
package org.apache.cassandra.db;
-import java.nio.ByteBuffer;
import java.util.*;
import com.google.common.collect.Iterables;
import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
import org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy;
+import org.apache.cassandra.db.composites.CellName;
import org.apache.cassandra.db.filter.NamesQueryFilter;
import org.apache.cassandra.db.filter.QueryFilter;
import org.apache.cassandra.db.marshal.CounterColumnType;
@@ -94,7 +94,7 @@ public class CollationController
// avoid changing the filter columns of the original filter
// (reduceNameFilter removes columns that are known to be irrelevant)
NamesQueryFilter namesFilter = (NamesQueryFilter) filter.filter;
- TreeSet<ByteBuffer> filterColumns = new TreeSet<ByteBuffer>(namesFilter.columns);
+ TreeSet<CellName> filterColumns = new TreeSet<>(namesFilter.columns);
QueryFilter reducedFilter = new QueryFilter(filter.key, filter.cfName, namesFilter.withUpdatedColumns(filterColumns), filter.timestamp);
/* add the SSTables on disk */
@@ -173,9 +173,9 @@ public class CollationController
if (container == null)
return;
- for (Iterator<ByteBuffer> iterator = ((NamesQueryFilter) filter.filter).columns.iterator(); iterator.hasNext(); )
+ for (Iterator<CellName> iterator = ((NamesQueryFilter) filter.filter).columns.iterator(); iterator.hasNext(); )
{
- ByteBuffer filterColumn = iterator.next();
+ CellName filterColumn = iterator.next();
Column column = container.getColumn(filterColumn);
if (column != null && column.timestamp() > sstableTimestamp)
iterator.remove();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/362cc053/src/java/org/apache/cassandra/db/Column.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Column.java b/src/java/org/apache/cassandra/db/Column.java
index e103cd3..4be1eeb 100644
--- a/src/java/org/apache/cassandra/db/Column.java
+++ b/src/java/org/apache/cassandra/db/Column.java
@@ -20,16 +20,15 @@ package org.apache.cassandra.db;
import java.io.DataInput;
import java.io.IOError;
import java.io.IOException;
-import java.net.InetAddress;
import java.nio.ByteBuffer;
import java.security.MessageDigest;
-import java.util.ArrayList;
import java.util.Iterator;
-import java.util.List;
import com.google.common.collect.AbstractIterator;
import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.composites.CellName;
+import org.apache.cassandra.db.composites.CellNameType;
import org.apache.cassandra.db.marshal.*;
import org.apache.cassandra.io.sstable.Descriptor;
import org.apache.cassandra.io.util.DataOutputBuffer;
@@ -46,18 +45,16 @@ public class Column implements OnDiskAtom
{
public static final int MAX_NAME_LENGTH = FBUtilities.MAX_UNSIGNED_SHORT;
- public static final ColumnSerializer serializer = new ColumnSerializer();
-
- public static OnDiskAtom.Serializer onDiskSerializer()
- {
- return OnDiskAtom.Serializer.instance;
- }
-
/**
* For 2.0-formatted sstables (where column count is not stored), @param count should be Integer.MAX_VALUE,
* and we will look for the end-of-row column name marker instead of relying on that.
*/
- public static Iterator<OnDiskAtom> onDiskIterator(final DataInput in, final int count, final ColumnSerializer.Flag flag, final int expireBefore, final Descriptor.Version version)
+ public static Iterator<OnDiskAtom> onDiskIterator(final DataInput in,
+ final int count,
+ final ColumnSerializer.Flag flag,
+ final int expireBefore,
+ final Descriptor.Version version,
+ final CellNameType type)
{
return new AbstractIterator<OnDiskAtom>()
{
@@ -71,7 +68,7 @@ public class Column implements OnDiskAtom
OnDiskAtom atom;
try
{
- atom = onDiskSerializer().deserializeFromSSTable(in, flag, expireBefore, version);
+ atom = type.onDiskAtomSerializer().deserializeFromSSTable(in, flag, expireBefore, version);
}
catch (IOException e)
{
@@ -85,31 +82,30 @@ public class Column implements OnDiskAtom
};
}
- protected final ByteBuffer name;
+ protected final CellName name;
protected final ByteBuffer value;
protected final long timestamp;
- Column(ByteBuffer name)
+ Column(CellName name)
{
this(name, ByteBufferUtil.EMPTY_BYTE_BUFFER);
}
- public Column(ByteBuffer name, ByteBuffer value)
+ public Column(CellName name, ByteBuffer value)
{
this(name, value, 0);
}
- public Column(ByteBuffer name, ByteBuffer value, long timestamp)
+ public Column(CellName name, ByteBuffer value, long timestamp)
{
assert name != null;
assert value != null;
- assert name.remaining() <= Column.MAX_NAME_LENGTH;
this.name = name;
this.value = value;
this.timestamp = timestamp;
}
- public Column withUpdatedName(ByteBuffer newName)
+ public Column withUpdatedName(CellName newName)
{
return new Column(newName, value, timestamp);
}
@@ -119,7 +115,7 @@ public class Column implements OnDiskAtom
return new Column(name, value, newTimestamp);
}
- public ByteBuffer name()
+ public CellName name()
{
return name;
}
@@ -162,10 +158,10 @@ public class Column implements OnDiskAtom
public int dataSize()
{
- return name().remaining() + value.remaining() + TypeSizes.NATIVE.sizeof(timestamp);
+ return name().dataSize() + value.remaining() + TypeSizes.NATIVE.sizeof(timestamp);
}
- public int serializedSize(TypeSizes typeSizes)
+ public int serializedSize(CellNameType type, TypeSizes typeSizes)
{
/*
* Size of a column is =
@@ -175,14 +171,8 @@ public class Column implements OnDiskAtom
* + 4 bytes which basically indicates the size of the byte array
* + entire byte array.
*/
- int nameSize = name.remaining();
int valueSize = value.remaining();
- return typeSizes.sizeof((short) nameSize) + nameSize + 1 + typeSizes.sizeof(timestamp) + typeSizes.sizeof(valueSize) + valueSize;
- }
-
- public long serializedSizeForSSTable()
- {
- return serializedSize(TypeSizes.NATIVE);
+ return ((int)type.cellSerializer().serializedSize(name, typeSizes)) + 1 + typeSizes.sizeof(timestamp) + typeSizes.sizeof(valueSize) + valueSize;
}
public int serializationFlags()
@@ -199,7 +189,7 @@ public class Column implements OnDiskAtom
public void updateDigest(MessageDigest digest)
{
- digest.update(name.duplicate());
+ digest.update(name.toByteBuffer().duplicate());
digest.update(value.duplicate());
DataOutputBuffer buffer = new DataOutputBuffer();
@@ -273,10 +263,10 @@ public class Column implements OnDiskAtom
public Column localCopy(ColumnFamilyStore cfs, Allocator allocator)
{
- return new Column(cfs.internOrCopy(name, allocator), allocator.clone(value), timestamp);
+ return new Column(name.copy(allocator), allocator.clone(value), timestamp);
}
- public String getString(AbstractType<?> comparator)
+ public String getString(CellNameType comparator)
{
StringBuilder sb = new StringBuilder();
sb.append(comparator.getString(name));
@@ -298,7 +288,7 @@ public class Column implements OnDiskAtom
{
validateName(metadata);
- AbstractType<?> valueValidator = metadata.getValueValidatorFromCellName(name());
+ AbstractType<?> valueValidator = metadata.getValueValidator(name());
if (valueValidator != null)
valueValidator.validate(value());
}
@@ -308,7 +298,7 @@ public class Column implements OnDiskAtom
return getLocalDeletionTime() < gcBefore;
}
- public static Column create(ByteBuffer name, ByteBuffer value, long timestamp, int ttl, CFMetaData metadata)
+ public static Column create(CellName name, ByteBuffer value, long timestamp, int ttl, CFMetaData metadata)
{
if (ttl <= 0)
ttl = metadata.getDefaultTimeToLive();
@@ -317,53 +307,4 @@ public class Column implements OnDiskAtom
? new ExpiringColumn(name, value, timestamp, ttl)
: new Column(name, value, timestamp);
}
-
- public static Column create(String value, long timestamp, String... names)
- {
- return new Column(decomposeName(names), UTF8Type.instance.decompose(value), timestamp);
- }
-
- public static Column create(int value, long timestamp, String... names)
- {
- return new Column(decomposeName(names), Int32Type.instance.decompose(value), timestamp);
- }
-
- public static Column create(boolean value, long timestamp, String... names)
- {
- return new Column(decomposeName(names), BooleanType.instance.decompose(value), timestamp);
- }
-
- public static Column create(double value, long timestamp, String... names)
- {
- return new Column(decomposeName(names), DoubleType.instance.decompose(value), timestamp);
- }
-
- public static Column create(ByteBuffer value, long timestamp, String... names)
- {
- return new Column(decomposeName(names), value, timestamp);
- }
-
- public static Column create(InetAddress value, long timestamp, String... names)
- {
- return new Column(decomposeName(names), InetAddressType.instance.decompose(value), timestamp);
- }
-
- static ByteBuffer decomposeName(String... names)
- {
- assert names.length > 0;
-
- if (names.length == 1)
- return UTF8Type.instance.decompose(names[0]);
-
- // not super performant. at this time, only infrequently called schema code uses this.
- List<AbstractType<?>> types = new ArrayList<AbstractType<?>>(names.length);
- for (int i = 0; i < names.length; i++)
- types.add(UTF8Type.instance);
-
- CompositeType.Builder builder = new CompositeType.Builder(CompositeType.getInstance(types));
- for (String name : names)
- builder.add(UTF8Type.instance.decompose(name));
- return builder.build();
- }
}
-
http://git-wip-us.apache.org/repos/asf/cassandra/blob/362cc053/src/java/org/apache/cassandra/db/ColumnFamily.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamily.java b/src/java/org/apache/cassandra/db/ColumnFamily.java
index 2c00071..c2134c2 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamily.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamily.java
@@ -36,8 +36,10 @@ import org.apache.commons.lang3.builder.HashCodeBuilder;
import org.apache.cassandra.cache.IRowCacheEntry;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.db.composites.CellName;
+import org.apache.cassandra.db.composites.CellNameType;
+import org.apache.cassandra.db.composites.CellNames;
import org.apache.cassandra.db.filter.ColumnSlice;
-import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.io.sstable.ColumnNameHelper;
import org.apache.cassandra.io.sstable.ColumnStats;
import org.apache.cassandra.io.sstable.SSTable;
@@ -116,29 +118,29 @@ public abstract class ColumnFamily implements Iterable<Column>, IRowCacheEntry
addColumn(column, HeapAllocator.instance);
}
- public void addColumn(ByteBuffer name, ByteBuffer value, long timestamp)
+ public void addColumn(CellName name, ByteBuffer value, long timestamp)
{
addColumn(name, value, timestamp, 0);
}
- public void addColumn(ByteBuffer name, ByteBuffer value, long timestamp, int timeToLive)
+ public void addColumn(CellName name, ByteBuffer value, long timestamp, int timeToLive)
{
assert !metadata().getDefaultValidator().isCommutative();
Column column = Column.create(name, value, timestamp, timeToLive, metadata());
addColumn(column);
}
- public void addCounter(ByteBuffer name, long value)
+ public void addCounter(CellName name, long value)
{
addColumn(new CounterUpdateColumn(name, value, System.currentTimeMillis()));
}
- public void addTombstone(ByteBuffer name, ByteBuffer localDeletionTime, long timestamp)
+ public void addTombstone(CellName name, ByteBuffer localDeletionTime, long timestamp)
{
addColumn(new DeletedColumn(name, localDeletionTime, timestamp));
}
- public void addTombstone(ByteBuffer name, int localDeletionTime, long timestamp)
+ public void addTombstone(CellName name, int localDeletionTime, long timestamp)
{
addColumn(new DeletedColumn(name, localDeletionTime, timestamp));
}
@@ -220,13 +222,13 @@ public abstract class ColumnFamily implements Iterable<Column>, IRowCacheEntry
* Get a column given its name, returning null if the column is not
* present.
*/
- public abstract Column getColumn(ByteBuffer name);
+ public abstract Column getColumn(CellName name);
/**
* Returns an iterable with the names of columns in this column map in the same order
* as the underlying columns themselves.
*/
- public abstract Iterable<ByteBuffer> getColumnNames();
+ public abstract Iterable<CellName> getColumnNames();
/**
* Returns the columns of this column map as a collection.
@@ -300,7 +302,7 @@ public abstract class ColumnFamily implements Iterable<Column>, IRowCacheEntry
// takes care of those for us.)
for (Column columnExternal : cfComposite)
{
- ByteBuffer cName = columnExternal.name();
+ CellName cName = columnExternal.name();
Column columnInternal = getColumn(cName);
if (columnInternal == null)
{
@@ -372,7 +374,7 @@ public abstract class ColumnFamily implements Iterable<Column>, IRowCacheEntry
if (isMarkedForDelete())
sb.append(" -").append(deletionInfo()).append("-");
- sb.append(" [").append(getComparator().getColumnsString(this)).append("])");
+ sb.append(" [").append(CellNames.getColumnsString(getComparator(), this)).append("])");
return sb.toString();
}
@@ -440,7 +442,7 @@ public abstract class ColumnFamily implements Iterable<Column>, IRowCacheEntry
/**
* @return the comparator whose sorting order the contained columns conform to
*/
- public AbstractType<?> getComparator()
+ public CellNameType getComparator()
{
return metadata.comparator;
}
@@ -478,9 +480,9 @@ public abstract class ColumnFamily implements Iterable<Column>, IRowCacheEntry
return false;
}
- public Map<ByteBuffer, ByteBuffer> asMap()
+ public Map<CellName, ByteBuffer> asMap()
{
- ImmutableMap.Builder<ByteBuffer, ByteBuffer> builder = ImmutableMap.builder();
+ ImmutableMap.Builder<CellName, ByteBuffer> builder = ImmutableMap.builder();
for (Column column : this)
builder.put(column.name, column.value);
return builder.build();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/362cc053/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java b/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java
index 3d832b2..13ec6fc 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java
@@ -67,8 +67,8 @@ public class ColumnFamilySerializer implements IVersionedSerializer<ColumnFamily
return;
}
- DeletionInfo.serializer().serialize(cf.deletionInfo(), out, version);
- ColumnSerializer columnSerializer = Column.serializer;
+ cf.getComparator().deletionInfoSerializer().serialize(cf.deletionInfo(), out, version);
+ ColumnSerializer columnSerializer = cf.getComparator().columnSerializer();
int count = cf.getColumnCount();
out.writeInt(count);
int written = 0;
@@ -108,9 +108,9 @@ public class ColumnFamilySerializer implements IVersionedSerializer<ColumnFamily
}
else
{
- cf.delete(DeletionInfo.serializer().deserialize(in, version, cf.getComparator()));
+ cf.delete(cf.getComparator().deletionInfoSerializer().deserialize(in, version));
- ColumnSerializer columnSerializer = Column.serializer;
+ ColumnSerializer columnSerializer = cf.getComparator().columnSerializer();
int size = in.readInt();
for (int i = 0; i < size; ++i)
cf.addColumn(columnSerializer.deserialize(in, flag));
@@ -128,10 +128,11 @@ public class ColumnFamilySerializer implements IVersionedSerializer<ColumnFamily
}
else
{
- size += DeletionInfo.serializer().serializedSize(cf.deletionInfo(), typeSizes, version);
+ size += cf.getComparator().deletionInfoSerializer().serializedSize(cf.deletionInfo(), typeSizes, version);
size += typeSizes.sizeof(cf.getColumnCount());
+ ColumnSerializer columnSerializer = cf.getComparator().columnSerializer();
for (Column column : cf)
- size += column.serializedSize(typeSizes);
+ size += columnSerializer.serializedSize(column, typeSizes);
}
return size;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/362cc053/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index a98e30b..f00e281 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -33,7 +33,6 @@ import com.google.common.base.Function;
import com.google.common.collect.*;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.Uninterruptibles;
-import org.cliffc.high_scale_lib.NonBlockingHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -51,10 +50,10 @@ import org.apache.cassandra.db.commitlog.CommitLog;
import org.apache.cassandra.db.commitlog.ReplayPosition;
import org.apache.cassandra.db.compaction.*;
import org.apache.cassandra.db.filter.*;
+import org.apache.cassandra.db.composites.CellNameType;
+import org.apache.cassandra.db.composites.Composite;
import org.apache.cassandra.db.index.SecondaryIndex;
import org.apache.cassandra.db.index.SecondaryIndexManager;
-import org.apache.cassandra.db.marshal.AbstractType;
-import org.apache.cassandra.db.marshal.CompositeType;
import org.apache.cassandra.dht.*;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.exceptions.ConfigurationException;
@@ -94,9 +93,6 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
public final SecondaryIndexManager indexManager;
- private static final int INTERN_CUTOFF = 256;
- public final ConcurrentMap<ByteBuffer, ByteBuffer> internedNames = new NonBlockingHashMap<ByteBuffer, ByteBuffer>();
-
/* These are locally held copies to be changed from the config during runtime */
private volatile DefaultInteger minCompactionThreshold;
private volatile DefaultInteger maxCompactionThreshold;
@@ -932,8 +928,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
// 2. if it has been re-added since then, this particular column was inserted before the last drop
private static boolean isDroppedColumn(Column c, CFMetaData meta)
{
- ByteBuffer cql3ColumnName = ((CompositeType) meta.comparator).extractLastComponent(c.name());
- Long droppedAt = meta.getDroppedColumns().get(meta.getColumnDefinition(cql3ColumnName).name);
+ Long droppedAt = meta.getDroppedColumns().get(c.name().cql3ColumnName());
return droppedAt != null && c.timestamp() <= droppedAt;
}
@@ -1230,8 +1225,8 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
}
public ColumnFamily getColumnFamily(DecoratedKey key,
- ByteBuffer start,
- ByteBuffer finish,
+ Composite start,
+ Composite finish,
boolean reversed,
int limit,
long timestamp)
@@ -1617,8 +1612,8 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
*/
public ExtendedFilter makeExtendedFilter(AbstractBounds<RowPosition> keyRange,
SliceQueryFilter columnRange,
- ByteBuffer columnStart,
- ByteBuffer columnStop,
+ Composite columnStart,
+ Composite columnStop,
List<IndexExpression> rowFilter,
int maxResults,
long now)
@@ -1756,7 +1751,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
}
}
- public AbstractType<?> getComparator()
+ public CellNameType getComparator()
{
return metadata.comparator;
}
@@ -2244,35 +2239,6 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
return partitioner instanceof LocalPartitioner;
}
- private ByteBuffer intern(ByteBuffer name)
- {
- ByteBuffer internedName = internedNames.get(name);
- if (internedName == null)
- {
- internedName = ByteBufferUtil.clone(name);
- ByteBuffer concurrentName = internedNames.putIfAbsent(internedName, internedName);
- if (concurrentName != null)
- internedName = concurrentName;
- }
- return internedName;
- }
-
- public ByteBuffer internOrCopy(ByteBuffer name, Allocator allocator)
- {
- if (internedNames.size() >= INTERN_CUTOFF)
- return allocator.clone(name);
-
- return intern(name);
- }
-
- public ByteBuffer maybeIntern(ByteBuffer name)
- {
- if (internedNames.size() >= INTERN_CUTOFF)
- return null;
-
- return intern(name);
- }
-
public Iterable<ColumnFamilyStore> concatWithIndexes()
{
return Iterables.concat(indexManager.getIndexesBackedByCfs(), Collections.singleton(this));
http://git-wip-us.apache.org/repos/asf/cassandra/blob/362cc053/src/java/org/apache/cassandra/db/ColumnIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnIndex.java b/src/java/org/apache/cassandra/db/ColumnIndex.java
index cd1baca..3e6d55d 100644
--- a/src/java/org/apache/cassandra/db/ColumnIndex.java
+++ b/src/java/org/apache/cassandra/db/ColumnIndex.java
@@ -25,6 +25,7 @@ import java.util.*;
import com.google.common.annotations.VisibleForTesting;
import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.composites.Composite;
import org.apache.cassandra.io.sstable.IndexHelper;
import org.apache.cassandra.utils.ByteBufferUtil;
@@ -53,8 +54,6 @@ public class ColumnIndex
*/
public static class Builder
{
- private static final OnDiskAtom.Serializer atomSerializer = Column.onDiskSerializer();
-
private final ColumnIndex result;
private final long indexOffset;
private long startPosition = -1;
@@ -69,6 +68,8 @@ public class ColumnIndex
private final ByteBuffer key;
private final DeletionInfo deletionInfo; // only used for serializing and calculating row header size
+ private final OnDiskAtom.Serializer atomSerializer;
+
public Builder(ColumnFamily cf,
ByteBuffer key,
DataOutput output)
@@ -83,6 +84,7 @@ public class ColumnIndex
this.result = new ColumnIndex(new ArrayList<IndexHelper.IndexInfo>());
this.output = output;
this.tombstoneTracker = new RangeTombstone.Tracker(cf.getComparator());
+ this.atomSerializer = cf.getComparator().onDiskAtomSerializer();
}
/**
@@ -119,7 +121,7 @@ public class ColumnIndex
public ColumnIndex build(ColumnFamily cf) throws IOException
{
// cf has disentangled the columns and range tombstones, we need to re-interleave them in comparator order
- Comparator<ByteBuffer> comparator = cf.getComparator();
+ Comparator<Composite> comparator = cf.getComparator();
DeletionInfo.InOrderTester tester = cf.deletionInfo().inOrderTester();
Iterator<RangeTombstone> rangeIter = cf.deletionInfo().rangeIterator();
RangeTombstone tombstone = rangeIter.hasNext() ? rangeIter.next() : null;
@@ -183,7 +185,7 @@ public class ColumnIndex
// where we wouldn't make any progress because a block is filled by said marker
}
- long size = column.serializedSizeForSSTable();
+ long size = atomSerializer.serializedSizeForSSTable(column);
endPosition += size;
blockSize += size;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/362cc053/src/java/org/apache/cassandra/db/ColumnSerializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnSerializer.java b/src/java/org/apache/cassandra/db/ColumnSerializer.java
index fb38b5f..a1c6ebd 100644
--- a/src/java/org/apache/cassandra/db/ColumnSerializer.java
+++ b/src/java/org/apache/cassandra/db/ColumnSerializer.java
@@ -22,9 +22,12 @@ import java.io.DataOutput;
import java.io.IOException;
import java.nio.ByteBuffer;
+import org.apache.cassandra.db.composites.CellName;
+import org.apache.cassandra.db.composites.CellNameType;
import org.apache.cassandra.io.ISerializer;
import org.apache.cassandra.io.FSReadError;
import org.apache.cassandra.io.util.FileDataInput;
+import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.utils.ByteBufferUtil;
public class ColumnSerializer implements ISerializer<Column>
@@ -51,10 +54,17 @@ public class ColumnSerializer implements ISerializer<Column>
LOCAL, FROM_REMOTE, PRESERVE_SIZE;
}
+ private final CellNameType type;
+
+ public ColumnSerializer(CellNameType type)
+ {
+ this.type = type;
+ }
+
public void serialize(Column column, DataOutput out) throws IOException
{
- assert column.name().remaining() > 0;
- ByteBufferUtil.writeWithShortLength(column.name(), out);
+ assert !column.name().isEmpty();
+ type.cellSerializer().serialize(column.name(), out);
try
{
out.writeByte(column.serializationFlags());
@@ -93,15 +103,13 @@ public class ColumnSerializer implements ISerializer<Column>
public Column deserialize(DataInput in, ColumnSerializer.Flag flag, int expireBefore) throws IOException
{
- ByteBuffer name = ByteBufferUtil.readWithShortLength(in);
- if (name.remaining() <= 0)
- throw CorruptColumnException.create(in, name);
+ CellName name = type.cellSerializer().deserialize(in);
int b = in.readUnsignedByte();
return deserializeColumnBody(in, name, b, flag, expireBefore);
}
- Column deserializeColumnBody(DataInput in, ByteBuffer name, int mask, ColumnSerializer.Flag flag, int expireBefore) throws IOException
+ Column deserializeColumnBody(DataInput in, CellName name, int mask, ColumnSerializer.Flag flag, int expireBefore) throws IOException
{
if ((mask & COUNTER_MASK) != 0)
{
@@ -130,9 +138,22 @@ public class ColumnSerializer implements ISerializer<Column>
}
}
- public long serializedSize(Column column, TypeSizes type)
+ void skipColumnBody(DataInput in, int mask) throws IOException
+ {
+ if ((mask & COUNTER_MASK) != 0)
+ FileUtils.skipBytesFully(in, 16);
+ else if ((mask & EXPIRATION_MASK) != 0)
+ FileUtils.skipBytesFully(in, 16);
+ else
+ FileUtils.skipBytesFully(in, 8);
+
+ int length = in.readInt();
+ FileUtils.skipBytesFully(in, length);
+ }
+
+ public long serializedSize(Column column, TypeSizes typeSizes)
{
- return column.serializedSize(type);
+ return column.serializedSize(type, typeSizes);
}
public static class CorruptColumnException extends IOException
http://git-wip-us.apache.org/repos/asf/cassandra/blob/362cc053/src/java/org/apache/cassandra/db/CounterColumn.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/CounterColumn.java b/src/java/org/apache/cassandra/db/CounterColumn.java
index b470c5a..ac2c88e 100644
--- a/src/java/org/apache/cassandra/db/CounterColumn.java
+++ b/src/java/org/apache/cassandra/db/CounterColumn.java
@@ -31,9 +31,10 @@ import org.slf4j.LoggerFactory;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.composites.CellName;
+import org.apache.cassandra.db.composites.CellNameType;
import org.apache.cassandra.db.context.CounterContext;
import org.apache.cassandra.db.context.IContext.ContextRelationship;
-import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.exceptions.OverloadedException;
import org.apache.cassandra.exceptions.RequestExecutionException;
import org.apache.cassandra.io.util.DataOutputBuffer;
@@ -53,28 +54,28 @@ public class CounterColumn extends Column
private final long timestampOfLastDelete;
- public CounterColumn(ByteBuffer name, long value, long timestamp)
+ public CounterColumn(CellName name, long value, long timestamp)
{
this(name, contextManager.create(value, HeapAllocator.instance), timestamp);
}
- public CounterColumn(ByteBuffer name, long value, long timestamp, long timestampOfLastDelete)
+ public CounterColumn(CellName name, long value, long timestamp, long timestampOfLastDelete)
{
this(name, contextManager.create(value, HeapAllocator.instance), timestamp, timestampOfLastDelete);
}
- public CounterColumn(ByteBuffer name, ByteBuffer value, long timestamp)
+ public CounterColumn(CellName name, ByteBuffer value, long timestamp)
{
this(name, value, timestamp, Long.MIN_VALUE);
}
- public CounterColumn(ByteBuffer name, ByteBuffer value, long timestamp, long timestampOfLastDelete)
+ public CounterColumn(CellName name, ByteBuffer value, long timestamp, long timestampOfLastDelete)
{
super(name, value, timestamp);
this.timestampOfLastDelete = timestampOfLastDelete;
}
- public static CounterColumn create(ByteBuffer name, ByteBuffer value, long timestamp, long timestampOfLastDelete, ColumnSerializer.Flag flag)
+ public static CounterColumn create(CellName name, ByteBuffer value, long timestamp, long timestampOfLastDelete, ColumnSerializer.Flag flag)
{
// #elt being negative means we have to clean delta
short count = value.getShort(value.position());
@@ -84,7 +85,7 @@ public class CounterColumn extends Column
}
@Override
- public Column withUpdatedName(ByteBuffer newName)
+ public Column withUpdatedName(CellName newName)
{
return new CounterColumn(newName, value, timestamp, timestampOfLastDelete);
}
@@ -110,9 +111,9 @@ public class CounterColumn extends Column
}
@Override
- public int serializedSize(TypeSizes typeSizes)
+ public int serializedSize(CellNameType type, TypeSizes typeSizes)
{
- return super.serializedSize(typeSizes) + typeSizes.sizeof(timestampOfLastDelete);
+ return super.serializedSize(type, typeSizes) + typeSizes.sizeof(timestampOfLastDelete);
}
@Override
@@ -147,7 +148,7 @@ public class CounterColumn extends Column
@Override
public void updateDigest(MessageDigest digest)
{
- digest.update(name.duplicate());
+ digest.update(name.toByteBuffer().duplicate());
// We don't take the deltas into account in a digest
contextManager.updateDigest(digest, value);
DataOutputBuffer buffer = new DataOutputBuffer();
@@ -217,17 +218,17 @@ public class CounterColumn extends Column
@Override
public Column localCopy(ColumnFamilyStore cfs)
{
- return new CounterColumn(cfs.internOrCopy(name, HeapAllocator.instance), ByteBufferUtil.clone(value), timestamp, timestampOfLastDelete);
+ return localCopy(cfs, HeapAllocator.instance);
}
@Override
public Column localCopy(ColumnFamilyStore cfs, Allocator allocator)
{
- return new CounterColumn(cfs.internOrCopy(name, allocator), allocator.clone(value), timestamp, timestampOfLastDelete);
+ return new CounterColumn(name.copy(allocator), allocator.clone(value), timestamp, timestampOfLastDelete);
}
@Override
- public String getString(AbstractType<?> comparator)
+ public String getString(CellNameType comparator)
{
StringBuilder sb = new StringBuilder();
sb.append(comparator.getString(name));
http://git-wip-us.apache.org/repos/asf/cassandra/blob/362cc053/src/java/org/apache/cassandra/db/CounterMutation.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/CounterMutation.java b/src/java/org/apache/cassandra/db/CounterMutation.java
index fb363c2..eae8e12 100644
--- a/src/java/org/apache/cassandra/db/CounterMutation.java
+++ b/src/java/org/apache/cassandra/db/CounterMutation.java
@@ -30,11 +30,11 @@ import java.util.UUID;
import com.google.common.collect.Iterables;
+import org.apache.cassandra.db.composites.CellName;
import org.apache.cassandra.db.filter.NamesQueryFilter;
import org.apache.cassandra.io.IVersionedSerializer;
import org.apache.cassandra.net.MessageOut;
import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.db.ConsistencyLevel;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.HeapAllocator;
@@ -109,7 +109,7 @@ public class CounterMutation implements IMutation
private void addReadCommandFromColumnFamily(String keyspaceName, ByteBuffer key, ColumnFamily columnFamily, long timestamp, List<ReadCommand> commands)
{
- SortedSet<ByteBuffer> s = new TreeSet<ByteBuffer>(columnFamily.metadata().comparator);
+ SortedSet<CellName> s = new TreeSet<>(columnFamily.metadata().comparator);
Iterables.addAll(s, columnFamily.getColumnNames());
commands.add(new SliceByNamesReadCommand(keyspaceName, key, columnFamily.metadata().cfName, timestamp, new NamesQueryFilter(s)));
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/362cc053/src/java/org/apache/cassandra/db/CounterUpdateColumn.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/CounterUpdateColumn.java b/src/java/org/apache/cassandra/db/CounterUpdateColumn.java
index 1ae7dd7..aaf3307 100644
--- a/src/java/org/apache/cassandra/db/CounterUpdateColumn.java
+++ b/src/java/org/apache/cassandra/db/CounterUpdateColumn.java
@@ -19,6 +19,7 @@ package org.apache.cassandra.db;
import java.nio.ByteBuffer;
+import org.apache.cassandra.db.composites.CellName;
import org.apache.cassandra.db.context.CounterContext;
import org.apache.cassandra.utils.Allocator;
import org.apache.cassandra.utils.ByteBufferUtil;
@@ -33,12 +34,12 @@ import org.apache.cassandra.utils.HeapAllocator;
*/
public class CounterUpdateColumn extends Column
{
- public CounterUpdateColumn(ByteBuffer name, long value, long timestamp)
+ public CounterUpdateColumn(CellName name, long value, long timestamp)
{
this(name, ByteBufferUtil.bytes(value), timestamp);
}
- public CounterUpdateColumn(ByteBuffer name, ByteBuffer value, long timestamp)
+ public CounterUpdateColumn(CellName name, ByteBuffer value, long timestamp)
{
super(name, value, timestamp);
}
@@ -81,7 +82,7 @@ public class CounterUpdateColumn extends Column
@Override
public CounterColumn localCopy(ColumnFamilyStore cfs)
{
- return new CounterColumn(cfs.internOrCopy(name, HeapAllocator.instance),
+ return new CounterColumn(name.copy(HeapAllocator.instance),
CounterContext.instance().create(delta(), HeapAllocator.instance),
timestamp(),
Long.MIN_VALUE);
@@ -90,7 +91,7 @@ public class CounterUpdateColumn extends Column
@Override
public Column localCopy(ColumnFamilyStore cfs, Allocator allocator)
{
- return new CounterColumn(cfs.internOrCopy(name, allocator),
+ return new CounterColumn(name.copy(allocator),
CounterContext.instance().create(delta(), allocator),
timestamp(),
Long.MIN_VALUE);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/362cc053/src/java/org/apache/cassandra/db/DataRange.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DataRange.java b/src/java/org/apache/cassandra/db/DataRange.java
index 713027c..453b16a 100644
--- a/src/java/org/apache/cassandra/db/DataRange.java
+++ b/src/java/org/apache/cassandra/db/DataRange.java
@@ -23,8 +23,9 @@ import java.util.Comparator;
import java.util.List;
import org.apache.cassandra.db.columniterator.IdentityQueryFilter;
+import org.apache.cassandra.db.composites.CellNameType;
+import org.apache.cassandra.db.composites.Composite;
import org.apache.cassandra.db.filter.*;
-import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.dht.*;
/**
@@ -57,8 +58,8 @@ public class DataRange
public static boolean isFullRowSlice(SliceQueryFilter filter)
{
return filter.slices.length == 1
- && filter.start().remaining() == 0
- && filter.finish().remaining() == 0
+ && filter.start().isEmpty()
+ && filter.finish().isEmpty()
&& filter.count == Integer.MAX_VALUE;
}
@@ -124,11 +125,11 @@ public class DataRange
public static class Paging extends DataRange
{
private final SliceQueryFilter sliceFilter;
- private final Comparator<ByteBuffer> comparator;
- private final ByteBuffer columnStart;
- private final ByteBuffer columnFinish;
+ private final Comparator<Composite> comparator;
+ private final Composite columnStart;
+ private final Composite columnFinish;
- private Paging(AbstractBounds<RowPosition> range, SliceQueryFilter filter, ByteBuffer columnStart, ByteBuffer columnFinish, Comparator<ByteBuffer> comparator)
+ private Paging(AbstractBounds<RowPosition> range, SliceQueryFilter filter, Composite columnStart, Composite columnFinish, Comparator<Composite> comparator)
{
super(range, filter);
@@ -142,9 +143,9 @@ public class DataRange
this.columnFinish = columnFinish;
}
- public Paging(AbstractBounds<RowPosition> range, SliceQueryFilter filter, ByteBuffer columnStart, ByteBuffer columnFinish, AbstractType<?> comparator)
+ public Paging(AbstractBounds<RowPosition> range, SliceQueryFilter filter, Composite columnStart, Composite columnFinish, CellNameType comparator)
{
- this(range, filter, columnStart, columnFinish, filter.isReversed() ? comparator.reverseComparator : comparator);
+ this(range, filter, columnStart, columnFinish, filter.isReversed() ? comparator.reverseComparator() : comparator);
}
@Override
@@ -184,8 +185,8 @@ public class DataRange
private ColumnSlice[] slicesForKey(ByteBuffer key)
{
// We don't call that until it's necessary, so assume we have to do some hard work
- ByteBuffer newStart = equals(startKey(), key) ? columnStart : null;
- ByteBuffer newFinish = equals(stopKey(), key) ? columnFinish : null;
+ Composite newStart = equals(startKey(), key) ? columnStart : null;
+ Composite newFinish = equals(stopKey(), key) ? columnFinish : null;
List<ColumnSlice> newSlices = new ArrayList<ColumnSlice>(sliceFilter.slices.length); // in the common case, we'll have the same number of slices
http://git-wip-us.apache.org/repos/asf/cassandra/blob/362cc053/src/java/org/apache/cassandra/db/DefsTables.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DefsTables.java b/src/java/org/apache/cassandra/db/DefsTables.java
index 828981e..f30e256 100644
--- a/src/java/org/apache/cassandra/db/DefsTables.java
+++ b/src/java/org/apache/cassandra/db/DefsTables.java
@@ -136,19 +136,6 @@ public class DefsTables
return keyspaces;
}
- public static ByteBuffer searchComposite(String name, boolean start)
- {
- assert name != null;
- ByteBuffer nameBytes = UTF8Type.instance.decompose(name);
- int length = nameBytes.remaining();
- byte[] bytes = new byte[2 + length + 1];
- bytes[0] = (byte)((length >> 8) & 0xFF);
- bytes[1] = (byte)(length & 0xFF);
- ByteBufferUtil.arrayCopy(nameBytes, 0, bytes, 2, length);
- bytes[bytes.length - 1] = (byte)(start ? 0 : 1);
- return ByteBuffer.wrap(bytes);
- }
-
private static Row serializedColumnFamilies(DecoratedKey ksNameKey)
{
ColumnFamilyStore cfsStore = SystemKeyspace.schemaCFS(SystemKeyspace.SCHEMA_COLUMNFAMILIES_CF);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/362cc053/src/java/org/apache/cassandra/db/DeletedColumn.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DeletedColumn.java b/src/java/org/apache/cassandra/db/DeletedColumn.java
index 377df27..ec88015 100644
--- a/src/java/org/apache/cassandra/db/DeletedColumn.java
+++ b/src/java/org/apache/cassandra/db/DeletedColumn.java
@@ -22,6 +22,7 @@ import java.nio.ByteBuffer;
import java.security.MessageDigest;
import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.composites.CellName;
import org.apache.cassandra.io.util.DataOutputBuffer;
import org.apache.cassandra.serializers.MarshalException;
import org.apache.cassandra.utils.Allocator;
@@ -30,18 +31,18 @@ import org.apache.cassandra.utils.HeapAllocator;
public class DeletedColumn extends Column
{
- public DeletedColumn(ByteBuffer name, int localDeletionTime, long timestamp)
+ public DeletedColumn(CellName name, int localDeletionTime, long timestamp)
{
this(name, ByteBufferUtil.bytes(localDeletionTime), timestamp);
}
- public DeletedColumn(ByteBuffer name, ByteBuffer value, long timestamp)
+ public DeletedColumn(CellName name, ByteBuffer value, long timestamp)
{
super(name, value, timestamp);
}
@Override
- public Column withUpdatedName(ByteBuffer newName)
+ public Column withUpdatedName(CellName newName)
{
return new DeletedColumn(newName, value, timestamp);
}
@@ -67,7 +68,7 @@ public class DeletedColumn extends Column
@Override
public void updateDigest(MessageDigest digest)
{
- digest.update(name.duplicate());
+ digest.update(name.toByteBuffer().duplicate());
DataOutputBuffer buffer = new DataOutputBuffer();
try
@@ -99,13 +100,13 @@ public class DeletedColumn extends Column
@Override
public Column localCopy(ColumnFamilyStore cfs)
{
- return new DeletedColumn(cfs.internOrCopy(name, HeapAllocator.instance), ByteBufferUtil.clone(value), timestamp);
+ return new DeletedColumn(name.copy(HeapAllocator.instance), ByteBufferUtil.clone(value), timestamp);
}
@Override
public Column localCopy(ColumnFamilyStore cfs, Allocator allocator)
{
- return new DeletedColumn(cfs.internOrCopy(name, allocator), allocator.clone(value), timestamp);
+ return new DeletedColumn(name.copy(allocator), allocator.clone(value), timestamp);
}
@Override
@@ -123,9 +124,4 @@ public class DeletedColumn extends Column
if (getLocalDeletionTime() < 0)
throw new MarshalException("The local deletion time should not be negative");
}
-
- public static DeletedColumn create(int localDeletionTime, long timestamp, String... names)
- {
- return new DeletedColumn(decomposeName(names), localDeletionTime, timestamp);
- }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/362cc053/src/java/org/apache/cassandra/db/DeletionInfo.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DeletionInfo.java b/src/java/org/apache/cassandra/db/DeletionInfo.java
index 13fc824..0bd0635 100644
--- a/src/java/org/apache/cassandra/db/DeletionInfo.java
+++ b/src/java/org/apache/cassandra/db/DeletionInfo.java
@@ -20,13 +20,13 @@ package org.apache.cassandra.db;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
-import java.nio.ByteBuffer;
import java.util.*;
import com.google.common.base.Objects;
import com.google.common.collect.Iterators;
-import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.composites.CType;
+import org.apache.cassandra.db.composites.Composite;
import org.apache.cassandra.io.IVersionedSerializer;
/**
@@ -35,8 +35,6 @@ import org.apache.cassandra.io.IVersionedSerializer;
*/
public class DeletionInfo
{
- private static final Serializer serializer = new Serializer();
-
/**
* This represents a deletion of the entire row. We can't represent this within the RangeTombstoneList, so it's
* kept separately. This also slightly optimizes the common case of a full row deletion.
@@ -67,13 +65,13 @@ public class DeletionInfo
this(topLevel, null);
}
- public DeletionInfo(ByteBuffer start, ByteBuffer end, Comparator<ByteBuffer> comparator, long markedForDeleteAt, int localDeletionTime)
+ public DeletionInfo(Composite start, Composite end, Comparator<Composite> comparator, long markedForDeleteAt, int localDeletionTime)
{
this(DeletionTime.LIVE, new RangeTombstoneList(comparator, 1));
ranges.add(start, end, markedForDeleteAt, localDeletionTime);
}
- public DeletionInfo(RangeTombstone rangeTombstone, Comparator<ByteBuffer> comparator)
+ public DeletionInfo(RangeTombstone rangeTombstone, Comparator<Composite> comparator)
{
this(rangeTombstone.min, rangeTombstone.max, comparator, rangeTombstone.data.markedForDeleteAt, rangeTombstone.data.localDeletionTime);
}
@@ -92,11 +90,6 @@ public class DeletionInfo
return new DeletionInfo(DeletionTime.LIVE);
}
- public static Serializer serializer()
- {
- return serializer;
- }
-
public DeletionInfo copy()
{
return new DeletionInfo(topLevel, ranges == null ? null : ranges.copy());
@@ -123,7 +116,7 @@ public class DeletionInfo
return isDeleted(column.name(), column.timestamp());
}
- public boolean isDeleted(ByteBuffer name, long timestamp)
+ public boolean isDeleted(Composite name, long timestamp)
{
// We do rely on this test: if topLevel.markedForDeleteAt is MIN_VALUE, we should not
// consider the column deleted even if timestamp=MIN_VALUE, otherwise this break QueryFilter.isRelevant
@@ -194,7 +187,7 @@ public class DeletionInfo
topLevel = newInfo;
}
- public void add(RangeTombstone tombstone, Comparator<ByteBuffer> comparator)
+ public void add(RangeTombstone tombstone, Comparator<Composite> comparator)
{
if (ranges == null)
ranges = new RangeTombstoneList(comparator, 1);
@@ -255,7 +248,7 @@ public class DeletionInfo
return ranges == null ? Iterators.<RangeTombstone>emptyIterator() : ranges.iterator();
}
- public DeletionTime rangeCovering(ByteBuffer name)
+ public DeletionTime rangeCovering(Composite name)
{
return ranges == null ? null : ranges.search(name);
}
@@ -284,15 +277,15 @@ public class DeletionInfo
{
assert !ranges.isEmpty();
StringBuilder sb = new StringBuilder();
- AbstractType at = (AbstractType)ranges.comparator();
- assert at != null;
+ CType type = (CType)ranges.comparator();
+ assert type != null;
Iterator<RangeTombstone> iter = rangeIterator();
while (iter.hasNext())
{
RangeTombstone i = iter.next();
sb.append("[");
- sb.append(at.getString(i.min)).append("-");
- sb.append(at.getString(i.max)).append(", ");
+ sb.append(type.getString(i.min)).append("-");
+ sb.append(type.getString(i.max)).append(", ");
sb.append(i.data);
sb.append("]");
}
@@ -326,32 +319,30 @@ public class DeletionInfo
public static class Serializer implements IVersionedSerializer<DeletionInfo>
{
- public void serialize(DeletionInfo info, DataOutput out, int version) throws IOException
+ private final RangeTombstoneList.Serializer rtlSerializer;
+
+ public Serializer(CType type)
{
- DeletionTime.serializer.serialize(info.topLevel, out);
- RangeTombstoneList.serializer.serialize(info.ranges, out, version);
+ this.rtlSerializer = new RangeTombstoneList.Serializer(type);
}
- /*
- * Range tombstones internally depend on the column family serializer, but it is not serialized.
- * Thus deserialize(DataInput, int, Comparator<ByteBuffer>) should be used instead of this method.
- */
- public DeletionInfo deserialize(DataInput in, int version) throws IOException
+ public void serialize(DeletionInfo info, DataOutput out, int version) throws IOException
{
- throw new UnsupportedOperationException();
+ DeletionTime.serializer.serialize(info.topLevel, out);
+ rtlSerializer.serialize(info.ranges, out, version);
}
- public DeletionInfo deserialize(DataInput in, int version, Comparator<ByteBuffer> comparator) throws IOException
+ public DeletionInfo deserialize(DataInput in, int version) throws IOException
{
DeletionTime topLevel = DeletionTime.serializer.deserialize(in);
- RangeTombstoneList ranges = RangeTombstoneList.serializer.deserialize(in, version, comparator);
+ RangeTombstoneList ranges = rtlSerializer.deserialize(in, version);
return new DeletionInfo(topLevel, ranges);
}
public long serializedSize(DeletionInfo info, TypeSizes typeSizes, int version)
{
long size = DeletionTime.serializer.serializedSize(info.topLevel, typeSizes);
- return size + RangeTombstoneList.serializer.serializedSize(info.ranges, typeSizes, version);
+ return size + rtlSerializer.serializedSize(info.ranges, typeSizes, version);
}
public long serializedSize(DeletionInfo info, int version)
@@ -389,7 +380,7 @@ public class DeletionInfo
return isDeleted(column.name(), column.timestamp());
}
- public boolean isDeleted(ByteBuffer name, long timestamp)
+ public boolean isDeleted(Composite name, long timestamp)
{
if (timestamp <= topLevel.markedForDeleteAt)
return true;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/362cc053/src/java/org/apache/cassandra/db/DeletionTime.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DeletionTime.java b/src/java/org/apache/cassandra/db/DeletionTime.java
index 7952fcb..c60b423 100644
--- a/src/java/org/apache/cassandra/db/DeletionTime.java
+++ b/src/java/org/apache/cassandra/db/DeletionTime.java
@@ -25,6 +25,7 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Objects;
import org.apache.cassandra.io.ISerializer;
+import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.utils.ObjectSizes;
/**
@@ -50,7 +51,7 @@ public class DeletionTime implements Comparable<DeletionTime>
*/
public final int localDeletionTime;
- public static final ISerializer<DeletionTime> serializer = new Serializer();
+ public static final Serializer serializer = new Serializer();
@VisibleForTesting
public DeletionTime(long markedForDeleteAt, int localDeletionTime)
@@ -110,7 +111,7 @@ public class DeletionTime implements Comparable<DeletionTime>
return ObjectSizes.getFieldSize(fields);
}
- private static class Serializer implements ISerializer<DeletionTime>
+ public static class Serializer implements ISerializer<DeletionTime>
{
public void serialize(DeletionTime delTime, DataOutput out) throws IOException
{
@@ -128,6 +129,11 @@ public class DeletionTime implements Comparable<DeletionTime>
return new DeletionTime(mfda, ldt);
}
+ public void skip(DataInput in) throws IOException
+ {
+ FileUtils.skipBytesFully(in, 4 + 8);
+ }
+
public long serializedSize(DeletionTime delTime, TypeSizes typeSizes)
{
return typeSizes.sizeof(delTime.localDeletionTime)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/362cc053/src/java/org/apache/cassandra/db/EmptyColumns.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/EmptyColumns.java b/src/java/org/apache/cassandra/db/EmptyColumns.java
index 129ddc5..782ffc9 100644
--- a/src/java/org/apache/cassandra/db/EmptyColumns.java
+++ b/src/java/org/apache/cassandra/db/EmptyColumns.java
@@ -21,12 +21,12 @@ package org.apache.cassandra.db;
*/
-import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.composites.CellName;
import org.apache.cassandra.db.filter.ColumnSlice;
import org.apache.cassandra.utils.Allocator;
@@ -78,12 +78,12 @@ public class EmptyColumns extends AbstractThreadUnsafeSortedColumns
throw new UnsupportedOperationException();
}
- public Column getColumn(ByteBuffer name)
+ public Column getColumn(CellName name)
{
throw new UnsupportedOperationException();
}
- public Iterable<ByteBuffer> getColumnNames()
+ public Iterable<CellName> getColumnNames()
{
return Collections.emptyList();
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/362cc053/src/java/org/apache/cassandra/db/ExpiringColumn.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ExpiringColumn.java b/src/java/org/apache/cassandra/db/ExpiringColumn.java
index e11567f..92c8306 100644
--- a/src/java/org/apache/cassandra/db/ExpiringColumn.java
+++ b/src/java/org/apache/cassandra/db/ExpiringColumn.java
@@ -22,11 +22,11 @@ import java.nio.ByteBuffer;
import java.security.MessageDigest;
import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.composites.CellName;
+import org.apache.cassandra.db.composites.CellNameType;
import org.apache.cassandra.io.util.DataOutputBuffer;
import org.apache.cassandra.serializers.MarshalException;
import org.apache.cassandra.utils.Allocator;
-import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.HeapAllocator;
/**
@@ -46,12 +46,12 @@ public class ExpiringColumn extends Column
private final int localExpirationTime;
private final int timeToLive;
- public ExpiringColumn(ByteBuffer name, ByteBuffer value, long timestamp, int timeToLive)
+ public ExpiringColumn(CellName name, ByteBuffer value, long timestamp, int timeToLive)
{
this(name, value, timestamp, timeToLive, (int) (System.currentTimeMillis() / 1000) + timeToLive);
}
- public ExpiringColumn(ByteBuffer name, ByteBuffer value, long timestamp, int timeToLive, int localExpirationTime)
+ public ExpiringColumn(CellName name, ByteBuffer value, long timestamp, int timeToLive, int localExpirationTime)
{
super(name, value, timestamp);
assert timeToLive > 0 : timeToLive;
@@ -61,7 +61,7 @@ public class ExpiringColumn extends Column
}
/** @return Either a DeletedColumn, or an ExpiringColumn. */
- public static Column create(ByteBuffer name, ByteBuffer value, long timestamp, int timeToLive, int localExpirationTime, int expireBefore, ColumnSerializer.Flag flag)
+ public static Column create(CellName name, ByteBuffer value, long timestamp, int timeToLive, int localExpirationTime, int expireBefore, ColumnSerializer.Flag flag)
{
if (localExpirationTime >= expireBefore || flag == ColumnSerializer.Flag.PRESERVE_SIZE)
return new ExpiringColumn(name, value, timestamp, timeToLive, localExpirationTime);
@@ -78,7 +78,7 @@ public class ExpiringColumn extends Column
}
@Override
- public Column withUpdatedName(ByteBuffer newName)
+ public Column withUpdatedName(CellName newName)
{
return new ExpiringColumn(newName, value, timestamp, timeToLive, localExpirationTime);
}
@@ -96,20 +96,20 @@ public class ExpiringColumn extends Column
}
@Override
- public int serializedSize(TypeSizes typeSizes)
+ public int serializedSize(CellNameType type, TypeSizes typeSizes)
{
/*
* An expired column adds to a Column :
* 4 bytes for the localExpirationTime
* + 4 bytes for the timeToLive
*/
- return super.serializedSize(typeSizes) + typeSizes.sizeof(localExpirationTime) + typeSizes.sizeof(timeToLive);
+ return super.serializedSize(type, typeSizes) + typeSizes.sizeof(localExpirationTime) + typeSizes.sizeof(timeToLive);
}
@Override
public void updateDigest(MessageDigest digest)
{
- digest.update(name.duplicate());
+ digest.update(name.toByteBuffer().duplicate());
digest.update(value.duplicate());
DataOutputBuffer buffer = new DataOutputBuffer();
@@ -135,20 +135,17 @@ public class ExpiringColumn extends Column
@Override
public Column localCopy(ColumnFamilyStore cfs)
{
- return new ExpiringColumn(cfs.internOrCopy(name, HeapAllocator.instance), ByteBufferUtil.clone(value), timestamp, timeToLive, localExpirationTime);
+ return localCopy(cfs, HeapAllocator.instance);
}
@Override
public Column localCopy(ColumnFamilyStore cfs, Allocator allocator)
{
- ByteBuffer clonedName = cfs.maybeIntern(name);
- if (clonedName == null)
- clonedName = allocator.clone(name);
- return new ExpiringColumn(clonedName, allocator.clone(value), timestamp, timeToLive, localExpirationTime);
+ return new ExpiringColumn(name.copy(allocator), allocator.clone(value), timestamp, timeToLive, localExpirationTime);
}
@Override
- public String getString(AbstractType<?> comparator)
+ public String getString(CellNameType comparator)
{
StringBuilder sb = new StringBuilder();
sb.append(super.getString(comparator));
http://git-wip-us.apache.org/repos/asf/cassandra/blob/362cc053/src/java/org/apache/cassandra/db/HintedHandOffManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/HintedHandOffManager.java b/src/java/org/apache/cassandra/db/HintedHandOffManager.java
index 39afd12..8868286 100644
--- a/src/java/org/apache/cassandra/db/HintedHandOffManager.java
+++ b/src/java/org/apache/cassandra/db/HintedHandOffManager.java
@@ -41,12 +41,14 @@ import org.slf4j.LoggerFactory;
import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
import org.apache.cassandra.concurrent.NamedThreadFactory;
+import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.db.composites.CellName;
+import org.apache.cassandra.db.composites.Composite;
+import org.apache.cassandra.db.composites.Composites;
import org.apache.cassandra.db.compaction.CompactionManager;
import org.apache.cassandra.db.filter.*;
-import org.apache.cassandra.db.marshal.AbstractType;
-import org.apache.cassandra.db.marshal.CompositeType;
import org.apache.cassandra.db.marshal.Int32Type;
import org.apache.cassandra.db.marshal.UUIDType;
import org.apache.cassandra.dht.IPartitioner;
@@ -102,7 +104,6 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean
private volatile boolean hintedHandOffPaused = false;
- static final CompositeType comparator = CompositeType.getInstance(Arrays.<AbstractType<?>>asList(UUIDType.instance, Int32Type.instance));
static final int maxHintTTL = Integer.parseInt(System.getProperty("cassandra.maxHintTTL", String.valueOf(Integer.MAX_VALUE)));
private final NonBlockingHashSet<InetAddress> queuedDeliveries = new NonBlockingHashSet<InetAddress>();
@@ -133,7 +134,7 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean
UUID hintId = UUIDGen.getTimeUUID();
// serialize the hint with id and version as a composite column name
- ByteBuffer name = comparator.decompose(hintId, MessagingService.current_version);
+ CellName name = CFMetaData.HintsCf.comparator.makeCellName(hintId, MessagingService.current_version);
ByteBuffer value = ByteBuffer.wrap(FBUtilities.serialize(mutation, RowMutation.serializer, MessagingService.current_version));
ColumnFamily cf = ArrayBackedSortedColumns.factory.create(Schema.instance.getCFMetaData(Keyspace.SYSTEM_KS, SystemKeyspace.HINTS_CF));
cf.addColumn(name, value, System.currentTimeMillis(), ttl);
@@ -178,7 +179,7 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean
StorageService.optionalTasks.scheduleWithFixedDelay(runnable, 10, 10, TimeUnit.MINUTES);
}
- private static void deleteHint(ByteBuffer tokenBytes, ByteBuffer columnName, long timestamp)
+ private static void deleteHint(ByteBuffer tokenBytes, CellName columnName, long timestamp)
{
RowMutation rm = new RowMutation(Keyspace.SYSTEM_KS, tokenBytes);
rm.delete(SystemKeyspace.HINTS_CF, columnName, timestamp);
@@ -238,11 +239,11 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean
return CompactionManager.instance.submitUserDefined(hintStore, descriptors, (int) (System.currentTimeMillis() / 1000));
}
- private static boolean pagingFinished(ColumnFamily hintColumnFamily, ByteBuffer startColumn)
+ private static boolean pagingFinished(ColumnFamily hintColumnFamily, Composite startColumn)
{
// done if no hints found or the start column (same as last column processed in previous iteration) is the only one
return hintColumnFamily == null
- || (hintColumnFamily.getSortedColumns().size() == 1 && hintColumnFamily.getColumn(startColumn) != null);
+ || (hintColumnFamily.getSortedColumns().size() == 1 && hintColumnFamily.getColumn((CellName)startColumn) != null);
}
private int waitForSchemaAgreement(InetAddress endpoint) throws TimeoutException
@@ -325,7 +326,7 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean
DecoratedKey epkey = StorageService.getPartitioner().decorateKey(hostIdBytes);
final AtomicInteger rowsReplayed = new AtomicInteger(0);
- ByteBuffer startColumn = ByteBufferUtil.EMPTY_BYTE_BUFFER;
+ Composite startColumn = Composites.EMPTY;
int pageSize = calculatePageSize();
logger.debug("Using pageSize of {}", pageSize);
@@ -343,7 +344,7 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean
QueryFilter filter = QueryFilter.getSliceFilter(epkey,
SystemKeyspace.HINTS_CF,
startColumn,
- ByteBufferUtil.EMPTY_BYTE_BUFFER,
+ Composites.EMPTY,
false,
pageSize,
now);
@@ -381,8 +382,7 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean
startColumn = hint.name();
- ByteBuffer[] components = comparator.split(hint.name());
- int version = Int32Type.instance.compose(components[1]);
+ int version = Int32Type.instance.compose(hint.name().get(1));
DataInputStream in = new DataInputStream(ByteBufferUtil.inputStream(hint.value()));
RowMutation rm;
try
@@ -493,7 +493,7 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean
IPartitioner p = StorageService.getPartitioner();
RowPosition minPos = p.getMinimumToken().minKeyBound();
Range<RowPosition> range = new Range<RowPosition>(minPos, minPos, p);
- IDiskAtomFilter filter = new NamesQueryFilter(ImmutableSortedSet.<ByteBuffer>of());
+ IDiskAtomFilter filter = new NamesQueryFilter(ImmutableSortedSet.<CellName>of());
List<Row> rows = hintStore.getRangeSlice(range, null, filter, Integer.MAX_VALUE, System.currentTimeMillis());
for (Row row : rows)
{
@@ -564,8 +564,7 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean
private List<Row> getHintsSlice(int columnCount)
{
// Get count # of columns...
- SliceQueryFilter predicate = new SliceQueryFilter(ByteBufferUtil.EMPTY_BYTE_BUFFER,
- ByteBufferUtil.EMPTY_BYTE_BUFFER,
+ SliceQueryFilter predicate = new SliceQueryFilter(ColumnSlice.ALL_COLUMNS_ARRAY,
false,
columnCount);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/362cc053/src/java/org/apache/cassandra/db/Memtable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Memtable.java b/src/java/org/apache/cassandra/db/Memtable.java
index 785f0c2..ffbdd37 100644
--- a/src/java/org/apache/cassandra/db/Memtable.java
+++ b/src/java/org/apache/cassandra/db/Memtable.java
@@ -32,6 +32,7 @@ import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
import org.apache.cassandra.concurrent.NamedThreadFactory;
import org.apache.cassandra.concurrent.StageManager;
import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.composites.CellNameType;
import org.apache.cassandra.db.commitlog.ReplayPosition;
import org.apache.cassandra.db.index.SecondaryIndexManager;
import org.apache.cassandra.db.marshal.AbstractType;
@@ -111,7 +112,7 @@ public class Memtable
// Record the comparator of the CFS at the creation of the memtable. This
// is only used when a user update the CF comparator, to know if the
// memtable was created with the new or old comparator.
- public final AbstractType initialComparator;
+ public final CellNameType initialComparator;
public Memtable(ColumnFamilyStore cfs)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/362cc053/src/java/org/apache/cassandra/db/OnDiskAtom.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/OnDiskAtom.java b/src/java/org/apache/cassandra/db/OnDiskAtom.java
index 2956d6b..bda46d5 100644
--- a/src/java/org/apache/cassandra/db/OnDiskAtom.java
+++ b/src/java/org/apache/cassandra/db/OnDiskAtom.java
@@ -18,18 +18,19 @@
package org.apache.cassandra.db;
import java.io.*;
-import java.nio.ByteBuffer;
import java.security.MessageDigest;
import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.composites.CellName;
+import org.apache.cassandra.db.composites.CellNameType;
+import org.apache.cassandra.db.composites.Composite;
import org.apache.cassandra.io.ISSTableSerializer;
import org.apache.cassandra.io.sstable.Descriptor;
import org.apache.cassandra.serializers.MarshalException;
-import org.apache.cassandra.utils.ByteBufferUtil;
public interface OnDiskAtom
{
- public ByteBuffer name();
+ public Composite name();
/**
* For a standard column, this is the same as timestamp().
@@ -39,28 +40,28 @@ public interface OnDiskAtom
public long maxTimestamp();
public int getLocalDeletionTime(); // for tombstone GC, so int is sufficient granularity
- public int serializedSize(TypeSizes typeSizes);
- public long serializedSizeForSSTable();
-
public void validateFields(CFMetaData metadata) throws MarshalException;
public void updateDigest(MessageDigest digest);
public static class Serializer implements ISSTableSerializer<OnDiskAtom>
{
- public static Serializer instance = new Serializer();
+ private final CellNameType type;
- private Serializer() {}
+ public Serializer(CellNameType type)
+ {
+ this.type = type;
+ }
public void serializeForSSTable(OnDiskAtom atom, DataOutput out) throws IOException
{
if (atom instanceof Column)
{
- Column.serializer.serialize((Column) atom, out);
+ type.columnSerializer().serialize((Column)atom, out);
}
else
{
assert atom instanceof RangeTombstone;
- RangeTombstone.serializer.serializeForSSTable((RangeTombstone)atom, out);
+ type.rangeTombstoneSerializer().serializeForSSTable((RangeTombstone)atom, out);
}
}
@@ -71,8 +72,8 @@ public interface OnDiskAtom
public OnDiskAtom deserializeFromSSTable(DataInput in, ColumnSerializer.Flag flag, int expireBefore, Descriptor.Version version) throws IOException
{
- ByteBuffer name = ByteBufferUtil.readWithShortLength(in);
- if (name.remaining() <= 0)
+ Composite name = type.serializer().deserialize(in);
+ if (name.isEmpty())
{
// SSTableWriter.END_OF_ROW
return null;
@@ -80,9 +81,22 @@ public interface OnDiskAtom
int b = in.readUnsignedByte();
if ((b & ColumnSerializer.RANGE_TOMBSTONE_MASK) != 0)
- return RangeTombstone.serializer.deserializeBody(in, name, version);
+ return type.rangeTombstoneSerializer().deserializeBody(in, name, version);
else
- return Column.serializer.deserializeColumnBody(in, name, b, flag, expireBefore);
+ return type.columnSerializer().deserializeColumnBody(in, (CellName)name, b, flag, expireBefore);
+ }
+
+ public long serializedSizeForSSTable(OnDiskAtom atom)
+ {
+ if (atom instanceof Column)
+ {
+ return type.columnSerializer().serializedSize((Column)atom, TypeSizes.NATIVE);
+ }
+ else
+ {
+ assert atom instanceof RangeTombstone;
+ return type.rangeTombstoneSerializer().serializedSizeForSSTable((RangeTombstone)atom);
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/362cc053/src/java/org/apache/cassandra/db/PagedRangeCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/PagedRangeCommand.java b/src/java/org/apache/cassandra/db/PagedRangeCommand.java
index 0e24859..1a009d9 100644
--- a/src/java/org/apache/cassandra/db/PagedRangeCommand.java
+++ b/src/java/org/apache/cassandra/db/PagedRangeCommand.java
@@ -20,10 +20,12 @@ package org.apache.cassandra.db;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
-import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.db.composites.Composite;
import org.apache.cassandra.db.filter.*;
import org.apache.cassandra.dht.AbstractBounds;
import org.apache.cassandra.io.IVersionedSerializer;
@@ -35,8 +37,8 @@ public class PagedRangeCommand extends AbstractRangeCommand
{
public static final IVersionedSerializer<PagedRangeCommand> serializer = new Serializer();
- public final ByteBuffer start;
- public final ByteBuffer stop;
+ public final Composite start;
+ public final Composite stop;
public final int limit;
public PagedRangeCommand(String keyspace,
@@ -44,8 +46,8 @@ public class PagedRangeCommand extends AbstractRangeCommand
long timestamp,
AbstractBounds<RowPosition> keyRange,
SliceQueryFilter predicate,
- ByteBuffer start,
- ByteBuffer stop,
+ Composite start,
+ Composite stop,
List<IndexExpression> rowFilter,
int limit)
{
@@ -57,13 +59,13 @@ public class PagedRangeCommand extends AbstractRangeCommand
public MessageOut<PagedRangeCommand> createMessage()
{
- return new MessageOut<PagedRangeCommand>(MessagingService.Verb.PAGED_RANGE, this, serializer);
+ return new MessageOut<>(MessagingService.Verb.PAGED_RANGE, this, serializer);
}
public AbstractRangeCommand forSubRange(AbstractBounds<RowPosition> subRange)
{
- ByteBuffer newStart = subRange.left.equals(keyRange.left) ? start : ((SliceQueryFilter)predicate).start();
- ByteBuffer newStop = subRange.right.equals(keyRange.right) ? stop : ((SliceQueryFilter)predicate).finish();
+ Composite newStart = subRange.left.equals(keyRange.left) ? start : ((SliceQueryFilter)predicate).start();
+ Composite newStop = subRange.right.equals(keyRange.right) ? stop : ((SliceQueryFilter)predicate).finish();
return new PagedRangeCommand(keyspace,
columnFamily,
timestamp,
@@ -125,13 +127,15 @@ public class PagedRangeCommand extends AbstractRangeCommand
AbstractBounds.serializer.serialize(cmd.keyRange, out, version);
+ CFMetaData metadata = Schema.instance.getCFMetaData(cmd.keyspace, cmd.columnFamily);
+
// SliceQueryFilter (the count is not used)
SliceQueryFilter filter = (SliceQueryFilter)cmd.predicate;
- SliceQueryFilter.serializer.serialize(filter, out, version);
+ metadata.comparator.sliceQueryFilterSerializer().serialize(filter, out, version);
// The start and stop of the page
- ByteBufferUtil.writeWithShortLength(cmd.start, out);
- ByteBufferUtil.writeWithShortLength(cmd.stop, out);
+ metadata.comparator.serializer().serialize(cmd.start, out);
+ metadata.comparator.serializer().serialize(cmd.stop, out);
out.writeInt(cmd.rowFilter.size());
for (IndexExpression expr : cmd.rowFilter)
@@ -152,10 +156,12 @@ public class PagedRangeCommand extends AbstractRangeCommand
AbstractBounds<RowPosition> keyRange = AbstractBounds.serializer.deserialize(in, version).toRowBounds();
- SliceQueryFilter predicate = SliceQueryFilter.serializer.deserialize(in, version);
+ CFMetaData metadata = Schema.instance.getCFMetaData(keyspace, columnFamily);
+
+ SliceQueryFilter predicate = metadata.comparator.sliceQueryFilterSerializer().deserialize(in, version);
- ByteBuffer start = ByteBufferUtil.readWithShortLength(in);
- ByteBuffer stop = ByteBufferUtil.readWithShortLength(in);
+ Composite start = metadata.comparator.serializer().deserialize(in);
+ Composite stop = metadata.comparator.serializer().deserialize(in);
int filterCount = in.readInt();
List<IndexExpression> rowFilter = new ArrayList<IndexExpression>(filterCount);
@@ -181,10 +187,12 @@ public class PagedRangeCommand extends AbstractRangeCommand
size += AbstractBounds.serializer.serializedSize(cmd.keyRange, version);
- size += SliceQueryFilter.serializer.serializedSize((SliceQueryFilter)cmd.predicate, version);
+ CFMetaData metadata = Schema.instance.getCFMetaData(cmd.keyspace, cmd.columnFamily);
+
+ size += metadata.comparator.sliceQueryFilterSerializer().serializedSize((SliceQueryFilter)cmd.predicate, version);
- size += TypeSizes.NATIVE.sizeofWithShortLength(cmd.start);
- size += TypeSizes.NATIVE.sizeofWithShortLength(cmd.stop);
+ size += metadata.comparator.serializer().serializedSize(cmd.start, TypeSizes.NATIVE);
+ size += metadata.comparator.serializer().serializedSize(cmd.stop, TypeSizes.NATIVE);
size += TypeSizes.NATIVE.sizeof(cmd.rowFilter.size());
for (IndexExpression expr : cmd.rowFilter)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/362cc053/src/java/org/apache/cassandra/db/RangeSliceCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/RangeSliceCommand.java b/src/java/org/apache/cassandra/db/RangeSliceCommand.java
index a9a48a0..b8f67ba 100644
--- a/src/java/org/apache/cassandra/db/RangeSliceCommand.java
+++ b/src/java/org/apache/cassandra/db/RangeSliceCommand.java
@@ -157,7 +157,9 @@ class RangeSliceCommandSerializer implements IVersionedSerializer<RangeSliceComm
out.writeUTF(sliceCommand.columnFamily);
out.writeLong(sliceCommand.timestamp);
- IDiskAtomFilter.Serializer.instance.serialize(sliceCommand.predicate, out, version);
+ CFMetaData metadata = Schema.instance.getCFMetaData(sliceCommand.keyspace, sliceCommand.columnFamily);
+
+ metadata.comparator.diskAtomFilterSerializer().serialize(sliceCommand.predicate, out, version);
if (sliceCommand.rowFilter == null)
{
@@ -187,7 +189,7 @@ class RangeSliceCommandSerializer implements IVersionedSerializer<RangeSliceComm
CFMetaData metadata = Schema.instance.getCFMetaData(keyspace, columnFamily);
- IDiskAtomFilter predicate = IDiskAtomFilter.Serializer.instance.deserialize(in, version, metadata.comparator);
+ IDiskAtomFilter predicate = metadata.comparator.diskAtomFilterSerializer().deserialize(in, version);
List<IndexExpression> rowFilter;
int filterCount = in.readInt();
@@ -214,9 +216,11 @@ class RangeSliceCommandSerializer implements IVersionedSerializer<RangeSliceComm
size += TypeSizes.NATIVE.sizeof(rsc.columnFamily);
size += TypeSizes.NATIVE.sizeof(rsc.timestamp);
+ CFMetaData metadata = Schema.instance.getCFMetaData(rsc.keyspace, rsc.columnFamily);
+
IDiskAtomFilter filter = rsc.predicate;
- size += IDiskAtomFilter.Serializer.instance.serializedSize(filter, version);
+ size += metadata.comparator.diskAtomFilterSerializer().serializedSize(filter, version);
if (rsc.rowFilter == null)
{