You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2015/06/30 12:48:06 UTC
[42/51] [partial] cassandra git commit: Storage engine refactor,
a.k.a CASSANDRA-8099
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index bed5d39..8fb83af 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -47,19 +47,14 @@ import org.apache.cassandra.cache.*;
import org.apache.cassandra.concurrent.*;
import org.apache.cassandra.config.*;
import org.apache.cassandra.config.CFMetaData.SpeculativeRetry;
+import org.apache.cassandra.db.rows.*;
import org.apache.cassandra.db.commitlog.CommitLog;
import org.apache.cassandra.db.commitlog.ReplayPosition;
import org.apache.cassandra.db.compaction.*;
-import org.apache.cassandra.db.composites.CellName;
-import org.apache.cassandra.db.composites.CellNameType;
-import org.apache.cassandra.db.composites.Composite;
-import org.apache.cassandra.db.filter.ColumnSlice;
-import org.apache.cassandra.db.filter.ExtendedFilter;
-import org.apache.cassandra.db.filter.IDiskAtomFilter;
-import org.apache.cassandra.db.filter.QueryFilter;
-import org.apache.cassandra.db.filter.SliceQueryFilter;
+import org.apache.cassandra.db.filter.*;
import org.apache.cassandra.db.index.SecondaryIndex;
import org.apache.cassandra.db.index.SecondaryIndexManager;
+import org.apache.cassandra.db.partitions.*;
import org.apache.cassandra.dht.*;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.exceptions.ConfigurationException;
@@ -76,7 +71,6 @@ import org.apache.cassandra.metrics.ColumnFamilyMetrics.Sampler;
import org.apache.cassandra.service.CacheService;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.streaming.StreamLockfile;
-import org.apache.cassandra.tracing.Tracing;
import org.apache.cassandra.utils.*;
import org.apache.cassandra.utils.concurrent.*;
import org.apache.cassandra.utils.TopKSampler.SamplerResult;
@@ -592,12 +586,9 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
{
if (def.isIndexed())
{
- CellNameType indexComparator = SecondaryIndex.getIndexComparator(metadata, def);
- if (indexComparator != null)
- {
- CFMetaData indexMetadata = CFMetaData.newIndexMetadata(metadata, def, indexComparator);
+ CFMetaData indexMetadata = SecondaryIndex.newIndexMetadata(metadata, def);
+ if (indexMetadata != null)
scrubDataDirectories(indexMetadata);
- }
}
}
}
@@ -1220,7 +1211,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
return;
RowCacheKey cacheKey = new RowCacheKey(metadata.cfId, key);
- invalidateCachedRow(cacheKey);
+ invalidateCachedPartition(cacheKey);
}
/**
@@ -1230,11 +1221,12 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
* param @ key - key for update/insert
* param @ columnFamily - columnFamily changes
*/
- public void apply(DecoratedKey key, ColumnFamily columnFamily, SecondaryIndexManager.Updater indexer, OpOrder.Group opGroup, ReplayPosition replayPosition)
+ public void apply(PartitionUpdate update, SecondaryIndexManager.Updater indexer, OpOrder.Group opGroup, ReplayPosition replayPosition)
{
long start = System.nanoTime();
Memtable mt = data.getMemtableFor(opGroup, replayPosition);
- final long timeDelta = mt.put(key, columnFamily, indexer, opGroup);
+ long timeDelta = mt.put(update, indexer, opGroup);
+ DecoratedKey key = update.partitionKey();
maybeUpdateRowCache(key);
metric.samplers.get(Sampler.WRITES).addSample(key.getKey(), key.hashCode(), 1);
metric.writeLatency.addNano(System.nanoTime() - start);
@@ -1243,93 +1235,6 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
}
/**
- * Purges gc-able top-level and range tombstones, returning `cf` if there are any columns or tombstones left,
- * null otherwise.
- * @param gcBefore a timestamp (in seconds); tombstones with a localDeletionTime before this will be purged
- */
- public static ColumnFamily removeDeletedCF(ColumnFamily cf, int gcBefore)
- {
- // purge old top-level and range tombstones
- cf.purgeTombstones(gcBefore);
-
- // if there are no columns or tombstones left, return null
- return !cf.hasColumns() && !cf.isMarkedForDelete() ? null : cf;
- }
-
- /**
- * Removes deleted columns and purges gc-able tombstones.
- * @return an updated `cf` if any columns or tombstones remain, null otherwise
- */
- public static ColumnFamily removeDeleted(ColumnFamily cf, int gcBefore)
- {
- return removeDeleted(cf, gcBefore, SecondaryIndexManager.nullUpdater);
- }
-
- /*
- This is complicated because we need to preserve deleted columns and columnfamilies
- until they have been deleted for at least GC_GRACE_IN_SECONDS. But, we do not need to preserve
- their contents; just the object itself as a "tombstone" that can be used to repair other
- replicas that do not know about the deletion.
- */
- public static ColumnFamily removeDeleted(ColumnFamily cf, int gcBefore, SecondaryIndexManager.Updater indexer)
- {
- if (cf == null)
- {
- return null;
- }
-
- return removeDeletedCF(removeDeletedColumnsOnly(cf, gcBefore, indexer), gcBefore);
- }
-
- /**
- * Removes only per-cell tombstones, cells that are shadowed by a row-level or range tombstone, or
- * columns that have been dropped from the schema (for CQL3 tables only).
- * @return the updated ColumnFamily
- */
- public static ColumnFamily removeDeletedColumnsOnly(ColumnFamily cf, int gcBefore, SecondaryIndexManager.Updater indexer)
- {
- BatchRemoveIterator<Cell> iter = cf.batchRemoveIterator();
- DeletionInfo.InOrderTester tester = cf.inOrderDeletionTester();
- boolean hasDroppedColumns = !cf.metadata.getDroppedColumns().isEmpty();
- while (iter.hasNext())
- {
- Cell c = iter.next();
- // remove columns if
- // (a) the column itself is gcable or
- // (b) the column is shadowed by a CF tombstone
- // (c) the column has been dropped from the CF schema (CQL3 tables only)
- if (c.getLocalDeletionTime() < gcBefore || tester.isDeleted(c) || (hasDroppedColumns && isDroppedColumn(c, cf.metadata())))
- {
- iter.remove();
- indexer.remove(c);
- }
- }
- iter.commit();
- return cf;
- }
-
- // returns true if
- // 1. this column has been dropped from schema and
- // 2. if it has been re-added since then, this particular column was inserted before the last drop
- private static boolean isDroppedColumn(Cell c, CFMetaData meta)
- {
- Long droppedAt = meta.getDroppedColumns().get(c.name().cql3ColumnName(meta));
- return droppedAt != null && c.timestamp() <= droppedAt;
- }
-
- private void removeDroppedColumns(ColumnFamily cf)
- {
- if (cf == null || cf.metadata.getDroppedColumns().isEmpty())
- return;
-
- BatchRemoveIterator<Cell> iter = cf.batchRemoveIterator();
- while (iter.hasNext())
- if (isDroppedColumn(iter.next(), metadata))
- iter.remove();
- iter.commit();
- }
-
- /**
* @param sstables
* @return sstables whose key range overlaps with that of the given sstables, not including itself.
* (The given sstables may or may not overlap with each other.)
@@ -1348,7 +1253,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
Set<SSTableReader> results = null;
for (SSTableReader sstable : sstables)
{
- Set<SSTableReader> overlaps = ImmutableSet.copyOf(tree.search(Interval.<RowPosition, SSTableReader>create(sstable.first, sstable.last)));
+ Set<SSTableReader> overlaps = ImmutableSet.copyOf(tree.search(Interval.<PartitionPosition, SSTableReader>create(sstable.first, sstable.last)));
results = results == null ? overlaps : Sets.union(results, overlaps).immutableCopy();
}
results = Sets.difference(results, ImmutableSet.copyOf(sstables));
@@ -1532,9 +1437,6 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
return valid;
}
-
-
-
/**
* Package protected for access from the CompactionManager.
*/
@@ -1553,249 +1455,30 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
return data.getUncompacting();
}
- public ColumnFamily getColumnFamily(DecoratedKey key,
- Composite start,
- Composite finish,
- boolean reversed,
- int limit,
- long timestamp)
- {
- return getColumnFamily(QueryFilter.getSliceFilter(key, name, start, finish, reversed, limit, timestamp));
- }
-
- /**
- * Fetch the row and columns given by filter.key if it is in the cache; if not, read it from disk and cache it
- *
- * If row is cached, and the filter given is within its bounds, we return from cache, otherwise from disk
- *
- * If row is not cached, we figure out what filter is "biggest", read that from disk, then
- * filter the result and either cache that or return it.
- *
- * @param cfId the column family to read the row from
- * @param filter the columns being queried.
- * @return the requested data for the filter provided
- */
- private ColumnFamily getThroughCache(UUID cfId, QueryFilter filter)
- {
- assert isRowCacheEnabled()
- : String.format("Row cache is not enabled on table [" + name + "]");
-
- RowCacheKey key = new RowCacheKey(cfId, filter.key);
-
- // attempt a sentinel-read-cache sequence. if a write invalidates our sentinel, we'll return our
- // (now potentially obsolete) data, but won't cache it. see CASSANDRA-3862
- // TODO: don't evict entire rows on writes (#2864)
- IRowCacheEntry cached = CacheService.instance.rowCache.get(key);
- if (cached != null)
- {
- if (cached instanceof RowCacheSentinel)
- {
- // Some other read is trying to cache the value, just do a normal non-caching read
- Tracing.trace("Row cache miss (race)");
- metric.rowCacheMiss.inc();
- return getTopLevelColumns(filter, Integer.MIN_VALUE);
- }
-
- ColumnFamily cachedCf = (ColumnFamily)cached;
- if (isFilterFullyCoveredBy(filter.filter, cachedCf, filter.timestamp))
- {
- metric.rowCacheHit.inc();
- Tracing.trace("Row cache hit");
- return filterColumnFamily(cachedCf, filter);
- }
-
- metric.rowCacheHitOutOfRange.inc();
- Tracing.trace("Ignoring row cache as cached value could not satisfy query");
- return getTopLevelColumns(filter, Integer.MIN_VALUE);
- }
-
- metric.rowCacheMiss.inc();
- Tracing.trace("Row cache miss");
- RowCacheSentinel sentinel = new RowCacheSentinel();
- boolean sentinelSuccess = CacheService.instance.rowCache.putIfAbsent(key, sentinel);
- ColumnFamily data = null;
- ColumnFamily toCache = null;
- try
- {
- // If we are explicitely asked to fill the cache with full partitions, we go ahead and query the whole thing
- if (metadata.getCaching().rowCache.cacheFullPartitions())
- {
- data = getTopLevelColumns(QueryFilter.getIdentityFilter(filter.key, name, filter.timestamp), Integer.MIN_VALUE);
- toCache = data;
- Tracing.trace("Populating row cache with the whole partition");
- if (sentinelSuccess && toCache != null)
- CacheService.instance.rowCache.replace(key, sentinel, toCache);
- return filterColumnFamily(data, filter);
- }
-
- // Otherwise, if we want to cache the result of the query we're about to do, we must make sure this query
- // covers what needs to be cached. And if the user filter does not satisfy that, we sometimes extend said
- // filter so we can populate the cache but only if:
- // 1) we can guarantee it is a strict extension, i.e. that we will still fetch the data asked by the user.
- // 2) the extension does not make us query more than getRowsPerPartitionToCache() (as a mean to limit the
- // amount of extra work we'll do on a user query for the purpose of populating the cache).
- //
- // In practice, we can only guarantee those 2 points if the filter is one that queries the head of the
- // partition (and if that filter actually counts CQL3 rows since that's what we cache and it would be
- // bogus to compare the filter count to the 'rows to cache' otherwise).
- if (filter.filter.isHeadFilter() && filter.filter.countCQL3Rows(metadata.comparator))
- {
- SliceQueryFilter sliceFilter = (SliceQueryFilter)filter.filter;
- int rowsToCache = metadata.getCaching().rowCache.rowsToCache;
-
- SliceQueryFilter cacheSlice = readFilterForCache();
- QueryFilter cacheFilter = new QueryFilter(filter.key, name, cacheSlice, filter.timestamp);
-
- // If the filter count is less than the number of rows cached, we simply extend it to make sure we do cover the
- // number of rows to cache, and if that count is greater than the number of rows to cache, we simply filter what
- // needs to be cached afterwards.
- if (sliceFilter.count < rowsToCache)
- {
- toCache = getTopLevelColumns(cacheFilter, Integer.MIN_VALUE);
- if (toCache != null)
- {
- Tracing.trace("Populating row cache ({} rows cached)", cacheSlice.lastCounted());
- data = filterColumnFamily(toCache, filter);
- }
- }
- else
- {
- data = getTopLevelColumns(filter, Integer.MIN_VALUE);
- if (data != null)
- {
- // The filter limit was greater than the number of rows to cache. But, if the filter had a non-empty
- // finish bound, we may have gotten less than what needs to be cached, in which case we shouldn't cache it
- // (otherwise a cache hit would assume the whole partition is cached which is not the case).
- if (sliceFilter.finish().isEmpty() || sliceFilter.lastCounted() >= rowsToCache)
- {
- toCache = filterColumnFamily(data, cacheFilter);
- Tracing.trace("Caching {} rows (out of {} requested)", cacheSlice.lastCounted(), sliceFilter.count);
- }
- else
- {
- Tracing.trace("Not populating row cache, not enough rows fetched ({} fetched but {} required for the cache)", sliceFilter.lastCounted(), rowsToCache);
- }
- }
- }
-
- if (sentinelSuccess && toCache != null)
- CacheService.instance.rowCache.replace(key, sentinel, toCache);
- return data;
- }
- else
- {
- Tracing.trace("Fetching data but not populating cache as query does not query from the start of the partition");
- return getTopLevelColumns(filter, Integer.MIN_VALUE);
- }
- }
- finally
- {
- if (sentinelSuccess && toCache == null)
- invalidateCachedRow(key);
- }
- }
-
- public SliceQueryFilter readFilterForCache()
- {
- // We create a new filter everytime before for now SliceQueryFilter is unfortunatly mutable.
- return new SliceQueryFilter(ColumnSlice.ALL_COLUMNS_ARRAY, false, metadata.getCaching().rowCache.rowsToCache, metadata.clusteringColumns().size());
- }
-
- public boolean isFilterFullyCoveredBy(IDiskAtomFilter filter, ColumnFamily cachedCf, long now)
+ public boolean isFilterFullyCoveredBy(ClusteringIndexFilter filter, DataLimits limits, CachedPartition cached, int nowInSec)
{
// We can use the cached value only if we know that no data it doesn't contain could be covered
// by the query filter, that is if:
// 1) either the whole partition is cached
- // 2) or we can ensure than any data the filter selects are in the cached partition
-
- // When counting rows to decide if the whole row is cached, we should be careful with expiring
- // columns: if we use a timestamp newer than the one that was used when populating the cache, we might
- // end up deciding the whole partition is cached when it's really not (just some rows expired since the
- // cf was cached). This is the reason for Integer.MIN_VALUE below.
- boolean wholePartitionCached = cachedCf.liveCQL3RowCount(Integer.MIN_VALUE) < metadata.getCaching().rowCache.rowsToCache;
-
- // Contrarily to the "wholePartitionCached" check above, we do want isFullyCoveredBy to take the
- // timestamp of the query into account when dealing with expired columns. Otherwise, we could think
- // the cached partition has enough live rows to satisfy the filter when it doesn't because some
- // are now expired.
- return wholePartitionCached || filter.isFullyCoveredBy(cachedCf, now);
- }
-
- public int gcBefore(long now)
- {
- return (int) (now / 1000) - metadata.getGcGraceSeconds();
- }
-
- /**
- * get a list of columns starting from a given column, in a specified order.
- * only the latest version of a column is returned.
- * @return null if there is no data and no tombstones; otherwise a ColumnFamily
- */
- public ColumnFamily getColumnFamily(QueryFilter filter)
- {
- assert name.equals(filter.getColumnFamilyName()) : filter.getColumnFamilyName();
-
- ColumnFamily result = null;
-
- long start = System.nanoTime();
- try
- {
- int gcBefore = gcBefore(filter.timestamp);
- if (isRowCacheEnabled())
- {
- assert !isIndex(); // CASSANDRA-5732
- UUID cfId = metadata.cfId;
-
- ColumnFamily cached = getThroughCache(cfId, filter);
- if (cached == null)
- {
- logger.trace("cached row is empty");
- return null;
- }
-
- result = cached;
- }
- else
- {
- ColumnFamily cf = getTopLevelColumns(filter, gcBefore);
+ // 2) or we can ensure than any data the filter selects is in the cached partition
- if (cf == null)
- return null;
-
- result = removeDeletedCF(cf, gcBefore);
- }
-
- removeDroppedColumns(result);
-
- if (filter.filter instanceof SliceQueryFilter)
- {
- // Log the number of tombstones scanned on single key queries
- metric.tombstoneScannedHistogram.update(((SliceQueryFilter) filter.filter).lastTombstones());
- metric.liveScannedHistogram.update(((SliceQueryFilter) filter.filter).lastLive());
- }
- }
- finally
- {
- metric.readLatency.addNano(System.nanoTime() - start);
- }
+ // We can guarantee that a partition is fully cached if the number of rows it contains is less than
+ // what we're caching. Wen doing that, we should be careful about expiring cells: we should count
+ // something expired that wasn't when the partition was cached, or we could decide that the whole
+ // partition is cached when it's not. This is why we use CachedPartition#cachedLiveRows.
+ if (cached.cachedLiveRows() < metadata.getCaching().rowCache.rowsToCache)
+ return true;
- return result;
+ // If the whole partition isn't cached, then we must guarantee that the filter cannot select data that
+ // is not in the cache. We can guarantee that if either the filter is a "head filter" and the cached
+ // partition has more live rows that queried (where live rows refers to the rows that are live now),
+ // or if we can prove that everything the filter selects is in the cached partition based on its content.
+ return (filter.isHeadFilter() && limits.hasEnoughLiveData(cached, nowInSec)) || filter.isFullyCoveredBy(cached);
}
- /**
- * Filter a cached row, which will not be modified by the filter, but may be modified by throwing out
- * tombstones that are no longer relevant.
- * The returned column family won't be thread safe.
- */
- ColumnFamily filterColumnFamily(ColumnFamily cached, QueryFilter filter)
+ public int gcBefore(int nowInSec)
{
- if (cached == null)
- return null;
-
- ColumnFamily cf = cached.cloneMeShallow(ArrayBackedSortedColumns.factory, filter.filter.isReversed());
- int gcBefore = gcBefore(filter.timestamp);
- filter.collateOnDiskAtom(cf, filter.getIterator(cached), gcBefore);
- return removeDeletedCF(cf, gcBefore);
+ return nowInSec - metadata.getGcGraceSeconds();
}
public Set<SSTableReader> getUnrepairedSSTables()
@@ -1881,7 +1564,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
* @return a ViewFragment containing the sstables and memtables that may need to be merged
* for rows within @param rowBounds, inclusive, according to the interval tree.
*/
- public Function<View, List<SSTableReader>> viewFilter(final AbstractBounds<RowPosition> rowBounds)
+ public Function<View, List<SSTableReader>> viewFilter(final AbstractBounds<PartitionPosition> rowBounds)
{
return new Function<View, List<SSTableReader>>()
{
@@ -1896,14 +1579,14 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
* @return a ViewFragment containing the sstables and memtables that may need to be merged
* for rows for all of @param rowBoundsCollection, inclusive, according to the interval tree.
*/
- public Function<View, List<SSTableReader>> viewFilter(final Collection<AbstractBounds<RowPosition>> rowBoundsCollection, final boolean includeRepaired)
+ public Function<View, List<SSTableReader>> viewFilter(final Collection<AbstractBounds<PartitionPosition>> rowBoundsCollection, final boolean includeRepaired)
{
return new Function<View, List<SSTableReader>>()
{
public List<SSTableReader> apply(View view)
{
Set<SSTableReader> sstables = Sets.newHashSet();
- for (AbstractBounds<RowPosition> rowBounds : rowBoundsCollection)
+ for (AbstractBounds<PartitionPosition> rowBounds : rowBoundsCollection)
{
for (SSTableReader sstable : view.sstablesInBounds(rowBounds))
{
@@ -1934,20 +1617,6 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
}
}
- public ColumnFamily getTopLevelColumns(QueryFilter filter, int gcBefore)
- {
- Tracing.trace("Executing single-partition query on {}", name);
- CollationController controller = new CollationController(this, filter, gcBefore);
- ColumnFamily columns;
- try (OpOrder.Group op = readOrdering.start())
- {
- columns = controller.getTopLevelColumns(Memtable.MEMORY_POOL.needToCopyOnHeap());
- }
- if (columns != null)
- metric.samplers.get(Sampler.READS).addSample(filter.key.getKey(), filter.key.hashCode(), 1);
- metric.updateSSTableIterated(controller.getSstablesIterated());
- return columns;
- }
public void beginLocalSampling(String sampler, int capacity)
{
@@ -1982,7 +1651,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
RowCacheKey key = keyIter.next();
DecoratedKey dk = partitioner.decorateKey(ByteBuffer.wrap(key.key));
if (key.cfId.equals(metadata.cfId) && !Range.isInRanges(dk.getToken(), ranges))
- invalidateCachedRow(dk);
+ invalidateCachedPartition(dk);
}
if (metadata.isCounter())
@@ -1998,247 +1667,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
}
}
- public static abstract class AbstractScanIterator extends AbstractIterator<Row> implements CloseableIterator<Row>
- {
- public boolean needsFiltering()
- {
- return true;
- }
- }
-
- /**
- * Iterate over a range of rows and columns from memtables/sstables.
- *
- * @param range The range of keys and columns within those keys to fetch
- */
- @SuppressWarnings("resource")
- private AbstractScanIterator getSequentialIterator(final DataRange range, long now)
- {
- assert !(range.keyRange() instanceof Range) || !((Range<?>)range.keyRange()).isWrapAround() || range.keyRange().right.isMinimum() : range.keyRange();
-
- final ViewFragment view = select(viewFilter(range.keyRange()));
- Tracing.trace("Executing seq scan across {} sstables for {}", view.sstables.size(), range.keyRange().getString(metadata.getKeyValidator()));
-
- final CloseableIterator<Row> iterator = RowIteratorFactory.getIterator(view.memtables, view.sstables, range, this, now);
-
- // todo this could be pushed into SSTableScanner
- return new AbstractScanIterator()
- {
- protected Row computeNext()
- {
- while (true)
- {
- // pull a row out of the iterator
- if (!iterator.hasNext())
- return endOfData();
-
- Row current = iterator.next();
- DecoratedKey key = current.key;
-
- if (!range.stopKey().isMinimum() && range.stopKey().compareTo(key) < 0)
- return endOfData();
-
- // skipping outside of assigned range
- if (!range.contains(key))
- continue;
-
- if (logger.isTraceEnabled())
- logger.trace("scanned {}", metadata.getKeyValidator().getString(key.getKey()));
-
- return current;
- }
- }
-
- public void close() throws IOException
- {
- iterator.close();
- }
- };
- }
-
- @VisibleForTesting
- public List<Row> getRangeSlice(final AbstractBounds<RowPosition> range,
- List<IndexExpression> rowFilter,
- IDiskAtomFilter columnFilter,
- int maxResults)
- {
- return getRangeSlice(range, rowFilter, columnFilter, maxResults, System.currentTimeMillis());
- }
-
- public List<Row> getRangeSlice(final AbstractBounds<RowPosition> range,
- List<IndexExpression> rowFilter,
- IDiskAtomFilter columnFilter,
- int maxResults,
- long now)
- {
- return getRangeSlice(makeExtendedFilter(range, columnFilter, rowFilter, maxResults, false, false, now));
- }
-
- /**
- * Allows generic range paging with the slice column filter.
- * Typically, suppose we have rows A, B, C ... Z having each some columns in [1, 100].
- * And suppose we want to page through the query that for all rows returns the columns
- * within [25, 75]. For that, we need to be able to do a range slice starting at (row r, column c)
- * and ending at (row Z, column 75), *but* that only return columns in [25, 75].
- * That is what this method allows. The columnRange is the "window" of columns we are interested
- * in each row, and columnStart (resp. columnEnd) is the start (resp. end) for the first
- * (resp. last) requested row.
- */
- public ExtendedFilter makeExtendedFilter(AbstractBounds<RowPosition> keyRange,
- SliceQueryFilter columnRange,
- Composite columnStart,
- Composite columnStop,
- List<IndexExpression> rowFilter,
- int maxResults,
- boolean countCQL3Rows,
- long now)
- {
- DataRange dataRange = new DataRange.Paging(keyRange, columnRange, columnStart, columnStop, metadata);
- return ExtendedFilter.create(this, dataRange, rowFilter, maxResults, countCQL3Rows, now);
- }
-
- public List<Row> getRangeSlice(AbstractBounds<RowPosition> range,
- List<IndexExpression> rowFilter,
- IDiskAtomFilter columnFilter,
- int maxResults,
- long now,
- boolean countCQL3Rows,
- boolean isPaging)
- {
- return getRangeSlice(makeExtendedFilter(range, columnFilter, rowFilter, maxResults, countCQL3Rows, isPaging, now));
- }
-
- public ExtendedFilter makeExtendedFilter(AbstractBounds<RowPosition> range,
- IDiskAtomFilter columnFilter,
- List<IndexExpression> rowFilter,
- int maxResults,
- boolean countCQL3Rows,
- boolean isPaging,
- long timestamp)
- {
- DataRange dataRange;
- if (isPaging)
- {
- assert columnFilter instanceof SliceQueryFilter;
- SliceQueryFilter sfilter = (SliceQueryFilter)columnFilter;
- assert sfilter.slices.length == 1;
- // create a new SliceQueryFilter that selects all cells, but pass the original slice start and finish
- // through to DataRange.Paging to be used on the first and last partitions
- SliceQueryFilter newFilter = new SliceQueryFilter(ColumnSlice.ALL_COLUMNS_ARRAY, sfilter.isReversed(), sfilter.count);
- dataRange = new DataRange.Paging(range, newFilter, sfilter.start(), sfilter.finish(), metadata);
- }
- else
- {
- dataRange = new DataRange(range, columnFilter);
- }
- return ExtendedFilter.create(this, dataRange, rowFilter, maxResults, countCQL3Rows, timestamp);
- }
-
- public List<Row> getRangeSlice(ExtendedFilter filter)
- {
- long start = System.nanoTime();
- try (OpOrder.Group op = readOrdering.start())
- {
- return filter(getSequentialIterator(filter.dataRange, filter.timestamp), filter);
- }
- finally
- {
- metric.rangeLatency.addNano(System.nanoTime() - start);
- }
- }
-
- @VisibleForTesting
- public List<Row> search(AbstractBounds<RowPosition> range,
- List<IndexExpression> clause,
- IDiskAtomFilter dataFilter,
- int maxResults)
- {
- return search(range, clause, dataFilter, maxResults, System.currentTimeMillis());
- }
-
- public List<Row> search(AbstractBounds<RowPosition> range,
- List<IndexExpression> clause,
- IDiskAtomFilter dataFilter,
- int maxResults,
- long now)
- {
- return search(makeExtendedFilter(range, dataFilter, clause, maxResults, false, false, now));
- }
-
- public List<Row> search(ExtendedFilter filter)
- {
- Tracing.trace("Executing indexed scan for {}", filter.dataRange.keyRange().getString(metadata.getKeyValidator()));
- return indexManager.search(filter);
- }
-
- public List<Row> filter(AbstractScanIterator rowIterator, ExtendedFilter filter)
- {
- logger.trace("Filtering {} for rows matching {}", rowIterator, filter);
- List<Row> rows = new ArrayList<Row>();
- int columnsCount = 0;
- int total = 0, matched = 0;
- boolean ignoreTombstonedPartitions = filter.ignoreTombstonedPartitions();
-
- try
- {
- while (rowIterator.hasNext() && matched < filter.maxRows() && columnsCount < filter.maxColumns())
- {
- // get the raw columns requested, and additional columns for the expressions if necessary
- Row rawRow = rowIterator.next();
- total++;
- ColumnFamily data = rawRow.cf;
-
- if (rowIterator.needsFiltering())
- {
- IDiskAtomFilter extraFilter = filter.getExtraFilter(rawRow.key, data);
- if (extraFilter != null)
- {
- ColumnFamily cf = filter.cfs.getColumnFamily(new QueryFilter(rawRow.key, name, extraFilter, filter.timestamp));
- if (cf != null)
- data.addAll(cf);
- }
-
- removeDroppedColumns(data);
-
- if (!filter.isSatisfiedBy(rawRow.key, data, null, null))
- continue;
-
- logger.trace("{} satisfies all filter expressions", data);
- // cut the resultset back to what was requested, if necessary
- data = filter.prune(rawRow.key, data);
- }
- else
- {
- removeDroppedColumns(data);
- }
-
- rows.add(new Row(rawRow.key, data));
- if (!ignoreTombstonedPartitions || !data.hasOnlyTombstones(filter.timestamp))
- matched++;
-
- if (data != null)
- columnsCount += filter.lastCounted(data);
- // Update the underlying filter to avoid querying more columns per slice than necessary and to handle paging
- filter.updateFilter(columnsCount);
- }
-
- return rows;
- }
- finally
- {
- try
- {
- rowIterator.close();
- Tracing.trace("Scanned {} rows and matched {}", total, matched);
- }
- catch (IOException e)
- {
- throw new RuntimeException(e);
- }
- }
- }
-
- public CellNameType getComparator()
+ public ClusteringComparator getComparator()
{
return metadata.comparator;
}
@@ -2388,20 +1817,20 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
}
/**
- * @return the cached row for @param key if it is already present in the cache.
- * That is, unlike getThroughCache, it will not readAndCache the row if it is not present, nor
+ * @return the cached partition for @param key if it is already present in the cache.
+ * Not that this will not readAndCache the parition if it is not present, nor
* are these calls counted in cache statistics.
*
- * Note that this WILL cause deserialization of a SerializingCache row, so if all you
- * need to know is whether a row is present or not, use containsCachedRow instead.
+ * Note that this WILL cause deserialization of a SerializingCache partition, so if all you
+ * need to know is whether a partition is present or not, use containsCachedParition instead.
*/
- public ColumnFamily getRawCachedRow(DecoratedKey key)
+ public CachedPartition getRawCachedPartition(DecoratedKey key)
{
if (!isRowCacheEnabled())
return null;
IRowCacheEntry cached = CacheService.instance.rowCache.getInternal(new RowCacheKey(metadata.cfId, key));
- return cached == null || cached instanceof RowCacheSentinel ? null : (ColumnFamily)cached;
+ return cached == null || cached instanceof RowCacheSentinel ? null : (CachedPartition)cached;
}
private void invalidateCaches()
@@ -2415,37 +1844,37 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
/**
* @return true if @param key is contained in the row cache
*/
- public boolean containsCachedRow(DecoratedKey key)
+ public boolean containsCachedParition(DecoratedKey key)
{
return CacheService.instance.rowCache.getCapacity() != 0 && CacheService.instance.rowCache.containsKey(new RowCacheKey(metadata.cfId, key));
}
- public void invalidateCachedRow(RowCacheKey key)
+ public void invalidateCachedPartition(RowCacheKey key)
{
CacheService.instance.rowCache.remove(key);
}
- public void invalidateCachedRow(DecoratedKey key)
+ public void invalidateCachedPartition(DecoratedKey key)
{
UUID cfId = Schema.instance.getId(keyspace.getName(), this.name);
if (cfId == null)
return; // secondary index
- invalidateCachedRow(new RowCacheKey(cfId, key));
+ invalidateCachedPartition(new RowCacheKey(cfId, key));
}
- public ClockAndCount getCachedCounter(ByteBuffer partitionKey, CellName cellName)
+ public ClockAndCount getCachedCounter(ByteBuffer partitionKey, Clustering clustering, ColumnDefinition column, CellPath path)
{
if (CacheService.instance.counterCache.getCapacity() == 0L) // counter cache disabled.
return null;
- return CacheService.instance.counterCache.get(CounterCacheKey.create(metadata.cfId, partitionKey, cellName));
+ return CacheService.instance.counterCache.get(CounterCacheKey.create(metadata.cfId, partitionKey, clustering, column, path));
}
- public void putCachedCounter(ByteBuffer partitionKey, CellName cellName, ClockAndCount clockAndCount)
+ public void putCachedCounter(ByteBuffer partitionKey, Clustering clustering, ColumnDefinition column, CellPath path, ClockAndCount clockAndCount)
{
if (CacheService.instance.counterCache.getCapacity() == 0L) // counter cache disabled.
return;
- CacheService.instance.counterCache.put(CounterCacheKey.create(metadata.cfId, partitionKey, cellName), clockAndCount);
+ CacheService.instance.counterCache.put(CounterCacheKey.create(metadata.cfId, partitionKey, clustering, column, path), clockAndCount);
}
public void forceMajorCompaction() throws InterruptedException, ExecutionException
@@ -2830,7 +2259,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
return view.sstables.isEmpty() && view.getCurrentMemtable().getOperations() == 0 && view.liveMemtables.size() <= 1 && view.flushingMemtables.size() == 0;
}
- private boolean isRowCacheEnabled()
+ public boolean isRowCacheEnabled()
{
return metadata.getCaching().rowCache.isEnabled() && CacheService.instance.rowCache.getCapacity() > 0;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/ColumnFamilyType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyType.java b/src/java/org/apache/cassandra/db/ColumnFamilyType.java
deleted file mode 100644
index 51e8b63..0000000
--- a/src/java/org/apache/cassandra/db/ColumnFamilyType.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db;
-
-/**
- * column family type enum
- */
-public enum ColumnFamilyType
-{
- Standard,
- Super;
-
- public static ColumnFamilyType create(String name)
- {
- try
- {
- // TODO thrift optional parameter in CfDef is leaking down here which it shouldn't
- return name == null ? ColumnFamilyType.Standard : ColumnFamilyType.valueOf(name);
- }
- catch (IllegalArgumentException e)
- {
- return null;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/ColumnIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnIndex.java b/src/java/org/apache/cassandra/db/ColumnIndex.java
index d9d6a9c..1a9b92d 100644
--- a/src/java/org/apache/cassandra/db/ColumnIndex.java
+++ b/src/java/org/apache/cassandra/db/ColumnIndex.java
@@ -18,15 +18,15 @@
package org.apache.cassandra.db;
import java.io.IOException;
-import java.nio.ByteBuffer;
import java.util.*;
import com.google.common.annotations.VisibleForTesting;
import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.db.composites.Composite;
+import org.apache.cassandra.db.rows.*;
import org.apache.cassandra.io.sstable.IndexHelper;
-import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.io.sstable.format.Version;
+import org.apache.cassandra.io.util.SequentialWriter;
import org.apache.cassandra.utils.ByteBufferUtil;
public class ColumnIndex
@@ -42,6 +42,14 @@ public class ColumnIndex
this.columnsIndex = columnsIndex;
}
+ public static ColumnIndex writeAndBuildIndex(UnfilteredRowIterator iterator, SequentialWriter output, SerializationHeader header, Version version) throws IOException
+ {
+ assert !iterator.isEmpty() && version.storeRows();
+
+ Builder builder = new Builder(iterator, output, header, version.correspondingMessagingVersion());
+ return builder.build();
+ }
+
@VisibleForTesting
public static ColumnIndex nothing()
{
@@ -52,192 +60,114 @@ public class ColumnIndex
* Help to create an index for a column family based on size of columns,
* and write said columns to disk.
*/
- public static class Builder
+ private static class Builder
{
+ private final UnfilteredRowIterator iterator;
+ private final SequentialWriter writer;
+ private final SerializationHeader header;
+ private final int version;
+
private final ColumnIndex result;
- private final long indexOffset;
+ private final long initialPosition;
private long startPosition = -1;
- private long endPosition = 0;
- private long blockSize;
- private OnDiskAtom firstColumn;
- private OnDiskAtom lastColumn;
- private OnDiskAtom lastBlockClosing;
- private final DataOutputPlus output;
- private final RangeTombstone.Tracker tombstoneTracker;
- private int atomCount;
- private final ByteBuffer key;
- private final DeletionInfo deletionInfo; // only used for serializing and calculating row header size
-
- private final OnDiskAtom.Serializer atomSerializer;
-
- public Builder(ColumnFamily cf,
- ByteBuffer key,
- DataOutputPlus output)
- {
- assert cf != null;
- assert key != null;
- assert output != null;
- this.key = key;
- deletionInfo = cf.deletionInfo();
- this.indexOffset = rowHeaderSize(key, deletionInfo);
- this.result = new ColumnIndex(new ArrayList<IndexHelper.IndexInfo>());
- this.output = output;
- this.tombstoneTracker = new RangeTombstone.Tracker(cf.getComparator());
- this.atomSerializer = cf.getComparator().onDiskAtomSerializer();
- }
+ private int written;
- /**
- * Returns the number of bytes between the beginning of the row and the
- * first serialized column.
- */
- private static long rowHeaderSize(ByteBuffer key, DeletionInfo delInfo)
- {
- TypeSizes typeSizes = TypeSizes.NATIVE;
- // TODO fix constantSize when changing the nativeconststs.
- int keysize = key.remaining();
- return typeSizes.sizeof((short) keysize) + keysize // Row key
- + DeletionTime.serializer.serializedSize(delInfo.getTopLevelDeletion(), typeSizes);
- }
+ private ClusteringPrefix firstClustering;
+ private final ReusableClusteringPrefix lastClustering;
+
+ private DeletionTime openMarker;
- public RangeTombstone.Tracker tombstoneTracker()
+ public Builder(UnfilteredRowIterator iterator,
+ SequentialWriter writer,
+ SerializationHeader header,
+ int version)
{
- return tombstoneTracker;
+ this.iterator = iterator;
+ this.writer = writer;
+ this.header = header;
+ this.version = version;
+
+ this.result = new ColumnIndex(new ArrayList<IndexHelper.IndexInfo>());
+ this.initialPosition = writer.getFilePointer();
+ this.lastClustering = new ReusableClusteringPrefix(iterator.metadata().clusteringColumns().size());
}
- public int writtenAtomCount()
+ private void writePartitionHeader(UnfilteredRowIterator iterator) throws IOException
{
- return atomCount + tombstoneTracker.writtenAtom();
+ ByteBufferUtil.writeWithShortLength(iterator.partitionKey().getKey(), writer.stream);
+ DeletionTime.serializer.serialize(iterator.partitionLevelDeletion(), writer.stream);
+ if (header.hasStatic())
+ UnfilteredSerializer.serializer.serialize(iterator.staticRow(), header, writer.stream, version);
}
- /**
- * Serializes the index into in-memory structure with all required components
- * such as Bloom Filter, index block size, IndexInfo list
- *
- * @param cf Column family to create index for
- *
- * @return information about index - it's Bloom Filter, block size and IndexInfo list
- */
- public ColumnIndex build(ColumnFamily cf) throws IOException
+ public ColumnIndex build() throws IOException
{
- // cf has disentangled the columns and range tombstones, we need to re-interleave them in comparator order
- Comparator<Composite> comparator = cf.getComparator();
- DeletionInfo.InOrderTester tester = cf.deletionInfo().inOrderTester();
- Iterator<RangeTombstone> rangeIter = cf.deletionInfo().rangeIterator();
- RangeTombstone tombstone = rangeIter.hasNext() ? rangeIter.next() : null;
-
- for (Cell c : cf)
- {
- while (tombstone != null && comparator.compare(c.name(), tombstone.min) >= 0)
- {
- // skip range tombstones that are shadowed by partition tombstones
- if (!cf.deletionInfo().getTopLevelDeletion().isDeleted(tombstone))
- add(tombstone);
- tombstone = rangeIter.hasNext() ? rangeIter.next() : null;
- }
-
- // We can skip any cell if it's shadowed by a tombstone already. This is a more
- // general case than was handled by CASSANDRA-2589.
- if (!tester.isDeleted(c))
- add(c);
- }
-
- while (tombstone != null)
- {
- add(tombstone);
- tombstone = rangeIter.hasNext() ? rangeIter.next() : null;
- }
- ColumnIndex index = build();
+ writePartitionHeader(iterator);
- maybeWriteEmptyRowHeader();
+ while (iterator.hasNext())
+ add(iterator.next());
- return index;
+ return close();
}
- /**
- * The important distinction wrt build() is that we may be building for a row that ends up
- * being compacted away entirely, i.e., the input consists only of expired tombstones (or
- * columns shadowed by expired tombstone). Thus, it is the caller's responsibility
- * to decide whether to write the header for an empty row.
- */
- public ColumnIndex buildForCompaction(Iterator<OnDiskAtom> columns) throws IOException
+ private long currentPosition()
{
- while (columns.hasNext())
- {
- OnDiskAtom c = columns.next();
- add(c);
- }
+ return writer.getFilePointer() - initialPosition;
+ }
- return build();
+ private void addIndexBlock()
+ {
+ IndexHelper.IndexInfo cIndexInfo = new IndexHelper.IndexInfo(firstClustering,
+ lastClustering.get().takeAlias(),
+ startPosition,
+ currentPosition() - startPosition,
+ openMarker);
+ result.columnsIndex.add(cIndexInfo);
+ firstClustering = null;
}
- public void add(OnDiskAtom column) throws IOException
+ private void add(Unfiltered unfiltered) throws IOException
{
- atomCount++;
+ lastClustering.copy(unfiltered.clustering());
+ boolean isMarker = unfiltered.kind() == Unfiltered.Kind.RANGE_TOMBSTONE_MARKER;
- if (firstColumn == null)
+ if (firstClustering == null)
{
- firstColumn = column;
- startPosition = endPosition;
- // TODO: have that use the firstColumn as min + make sure we optimize that on read
- endPosition += tombstoneTracker.writeOpenedMarker(firstColumn, output, atomSerializer);
- blockSize = 0; // We don't count repeated tombstone marker in the block size, to avoid a situation
- // where we wouldn't make any progress because a block is filled by said marker
+ // Beginning of an index block. Remember the start and position
+ firstClustering = lastClustering.get().takeAlias();
+ startPosition = currentPosition();
}
- long size = atomSerializer.serializedSizeForSSTable(column);
- endPosition += size;
- blockSize += size;
+ UnfilteredSerializer.serializer.serialize(unfiltered, header, writer.stream, version);
+ ++written;
- // if we hit the column index size that we have to index after, go ahead and index it.
- if (blockSize >= DatabaseDescriptor.getColumnIndexSize())
+ if (isMarker)
{
- IndexHelper.IndexInfo cIndexInfo = new IndexHelper.IndexInfo(firstColumn.name(), column.name(), indexOffset + startPosition, endPosition - startPosition);
- result.columnsIndex.add(cIndexInfo);
- firstColumn = null;
- lastBlockClosing = column;
+ RangeTombstoneMarker marker = (RangeTombstoneMarker) unfiltered;
+ openMarker = marker.isOpen(false) ? marker.openDeletionTime(false) : null;
}
- maybeWriteRowHeader();
- atomSerializer.serializeForSSTable(column, output);
-
- // TODO: Should deal with removing unneeded tombstones
- tombstoneTracker.update(column, false);
-
- lastColumn = column;
+ // if we hit the column index size that we have to index after, go ahead and index it.
+ if (currentPosition() - startPosition >= DatabaseDescriptor.getColumnIndexSize())
+ addIndexBlock();
}
- private void maybeWriteRowHeader() throws IOException
+ private ColumnIndex close() throws IOException
{
- if (lastColumn == null)
- {
- ByteBufferUtil.writeWithShortLength(key, output);
- DeletionTime.serializer.serialize(deletionInfo.getTopLevelDeletion(), output);
- }
- }
+ UnfilteredSerializer.serializer.writeEndOfPartition(writer.stream);
- public ColumnIndex build()
- {
- // all columns were GC'd after all
- if (lastColumn == null)
+ // It's possible we add no rows, just a top level deletion
+ if (written == 0)
return ColumnIndex.EMPTY;
// the last column may have fallen on an index boundary already. if not, index it explicitly.
- if (result.columnsIndex.isEmpty() || lastBlockClosing != lastColumn)
- {
- IndexHelper.IndexInfo cIndexInfo = new IndexHelper.IndexInfo(firstColumn.name(), lastColumn.name(), indexOffset + startPosition, endPosition - startPosition);
- result.columnsIndex.add(cIndexInfo);
- }
+ if (firstClustering != null)
+ addIndexBlock();
// we should always have at least one computed index block, but we only write it out if there is more than that.
assert result.columnsIndex.size() > 0;
return result;
}
-
- public void maybeWriteEmptyRowHeader() throws IOException
- {
- if (!deletionInfo.isLive())
- maybeWriteRowHeader();
- }
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/ColumnSerializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnSerializer.java b/src/java/org/apache/cassandra/db/ColumnSerializer.java
deleted file mode 100644
index 8e7026c..0000000
--- a/src/java/org/apache/cassandra/db/ColumnSerializer.java
+++ /dev/null
@@ -1,188 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db;
-
-import java.io.DataInput;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-
-import org.apache.cassandra.db.composites.CellName;
-import org.apache.cassandra.db.composites.CellNameType;
-import org.apache.cassandra.io.ISerializer;
-import org.apache.cassandra.io.FSReadError;
-import org.apache.cassandra.io.util.DataOutputPlus;
-import org.apache.cassandra.io.util.FileDataInput;
-import org.apache.cassandra.io.util.FileUtils;
-import org.apache.cassandra.utils.ByteBufferUtil;
-
-public class ColumnSerializer implements ISerializer<Cell>
-{
- public final static int DELETION_MASK = 0x01;
- public final static int EXPIRATION_MASK = 0x02;
- public final static int COUNTER_MASK = 0x04;
- public final static int COUNTER_UPDATE_MASK = 0x08;
- public final static int RANGE_TOMBSTONE_MASK = 0x10;
-
- /**
- * Flag affecting deserialization behavior.
- * - LOCAL: for deserialization of local data (Expired columns are
- * converted to tombstones (to gain disk space)).
- * - FROM_REMOTE: for deserialization of data received from remote hosts
- * (Expired columns are converted to tombstone and counters have
- * their delta cleared)
- * - PRESERVE_SIZE: used when no transformation must be performed, i.e,
- * when we must ensure that deserializing and reserializing the
- * result yield the exact same bytes. Streaming uses this.
- */
- public static enum Flag
- {
- LOCAL, FROM_REMOTE, PRESERVE_SIZE;
- }
-
- private final CellNameType type;
-
- public ColumnSerializer(CellNameType type)
- {
- this.type = type;
- }
-
- public void serialize(Cell cell, DataOutputPlus out) throws IOException
- {
- assert !cell.name().isEmpty();
- type.cellSerializer().serialize(cell.name(), out);
- try
- {
- out.writeByte(cell.serializationFlags());
- if (cell instanceof CounterCell)
- {
- out.writeLong(((CounterCell) cell).timestampOfLastDelete());
- }
- else if (cell instanceof ExpiringCell)
- {
- out.writeInt(((ExpiringCell) cell).getTimeToLive());
- out.writeInt(cell.getLocalDeletionTime());
- }
- out.writeLong(cell.timestamp());
- ByteBufferUtil.writeWithLength(cell.value(), out);
- }
- catch (IOException e)
- {
- throw new RuntimeException(e);
- }
- }
-
- public Cell deserialize(DataInput in) throws IOException
- {
- return deserialize(in, Flag.LOCAL);
- }
-
- /*
- * For counter columns, we must know when we deserialize them if what we
- * deserialize comes from a remote host. If it does, then we must clear
- * the delta.
- */
- public Cell deserialize(DataInput in, ColumnSerializer.Flag flag) throws IOException
- {
- return deserialize(in, flag, Integer.MIN_VALUE);
- }
-
- public Cell deserialize(DataInput in, ColumnSerializer.Flag flag, int expireBefore) throws IOException
- {
- CellName name = type.cellSerializer().deserialize(in);
-
- int b = in.readUnsignedByte();
- return deserializeColumnBody(in, name, b, flag, expireBefore);
- }
-
- Cell deserializeColumnBody(DataInput in, CellName name, int mask, ColumnSerializer.Flag flag, int expireBefore) throws IOException
- {
- if ((mask & COUNTER_MASK) != 0)
- {
- long timestampOfLastDelete = in.readLong();
- long ts = in.readLong();
- ByteBuffer value = ByteBufferUtil.readWithLength(in);
- return BufferCounterCell.create(name, value, ts, timestampOfLastDelete, flag);
- }
- else if ((mask & EXPIRATION_MASK) != 0)
- {
- int ttl = in.readInt();
- int expiration = in.readInt();
- long ts = in.readLong();
- ByteBuffer value = ByteBufferUtil.readWithLength(in);
- return BufferExpiringCell.create(name, value, ts, ttl, expiration, expireBefore, flag);
- }
- else
- {
- long ts = in.readLong();
- ByteBuffer value = ByteBufferUtil.readWithLength(in);
- return (mask & COUNTER_UPDATE_MASK) != 0
- ? new BufferCounterUpdateCell(name, value, ts)
- : ((mask & DELETION_MASK) == 0
- ? new BufferCell(name, value, ts)
- : new BufferDeletedCell(name, value, ts));
- }
- }
-
- void skipColumnBody(DataInput in, int mask) throws IOException
- {
- if ((mask & COUNTER_MASK) != 0)
- FileUtils.skipBytesFully(in, 16);
- else if ((mask & EXPIRATION_MASK) != 0)
- FileUtils.skipBytesFully(in, 16);
- else
- FileUtils.skipBytesFully(in, 8);
-
- int length = in.readInt();
- FileUtils.skipBytesFully(in, length);
- }
-
- public long serializedSize(Cell cell, TypeSizes typeSizes)
- {
- return cell.serializedSize(type, typeSizes);
- }
-
- public static class CorruptColumnException extends IOException
- {
- public CorruptColumnException(String s)
- {
- super(s);
- }
-
- public static CorruptColumnException create(DataInput in, ByteBuffer name)
- {
- assert name.remaining() <= 0;
- String format = "invalid column name length %d%s";
- String details = "";
- if (in instanceof FileDataInput)
- {
- FileDataInput fdis = (FileDataInput)in;
- long remaining;
- try
- {
- remaining = fdis.bytesRemaining();
- }
- catch (IOException e)
- {
- throw new FSReadError(e, fdis.getPath());
- }
- details = String.format(" (%s, %d bytes remaining)", fdis.getPath(), remaining);
- }
- return new CorruptColumnException(String.format(format, name.remaining(), details));
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/Columns.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Columns.java b/src/java/org/apache/cassandra/db/Columns.java
new file mode 100644
index 0000000..83d39db
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/Columns.java
@@ -0,0 +1,535 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db;
+
+import java.io.DataInput;
+import java.io.IOException;
+import java.util.*;
+import java.nio.ByteBuffer;
+import java.security.MessageDigest;
+
+import com.google.common.collect.AbstractIterator;
+import com.google.common.collect.Iterators;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.db.marshal.UTF8Type;
+import org.apache.cassandra.db.marshal.MapType;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+/**
+ * An immutable and sorted list of (non-PK) columns for a given table.
+ * <p>
+ * Note that in practice, it will either store only static columns, or only regular ones. When
+ * we need both type of columns, we use a {@link PartitionColumns} object.
+ */
+public class Columns implements Iterable<ColumnDefinition>
+{
+ public static final Serializer serializer = new Serializer();
+ public static final Columns NONE = new Columns(new ColumnDefinition[0], 0);
+
+ public final ColumnDefinition[] columns;
+ public final int complexIdx; // Index of the first complex column
+
+ private Columns(ColumnDefinition[] columns, int complexIdx)
+ {
+ assert complexIdx <= columns.length;
+ this.columns = columns;
+ this.complexIdx = complexIdx;
+ }
+
+ private Columns(ColumnDefinition[] columns)
+ {
+ this(columns, findFirstComplexIdx(columns));
+ }
+
+ /**
+ * Creates a {@code Columns} holding only the one column provided.
+ *
+ * @param c the column for which to create a {@code Columns} object.
+ *
+ * @return the newly created {@code Columns} containing only {@code c}.
+ */
+ public static Columns of(ColumnDefinition c)
+ {
+ ColumnDefinition[] columns = new ColumnDefinition[]{ c };
+ return new Columns(columns, c.isComplex() ? 0 : 1);
+ }
+
+ /**
+ * Returns a new {@code Columns} object holing the same columns than the provided set.
+ *
+ * @param param s the set from which to create the new {@code Columns}.
+ *
+ * @return the newly created {@code Columns} containing the columns from {@code s}.
+ */
+ public static Columns from(Set<ColumnDefinition> s)
+ {
+ ColumnDefinition[] columns = s.toArray(new ColumnDefinition[s.size()]);
+ Arrays.sort(columns);
+ return new Columns(columns, findFirstComplexIdx(columns));
+ }
+
+ private static int findFirstComplexIdx(ColumnDefinition[] columns)
+ {
+ for (int i = 0; i < columns.length; i++)
+ if (columns[i].isComplex())
+ return i;
+ return columns.length;
+ }
+
+ /**
+ * Whether this columns is empty.
+ *
+ * @return whether this columns is empty.
+ */
+ public boolean isEmpty()
+ {
+ return columns.length == 0;
+ }
+
+ /**
+ * The number of simple columns in this object.
+ *
+ * @return the number of simple columns in this object.
+ */
+ public int simpleColumnCount()
+ {
+ return complexIdx;
+ }
+
+ /**
+ * The number of complex columns (non-frozen collections, udts, ...) in this object.
+ *
+ * @return the number of complex columns in this object.
+ */
+ public int complexColumnCount()
+ {
+ return columns.length - complexIdx;
+ }
+
+ /**
+ * The total number of columns in this object.
+ *
+ * @return the total number of columns in this object.
+ */
+ public int columnCount()
+ {
+ return columns.length;
+ }
+
+ /**
+ * Whether this objects contains simple columns.
+ *
+ * @return whether this objects contains simple columns.
+ */
+ public boolean hasSimple()
+ {
+ return complexIdx > 0;
+ }
+
+ /**
+ * Whether this objects contains complex columns.
+ *
+ * @return whether this objects contains complex columns.
+ */
+ public boolean hasComplex()
+ {
+ return complexIdx < columns.length;
+ }
+
+ /**
+ * Returns the ith simple column of this object.
+ *
+ * @param i the index for the simple column to fectch. This must
+ * satisfy {@code 0 <= i < simpleColumnCount()}.
+ *
+ * @return the {@code i}th simple column in this object.
+ */
+ public ColumnDefinition getSimple(int i)
+ {
+ return columns[i];
+ }
+
+ /**
+ * Returns the ith complex column of this object.
+ *
+ * @param i the index for the complex column to fectch. This must
+ * satisfy {@code 0 <= i < complexColumnCount()}.
+ *
+ * @return the {@code i}th complex column in this object.
+ */
+ public ColumnDefinition getComplex(int i)
+ {
+ return columns[complexIdx + i];
+ }
+
+ /**
+ * The index of the provided simple column in this object (if it contains
+ * the provided column).
+ *
+ * @param c the simple column for which to return the index of.
+ * @param from the index to start the search from.
+ *
+ * @return the index for simple column {@code c} if it is contains in this
+ * object (starting from index {@code from}), {@code -1} otherwise.
+ */
+ public int simpleIdx(ColumnDefinition c, int from)
+ {
+ assert !c.isComplex();
+ for (int i = from; i < complexIdx; i++)
+ // We know we only use "interned" ColumnIdentifier so == is ok.
+ if (columns[i].name == c.name)
+ return i;
+ return -1;
+ }
+
+ /**
+ * The index of the provided complex column in this object (if it contains
+ * the provided column).
+ *
+ * @param c the complex column for which to return the index of.
+ * @param from the index to start the search from.
+ *
+ * @return the index for complex column {@code c} if it is contains in this
+ * object (starting from index {@code from}), {@code -1} otherwise.
+ */
+ public int complexIdx(ColumnDefinition c, int from)
+ {
+ assert c.isComplex();
+ for (int i = complexIdx + from; i < columns.length; i++)
+ // We know we only use "interned" ColumnIdentifier so == is ok.
+ if (columns[i].name == c.name)
+ return i - complexIdx;
+ return -1;
+ }
+
+ /**
+ * Whether the provided column is contained by this object.
+ *
+ * @param c the column to check presence of.
+ *
+ * @return whether {@code c} is contained by this object.
+ */
+ public boolean contains(ColumnDefinition c)
+ {
+ return c.isComplex() ? complexIdx(c, 0) >= 0 : simpleIdx(c, 0) >= 0;
+ }
+
+ /**
+ * Whether or not there is some counter columns within those columns.
+ *
+ * @return whether or not there is some counter columns within those columns.
+ */
+ public boolean hasCounters()
+ {
+ for (int i = 0; i < complexIdx; i++)
+ {
+ if (columns[i].type.isCounter())
+ return true;
+ }
+
+ for (int i = complexIdx; i < columns.length; i++)
+ {
+ // We only support counter in maps because that's all we need for now (and we need it for the sake of thrift super columns of counter)
+ if (columns[i].type instanceof MapType && (((MapType)columns[i].type).valueComparator().isCounter()))
+ return true;
+ }
+
+ return false;
+ }
+
+ /**
+ * Returns the result of merging this {@code Columns} object with the
+ * provided one.
+ *
+ * @param other the other {@code Columns} to merge this object with.
+ *
+ * @return the result of merging/taking the union of {@code this} and
+ * {@code other}. The returned object may be one of the operand and that
+ * operand is a subset of the other operand.
+ */
+ public Columns mergeTo(Columns other)
+ {
+ if (this == other || other == NONE)
+ return this;
+ if (this == NONE)
+ return other;
+
+ int i = 0, j = 0;
+ int size = 0;
+ while (i < columns.length && j < other.columns.length)
+ {
+ ++size;
+ int cmp = columns[i].compareTo(other.columns[j]);
+ if (cmp == 0)
+ {
+ ++i;
+ ++j;
+ }
+ else if (cmp < 0)
+ {
+ ++i;
+ }
+ else
+ {
+ ++j;
+ }
+ }
+
+ // If every element was always counted on both array, we have the same
+ // arrays for the first min elements
+ if (i == size && j == size)
+ {
+ // We've exited because of either c1 or c2 (or both). The array that
+ // made us stop is thus a subset of the 2nd one, return that array.
+ return i == columns.length ? other : this;
+ }
+
+ size += i == columns.length ? other.columns.length - j : columns.length - i;
+ ColumnDefinition[] result = new ColumnDefinition[size];
+ i = 0;
+ j = 0;
+ for (int k = 0; k < size; k++)
+ {
+ int cmp = i >= columns.length ? 1
+ : (j >= other.columns.length ? -1 : columns[i].compareTo(other.columns[j]));
+ if (cmp == 0)
+ {
+ result[k] = columns[i];
+ ++i;
+ ++j;
+ }
+ else if (cmp < 0)
+ {
+ result[k] = columns[i++];
+ }
+ else
+ {
+ result[k] = other.columns[j++];
+ }
+ }
+ return new Columns(result, findFirstComplexIdx(result));
+ }
+
+ /**
+ * Whether this object is a subset of the provided other {@code Columns object}.
+ *
+ * @param other the othere object to test for inclusion in this object.
+ *
+ * @return whether all the columns of {@code other} are contained by this object.
+ */
+ public boolean contains(Columns other)
+ {
+ if (other.columns.length > columns.length)
+ return false;
+
+ int j = 0;
+ int cmp = 0;
+ for (ColumnDefinition def : other.columns)
+ {
+ while (j < columns.length && (cmp = columns[j].compareTo(def)) < 0)
+ j++;
+
+ if (j >= columns.length || cmp > 0)
+ return false;
+
+ // cmp == 0, we've found the definition. Ce can bump j once more since
+ // we know we won't need to compare that element again
+ j++;
+ }
+ return true;
+ }
+
+ /**
+ * Iterator over the simple columns of this object.
+ *
+ * @return an iterator over the simple columns of this object.
+ */
+ public Iterator<ColumnDefinition> simpleColumns()
+ {
+ return new ColumnIterator(0, complexIdx);
+ }
+
+ /**
+ * Iterator over the complex columns of this object.
+ *
+ * @return an iterator over the complex columns of this object.
+ */
+ public Iterator<ColumnDefinition> complexColumns()
+ {
+ return new ColumnIterator(complexIdx, columns.length);
+ }
+
+ /**
+ * Iterator over all the columns of this object.
+ *
+ * @return an iterator over all the columns of this object.
+ */
+ public Iterator<ColumnDefinition> iterator()
+ {
+ return Iterators.forArray(columns);
+ }
+
+ /**
+ * An iterator that returns the columns of this object in "select" order (that
+ * is in global alphabetical order, where the "normal" iterator returns simple
+ * columns first and the complex second).
+ *
+ * @return an iterator returning columns in alphabetical order.
+ */
+ public Iterator<ColumnDefinition> selectOrderIterator()
+ {
+ // In wildcard selection, we want to return all columns in alphabetical order,
+ // irregarding of whether they are complex or not
+ return new AbstractIterator<ColumnDefinition>()
+ {
+ private int regular;
+ private int complex = complexIdx;
+
+ protected ColumnDefinition computeNext()
+ {
+ if (complex >= columns.length)
+ return regular >= complexIdx ? endOfData() : columns[regular++];
+ if (regular >= complexIdx)
+ return columns[complex++];
+
+ return ByteBufferUtil.compareUnsigned(columns[regular].name.bytes, columns[complex].name.bytes) < 0
+ ? columns[regular++]
+ : columns[complex++];
+ }
+ };
+ }
+
+ /**
+ * Returns the equivalent of those columns but with the provided column removed.
+ *
+ * @param column the column to remove.
+ *
+ * @return newly allocated columns containing all the columns of {@code this} expect
+ * for {@code column}.
+ */
+ public Columns without(ColumnDefinition column)
+ {
+ int idx = column.isComplex() ? complexIdx(column, 0) : simpleIdx(column, 0);
+ if (idx < 0)
+ return this;
+
+ int realIdx = column.isComplex() ? complexIdx + idx : idx;
+
+ ColumnDefinition[] newColumns = new ColumnDefinition[columns.length - 1];
+ System.arraycopy(columns, 0, newColumns, 0, realIdx);
+ System.arraycopy(columns, realIdx + 1, newColumns, realIdx, newColumns.length - realIdx);
+ return new Columns(newColumns);
+ }
+
+ public void digest(MessageDigest digest)
+ {
+ for (ColumnDefinition c : this)
+ digest.update(c.name.bytes.duplicate());
+ }
+
+ @Override
+ public boolean equals(Object other)
+ {
+ if (!(other instanceof Columns))
+ return false;
+
+ Columns that = (Columns)other;
+ return this.complexIdx == that.complexIdx && Arrays.equals(this.columns, that.columns);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hash(complexIdx, Arrays.hashCode(columns));
+ }
+
+ @Override
+ public String toString()
+ {
+ StringBuilder sb = new StringBuilder();
+ boolean first = true;
+ for (ColumnDefinition def : this)
+ {
+ if (first) first = false; else sb.append(" ");
+ sb.append(def.name);
+ }
+ return sb.toString();
+ }
+
+ private class ColumnIterator extends AbstractIterator<ColumnDefinition>
+ {
+ private final int to;
+ private int idx;
+
+ private ColumnIterator(int from, int to)
+ {
+ this.idx = from;
+ this.to = to;
+ }
+
+ protected ColumnDefinition computeNext()
+ {
+ if (idx >= to)
+ return endOfData();
+ return columns[idx++];
+ }
+ }
+
+ public static class Serializer
+ {
+ public void serialize(Columns columns, DataOutputPlus out) throws IOException
+ {
+ out.writeShort(columns.columnCount());
+ for (ColumnDefinition column : columns)
+ ByteBufferUtil.writeWithShortLength(column.name.bytes, out);
+ }
+
+ public long serializedSize(Columns columns, TypeSizes sizes)
+ {
+ long size = sizes.sizeof((short)columns.columnCount());
+ for (ColumnDefinition column : columns)
+ size += sizes.sizeofWithShortLength(column.name.bytes);
+ return size;
+ }
+
+ public Columns deserialize(DataInput in, CFMetaData metadata) throws IOException
+ {
+ int length = in.readUnsignedShort();
+ ColumnDefinition[] columns = new ColumnDefinition[length];
+ for (int i = 0; i < length; i++)
+ {
+ ByteBuffer name = ByteBufferUtil.readWithShortLength(in);
+ ColumnDefinition column = metadata.getColumnDefinition(name);
+ if (column == null)
+ {
+ // If we don't find the definition, it could be we have data for a dropped column, and we shouldn't
+ // fail deserialization because of that. So we grab a "fake" ColumnDefinition that ensure proper
+ // deserialization. The column will be ignore later on anyway.
+ column = metadata.getDroppedColumnDefinition(name);
+ if (column == null)
+ throw new RuntimeException("Unknown column " + UTF8Type.instance.getString(name) + " during deserialization");
+ }
+ columns[i] = column;
+ }
+ return new Columns(columns);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/CompactTables.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/CompactTables.java b/src/java/org/apache/cassandra/db/CompactTables.java
new file mode 100644
index 0000000..a72e7f2
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/CompactTables.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db;
+
+import java.nio.ByteBuffer;
+import java.util.*;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.db.marshal.*;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+/**
+ * Small utility methods pertaining to the encoding of COMPACT STORAGE tables.
+ *
+ * COMPACT STORAGE tables exists mainly for the sake of encoding internally thrift tables (as well as
+ * exposing those tables through CQL). Note that due to these constraints, the internal representation
+ * of compact tables does *not* correspond exactly to their CQL definition.
+ *
+ * The internal layout of such tables is such that it can encode any thrift table. That layout is as follow:
+ * CREATE TABLE compact (
+ * key [key_validation_class],
+ * [column_metadata_1] [type1] static,
+ * ...,
+ * [column_metadata_n] [type1] static,
+ * column [comparator],
+ * value [default_validation_class]
+ * PRIMARY KEY (key, column)
+ * )
+ * More specifically, the table:
+ * - always has a clustering column and a regular value, which are used to store the "dynamic" thrift columns name and value.
+ * Those are always present because we have no way to know in advance if "dynamic" columns will be inserted or not. Note
+ * that when declared from CQL, compact tables may not have any clustering: in that case, we still have a clustering
+ * defined internally, it is just ignored as far as interacting from CQL is concerned.
+ * - have a static column for every "static" column defined in the thrift "column_metadata". Note that when declaring a compact
+ * table from CQL without any clustering (but some non-PK columns), the columns ends up static internally even though they are
+ * not in the declaration
+ *
+ * On variation is that if the table comparator is a CompositeType, then the underlying table will have one clustering column by
+ * element of the CompositeType, but the rest of the layout is as above.
+ *
+ * As far as thrift is concerned, one exception to this is super column families, which have a different layout. Namely, a super
+ * column families is encoded with:
+ * CREATE TABLE super (
+ * key [key_validation_class],
+ * super_column_name [comparator],
+ * [column_metadata_1] [type1],
+ * ...,
+ * [column_metadata_n] [type1],
+ * "" map<[sub_comparator], [default_validation_class]>
+ * PRIMARY KEY (key, super_column_name)
+ * )
+ * In other words, every super column is encoded by a row. That row has one column for each defined "column_metadata", but it also
+ * has a special map column (whose name is the empty string as this is guaranteed to never conflict with a user-defined
+ * "column_metadata") which stores the super column "dynamic" sub-columns.
+ */
+public abstract class CompactTables
+{
+ // We use an empty value for the 1) this can't conflict with a user-defined column and 2) this actually
+ // validate with any comparator which makes it convenient for columnDefinitionComparator().
+ public static final ByteBuffer SUPER_COLUMN_MAP_COLUMN = ByteBufferUtil.EMPTY_BYTE_BUFFER;
+ public static final String SUPER_COLUMN_MAP_COLUMN_STR = UTF8Type.instance.compose(SUPER_COLUMN_MAP_COLUMN);
+
+ private CompactTables() {}
+
+ public static ColumnDefinition getCompactValueColumn(PartitionColumns columns, boolean isSuper)
+ {
+ if (isSuper)
+ {
+ for (ColumnDefinition column : columns.regulars)
+ if (column.name.bytes.equals(SUPER_COLUMN_MAP_COLUMN))
+ return column;
+ throw new AssertionError("Invalid super column table definition, no 'dynamic' map column");
+ }
+ assert columns.regulars.simpleColumnCount() == 1 && columns.regulars.complexColumnCount() == 0;
+ return columns.regulars.getSimple(0);
+ }
+
+ public static AbstractType<?> columnDefinitionComparator(ColumnDefinition.Kind kind, boolean isSuper, AbstractType<?> rawComparator, AbstractType<?> rawSubComparator)
+ {
+ if (isSuper)
+ return kind == ColumnDefinition.Kind.REGULAR ? rawSubComparator : UTF8Type.instance;
+ else
+ return kind == ColumnDefinition.Kind.STATIC ? rawComparator : UTF8Type.instance;
+ }
+
+ public static boolean hasEmptyCompactValue(CFMetaData metadata)
+ {
+ return metadata.compactValueColumn().type instanceof EmptyType;
+ }
+
+ public static boolean isSuperColumnMapColumn(ColumnDefinition column)
+ {
+ return column.kind == ColumnDefinition.Kind.REGULAR && column.name.bytes.equals(SUPER_COLUMN_MAP_COLUMN);
+ }
+
+ public static DefaultNames defaultNameGenerator(Set<String> usedNames)
+ {
+ return new DefaultNames(new HashSet<String>(usedNames));
+ }
+
+ public static DefaultNames defaultNameGenerator(Iterable<ColumnDefinition> defs)
+ {
+ Set<String> usedNames = new HashSet<>();
+ for (ColumnDefinition def : defs)
+ usedNames.add(def.name.toString());
+ return new DefaultNames(usedNames);
+ }
+
+ public static class DefaultNames
+ {
+ private static final String DEFAULT_PARTITION_KEY_NAME = "key";
+ private static final String DEFAULT_CLUSTERING_NAME = "column";
+ private static final String DEFAULT_COMPACT_VALUE_NAME = "value";
+
+ private final Set<String> usedNames;
+ private int partitionIndex = 0;
+ private int clusteringIndex = 1;
+ private int compactIndex = 0;
+
+ private DefaultNames(Set<String> usedNames)
+ {
+ this.usedNames = usedNames;
+ }
+
+ public String defaultPartitionKeyName()
+ {
+ while (true)
+ {
+ // For compatibility sake, we call the first alias 'key' rather than 'key1'. This
+ // is inconsistent with column alias, but it's probably not worth risking breaking compatibility now.
+ String candidate = partitionIndex == 0 ? DEFAULT_PARTITION_KEY_NAME : DEFAULT_PARTITION_KEY_NAME + (partitionIndex + 1);
+ ++partitionIndex;
+ if (usedNames.add(candidate))
+ return candidate;
+ }
+ }
+
+ public String defaultClusteringName()
+ {
+ while (true)
+ {
+ String candidate = DEFAULT_CLUSTERING_NAME + clusteringIndex;
+ ++clusteringIndex;
+ if (usedNames.add(candidate))
+ return candidate;
+ }
+ }
+
+ public String defaultCompactValueName()
+ {
+ while (true)
+ {
+ String candidate = compactIndex == 0 ? DEFAULT_COMPACT_VALUE_NAME : DEFAULT_COMPACT_VALUE_NAME + compactIndex;
+ ++compactIndex;
+ if (usedNames.add(candidate))
+ return candidate;
+ }
+ }
+ }
+}