You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2013/01/02 13:41:16 UTC
[7/9] Replace supercolumns internally by composites
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a005df3/src/java/org/apache/cassandra/db/HintedHandOffManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/HintedHandOffManager.java b/src/java/org/apache/cassandra/db/HintedHandOffManager.java
index 476af88..3d726a0 100644
--- a/src/java/org/apache/cassandra/db/HintedHandOffManager.java
+++ b/src/java/org/apache/cassandra/db/HintedHandOffManager.java
@@ -129,7 +129,7 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean
private static void deleteHint(ByteBuffer tokenBytes, ByteBuffer columnName, long timestamp) throws IOException
{
RowMutation rm = new RowMutation(Table.SYSTEM_KS, tokenBytes);
- rm.delete(new QueryPath(SystemTable.HINTS_CF, null, columnName), timestamp);
+ rm.delete(SystemTable.HINTS_CF, columnName, timestamp);
rm.applyUnsafe(); // don't bother with commitlog since we're going to flush as soon as we're done with delivery
}
@@ -155,7 +155,7 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean
UUID hostId = StorageService.instance.getTokenMetadata().getHostId(endpoint);
ByteBuffer hostIdBytes = ByteBuffer.wrap(UUIDGen.decompose(hostId));
final RowMutation rm = new RowMutation(Table.SYSTEM_KS, hostIdBytes);
- rm.delete(new QueryPath(SystemTable.HINTS_CF), System.currentTimeMillis());
+ rm.delete(SystemTable.HINTS_CF, System.currentTimeMillis());
// execute asynchronously to avoid blocking caller (which may be processing gossip)
Runnable runnable = new Runnable()
@@ -303,7 +303,7 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean
while (true)
{
- QueryFilter filter = QueryFilter.getSliceFilter(epkey, new QueryPath(SystemTable.HINTS_CF), startColumn, ByteBufferUtil.EMPTY_BYTE_BUFFER, false, pageSize);
+ QueryFilter filter = QueryFilter.getSliceFilter(epkey, SystemTable.HINTS_CF, startColumn, ByteBufferUtil.EMPTY_BYTE_BUFFER, false, pageSize);
ColumnFamily hintsPage = ColumnFamilyStore.removeDeleted(hintStore.getColumnFamily(filter), (int)(System.currentTimeMillis() / 1000));
if (pagingFinished(hintsPage, startColumn))
{
@@ -322,7 +322,7 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean
}
- for (final IColumn hint : hintsPage.getSortedColumns())
+ for (final Column hint : hintsPage.getSortedColumns())
{
// Skip tombstones:
// if we iterate quickly enough, it's possible that we could request a new page in the same millisecond
@@ -400,7 +400,7 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean
RowPosition minPos = p.getMinimumToken().minKeyBound();
Range<RowPosition> range = new Range<RowPosition>(minPos, minPos, p);
IDiskAtomFilter filter = new NamesQueryFilter(ImmutableSortedSet.<ByteBuffer>of());
- List<Row> rows = hintStore.getRangeSlice(null, range, Integer.MAX_VALUE, filter, null);
+ List<Row> rows = hintStore.getRangeSlice(range, Integer.MAX_VALUE, filter, null);
for (Row row : rows)
{
UUID hostId = UUIDGen.getUUID(row.key.key);
@@ -473,9 +473,6 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean
private List<Row> getHintsSlice(int columnCount)
{
- // ColumnParent for HintsCF...
- ColumnParent parent = new ColumnParent(SystemTable.HINTS_CF);
-
// Get count # of columns...
SliceQueryFilter predicate = new SliceQueryFilter(ByteBufferUtil.EMPTY_BYTE_BUFFER,
ByteBufferUtil.EMPTY_BYTE_BUFFER,
@@ -491,7 +488,7 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean
List<Row> rows;
try
{
- rows = StorageProxy.getRangeSlice(new RangeSliceCommand(Table.SYSTEM_KS, parent, predicate, range, null, LARGE_NUMBER), ConsistencyLevel.ONE);
+ rows = StorageProxy.getRangeSlice(new RangeSliceCommand(Table.SYSTEM_KS, SystemTable.HINTS_CF, predicate, range, null, LARGE_NUMBER), ConsistencyLevel.ONE);
}
catch (Exception e)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a005df3/src/java/org/apache/cassandra/db/IColumn.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/IColumn.java b/src/java/org/apache/cassandra/db/IColumn.java
deleted file mode 100644
index 0a7fe7a..0000000
--- a/src/java/org/apache/cassandra/db/IColumn.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db;
-
-import java.nio.ByteBuffer;
-import java.util.Collection;
-
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.db.marshal.AbstractType;
-import org.apache.cassandra.db.marshal.MarshalException;
-import org.apache.cassandra.utils.Allocator;
-import org.apache.cassandra.utils.FBUtilities;
-
-/** TODO: rename */
-public interface IColumn extends OnDiskAtom
-{
- public static final int MAX_NAME_LENGTH = FBUtilities.MAX_UNSIGNED_SHORT;
-
- /**
- * @return true if the column has been deleted (is a tombstone). This depends on comparing the server clock
- * with getLocalDeletionTime, so it can change during a single request if you're not careful.
- */
- public boolean isMarkedForDelete();
-
- public long getMarkedForDeleteAt();
- public long mostRecentLiveChangeAt();
- public long mostRecentNonGCableChangeAt(int gcbefore);
- /** the size of user-provided data, not including internal overhead */
- public int dataSize();
- public int serializationFlags();
- public long timestamp();
- public ByteBuffer value();
- public Collection<IColumn> getSubColumns();
- public IColumn getSubColumn(ByteBuffer columnName);
- public void addColumn(IColumn column);
- public void addColumn(IColumn column, Allocator allocator);
- public IColumn diff(IColumn column);
- public IColumn reconcile(IColumn column);
- public IColumn reconcile(IColumn column, Allocator allocator);
- public String getString(AbstractType<?> comparator);
- public void validateFields(CFMetaData metadata) throws MarshalException;
-
- /** clones the column for the row cache, interning column names and making copies of other underlying byte buffers */
- IColumn localCopy(ColumnFamilyStore cfs);
-
- /**
- * clones the column for the memtable, interning column names and making copies of other underlying byte buffers.
- * Unlike the other localCopy, this uses Allocator to allocate values in contiguous memory regions,
- * which helps avoid heap fragmentation.
- */
- IColumn localCopy(ColumnFamilyStore cfs, Allocator allocator);
-
- /**
- * For a simple column, live == !isMarkedForDelete.
- * For a supercolumn, live means it has at least one subcolumn whose timestamp is greater than the
- * supercolumn deleted-at time.
- */
- boolean isLive();
-
- /**
- * For a standard column, this is the same as timestamp().
- * For a super column, this is the max column timestamp of the sub columns.
- */
- public long maxTimestamp();
-
- /**
- * @return true if the column or any its subcolumns is no longer relevant after @param gcBefore
- */
- public boolean hasIrrelevantData(int gcBefore);
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a005df3/src/java/org/apache/cassandra/db/IColumnContainer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/IColumnContainer.java b/src/java/org/apache/cassandra/db/IColumnContainer.java
deleted file mode 100644
index a3bd210..0000000
--- a/src/java/org/apache/cassandra/db/IColumnContainer.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db;
-
-import java.nio.ByteBuffer;
-import java.util.Collection;
-
-import org.apache.cassandra.db.marshal.AbstractType;
-import org.apache.cassandra.utils.Allocator;
-
-public interface IColumnContainer
-{
- public void addColumn(IColumn column);
- public void addColumn(IColumn column, Allocator allocator);
- public void remove(ByteBuffer columnName);
-
- /**
- * Replace oldColumn if represent by newColumn.
- * Returns true if oldColumn was present (and thus replaced)
- * oldColumn and newColumn should have the same name.
- * !NOTE! This should only be used if you know this is what you need. To
- * add a column such that it use the usual column resolution rules in a
- * thread safe manner, use addColumn.
- */
- public boolean replace(IColumn oldColumn, IColumn newColumn);
-
- public boolean isMarkedForDelete();
- public DeletionInfo deletionInfo();
- public boolean hasIrrelevantData(int gcBefore);
-
- public AbstractType<?> getComparator();
-
- public Collection<IColumn> getSortedColumns();
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a005df3/src/java/org/apache/cassandra/db/ISortedColumns.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ISortedColumns.java b/src/java/org/apache/cassandra/db/ISortedColumns.java
index 5ccba19..df288aa 100644
--- a/src/java/org/apache/cassandra/db/ISortedColumns.java
+++ b/src/java/org/apache/cassandra/db/ISortedColumns.java
@@ -62,7 +62,7 @@ public interface ISortedColumns extends IIterableColumns
* If a column with the same name is already present in the map, it will
* be replaced by the newly added column.
*/
- public void addColumn(IColumn column, Allocator allocator);
+ public void addColumn(Column column, Allocator allocator);
/**
* Adds all the columns of a given column map to this column map.
@@ -75,19 +75,19 @@ public interface ISortedColumns extends IIterableColumns
*
* @return the difference in size seen after merging the given columns
*/
- public long addAllWithSizeDelta(ISortedColumns cm, Allocator allocator, Function<IColumn, IColumn> transformation, SecondaryIndexManager.Updater indexer);
+ public long addAllWithSizeDelta(ISortedColumns cm, Allocator allocator, Function<Column, Column> transformation, SecondaryIndexManager.Updater indexer);
/**
* Adds the columns without necessarily computing the size delta
*/
- public void addAll(ISortedColumns cm, Allocator allocator, Function<IColumn, IColumn> transformation);
+ public void addAll(ISortedColumns cm, Allocator allocator, Function<Column, Column> transformation);
/**
* Replace oldColumn if present by newColumn.
* Returns true if oldColumn was present and thus replaced.
* oldColumn and newColumn should have the same name.
*/
- public boolean replace(IColumn oldColumn, IColumn newColumn);
+ public boolean replace(Column oldColumn, Column newColumn);
/**
* Remove if present a column by name.
@@ -103,7 +103,7 @@ public interface ISortedColumns extends IIterableColumns
* Get a column given its name, returning null if the column is not
* present.
*/
- public IColumn getColumn(ByteBuffer name);
+ public Column getColumn(ByteBuffer name);
/**
* Returns a set with the names of columns in this column map.
@@ -117,14 +117,14 @@ public interface ISortedColumns extends IIterableColumns
* The columns in the returned collection should be sorted as the columns
* in this map.
*/
- public Collection<IColumn> getSortedColumns();
+ public Collection<Column> getSortedColumns();
/**
* Returns the columns of this column map as a collection.
* The columns in the returned collection should be sorted in reverse
* order of the columns in this map.
*/
- public Collection<IColumn> getReverseSortedColumns();
+ public Collection<Column> getReverseSortedColumns();
/**
* Returns the number of columns in this map.
@@ -140,13 +140,13 @@ public interface ISortedColumns extends IIterableColumns
* Returns an iterator over the columns of this map that returns only the matching @param slices.
* The provided slices must be in order and must be non-overlapping.
*/
- public Iterator<IColumn> iterator(ColumnSlice[] slices);
+ public Iterator<Column> iterator(ColumnSlice[] slices);
/**
* Returns a reversed iterator over the columns of this map that returns only the matching @param slices.
* The provided slices must be in reversed order and must be non-overlapping.
*/
- public Iterator<IColumn> reverseIterator(ColumnSlice[] slices);
+ public Iterator<Column> reverseIterator(ColumnSlice[] slices);
/**
* Returns if this map only support inserts in reverse order.
@@ -171,6 +171,6 @@ public interface ISortedColumns extends IIterableColumns
* columns in the provided sorted map.
* See {@code create} for the description of {@code insertReversed}
*/
- public ISortedColumns fromSorted(SortedMap<ByteBuffer, IColumn> sm, boolean insertReversed);
+ public ISortedColumns fromSorted(SortedMap<ByteBuffer, Column> sm, boolean insertReversed);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a005df3/src/java/org/apache/cassandra/db/Memtable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Memtable.java b/src/java/org/apache/cassandra/db/Memtable.java
index 6b708d5..32e6852 100644
--- a/src/java/org/apache/cassandra/db/Memtable.java
+++ b/src/java/org/apache/cassandra/db/Memtable.java
@@ -93,9 +93,9 @@ public class Memtable
private final SlabAllocator allocator = new SlabAllocator();
// We really only need one column by allocator but one by memtable is not a big waste and avoids needing allocators to know about CFS
- private final Function<IColumn, IColumn> localCopyFunction = new Function<IColumn, IColumn>()
+ private final Function<Column, Column> localCopyFunction = new Function<Column, Column>()
{
- public IColumn apply(IColumn c)
+ public Column apply(Column c)
{
return c.localCopy(cfs, allocator);
};
@@ -226,7 +226,7 @@ public class Memtable
if (previous == null)
{
// AtomicSortedColumns doesn't work for super columns (see #3821)
- ColumnFamily empty = cf.cloneMeShallow(cf.isSuper() ? ThreadSafeSortedColumns.factory() : AtomicSortedColumns.factory(), false);
+ ColumnFamily empty = cf.cloneMeShallow(AtomicSortedColumns.factory(), false);
// We'll add the columns later. This avoids wasting works if we get beaten in the putIfAbsent
previous = columnFamilies.putIfAbsent(new DecoratedKey(key.token, allocator.clone(key.key)), empty);
if (previous == null)
@@ -316,7 +316,7 @@ public class Memtable
public static OnDiskAtomIterator getSliceIterator(final DecoratedKey key, final ColumnFamily cf, SliceQueryFilter filter)
{
assert cf != null;
- final Iterator<IColumn> filteredIter = filter.reversed ? cf.reverseIterator(filter.slices) : cf.iterator(filter.slices);
+ final Iterator<Column> filteredIter = filter.reversed ? cf.reverseIterator(filter.slices) : cf.iterator(filter.slices);
return new AbstractColumnIterator()
{
@@ -345,7 +345,6 @@ public class Memtable
public static OnDiskAtomIterator getNamesIterator(final DecoratedKey key, final ColumnFamily cf, final NamesQueryFilter filter)
{
assert cf != null;
- final boolean isStandard = !cf.isSuper();
return new SimpleAbstractColumnIterator()
{
@@ -366,10 +365,9 @@ public class Memtable
while (iter.hasNext())
{
ByteBuffer current = iter.next();
- IColumn column = cf.getColumn(current);
+ Column column = cf.getColumn(current);
if (column != null)
- // clone supercolumns so caller can freely removeDeleted or otherwise mutate it
- return isStandard ? column : ((SuperColumn)column).cloneMe();
+ return column;
}
return endOfData();
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a005df3/src/java/org/apache/cassandra/db/OnDiskAtom.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/OnDiskAtom.java b/src/java/org/apache/cassandra/db/OnDiskAtom.java
index 7501d83..cc997ca 100644
--- a/src/java/org/apache/cassandra/db/OnDiskAtom.java
+++ b/src/java/org/apache/cassandra/db/OnDiskAtom.java
@@ -23,7 +23,6 @@ import java.security.MessageDigest;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.db.marshal.MarshalException;
-import org.apache.cassandra.io.IColumnSerializer;
import org.apache.cassandra.io.ISSTableSerializer;
import org.apache.cassandra.io.sstable.Descriptor;
import org.apache.cassandra.utils.ByteBufferUtil;
@@ -48,18 +47,15 @@ public interface OnDiskAtom
public static class Serializer implements ISSTableSerializer<OnDiskAtom>
{
- private final IColumnSerializer columnSerializer;
+ public static Serializer instance = new Serializer();
- public Serializer(IColumnSerializer columnSerializer)
- {
- this.columnSerializer = columnSerializer;
- }
+ private Serializer() {}
public void serializeForSSTable(OnDiskAtom atom, DataOutput dos) throws IOException
{
- if (atom instanceof IColumn)
+ if (atom instanceof Column)
{
- columnSerializer.serialize((IColumn)atom, dos);
+ Column.serializer().serialize((Column)atom, dos);
}
else
{
@@ -70,27 +66,20 @@ public interface OnDiskAtom
public OnDiskAtom deserializeFromSSTable(DataInput dis, Descriptor.Version version) throws IOException
{
- return deserializeFromSSTable(dis, IColumnSerializer.Flag.LOCAL, (int)(System.currentTimeMillis() / 1000), version);
+ return deserializeFromSSTable(dis, ColumnSerializer.Flag.LOCAL, (int)(System.currentTimeMillis() / 1000), version);
}
- public OnDiskAtom deserializeFromSSTable(DataInput dis, IColumnSerializer.Flag flag, int expireBefore, Descriptor.Version version) throws IOException
+ public OnDiskAtom deserializeFromSSTable(DataInput dis, ColumnSerializer.Flag flag, int expireBefore, Descriptor.Version version) throws IOException
{
- if (columnSerializer instanceof SuperColumnSerializer)
- {
- return columnSerializer.deserialize(dis, flag, expireBefore);
- }
- else
- {
- ByteBuffer name = ByteBufferUtil.readWithShortLength(dis);
- if (name.remaining() <= 0)
- throw ColumnSerializer.CorruptColumnException.create(dis, name);
+ ByteBuffer name = ByteBufferUtil.readWithShortLength(dis);
+ if (name.remaining() <= 0)
+ throw ColumnSerializer.CorruptColumnException.create(dis, name);
- int b = dis.readUnsignedByte();
- if ((b & ColumnSerializer.RANGE_TOMBSTONE_MASK) != 0)
- return RangeTombstone.serializer.deserializeBody(dis, name, version);
- else
- return ((ColumnSerializer)columnSerializer).deserializeColumnBody(dis, name, b, flag, expireBefore);
- }
+ int b = dis.readUnsignedByte();
+ if ((b & ColumnSerializer.RANGE_TOMBSTONE_MASK) != 0)
+ return RangeTombstone.serializer.deserializeBody(dis, name, version);
+ else
+ return Column.serializer().deserializeColumnBody(dis, name, b, flag, expireBefore);
}
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a005df3/src/java/org/apache/cassandra/db/RangeSliceCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/RangeSliceCommand.java b/src/java/org/apache/cassandra/db/RangeSliceCommand.java
index 1748abd..e365b00 100644
--- a/src/java/org/apache/cassandra/db/RangeSliceCommand.java
+++ b/src/java/org/apache/cassandra/db/RangeSliceCommand.java
@@ -49,13 +49,13 @@ import org.apache.cassandra.db.filter.IDiskAtomFilter;
import org.apache.cassandra.db.filter.NamesQueryFilter;
import org.apache.cassandra.db.filter.SliceQueryFilter;
import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.marshal.CompositeType;
import org.apache.cassandra.dht.AbstractBounds;
import org.apache.cassandra.exceptions.*;
import org.apache.cassandra.io.IVersionedSerializer;
import org.apache.cassandra.net.MessageOut;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.service.IReadCommand;
-import org.apache.cassandra.thrift.ColumnParent;
import org.apache.cassandra.thrift.IndexClause;
import org.apache.cassandra.thrift.IndexExpression;
import org.apache.cassandra.thrift.IndexOperator;
@@ -76,7 +76,6 @@ public class RangeSliceCommand implements IReadCommand
public final String keyspace;
public final String column_family;
- public final ByteBuffer super_column;
public final IDiskAtomFilter predicate;
public final List<IndexExpression> row_filter;
@@ -86,26 +85,20 @@ public class RangeSliceCommand implements IReadCommand
public final boolean countCQL3Rows;
public final boolean isPaging;
- public RangeSliceCommand(String keyspace, String column_family, ByteBuffer super_column, IDiskAtomFilter predicate, AbstractBounds<RowPosition> range, int maxResults)
+ public RangeSliceCommand(String keyspace, String column_family, IDiskAtomFilter predicate, AbstractBounds<RowPosition> range, int maxResults)
{
- this(keyspace, column_family, super_column, predicate, range, null, maxResults, false, false);
+ this(keyspace, column_family, predicate, range, null, maxResults, false, false);
}
- public RangeSliceCommand(String keyspace, ColumnParent column_parent, IDiskAtomFilter predicate, AbstractBounds<RowPosition> range, List<IndexExpression> row_filter, int maxResults)
+ public RangeSliceCommand(String keyspace, String column_family, IDiskAtomFilter predicate, AbstractBounds<RowPosition> range, List<IndexExpression> row_filter, int maxResults)
{
- this(keyspace, column_parent.getColumn_family(), column_parent.super_column, predicate, range, row_filter, maxResults, false, false);
+ this(keyspace, column_family, predicate, range, row_filter, maxResults, false, false);
}
- public RangeSliceCommand(String keyspace, String column_family, ByteBuffer super_column, IDiskAtomFilter predicate, AbstractBounds<RowPosition> range, List<IndexExpression> row_filter, int maxResults)
- {
- this(keyspace, column_family, super_column, predicate, range, row_filter, maxResults, false, false);
- }
-
- public RangeSliceCommand(String keyspace, String column_family, ByteBuffer super_column, IDiskAtomFilter predicate, AbstractBounds<RowPosition> range, List<IndexExpression> row_filter, int maxResults, boolean countCQL3Rows, boolean isPaging)
+ public RangeSliceCommand(String keyspace, String column_family, IDiskAtomFilter predicate, AbstractBounds<RowPosition> range, List<IndexExpression> row_filter, int maxResults, boolean countCQL3Rows, boolean isPaging)
{
this.keyspace = keyspace;
this.column_family = column_family;
- this.super_column = super_column;
this.predicate = predicate;
this.range = range;
this.row_filter = row_filter;
@@ -125,7 +118,6 @@ public class RangeSliceCommand implements IReadCommand
return "RangeSliceCommand{" +
"keyspace='" + keyspace + '\'' +
", column_family='" + column_family + '\'' +
- ", super_column=" + super_column +
", predicate=" + predicate +
", range=" + range +
", row_filter =" + row_filter +
@@ -198,10 +190,26 @@ class RangeSliceCommandSerializer implements IVersionedSerializer<RangeSliceComm
{
dos.writeUTF(sliceCommand.keyspace);
dos.writeUTF(sliceCommand.column_family);
- ByteBuffer sc = sliceCommand.super_column;
- dos.writeInt(sc == null ? 0 : sc.remaining());
- if (sc != null)
- ByteBufferUtil.write(sc, dos);
+
+ IDiskAtomFilter filter = sliceCommand.predicate;
+ if (version < MessagingService.VERSION_20)
+ {
+ // Pre-2.0, we need to know if it's a super column. If it is, we
+ // must extract the super column name from the predicate (and
+ // modify the predicate accordingly)
+ ByteBuffer sc = null;
+ CFMetaData metadata = Schema.instance.getCFMetaData(sliceCommand.getKeyspace(), sliceCommand.column_family);
+ if (metadata.cfType == ColumnFamilyType.Super)
+ {
+ SuperColumns.SCFilter scFilter = SuperColumns.filterToSC((CompositeType)metadata.comparator, filter);
+ sc = scFilter.scName;
+ filter = scFilter.updatedFilter;
+ }
+
+ dos.writeInt(sc == null ? 0 : sc.remaining());
+ if (sc != null)
+ ByteBufferUtil.write(sc, dos);
+ }
if (version < MessagingService.VERSION_12)
{
@@ -250,26 +258,48 @@ class RangeSliceCommandSerializer implements IVersionedSerializer<RangeSliceComm
String keyspace = dis.readUTF();
String columnFamily = dis.readUTF();
- int scLength = dis.readInt();
- ByteBuffer superColumn = null;
- if (scLength > 0)
- {
- byte[] buf = new byte[scLength];
- dis.readFully(buf);
- superColumn = ByteBuffer.wrap(buf);
- }
+ CFMetaData metadata = Schema.instance.getCFMetaData(keyspace, columnFamily);
IDiskAtomFilter predicate;
- AbstractType<?> comparator = ColumnFamily.getComparatorFor(keyspace, columnFamily, superColumn);
- if (version < MessagingService.VERSION_12)
+ if (version < MessagingService.VERSION_20)
{
- SlicePredicate pred = new SlicePredicate();
- FBUtilities.deserialize(new TDeserializer(new TBinaryProtocol.Factory()), pred, dis);
- predicate = ThriftValidation.asIFilter(pred, comparator);
+ int scLength = dis.readInt();
+ ByteBuffer superColumn = null;
+ if (scLength > 0)
+ {
+ byte[] buf = new byte[scLength];
+ dis.readFully(buf);
+ superColumn = ByteBuffer.wrap(buf);
+ }
+
+ AbstractType<?> comparator;
+ if (metadata.cfType == ColumnFamilyType.Super)
+ {
+ CompositeType type = (CompositeType)metadata.comparator;
+ comparator = superColumn == null ? type.types.get(0) : type.types.get(1);
+ }
+ else
+ {
+ comparator = metadata.comparator;
+ }
+
+ if (version < MessagingService.VERSION_12)
+ {
+ SlicePredicate pred = new SlicePredicate();
+ FBUtilities.deserialize(new TDeserializer(new TBinaryProtocol.Factory()), pred, dis);
+ predicate = ThriftValidation.asIFilter(pred, metadata, superColumn);
+ }
+ else
+ {
+ predicate = IDiskAtomFilter.Serializer.instance.deserialize(dis, version, comparator);
+ }
+
+ if (metadata.cfType == ColumnFamilyType.Super)
+ predicate = SuperColumns.fromSCFilter((CompositeType)metadata.comparator, superColumn, predicate);
}
else
{
- predicate = IDiskAtomFilter.Serializer.instance.deserialize(dis, version, comparator);
+ predicate = IDiskAtomFilter.Serializer.instance.deserialize(dis, version, metadata.comparator);
}
List<IndexExpression> rowFilter = null;
@@ -304,7 +334,7 @@ class RangeSliceCommandSerializer implements IVersionedSerializer<RangeSliceComm
countCQL3Rows = dis.readBoolean();
isPaging = dis.readBoolean();
}
- return new RangeSliceCommand(keyspace, columnFamily, superColumn, predicate, range, rowFilter, maxResults, countCQL3Rows, isPaging);
+ return new RangeSliceCommand(keyspace, columnFamily, predicate, range, rowFilter, maxResults, countCQL3Rows, isPaging);
}
public long serializedSize(RangeSliceCommand rsc, int version)
@@ -312,15 +342,27 @@ class RangeSliceCommandSerializer implements IVersionedSerializer<RangeSliceComm
long size = TypeSizes.NATIVE.sizeof(rsc.keyspace);
size += TypeSizes.NATIVE.sizeof(rsc.column_family);
- ByteBuffer sc = rsc.super_column;
- if (sc != null)
- {
- size += TypeSizes.NATIVE.sizeof(sc.remaining());
- size += sc.remaining();
- }
- else
+ IDiskAtomFilter filter = rsc.predicate;
+ if (version < MessagingService.VERSION_20)
{
- size += TypeSizes.NATIVE.sizeof(0);
+ ByteBuffer sc = null;
+ CFMetaData metadata = Schema.instance.getCFMetaData(rsc.keyspace, rsc.column_family);
+ if (metadata.cfType == ColumnFamilyType.Super)
+ {
+ SuperColumns.SCFilter scFilter = SuperColumns.filterToSC((CompositeType)metadata.comparator, filter);
+ sc = scFilter.scName;
+ filter = scFilter.updatedFilter;
+ }
+
+ if (sc != null)
+ {
+ size += TypeSizes.NATIVE.sizeof(sc.remaining());
+ size += sc.remaining();
+ }
+ else
+ {
+ size += TypeSizes.NATIVE.sizeof(0);
+ }
}
if (version < MessagingService.VERSION_12)
@@ -328,7 +370,7 @@ class RangeSliceCommandSerializer implements IVersionedSerializer<RangeSliceComm
TSerializer ser = new TSerializer(new TBinaryProtocol.Factory());
try
{
- int predicateLength = ser.serialize(asSlicePredicate(rsc.predicate)).length;
+ int predicateLength = ser.serialize(asSlicePredicate(filter)).length;
if (version < MessagingService.VERSION_12)
size += TypeSizes.NATIVE.sizeof(predicateLength);
size += predicateLength;
@@ -340,7 +382,7 @@ class RangeSliceCommandSerializer implements IVersionedSerializer<RangeSliceComm
}
else
{
- size += IDiskAtomFilter.Serializer.instance.serializedSize(rsc.predicate, version);
+ size += IDiskAtomFilter.Serializer.instance.serializedSize(filter, version);
}
if (version >= MessagingService.VERSION_11)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a005df3/src/java/org/apache/cassandra/db/RangeTombstone.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/RangeTombstone.java b/src/java/org/apache/cassandra/db/RangeTombstone.java
index 1d472c3..bcddfea 100644
--- a/src/java/org/apache/cassandra/db/RangeTombstone.java
+++ b/src/java/org/apache/cassandra/db/RangeTombstone.java
@@ -83,9 +83,8 @@ public class RangeTombstone extends Interval<ByteBuffer, DeletionTime> implement
public void validateFields(CFMetaData metadata) throws MarshalException
{
- AbstractType<?> nameValidator = metadata.cfType == ColumnFamilyType.Super ? metadata.subcolumnComparator : metadata.comparator;
- nameValidator.validate(min);
- nameValidator.validate(max);
+ metadata.comparator.validate(min);
+ metadata.comparator.validate(max);
}
public void updateDigest(MessageDigest digest)
@@ -192,7 +191,7 @@ public class RangeTombstone extends Interval<ByteBuffer, DeletionTime> implement
/**
* Update this tracker given an {@code atom}.
- * If column is a IColumn, check if any tracked range is useless and
+ * If column is a Column, check if any tracked range is useless and
* can be removed. If it is a RangeTombstone, add it to this tracker.
*/
public void update(OnDiskAtom atom)
@@ -219,7 +218,7 @@ public class RangeTombstone extends Interval<ByteBuffer, DeletionTime> implement
}
else
{
- assert atom instanceof IColumn;
+ assert atom instanceof Column;
Iterator<RangeTombstone> iter = maxOrderingSet.iterator();
while (iter.hasNext())
{
@@ -240,7 +239,7 @@ public class RangeTombstone extends Interval<ByteBuffer, DeletionTime> implement
}
}
- public boolean isDeleted(IColumn column)
+ public boolean isDeleted(Column column)
{
for (RangeTombstone tombstone : ranges)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a005df3/src/java/org/apache/cassandra/db/ReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ReadCommand.java b/src/java/org/apache/cassandra/db/ReadCommand.java
index f3494e5..8b68d5b 100644
--- a/src/java/org/apache/cassandra/db/ReadCommand.java
+++ b/src/java/org/apache/cassandra/db/ReadCommand.java
@@ -24,10 +24,14 @@ import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
+import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.Schema;
import org.apache.cassandra.db.filter.IDiskAtomFilter;
-import org.apache.cassandra.db.filter.QueryPath;
+import org.apache.cassandra.db.filter.NamesQueryFilter;
+import org.apache.cassandra.db.filter.SliceQueryFilter;
import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.marshal.CompositeType;
import org.apache.cassandra.io.IVersionedSerializer;
import org.apache.cassandra.net.MessageOut;
import org.apache.cassandra.net.MessagingService;
@@ -38,8 +42,22 @@ import org.apache.cassandra.utils.IFilter;
public abstract class ReadCommand implements IReadCommand
{
- public static final byte CMD_TYPE_GET_SLICE_BY_NAMES = 1;
- public static final byte CMD_TYPE_GET_SLICE = 2;
+ public enum Type {
+ GET_BY_NAMES((byte)1),
+ GET_SLICES((byte)2);
+
+ public final byte serializedValue;
+
+ private Type(byte b)
+ {
+ this.serializedValue = b;
+ }
+
+ public static Type fromSerializedValue(byte b)
+ {
+ return b == 1 ? GET_BY_NAMES : GET_SLICES;
+ }
+ }
public static final ReadCommandSerializer serializer = new ReadCommandSerializer();
@@ -48,20 +66,28 @@ public abstract class ReadCommand implements IReadCommand
return new MessageOut<ReadCommand>(MessagingService.Verb.READ, this, serializer);
}
- public final QueryPath queryPath;
public final String table;
+ public final String cfName;
public final ByteBuffer key;
private boolean isDigestQuery = false;
- protected final byte commandType;
+ protected final Type commandType;
- protected ReadCommand(String table, ByteBuffer key, QueryPath queryPath, byte cmdType)
+ protected ReadCommand(String table, ByteBuffer key, String cfName, Type cmdType)
{
this.table = table;
this.key = key;
- this.queryPath = queryPath;
+ this.cfName = cfName;
this.commandType = cmdType;
}
+ public static ReadCommand create(String table, ByteBuffer key, String cfName, IDiskAtomFilter filter)
+ {
+ if (filter instanceof SliceQueryFilter)
+ return new SliceFromReadCommand(table, key, cfName, (SliceQueryFilter)filter);
+ else
+ return new SliceByNamesReadCommand(table, key, cfName, (NamesQueryFilter)filter);
+ }
+
public boolean isDigestQuery()
{
return isDigestQuery;
@@ -74,7 +100,7 @@ public abstract class ReadCommand implements IReadCommand
public String getColumnFamilyName()
{
- return queryPath.columnFamilyName;
+ return cfName;
}
public abstract ReadCommand copy();
@@ -83,11 +109,6 @@ public abstract class ReadCommand implements IReadCommand
public abstract IDiskAtomFilter filter();
- protected AbstractType<?> getComparator()
- {
- return ColumnFamily.getComparatorFor(table, getColumnFamilyName(), queryPath.superColumnName);
- }
-
public String getKeyspace()
{
return table;
@@ -113,28 +134,77 @@ public abstract class ReadCommand implements IReadCommand
class ReadCommandSerializer implements IVersionedSerializer<ReadCommand>
{
- private static final Map<Byte, IVersionedSerializer<ReadCommand>> CMD_SERIALIZER_MAP = new HashMap<Byte, IVersionedSerializer<ReadCommand>>();
- static
- {
- CMD_SERIALIZER_MAP.put(ReadCommand.CMD_TYPE_GET_SLICE_BY_NAMES, new SliceByNamesReadCommandSerializer());
- CMD_SERIALIZER_MAP.put(ReadCommand.CMD_TYPE_GET_SLICE, new SliceFromReadCommandSerializer());
- }
-
-
public void serialize(ReadCommand command, DataOutput dos, int version) throws IOException
{
- dos.writeByte(command.commandType);
- CMD_SERIALIZER_MAP.get(command.commandType).serialize(command, dos, version);
+ // For super columns, when talking to an older node, we need to translate the filter used.
+ // That translation can change the filter type (names -> slice), and so change the command type.
+ // Hence we need to detect that early on, before we've written the command type.
+ ReadCommand newCommand = command;
+ ByteBuffer superColumn = null;
+ if (version < MessagingService.VERSION_20)
+ {
+ CFMetaData metadata = Schema.instance.getCFMetaData(command.table, command.cfName);
+ if (metadata.cfType == ColumnFamilyType.Super)
+ {
+ SuperColumns.SCFilter scFilter = SuperColumns.filterToSC((CompositeType)metadata.comparator, command.filter());
+ newCommand = ReadCommand.create(command.table, command.key, command.cfName, scFilter.updatedFilter);
+ newCommand.setDigestQuery(command.isDigestQuery());
+ superColumn = scFilter.scName;
+ }
+ }
+
+ dos.writeByte(newCommand.commandType.serializedValue);
+ switch (command.commandType)
+ {
+ case GET_BY_NAMES:
+ SliceByNamesReadCommand.serializer.serialize(newCommand, superColumn, dos, version);
+ break;
+ case GET_SLICES:
+ SliceFromReadCommand.serializer.serialize(newCommand, superColumn, dos, version);
+ break;
+ default:
+ throw new AssertionError();
+ }
}
public ReadCommand deserialize(DataInput dis, int version) throws IOException
{
- byte msgType = dis.readByte();
- return CMD_SERIALIZER_MAP.get(msgType).deserialize(dis, version);
+ ReadCommand.Type msgType = ReadCommand.Type.fromSerializedValue(dis.readByte());
+ switch (msgType)
+ {
+ case GET_BY_NAMES:
+ return SliceByNamesReadCommand.serializer.deserialize(dis, version);
+ case GET_SLICES:
+ return SliceFromReadCommand.serializer.deserialize(dis, version);
+ default:
+ throw new AssertionError();
+ }
}
public long serializedSize(ReadCommand command, int version)
{
- return 1 + CMD_SERIALIZER_MAP.get(command.commandType).serializedSize(command, version);
+ ReadCommand newCommand = command;
+ ByteBuffer superColumn = null;
+ if (version < MessagingService.VERSION_20)
+ {
+ CFMetaData metadata = Schema.instance.getCFMetaData(command.table, command.cfName);
+ if (metadata.cfType == ColumnFamilyType.Super)
+ {
+ SuperColumns.SCFilter scFilter = SuperColumns.filterToSC((CompositeType)metadata.comparator, command.filter());
+ newCommand = ReadCommand.create(command.table, command.key, command.cfName, scFilter.updatedFilter);
+ newCommand.setDigestQuery(command.isDigestQuery());
+ superColumn = scFilter.scName;
+ }
+ }
+
+ switch (command.commandType)
+ {
+ case GET_BY_NAMES:
+ return 1 + SliceByNamesReadCommand.serializer.serializedSize(newCommand, superColumn, version);
+ case GET_SLICES:
+ return 1 + SliceFromReadCommand.serializer.serializedSize(newCommand, superColumn, version);
+ default:
+ throw new AssertionError();
+ }
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a005df3/src/java/org/apache/cassandra/db/ReadResponse.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ReadResponse.java b/src/java/org/apache/cassandra/db/ReadResponse.java
index 1f21006..95f36e3 100644
--- a/src/java/org/apache/cassandra/db/ReadResponse.java
+++ b/src/java/org/apache/cassandra/db/ReadResponse.java
@@ -20,7 +20,6 @@ package org.apache.cassandra.db;
import java.io.*;
import java.nio.ByteBuffer;
-import org.apache.cassandra.io.IColumnSerializer;
import org.apache.cassandra.io.IVersionedSerializer;
import org.apache.cassandra.utils.ByteBufferUtil;
@@ -94,7 +93,7 @@ class ReadResponseSerializer implements IVersionedSerializer<ReadResponse>
if (!isDigest)
{
// This is coming from a remote host
- row = Row.serializer.deserialize(dis, version, IColumnSerializer.Flag.FROM_REMOTE, ArrayBackedSortedColumns.factory());
+ row = Row.serializer.deserialize(dis, version, ColumnSerializer.Flag.FROM_REMOTE, ArrayBackedSortedColumns.factory());
}
return isDigest ? new ReadResponse(ByteBuffer.wrap(digest)) : new ReadResponse(row);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a005df3/src/java/org/apache/cassandra/db/RetriedSliceFromReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/RetriedSliceFromReadCommand.java b/src/java/org/apache/cassandra/db/RetriedSliceFromReadCommand.java
index e7e99fe..b948460 100644
--- a/src/java/org/apache/cassandra/db/RetriedSliceFromReadCommand.java
+++ b/src/java/org/apache/cassandra/db/RetriedSliceFromReadCommand.java
@@ -29,21 +29,16 @@ public class RetriedSliceFromReadCommand extends SliceFromReadCommand
static final Logger logger = LoggerFactory.getLogger(RetriedSliceFromReadCommand.class);
public final int originalCount;
- public RetriedSliceFromReadCommand(String table, ByteBuffer key, ColumnParent column_parent, SliceQueryFilter filter, int originalCount)
+ public RetriedSliceFromReadCommand(String table, ByteBuffer key, String cfName, SliceQueryFilter filter, int originalCount)
{
- this(table, key, new QueryPath(column_parent), filter, originalCount);
- }
-
- public RetriedSliceFromReadCommand(String table, ByteBuffer key, QueryPath path, SliceQueryFilter filter, int originalCount)
- {
- super(table, key, path, filter);
+ super(table, key, cfName, filter);
this.originalCount = originalCount;
}
@Override
public ReadCommand copy()
{
- ReadCommand readCommand = new RetriedSliceFromReadCommand(table, key, queryPath, filter, originalCount);
+ ReadCommand readCommand = new RetriedSliceFromReadCommand(table, key, cfName, filter, originalCount);
readCommand.setDigestQuery(isDigestQuery());
return readCommand;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a005df3/src/java/org/apache/cassandra/db/Row.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Row.java b/src/java/org/apache/cassandra/db/Row.java
index 74cd906..54c5efe 100644
--- a/src/java/org/apache/cassandra/db/Row.java
+++ b/src/java/org/apache/cassandra/db/Row.java
@@ -20,7 +20,6 @@ package org.apache.cassandra.db;
import java.io.*;
import org.apache.cassandra.db.filter.IDiskAtomFilter;
-import org.apache.cassandra.io.IColumnSerializer;
import org.apache.cassandra.io.IVersionedSerializer;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.ByteBufferUtil;
@@ -62,7 +61,7 @@ public class Row
ColumnFamily.serializer.serialize(row.cf, dos, version);
}
- public Row deserialize(DataInput dis, int version, IColumnSerializer.Flag flag, ISortedColumns.Factory factory) throws IOException
+ public Row deserialize(DataInput dis, int version, ColumnSerializer.Flag flag, ISortedColumns.Factory factory) throws IOException
{
return new Row(StorageService.getPartitioner().decorateKey(ByteBufferUtil.readWithShortLength(dis)),
ColumnFamily.serializer.deserialize(dis, flag, factory, version));
@@ -70,7 +69,7 @@ public class Row
public Row deserialize(DataInput dis, int version) throws IOException
{
- return deserialize(dis, version, IColumnSerializer.Flag.LOCAL, TreeMapBackedSortedColumns.factory());
+ return deserialize(dis, version, ColumnSerializer.Flag.LOCAL, TreeMapBackedSortedColumns.factory());
}
public long serializedSize(Row row, int version)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a005df3/src/java/org/apache/cassandra/db/RowIteratorFactory.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/RowIteratorFactory.java b/src/java/org/apache/cassandra/db/RowIteratorFactory.java
index 937c496..38d0fc1 100644
--- a/src/java/org/apache/cassandra/db/RowIteratorFactory.java
+++ b/src/java/org/apache/cassandra/db/RowIteratorFactory.java
@@ -107,7 +107,7 @@ public class RowIteratorFactory
}
else
{
- QueryFilter keyFilter = new QueryFilter(key, filter.path, filter.filter);
+ QueryFilter keyFilter = new QueryFilter(key, filter.cfName, filter.filter);
returnCF = cfs.filterColumnFamily(cached, keyFilter, gcBefore);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a005df3/src/java/org/apache/cassandra/db/RowMutation.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/RowMutation.java b/src/java/org/apache/cassandra/db/RowMutation.java
index b4666b5..7ad3100 100644
--- a/src/java/org/apache/cassandra/db/RowMutation.java
+++ b/src/java/org/apache/cassandra/db/RowMutation.java
@@ -29,9 +29,7 @@ import org.apache.commons.lang.StringUtils;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.KSMetaData;
import org.apache.cassandra.config.Schema;
-import org.apache.cassandra.db.filter.QueryPath;
import org.apache.cassandra.db.marshal.UUIDType;
-import org.apache.cassandra.io.IColumnSerializer;
import org.apache.cassandra.io.IVersionedSerializer;
import org.apache.cassandra.io.util.FastByteArrayInputStream;
import org.apache.cassandra.net.MessageOut;
@@ -115,8 +113,9 @@ public class RowMutation implements IMutation
ttl = Math.min(ttl, cf.metadata().getGcGraceSeconds());
// serialize the hint with id and version as a composite column name
- QueryPath path = new QueryPath(SystemTable.HINTS_CF, null, HintedHandOffManager.comparator.decompose(hintId, MessagingService.current_version));
- rm.add(path, ByteBuffer.wrap(mutation.getSerializedBuffer(MessagingService.current_version)), System.currentTimeMillis(), ttl);
+ ByteBuffer name = HintedHandOffManager.comparator.decompose(hintId, MessagingService.current_version);
+ ByteBuffer value = ByteBuffer.wrap(mutation.getSerializedBuffer(MessagingService.current_version));
+ rm.add(SystemTable.HINTS_CF, name, value, System.currentTimeMillis(), ttl);
return rm;
}
@@ -156,81 +155,37 @@ public class RowMutation implements IMutation
return modifications.isEmpty();
}
- /*
- * Specify a column name and a corresponding value for
- * the column. Column name is specified as <column family>:column.
- * This will result in a ColumnFamily associated with
- * <column family> as name and a Column with <column>
- * as name. The column can be further broken up
- * as super column name : columnname in case of super columns
- *
- * param @ cf - column name as <column family>:<column>
- * param @ value - value associated with the column
- * param @ timestamp - timestamp associated with this data.
- * param @ timeToLive - ttl for the column, 0 for standard (non expiring) columns
- *
- * @Deprecated this tends to be low-performance; we're doing two hash lookups,
- * one of which instantiates a Pair, and callers tend to instantiate new QP objects
- * for each call as well. Use the add(ColumnFamily) overload instead.
- */
- public void add(QueryPath path, ByteBuffer value, long timestamp, int timeToLive)
+ public void add(String cfName, ByteBuffer name, ByteBuffer value, long timestamp, int timeToLive)
{
- UUID id = Schema.instance.getId(table, path.columnFamilyName);
- ColumnFamily columnFamily = modifications.get(id);
-
- if (columnFamily == null)
- {
- columnFamily = ColumnFamily.create(table, path.columnFamilyName);
- modifications.put(id, columnFamily);
- }
- columnFamily.addColumn(path, value, timestamp, timeToLive);
+ addOrGet(cfName).addColumn(name, value, timestamp, timeToLive);
}
- public void addCounter(QueryPath path, long value)
+ public void addCounter(String cfName, ByteBuffer name, long value)
{
- UUID id = Schema.instance.getId(table, path.columnFamilyName);
- ColumnFamily columnFamily = modifications.get(id);
-
- if (columnFamily == null)
- {
- columnFamily = ColumnFamily.create(table, path.columnFamilyName);
- modifications.put(id, columnFamily);
- }
- columnFamily.addCounter(path, value);
+ addOrGet(cfName).addCounter(name, value);
}
- public void add(QueryPath path, ByteBuffer value, long timestamp)
+ public void add(String cfName, ByteBuffer name, ByteBuffer value, long timestamp)
{
- add(path, value, timestamp, 0);
+ add(cfName, name, value, timestamp, 0);
}
- public void delete(QueryPath path, long timestamp)
+ public void delete(String cfName, long timestamp)
{
- UUID id = Schema.instance.getId(table, path.columnFamilyName);
-
int localDeleteTime = (int) (System.currentTimeMillis() / 1000);
+ addOrGet(cfName).delete(new DeletionInfo(timestamp, localDeleteTime));
+ }
- ColumnFamily columnFamily = modifications.get(id);
- if (columnFamily == null)
- {
- columnFamily = ColumnFamily.create(table, path.columnFamilyName);
- modifications.put(id, columnFamily);
- }
+ public void delete(String cfName, ByteBuffer name, long timestamp)
+ {
+ int localDeleteTime = (int) (System.currentTimeMillis() / 1000);
+ addOrGet(cfName).addTombstone(name, localDeleteTime, timestamp);
+ }
- if (path.superColumnName == null && path.columnName == null)
- {
- columnFamily.delete(new DeletionInfo(timestamp, localDeleteTime));
- }
- else if (path.columnName == null)
- {
- SuperColumn sc = new SuperColumn(path.superColumnName, columnFamily.getSubComparator());
- sc.delete(new DeletionInfo(timestamp, localDeleteTime));
- columnFamily.addColumn(sc);
- }
- else
- {
- columnFamily.addTombstone(path, localDeleteTime, timestamp);
- }
+ public void deleteRange(String cfName, ByteBuffer start, ByteBuffer end, long timestamp)
+ {
+ int localDeleteTime = (int) (System.currentTimeMillis() / 1000);
+ addOrGet(cfName).addAtom(new RangeTombstone(start, end, timestamp, localDeleteTime));
}
public void addAll(IMutation m)
@@ -314,49 +269,6 @@ public class RowMutation implements IMutation
return buff.append("])").toString();
}
- public void addColumnOrSuperColumn(String cfName, ColumnOrSuperColumn cosc)
- {
- if (cosc.super_column != null)
- {
- for (org.apache.cassandra.thrift.Column column : cosc.super_column.columns)
- {
- add(new QueryPath(cfName, cosc.super_column.name, column.name), column.value, column.timestamp, column.ttl);
- }
- }
- else if (cosc.column != null)
- {
- add(new QueryPath(cfName, null, cosc.column.name), cosc.column.value, cosc.column.timestamp, cosc.column.ttl);
- }
- else if (cosc.counter_super_column != null)
- {
- for (org.apache.cassandra.thrift.CounterColumn column : cosc.counter_super_column.columns)
- {
- addCounter(new QueryPath(cfName, cosc.counter_super_column.name, column.name), column.value);
- }
- }
- else // cosc.counter_column != null
- {
- addCounter(new QueryPath(cfName, null, cosc.counter_column.name), cosc.counter_column.value);
- }
- }
-
- public void deleteColumnOrSuperColumn(String cfName, Deletion del)
- {
- if (del.predicate != null && del.predicate.column_names != null)
- {
- for(ByteBuffer c : del.predicate.column_names)
- {
- if (del.super_column == null && Schema.instance.getColumnFamilyType(table, cfName) == ColumnFamilyType.Super)
- delete(new QueryPath(cfName, c), del.timestamp);
- else
- delete(new QueryPath(cfName, del.super_column, c), del.timestamp);
- }
- }
- else
- {
- delete(new QueryPath(cfName, del.super_column), del.timestamp);
- }
- }
public static RowMutation fromBytes(byte[] raw, int version) throws IOException
{
@@ -396,7 +308,7 @@ public class RowMutation implements IMutation
}
}
- public RowMutation deserialize(DataInput dis, int version, IColumnSerializer.Flag flag) throws IOException
+ public RowMutation deserialize(DataInput dis, int version, ColumnSerializer.Flag flag) throws IOException
{
String table = dis.readUTF();
ByteBuffer key = ByteBufferUtil.readWithShortLength(dis);
@@ -417,7 +329,7 @@ public class RowMutation implements IMutation
public RowMutation deserialize(DataInput dis, int version) throws IOException
{
- return deserialize(dis, version, IColumnSerializer.Flag.FROM_REMOTE);
+ return deserialize(dis, version, ColumnSerializer.Flag.FROM_REMOTE);
}
public long serializedSize(RowMutation rm, int version)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a005df3/src/java/org/apache/cassandra/db/SliceByNamesReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SliceByNamesReadCommand.java b/src/java/org/apache/cassandra/db/SliceByNamesReadCommand.java
index 2b3b0b1..0273cbc 100644
--- a/src/java/org/apache/cassandra/db/SliceByNamesReadCommand.java
+++ b/src/java/org/apache/cassandra/db/SliceByNamesReadCommand.java
@@ -21,39 +21,31 @@ import java.io.*;
import java.nio.ByteBuffer;
import java.util.*;
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.Schema;
import org.apache.cassandra.db.filter.*;
import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.marshal.CompositeType;
import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.thrift.ColumnParent;
import org.apache.cassandra.utils.ByteBufferUtil;
public class SliceByNamesReadCommand extends ReadCommand
{
- public final NamesQueryFilter filter;
-
- public SliceByNamesReadCommand(String table, ByteBuffer key, ColumnParent column_parent, Collection<ByteBuffer> columnNames)
- {
- this(table, key, new QueryPath(column_parent), columnNames);
- }
+ static final SliceByNamesReadCommandSerializer serializer = new SliceByNamesReadCommandSerializer();
- public SliceByNamesReadCommand(String table, ByteBuffer key, QueryPath path, Collection<ByteBuffer> columnNames)
- {
- super(table, key, path, CMD_TYPE_GET_SLICE_BY_NAMES);
- SortedSet s = new TreeSet<ByteBuffer>(getComparator());
- s.addAll(columnNames);
- this.filter = new NamesQueryFilter(s);
- }
+ public final NamesQueryFilter filter;
- public SliceByNamesReadCommand(String table, ByteBuffer key, QueryPath path, NamesQueryFilter filter)
+ public SliceByNamesReadCommand(String table, ByteBuffer key, String cfName, NamesQueryFilter filter)
{
- super(table, key, path, CMD_TYPE_GET_SLICE_BY_NAMES);
+ super(table, key, cfName, Type.GET_BY_NAMES);
this.filter = filter;
}
public ReadCommand copy()
{
- ReadCommand readCommand= new SliceByNamesReadCommand(table, key, queryPath, filter);
+ ReadCommand readCommand= new SliceByNamesReadCommand(table, key, cfName, filter);
readCommand.setDigestQuery(isDigestQuery());
return readCommand;
}
@@ -61,7 +53,7 @@ public class SliceByNamesReadCommand extends ReadCommand
public Row getRow(Table table) throws IOException
{
DecoratedKey dk = StorageService.getPartitioner().decorateKey(key);
- return table.getRow(new QueryFilter(dk, queryPath, filter));
+ return table.getRow(new QueryFilter(dk, cfName, filter));
}
@Override
@@ -70,7 +62,7 @@ public class SliceByNamesReadCommand extends ReadCommand
return "SliceByNamesReadCommand(" +
"table='" + table + '\'' +
", key=" + ByteBufferUtil.bytesToHex(key) +
- ", columnParent='" + queryPath + '\'' +
+ ", cfName='" + cfName + '\'' +
", filter=" + filter +
')';
}
@@ -85,30 +77,86 @@ class SliceByNamesReadCommandSerializer implements IVersionedSerializer<ReadComm
{
public void serialize(ReadCommand cmd, DataOutput dos, int version) throws IOException
{
+ serialize(cmd, null, dos, version);
+ }
+
+ public void serialize(ReadCommand cmd, ByteBuffer superColumn, DataOutput dos, int version) throws IOException
+ {
SliceByNamesReadCommand command = (SliceByNamesReadCommand) cmd;
dos.writeBoolean(command.isDigestQuery());
dos.writeUTF(command.table);
ByteBufferUtil.writeWithShortLength(command.key, dos);
- command.queryPath.serialize(dos);
+
+ if (version < MessagingService.VERSION_20)
+ new QueryPath(command.cfName, superColumn).serialize(dos);
+ else
+ dos.writeUTF(command.cfName);
+
NamesQueryFilter.serializer.serialize(command.filter, dos, version);
}
- public SliceByNamesReadCommand deserialize(DataInput dis, int version) throws IOException
+ public ReadCommand deserialize(DataInput dis, int version) throws IOException
{
boolean isDigest = dis.readBoolean();
String table = dis.readUTF();
ByteBuffer key = ByteBufferUtil.readWithShortLength(dis);
- QueryPath columnParent = QueryPath.deserialize(dis);
- AbstractType<?> comparator = ColumnFamily.getComparatorFor(table, columnParent.columnFamilyName, columnParent.superColumnName);
- NamesQueryFilter filter = NamesQueryFilter.serializer.deserialize(dis, version, comparator);
- SliceByNamesReadCommand command = new SliceByNamesReadCommand(table, key, columnParent, filter);
+ String cfName;
+ ByteBuffer sc = null;
+ if (version < MessagingService.VERSION_20)
+ {
+ QueryPath path = QueryPath.deserialize(dis);
+ cfName = path.columnFamilyName;
+ sc = path.superColumnName;
+ }
+ else
+ {
+ cfName = dis.readUTF();
+ }
+
+ CFMetaData metadata = Schema.instance.getCFMetaData(table, cfName);
+ ReadCommand command;
+ if (version < MessagingService.VERSION_20)
+ {
+ AbstractType<?> comparator;
+ if (metadata.cfType == ColumnFamilyType.Super)
+ {
+ CompositeType type = (CompositeType)metadata.comparator;
+ comparator = sc == null ? type.types.get(0) : type.types.get(1);
+ }
+ else
+ {
+ comparator = metadata.comparator;
+ }
+
+ IDiskAtomFilter filter = NamesQueryFilter.serializer.deserialize(dis, version, comparator);
+
+ if (metadata.cfType == ColumnFamilyType.Super)
+ filter = SuperColumns.fromSCFilter((CompositeType)metadata.comparator, sc, filter);
+
+ // Due to SC compat, it's possible we get back a slice filter at this point
+ if (filter instanceof NamesQueryFilter)
+ command = new SliceByNamesReadCommand(table, key, cfName, (NamesQueryFilter)filter);
+ else
+ command = new SliceFromReadCommand(table, key, cfName, (SliceQueryFilter)filter);
+ }
+ else
+ {
+ NamesQueryFilter filter = NamesQueryFilter.serializer.deserialize(dis, version, metadata.comparator);
+ command = new SliceByNamesReadCommand(table, key, cfName, filter);
+ }
+
command.setDigestQuery(isDigest);
return command;
}
public long serializedSize(ReadCommand cmd, int version)
{
+ return serializedSize(cmd, null, version);
+ }
+
+ public long serializedSize(ReadCommand cmd, ByteBuffer superColumn, int version)
+ {
TypeSizes sizes = TypeSizes.NATIVE;
SliceByNamesReadCommand command = (SliceByNamesReadCommand) cmd;
int size = sizes.sizeof(command.isDigestQuery());
@@ -116,7 +164,16 @@ class SliceByNamesReadCommandSerializer implements IVersionedSerializer<ReadComm
size += sizes.sizeof(command.table);
size += sizes.sizeof((short)keySize) + keySize;
- size += command.queryPath.serializedSize(sizes);
+
+ if (version < MessagingService.VERSION_20)
+ {
+ size += new QueryPath(command.cfName, superColumn).serializedSize(sizes);
+ }
+ else
+ {
+ size += sizes.sizeof(command.cfName);
+ }
+
size += NamesQueryFilter.serializer.serializedSize(command.filter, version);
return size;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a005df3/src/java/org/apache/cassandra/db/SliceFromReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SliceFromReadCommand.java b/src/java/org/apache/cassandra/db/SliceFromReadCommand.java
index d52826b..3a05ce0 100644
--- a/src/java/org/apache/cassandra/db/SliceFromReadCommand.java
+++ b/src/java/org/apache/cassandra/db/SliceFromReadCommand.java
@@ -25,41 +25,37 @@ import java.nio.ByteBuffer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.Schema;
import org.apache.cassandra.db.filter.IDiskAtomFilter;
import org.apache.cassandra.db.filter.QueryFilter;
import org.apache.cassandra.db.filter.QueryPath;
import org.apache.cassandra.db.filter.SliceQueryFilter;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.marshal.CompositeType;
import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.service.RepairCallback;
import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.thrift.ColumnParent;
import org.apache.cassandra.utils.ByteBufferUtil;
public class SliceFromReadCommand extends ReadCommand
{
static final Logger logger = LoggerFactory.getLogger(SliceFromReadCommand.class);
- public final SliceQueryFilter filter;
+ static final SliceFromReadCommandSerializer serializer = new SliceFromReadCommandSerializer();
- public SliceFromReadCommand(String table, ByteBuffer key, ColumnParent column_parent, ByteBuffer start, ByteBuffer finish, boolean reversed, int count)
- {
- this(table, key, new QueryPath(column_parent), start, finish, reversed, count);
- }
-
- public SliceFromReadCommand(String table, ByteBuffer key, QueryPath path, ByteBuffer start, ByteBuffer finish, boolean reversed, int count)
- {
- this(table, key, path, new SliceQueryFilter(start, finish, reversed, count));
- }
+ public final SliceQueryFilter filter;
- public SliceFromReadCommand(String table, ByteBuffer key, QueryPath path, SliceQueryFilter filter)
+ public SliceFromReadCommand(String table, ByteBuffer key, String cfName, SliceQueryFilter filter)
{
- super(table, key, path, CMD_TYPE_GET_SLICE);
+ super(table, key, cfName, Type.GET_SLICES);
this.filter = filter;
}
public ReadCommand copy()
{
- ReadCommand readCommand = new SliceFromReadCommand(table, key, queryPath, filter);
+ ReadCommand readCommand = new SliceFromReadCommand(table, key, cfName, filter);
readCommand.setDigestQuery(isDigestQuery());
return readCommand;
}
@@ -67,7 +63,7 @@ public class SliceFromReadCommand extends ReadCommand
public Row getRow(Table table)
{
DecoratedKey dk = StorageService.getPartitioner().decorateKey(key);
- return table.getRow(new QueryFilter(dk, queryPath, filter));
+ return table.getRow(new QueryFilter(dk, cfName, filter));
}
@Override
@@ -91,7 +87,7 @@ public class SliceFromReadCommand extends ReadCommand
// round we want to ask x column so that x * (l/t) == t, i.e. x = t^2/l.
int retryCount = liveCountInRow == 0 ? count + 1 : ((count * count) / liveCountInRow) + 1;
SliceQueryFilter newFilter = filter.withUpdatedCount(retryCount);
- return new RetriedSliceFromReadCommand(table, key, queryPath, newFilter, getOriginalRequestedCount());
+ return new RetriedSliceFromReadCommand(table, key, cfName, newFilter, getOriginalRequestedCount());
}
return null;
@@ -127,7 +123,7 @@ public class SliceFromReadCommand extends ReadCommand
return "SliceFromReadCommand(" +
"table='" + table + '\'' +
", key='" + ByteBufferUtil.bytesToHex(key) + '\'' +
- ", column_parent='" + queryPath + '\'' +
+ ", cfName='" + cfName + '\'' +
", filter='" + filter + '\'' +
')';
}
@@ -137,11 +133,21 @@ class SliceFromReadCommandSerializer implements IVersionedSerializer<ReadCommand
{
public void serialize(ReadCommand rm, DataOutput dos, int version) throws IOException
{
+ serialize(rm, null, dos, version);
+ }
+
+ public void serialize(ReadCommand rm, ByteBuffer superColumn, DataOutput dos, int version) throws IOException
+ {
SliceFromReadCommand realRM = (SliceFromReadCommand)rm;
dos.writeBoolean(realRM.isDigestQuery());
dos.writeUTF(realRM.table);
ByteBufferUtil.writeWithShortLength(realRM.key, dos);
- realRM.queryPath.serialize(dos);
+
+ if (version < MessagingService.VERSION_20)
+ new QueryPath(realRM.cfName, superColumn).serialize(dos);
+ else
+ dos.writeUTF(realRM.cfName);
+
SliceQueryFilter.serializer.serialize(realRM.filter, dos, version);
}
@@ -150,15 +156,46 @@ class SliceFromReadCommandSerializer implements IVersionedSerializer<ReadCommand
boolean isDigest = dis.readBoolean();
String table = dis.readUTF();
ByteBuffer key = ByteBufferUtil.readWithShortLength(dis);
- QueryPath path = QueryPath.deserialize(dis);
- SliceQueryFilter filter = SliceQueryFilter.serializer.deserialize(dis, version);
- SliceFromReadCommand rm = new SliceFromReadCommand(table, key, path, filter);
- rm.setDigestQuery(isDigest);
- return rm;
+
+ String cfName;
+ ByteBuffer sc = null;
+ if (version < MessagingService.VERSION_20)
+ {
+ QueryPath path = QueryPath.deserialize(dis);
+ cfName = path.columnFamilyName;
+ sc = path.superColumnName;
+ }
+ else
+ {
+ cfName = dis.readUTF();
+ }
+
+ CFMetaData metadata = Schema.instance.getCFMetaData(table, cfName);
+ SliceQueryFilter filter;
+ if (version < MessagingService.VERSION_20)
+ {
+ filter = SliceQueryFilter.serializer.deserialize(dis, version);
+
+ if (metadata.cfType == ColumnFamilyType.Super)
+ filter = SuperColumns.fromSCSliceFilter((CompositeType)metadata.comparator, sc, filter);
+ }
+ else
+ {
+ filter = SliceQueryFilter.serializer.deserialize(dis, version);
+ }
+
+ ReadCommand command = new SliceFromReadCommand(table, key, cfName, filter);
+ command.setDigestQuery(isDigest);
+ return command;
}
public long serializedSize(ReadCommand cmd, int version)
{
+ return serializedSize(cmd, null, version);
+ }
+
+ public long serializedSize(ReadCommand cmd, ByteBuffer superColumn, int version)
+ {
TypeSizes sizes = TypeSizes.NATIVE;
SliceFromReadCommand command = (SliceFromReadCommand) cmd;
int keySize = command.key.remaining();
@@ -166,7 +203,16 @@ class SliceFromReadCommandSerializer implements IVersionedSerializer<ReadCommand
int size = sizes.sizeof(cmd.isDigestQuery()); // boolean
size += sizes.sizeof(command.table);
size += sizes.sizeof((short) keySize) + keySize;
- size += command.queryPath.serializedSize(sizes);
+
+ if (version < MessagingService.VERSION_20)
+ {
+ size += new QueryPath(command.cfName, superColumn).serializedSize(sizes);
+ }
+ else
+ {
+ size += sizes.sizeof(command.cfName);
+ }
+
size += SliceQueryFilter.serializer.serializedSize(command.filter, version);
return size;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a005df3/src/java/org/apache/cassandra/db/SliceQueryPager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SliceQueryPager.java b/src/java/org/apache/cassandra/db/SliceQueryPager.java
index cd555a5..551b50d 100644
--- a/src/java/org/apache/cassandra/db/SliceQueryPager.java
+++ b/src/java/org/apache/cassandra/db/SliceQueryPager.java
@@ -51,9 +51,8 @@ public class SliceQueryPager implements Iterator<ColumnFamily>
if (exhausted)
return null;
- QueryPath path = new QueryPath(cfs.name);
SliceQueryFilter sliceFilter = new SliceQueryFilter(slices, false, DEFAULT_PAGE_SIZE);
- QueryFilter filter = new QueryFilter(key, path, sliceFilter);
+ QueryFilter filter = new QueryFilter(key, cfs.name, sliceFilter);
ColumnFamily cf = cfs.getColumnFamily(filter);
if (cf == null || sliceFilter.getLiveCount(cf) < DEFAULT_PAGE_SIZE)
{
@@ -61,8 +60,8 @@ public class SliceQueryPager implements Iterator<ColumnFamily>
}
else
{
- Iterator<IColumn> iter = cf.getReverseSortedColumns().iterator();
- IColumn lastColumn = iter.next();
+ Iterator<Column> iter = cf.getReverseSortedColumns().iterator();
+ Column lastColumn = iter.next();
while (lastColumn.isMarkedForDelete())
lastColumn = iter.next();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a005df3/src/java/org/apache/cassandra/db/SuperColumn.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SuperColumn.java b/src/java/org/apache/cassandra/db/SuperColumn.java
deleted file mode 100644
index 57e87c4..0000000
--- a/src/java/org/apache/cassandra/db/SuperColumn.java
+++ /dev/null
@@ -1,432 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.security.MessageDigest;
-import java.util.Collection;
-import java.util.Comparator;
-
-import com.google.common.base.Objects;
-import com.google.common.collect.Iterables;
-
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.db.marshal.AbstractType;
-import org.apache.cassandra.db.marshal.MarshalException;
-import org.apache.cassandra.io.IColumnSerializer;
-import org.apache.cassandra.io.util.ColumnSortedMap;
-import org.apache.cassandra.io.util.DataOutputBuffer;
-import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.utils.Allocator;
-import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.cassandra.utils.HeapAllocator;
-import org.cliffc.high_scale_lib.NonBlockingHashMap;
-
-public class SuperColumn extends AbstractColumnContainer implements IColumn
-{
- private static final NonBlockingHashMap<Comparator, SuperColumnSerializer> serializers = new NonBlockingHashMap<Comparator, SuperColumnSerializer>();
- public static SuperColumnSerializer serializer(AbstractType<?> comparator)
- {
- SuperColumnSerializer serializer = serializers.get(comparator);
- if (serializer == null)
- {
- serializer = new SuperColumnSerializer(comparator);
- serializers.put(comparator, serializer);
- }
- return serializer;
- }
-
- public static OnDiskAtom.Serializer onDiskSerializer(AbstractType<?> comparator)
- {
- return new OnDiskAtom.Serializer(serializer(comparator));
- }
-
- private final ByteBuffer name;
-
- public SuperColumn(ByteBuffer name, AbstractType<?> comparator)
- {
- this(name, AtomicSortedColumns.factory().create(comparator, false));
- }
-
- SuperColumn(ByteBuffer name, ISortedColumns columns)
- {
- super(columns);
- assert name != null;
- assert name.remaining() <= IColumn.MAX_NAME_LENGTH;
- this.name = name;
- }
-
- public SuperColumn cloneMeShallow()
- {
- SuperColumn sc = new SuperColumn(name, getComparator());
- sc.delete(this);
- return sc;
- }
-
- public IColumn cloneMe()
- {
- SuperColumn sc = new SuperColumn(name, columns.cloneMe());
- sc.delete(this);
- return sc;
- }
-
- public ByteBuffer name()
- {
- return name;
- }
-
- public Collection<IColumn> getSubColumns()
- {
- return getSortedColumns();
- }
-
- public IColumn getSubColumn(ByteBuffer columnName)
- {
- IColumn column = columns.getColumn(columnName);
- assert column == null || column instanceof Column;
- return column;
- }
-
- /**
- * This calculates the exact size of the sub columns on the fly
- */
- public int dataSize()
- {
- int size = TypeSizes.NATIVE.sizeof(getMarkedForDeleteAt());
- for (IColumn subColumn : getSubColumns())
- size += subColumn.dataSize();
- return size;
- }
-
- /**
- * This returns the size of the super-column when serialized.
- * @see org.apache.cassandra.db.IColumn#serializedSize(TypeSizes)
- */
- public int serializedSize(TypeSizes typeSizes)
- {
- /*
- * We need to keep the way we are calculating the column size in sync with the
- * way we are calculating the size for the column family serializer.
- *
- * 2 bytes for name size
- * n bytes for the name
- * 4 bytes for getLocalDeletionTime
- * 8 bytes for getMarkedForDeleteAt
- * 4 bytes for the subcolumns size
- * size(constantSize) of subcolumns.
- */
- int nameSize = name.remaining();
- int subColumnsSize = 0;
- for (IColumn subColumn : getSubColumns())
- subColumnsSize += subColumn.serializedSize(typeSizes);
- int size = typeSizes.sizeof((short) nameSize) + nameSize
- + typeSizes.sizeof(getLocalDeletionTime())
- + typeSizes.sizeof(getMarkedForDeleteAt())
- + typeSizes.sizeof(subColumnsSize) + subColumnsSize;
- return size;
- }
-
- public long serializedSizeForSSTable()
- {
- return serializedSize(TypeSizes.NATIVE);
- }
-
- public long timestamp()
- {
- throw new UnsupportedOperationException("This operation is not supported for Super Columns.");
- }
-
- public long minTimestamp()
- {
- long minTimestamp = getMarkedForDeleteAt();
- for (IColumn subColumn : getSubColumns())
- minTimestamp = Math.min(minTimestamp, subColumn.minTimestamp());
- return minTimestamp;
- }
-
- public long maxTimestamp()
- {
- long maxTimestamp = getMarkedForDeleteAt();
- for (IColumn subColumn : getSubColumns())
- maxTimestamp = Math.max(maxTimestamp, subColumn.maxTimestamp());
- return maxTimestamp;
- }
-
- public long mostRecentLiveChangeAt()
- {
- long max = Long.MIN_VALUE;
- for (IColumn column : getSubColumns())
- {
- if (!column.isMarkedForDelete() && column.timestamp() > max)
- {
- max = column.timestamp();
- }
- }
- return max;
- }
-
- public long getMarkedForDeleteAt()
- {
- return deletionInfo().getTopLevelDeletion().markedForDeleteAt;
- }
-
- public int getLocalDeletionTime()
- {
- return deletionInfo().getTopLevelDeletion().localDeletionTime;
- }
-
- public long mostRecentNonGCableChangeAt(int gcbefore)
- {
- long max = Long.MIN_VALUE;
- for (IColumn column : getSubColumns())
- {
- if (column.getLocalDeletionTime() >= gcbefore && column.timestamp() > max)
- {
- max = column.timestamp();
- }
- }
- return max;
- }
-
- public ByteBuffer value()
- {
- throw new UnsupportedOperationException("This operation is not supported for Super Columns.");
- }
-
- @Override
- public void addColumn(IColumn column, Allocator allocator)
- {
- assert column instanceof Column : "A super column can only contain simple columns";
- super.addColumn(column, allocator);
- }
-
- /*
- * Go through each sub column if it exists then as it to resolve itself
- * if the column does not exist then create it.
- */
- void putColumn(SuperColumn column, Allocator allocator)
- {
- for (IColumn subColumn : column.getSubColumns())
- {
- addColumn(subColumn, allocator);
- }
- delete(column);
- }
-
- public IColumn diff(IColumn columnNew)
- {
- IColumn columnDiff = new SuperColumn(columnNew.name(), ((SuperColumn)columnNew).getComparator());
- ((SuperColumn)columnDiff).delete(((SuperColumn)columnNew).deletionInfo());
-
- // (don't need to worry about columnNew containing subColumns that are shadowed by
- // the delete tombstone, since columnNew was generated by CF.resolve, which
- // takes care of those for us.)
- for (IColumn subColumn : columnNew.getSubColumns())
- {
- IColumn columnInternal = columns.getColumn(subColumn.name());
- if(columnInternal == null )
- {
- columnDiff.addColumn(subColumn);
- }
- else
- {
- IColumn subColumnDiff = columnInternal.diff(subColumn);
- if(subColumnDiff != null)
- {
- columnDiff.addColumn(subColumnDiff);
- }
- }
- }
-
- if (!columnDiff.getSubColumns().isEmpty() || columnNew.isMarkedForDelete())
- return columnDiff;
- else
- return null;
- }
-
- public void updateDigest(MessageDigest digest)
- {
- assert name != null;
- digest.update(name.duplicate());
- DataOutputBuffer buffer = new DataOutputBuffer();
- try
- {
- buffer.writeLong(getMarkedForDeleteAt());
- }
- catch (IOException e)
- {
- throw new RuntimeException(e);
- }
- digest.update(buffer.getData(), 0, buffer.getLength());
- for (IColumn column : getSubColumns())
- {
- column.updateDigest(digest);
- }
- }
-
- public String getString(AbstractType<?> comparator)
- {
- StringBuilder sb = new StringBuilder();
- sb.append("SuperColumn(");
- sb.append(comparator.getString(name));
-
- if (isMarkedForDelete()) {
- sb.append(" -delete at ").append(getMarkedForDeleteAt()).append("-");
- }
-
- sb.append(" [");
- sb.append(getComparator().getColumnsString(getSubColumns()));
- sb.append("])");
-
- return sb.toString();
- }
-
- public boolean isLive()
- {
- return mostRecentLiveChangeAt() > getMarkedForDeleteAt();
- }
-
- public IColumn localCopy(ColumnFamilyStore cfs)
- {
- return localCopy(cfs, HeapAllocator.instance);
- }
-
- public IColumn localCopy(ColumnFamilyStore cfs, Allocator allocator)
- {
- // we don't try to intern supercolumn names, because if we're using Cassandra correctly it's almost
- // certainly just going to pollute our interning map with unique, dynamic values
- SuperColumn sc = new SuperColumn(allocator.clone(name), this.getComparator());
- sc.delete(this);
-
- for(IColumn c : columns)
- {
- sc.addColumn(c.localCopy(cfs, allocator));
- }
-
- return sc;
- }
-
- public IColumn reconcile(IColumn c)
- {
- return reconcile(null, null);
- }
-
- public IColumn reconcile(IColumn c, Allocator allocator)
- {
- throw new UnsupportedOperationException("This operation is unsupported on super columns.");
- }
-
- public int serializationFlags()
- {
- throw new UnsupportedOperationException("Super columns don't have a serialization mask");
- }
-
- public void validateFields(CFMetaData metadata) throws MarshalException
- {
- metadata.comparator.validate(name());
- for (IColumn column : getSubColumns())
- {
- column.validateFields(metadata);
- }
- }
-
- @Override
- public boolean equals(Object o)
- {
- if (this == o)
- return true;
- if (o == null || getClass() != o.getClass())
- return false;
-
- SuperColumn sc = (SuperColumn)o;
-
- if (!name.equals(sc.name))
- return false;
- if (getMarkedForDeleteAt() != sc.getMarkedForDeleteAt())
- return false;
- if (getLocalDeletionTime() != sc.getLocalDeletionTime())
- return false;
- return Iterables.elementsEqual(columns, sc.columns);
- }
-
- @Override
- public int hashCode()
- {
- return Objects.hashCode(name, getMarkedForDeleteAt(), getLocalDeletionTime(), columns);
- }
-}
-
-class SuperColumnSerializer implements IColumnSerializer
-{
- private final AbstractType<?> comparator;
-
- public SuperColumnSerializer(AbstractType<?> comparator)
- {
- this.comparator = comparator;
- }
-
- public AbstractType<?> getComparator()
- {
- return comparator;
- }
-
- public void serialize(IColumn column, DataOutput dos) throws IOException
- {
- SuperColumn superColumn = (SuperColumn)column;
- ByteBufferUtil.writeWithShortLength(superColumn.name(), dos);
- DeletionInfo.serializer().serialize(superColumn.deletionInfo(), dos, MessagingService.VERSION_10);
- Collection<IColumn> columns = superColumn.getSubColumns();
- dos.writeInt(columns.size());
- for (IColumn subColumn : columns)
- {
- Column.serializer().serialize(subColumn, dos);
- }
- }
-
- public IColumn deserialize(DataInput dis) throws IOException
- {
- return deserialize(dis, IColumnSerializer.Flag.LOCAL);
- }
-
- public IColumn deserialize(DataInput dis, IColumnSerializer.Flag flag) throws IOException
- {
- return deserialize(dis, flag, (int)(System.currentTimeMillis() / 1000));
- }
-
- public IColumn deserialize(DataInput dis, IColumnSerializer.Flag flag, int expireBefore) throws IOException
- {
- ByteBuffer name = ByteBufferUtil.readWithShortLength(dis);
- DeletionInfo delInfo = DeletionInfo.serializer().deserialize(dis, MessagingService.VERSION_10, comparator);
-
- /* read the number of columns */
- int size = dis.readInt();
- ColumnSerializer serializer = Column.serializer();
- ColumnSortedMap preSortedMap = new ColumnSortedMap(comparator, serializer, dis, size, flag, expireBefore);
- SuperColumn superColumn = new SuperColumn(name, AtomicSortedColumns.factory().fromSorted(preSortedMap, false));
- superColumn.delete(delInfo);
- return superColumn;
- }
-
- public long serializedSize(IColumn object, TypeSizes typeSizes)
- {
- return object.serializedSize(typeSizes);
- }
-}