You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2017/01/27 22:18:31 UTC
[25/37] cassandra git commit: Make TableMetadata immutable,
optimize Schema
http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
index d917884..0716d47 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
@@ -37,6 +37,9 @@ import org.apache.cassandra.db.commitlog.CommitLog.Configuration;
import org.apache.cassandra.db.partitions.PartitionUpdate;
import org.apache.cassandra.io.FSWriteError;
import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.schema.Schema;
+import org.apache.cassandra.schema.TableId;
+import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.utils.CLibrary;
import org.apache.cassandra.utils.IntegerInterval;
import org.apache.cassandra.utils.concurrent.OpOrder;
@@ -100,10 +103,10 @@ public abstract class CommitLogSegment
private final WaitQueue syncComplete = new WaitQueue();
// a map of Cf->dirty interval in this segment; if interval is not covered by the clean set, the log contains unflushed data
- private final NonBlockingHashMap<UUID, IntegerInterval> cfDirty = new NonBlockingHashMap<>(1024);
+ private final NonBlockingHashMap<TableId, IntegerInterval> tableDirty = new NonBlockingHashMap<>(1024);
// a map of Cf->clean intervals; separate map from above to permit marking Cfs clean whilst the log is still in use
- private final ConcurrentHashMap<UUID, IntegerInterval.Set> cfClean = new ConcurrentHashMap<>();
+ private final ConcurrentHashMap<TableId, IntegerInterval.Set> tableClean = new ConcurrentHashMap<>();
public final long id;
@@ -475,27 +478,27 @@ public abstract class CommitLogSegment
void markDirty(Mutation mutation, int allocatedPosition)
{
for (PartitionUpdate update : mutation.getPartitionUpdates())
- coverInMap(cfDirty, update.metadata().cfId, allocatedPosition);
+ coverInMap(tableDirty, update.metadata().id, allocatedPosition);
}
/**
- * Marks the ColumnFamily specified by cfId as clean for this log segment. If the
+ * Marks the ColumnFamily specified by id as clean for this log segment. If the
* given context argument is contained in this file, it will only mark the CF as
* clean if no newer writes have taken place.
*
- * @param cfId the column family ID that is now clean
+ * @param tableId the table that is now clean
* @param startPosition the start of the range that is clean
* @param endPosition the end of the range that is clean
*/
- public synchronized void markClean(UUID cfId, CommitLogPosition startPosition, CommitLogPosition endPosition)
+ public synchronized void markClean(TableId tableId, CommitLogPosition startPosition, CommitLogPosition endPosition)
{
if (startPosition.segmentId > id || endPosition.segmentId < id)
return;
- if (!cfDirty.containsKey(cfId))
+ if (!tableDirty.containsKey(tableId))
return;
int start = startPosition.segmentId == id ? startPosition.position : 0;
int end = endPosition.segmentId == id ? endPosition.position : Integer.MAX_VALUE;
- cfClean.computeIfAbsent(cfId, k -> new IntegerInterval.Set()).add(start, end);
+ tableClean.computeIfAbsent(tableId, k -> new IntegerInterval.Set()).add(start, end);
removeCleanFromDirty();
}
@@ -505,16 +508,16 @@ public abstract class CommitLogSegment
if (isStillAllocating())
return;
- Iterator<Map.Entry<UUID, IntegerInterval.Set>> iter = cfClean.entrySet().iterator();
+ Iterator<Map.Entry<TableId, IntegerInterval.Set>> iter = tableClean.entrySet().iterator();
while (iter.hasNext())
{
- Map.Entry<UUID, IntegerInterval.Set> clean = iter.next();
- UUID cfId = clean.getKey();
+ Map.Entry<TableId, IntegerInterval.Set> clean = iter.next();
+ TableId tableId = clean.getKey();
IntegerInterval.Set cleanSet = clean.getValue();
- IntegerInterval dirtyInterval = cfDirty.get(cfId);
+ IntegerInterval dirtyInterval = tableDirty.get(tableId);
if (dirtyInterval != null && cleanSet.covers(dirtyInterval))
{
- cfDirty.remove(cfId);
+ tableDirty.remove(tableId);
iter.remove();
}
}
@@ -523,17 +526,17 @@ public abstract class CommitLogSegment
/**
* @return a collection of dirty CFIDs for this segment file.
*/
- public synchronized Collection<UUID> getDirtyCFIDs()
+ public synchronized Collection<TableId> getDirtyTableIds()
{
- if (cfClean.isEmpty() || cfDirty.isEmpty())
- return cfDirty.keySet();
+ if (tableClean.isEmpty() || tableDirty.isEmpty())
+ return tableDirty.keySet();
- List<UUID> r = new ArrayList<>(cfDirty.size());
- for (Map.Entry<UUID, IntegerInterval> dirty : cfDirty.entrySet())
+ List<TableId> r = new ArrayList<>(tableDirty.size());
+ for (Map.Entry<TableId, IntegerInterval> dirty : tableDirty.entrySet())
{
- UUID cfId = dirty.getKey();
+ TableId tableId = dirty.getKey();
IntegerInterval dirtyInterval = dirty.getValue();
- IntegerInterval.Set cleanSet = cfClean.get(cfId);
+ IntegerInterval.Set cleanSet = tableClean.get(tableId);
if (cleanSet == null || !cleanSet.covers(dirtyInterval))
r.add(dirty.getKey());
}
@@ -546,12 +549,12 @@ public abstract class CommitLogSegment
public synchronized boolean isUnused()
{
// if room to allocate, we're still in use as the active allocatingFrom,
- // so we don't want to race with updates to cfClean with removeCleanFromDirty
+ // so we don't want to race with updates to tableClean with removeCleanFromDirty
if (isStillAllocating())
return false;
removeCleanFromDirty();
- return cfDirty.isEmpty();
+ return tableDirty.isEmpty();
}
/**
@@ -569,12 +572,12 @@ public abstract class CommitLogSegment
public String dirtyString()
{
StringBuilder sb = new StringBuilder();
- for (UUID cfId : getDirtyCFIDs())
+ for (TableId tableId : getDirtyTableIds())
{
- CFMetaData m = Schema.instance.getCFMetaData(cfId);
- sb.append(m == null ? "<deleted>" : m.cfName).append(" (").append(cfId)
- .append(", dirty: ").append(cfDirty.get(cfId))
- .append(", clean: ").append(cfClean.get(cfId))
+ TableMetadata m = Schema.instance.getTableMetadata(tableId);
+ sb.append(m == null ? "<deleted>" : m.name).append(" (").append(tableId)
+ .append(", dirty: ").append(tableDirty.get(tableId))
+ .append(", clean: ").append(tableClean.get(tableId))
.append("), ");
}
return sb.toString();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/db/compaction/CompactionInfo.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionInfo.java b/src/java/org/apache/cassandra/db/compaction/CompactionInfo.java
index 344fa58..d3235bc 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionInfo.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionInfo.java
@@ -22,22 +22,22 @@ import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
-import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.schema.TableMetadata;
/** Implements serializable to allow structured info to be returned via JMX. */
public final class CompactionInfo implements Serializable
{
private static final long serialVersionUID = 3695381572726744816L;
- private final CFMetaData cfm;
+ private final TableMetadata metadata;
private final OperationType tasktype;
private final long completed;
private final long total;
private final String unit;
private final UUID compactionId;
- public CompactionInfo(CFMetaData cfm, OperationType tasktype, long bytesComplete, long totalBytes, UUID compactionId)
+ public CompactionInfo(TableMetadata metadata, OperationType tasktype, long bytesComplete, long totalBytes, UUID compactionId)
{
- this(cfm, tasktype, bytesComplete, totalBytes, "bytes", compactionId);
+ this(metadata, tasktype, bytesComplete, totalBytes, "bytes", compactionId);
}
public CompactionInfo(OperationType tasktype, long completed, long total, String unit, UUID compactionId)
@@ -45,12 +45,12 @@ public final class CompactionInfo implements Serializable
this(null, tasktype, completed, total, unit, compactionId);
}
- public CompactionInfo(CFMetaData cfm, OperationType tasktype, long completed, long total, String unit, UUID compactionId)
+ public CompactionInfo(TableMetadata metadata, OperationType tasktype, long completed, long total, String unit, UUID compactionId)
{
this.tasktype = tasktype;
this.completed = completed;
this.total = total;
- this.cfm = cfm;
+ this.metadata = metadata;
this.unit = unit;
this.compactionId = compactionId;
}
@@ -58,27 +58,27 @@ public final class CompactionInfo implements Serializable
/** @return A copy of this CompactionInfo with updated progress. */
public CompactionInfo forProgress(long complete, long total)
{
- return new CompactionInfo(cfm, tasktype, complete, total, unit, compactionId);
+ return new CompactionInfo(metadata, tasktype, complete, total, unit, compactionId);
}
public UUID getId()
{
- return cfm != null ? cfm.cfId : null;
+ return metadata != null ? metadata.id.asUUID() : null;
}
public String getKeyspace()
{
- return cfm != null ? cfm.ksName : null;
+ return metadata != null ? metadata.keyspace : null;
}
public String getColumnFamily()
{
- return cfm != null ? cfm.cfName : null;
+ return metadata != null ? metadata.name : null;
}
- public CFMetaData getCFMetaData()
+ public TableMetadata getTableMetadata()
{
- return cfm;
+ return metadata;
}
public long getCompleted()
@@ -105,7 +105,7 @@ public final class CompactionInfo implements Serializable
{
StringBuilder buff = new StringBuilder();
buff.append(getTaskType());
- if (cfm != null)
+ if (metadata != null)
{
buff.append('@').append(getId()).append('(');
buff.append(getKeyspace()).append(", ").append(getColumnFamily()).append(", ");
http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java b/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
index 19f4801..8e94fd9 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
@@ -22,7 +22,7 @@ import java.util.function.Predicate;
import com.google.common.collect.Ordering;
-import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.filter.ColumnFilter;
import org.apache.cassandra.db.partitions.PurgeFunction;
@@ -101,20 +101,20 @@ public class CompactionIterator extends CompactionInfo.Holder implements Unfilte
metrics.beginCompaction(this);
UnfilteredPartitionIterator merged = scanners.isEmpty()
- ? EmptyIterators.unfilteredPartition(controller.cfs.metadata)
- : UnfilteredPartitionIterators.merge(scanners, nowInSec, listener());
+ ? EmptyIterators.unfilteredPartition(controller.cfs.metadata())
+ : UnfilteredPartitionIterators.merge(scanners, nowInSec, listener());
merged = Transformation.apply(merged, new GarbageSkipper(controller, nowInSec));
this.compacted = Transformation.apply(merged, new Purger(controller, nowInSec));
}
- public CFMetaData metadata()
+ public TableMetadata metadata()
{
- return controller.cfs.metadata;
+ return controller.cfs.metadata();
}
public CompactionInfo getCompactionInfo()
{
- return new CompactionInfo(controller.cfs.metadata,
+ return new CompactionInfo(controller.cfs.metadata(),
type,
bytesRead,
totalBytes,
@@ -167,7 +167,7 @@ public class CompactionIterator extends CompactionInfo.Holder implements Unfilte
regulars = regulars.mergeTo(iter.columns().regulars);
}
}
- final PartitionColumns partitionColumns = new PartitionColumns(statics, regulars);
+ final RegularAndStaticColumns regularAndStaticColumns = new RegularAndStaticColumns(statics, regulars);
// If we have a 2ndary index, we must update it with deleted/shadowed cells.
// we can reuse a single CleanupTransaction for the duration of a partition.
@@ -181,7 +181,7 @@ public class CompactionIterator extends CompactionInfo.Holder implements Unfilte
// TODO: this should probably be done asynchronously and batched.
final CompactionTransaction indexTransaction =
controller.cfs.indexManager.newCompactionTransaction(partitionKey,
- partitionColumns,
+ regularAndStaticColumns,
versions.size(),
nowInSec);
@@ -320,7 +320,7 @@ public class CompactionIterator extends CompactionInfo.Holder implements Unfilte
final Row staticRow;
final ColumnFilter cf;
final int nowInSec;
- final CFMetaData metadata;
+ final TableMetadata metadata;
final boolean cellLevelGC;
DeletionTime tombOpenDeletionTime = DeletionTime.LIVE;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 9c74f62..91263a7 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -31,7 +31,6 @@ import javax.management.openmbean.TabularData;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.*;
-import com.google.common.primitives.Ints;
import com.google.common.util.concurrent.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -41,9 +40,9 @@ import org.apache.cassandra.cache.AutoSavingCache;
import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
import org.apache.cassandra.concurrent.NamedThreadFactory;
-import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.schema.Schema;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.compaction.CompactionInfo.Holder;
import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
@@ -794,7 +793,7 @@ public class CompactionManager implements CompactionManagerMBean
{
// extract keyspace and columnfamily name from filename
Descriptor desc = Descriptor.fromFilename(filename.trim());
- if (Schema.instance.getCFMetaData(desc) == null)
+ if (Schema.instance.getTableMetadataRef(desc) == null)
{
logger.warn("Schema does not exist for file {}. Skipping.", filename);
continue;
@@ -820,7 +819,7 @@ public class CompactionManager implements CompactionManagerMBean
{
// extract keyspace and columnfamily name from filename
Descriptor desc = Descriptor.fromFilename(filename.trim());
- if (Schema.instance.getCFMetaData(desc) == null)
+ if (Schema.instance.getTableMetadataRef(desc) == null)
{
logger.warn("Schema does not exist for file {}. Skipping.", filename);
continue;
@@ -1072,7 +1071,7 @@ public class CompactionManager implements CompactionManagerMBean
long totalkeysWritten = 0;
- long expectedBloomFilterSize = Math.max(cfs.metadata.params.minIndexInterval,
+ long expectedBloomFilterSize = Math.max(cfs.metadata().params.minIndexInterval,
SSTableReader.getApproximateKeyCount(txn.originals()));
if (logger.isTraceEnabled())
logger.trace("Expected bloom filter size : {}", expectedBloomFilterSize);
@@ -1241,16 +1240,13 @@ public class CompactionManager implements CompactionManagerMBean
LifecycleTransaction txn)
{
FileUtils.createDirectory(compactionFileLocation);
- SerializationHeader header = sstable.header;
- if (header == null)
- header = SerializationHeader.make(sstable.metadata, Collections.singleton(sstable));
return SSTableWriter.create(cfs.metadata,
cfs.newSSTableDescriptor(compactionFileLocation),
expectedBloomFilterSize,
repairedAt,
sstable.getSSTableLevel(),
- header,
+ sstable.header,
cfs.indexManager.listIndexes(),
txn);
}
@@ -1282,8 +1278,8 @@ public class CompactionManager implements CompactionManagerMBean
(long) expectedBloomFilterSize,
repairedAt,
cfs.metadata,
- new MetadataCollector(sstables, cfs.metadata.comparator, minLevel),
- SerializationHeader.make(cfs.metadata, sstables),
+ new MetadataCollector(sstables, cfs.metadata().comparator, minLevel),
+ SerializationHeader.make(cfs.metadata(), sstables),
cfs.indexManager.listIndexes(),
txn);
}
@@ -1435,7 +1431,7 @@ public class CompactionManager implements CompactionManagerMBean
return null;
Set<SSTableReader> sstablesToValidate = new HashSet<>();
if (prs.isGlobal)
- prs.markSSTablesRepairing(cfs.metadata.cfId, validator.desc.parentSessionId);
+ prs.markSSTablesRepairing(cfs.metadata.id, validator.desc.parentSessionId);
// note that we always grab all existing sstables for this - if we were to just grab the ones that
// were marked as repairing, we would miss any ranges that were compacted away and this would cause us to overstream
try (ColumnFamilyStore.RefViewFragment sstableCandidates = cfs.selectAndReference(View.select(SSTableSet.CANONICAL, (s) -> !prs.isIncremental || !s.isRepaired())))
@@ -1522,7 +1518,7 @@ public class CompactionManager implements CompactionManagerMBean
CompactionController controller = new CompactionController(cfs, sstableAsSet, getDefaultGcBefore(cfs, nowInSec));
CompactionIterator ci = new CompactionIterator(OperationType.ANTICOMPACTION, scanners.scanners, controller, nowInSec, UUIDGen.getTimeUUID(), metrics))
{
- int expectedBloomFilterSize = Math.max(cfs.metadata.params.minIndexInterval, (int)(SSTableReader.getApproximateKeyCount(sstableAsSet)));
+ int expectedBloomFilterSize = Math.max(cfs.metadata().params.minIndexInterval, (int)(SSTableReader.getApproximateKeyCount(sstableAsSet)));
repairedSSTableWriter.switchWriter(CompactionManager.createWriterForAntiCompaction(cfs, destination, expectedBloomFilterSize, repairedAt, sstableAsSet, anticompactionGroup));
unRepairedSSTableWriter.switchWriter(CompactionManager.createWriterForAntiCompaction(cfs, destination, expectedBloomFilterSize, ActiveRepairService.UNREPAIRED_SSTABLE, sstableAsSet, anticompactionGroup));
@@ -1990,7 +1986,7 @@ public class CompactionManager implements CompactionManagerMBean
* @param interruptValidation true if validation operations for repair should also be interrupted
*
*/
- public void interruptCompactionFor(Iterable<CFMetaData> columnFamilies, boolean interruptValidation)
+ public void interruptCompactionFor(Iterable<TableMetadata> columnFamilies, boolean interruptValidation)
{
assert columnFamilies != null;
@@ -2001,16 +1997,16 @@ public class CompactionManager implements CompactionManagerMBean
if ((info.getTaskType() == OperationType.VALIDATION) && !interruptValidation)
continue;
- if (Iterables.contains(columnFamilies, info.getCFMetaData()))
+ if (Iterables.contains(columnFamilies, info.getTableMetadata()))
compactionHolder.stop(); // signal compaction to stop
}
}
public void interruptCompactionForCFs(Iterable<ColumnFamilyStore> cfss, boolean interruptValidation)
{
- List<CFMetaData> metadata = new ArrayList<>();
+ List<TableMetadata> metadata = new ArrayList<>();
for (ColumnFamilyStore cfs : cfss)
- metadata.add(cfs.metadata);
+ metadata.add(cfs.metadata());
interruptCompactionFor(metadata, interruptValidation);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
index 5679338..71b160a 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
@@ -25,13 +25,14 @@ import java.util.stream.Collectors;
import java.util.stream.Stream;
import com.google.common.collect.Iterables;
+
+import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.index.Index;
import com.google.common.primitives.Ints;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.Directories;
import org.apache.cassandra.db.Memtable;
@@ -88,8 +89,8 @@ public class CompactionStrategyManager implements INotificationConsumer
logger.trace("{} subscribed to the data tracker.", this);
this.cfs = cfs;
this.compactionLogger = new CompactionLogger(cfs, this);
- reload(cfs.metadata);
- params = cfs.metadata.params.compaction;
+ reload(cfs.metadata());
+ params = cfs.metadata().params.compaction;
locations = getDirectories().getWriteableLocations();
enabled = params.isEnabled();
}
@@ -108,7 +109,7 @@ public class CompactionStrategyManager implements INotificationConsumer
if (!isEnabled())
return null;
- maybeReload(cfs.metadata);
+ maybeReload(cfs.metadata());
List<AbstractCompactionStrategy> strategies = new ArrayList<>();
strategies.addAll(repaired);
@@ -270,7 +271,7 @@ public class CompactionStrategyManager implements INotificationConsumer
}
}
- public void maybeReload(CFMetaData metadata)
+ public void maybeReload(TableMetadata metadata)
{
// compare the old schema configuration to the new one, ignore any locally set changes.
if (metadata.params.compaction.equals(schemaCompactionParams) &&
@@ -294,7 +295,7 @@ public class CompactionStrategyManager implements INotificationConsumer
* Called after changing configuration and at startup.
* @param metadata
*/
- private void reload(CFMetaData metadata)
+ private void reload(TableMetadata metadata)
{
boolean disabledWithJMX = !enabled && shouldBeEnabled();
if (!metadata.params.compaction.equals(schemaCompactionParams))
@@ -544,7 +545,7 @@ public class CompactionStrategyManager implements INotificationConsumer
public void handleNotification(INotification notification, Object sender)
{
- maybeReload(cfs.metadata);
+ maybeReload(cfs.metadata());
if (notification instanceof SSTableAddedNotification)
{
handleFlushNotification(((SSTableAddedNotification) notification).added);
@@ -691,7 +692,7 @@ public class CompactionStrategyManager implements INotificationConsumer
public AbstractCompactionTask getCompactionTask(LifecycleTransaction txn, int gcBefore, long maxSSTableBytes)
{
- maybeReload(cfs.metadata);
+ maybeReload(cfs.metadata());
validateForCompaction(txn.originals(), cfs, getDirectories());
return getCompactionStrategyFor(txn.originals().iterator().next()).getCompactionTask(txn, gcBefore, maxSSTableBytes);
}
@@ -713,7 +714,7 @@ public class CompactionStrategyManager implements INotificationConsumer
public Collection<AbstractCompactionTask> getMaximalTasks(final int gcBefore, final boolean splitOutput)
{
- maybeReload(cfs.metadata);
+ maybeReload(cfs.metadata());
// runWithCompactionsDisabled cancels active compactions and disables them, then we are able
// to make the repaired/unrepaired strategies mark their own sstables as compacting. Once the
// sstables are marked the compactions are re-enabled
@@ -761,7 +762,7 @@ public class CompactionStrategyManager implements INotificationConsumer
*/
public List<AbstractCompactionTask> getUserDefinedTasks(Collection<SSTableReader> sstables, int gcBefore)
{
- maybeReload(cfs.metadata);
+ maybeReload(cfs.metadata());
List<AbstractCompactionTask> ret = new ArrayList<>();
readLock.lock();
try
@@ -882,14 +883,14 @@ public class CompactionStrategyManager implements INotificationConsumer
locations = cfs.getDirectories().getWriteableLocations();
for (int i = 0; i < locations.length; i++)
{
- repaired.add(CFMetaData.createCompactionStrategyInstance(cfs, params));
- unrepaired.add(CFMetaData.createCompactionStrategyInstance(cfs, params));
+ repaired.add(cfs.createCompactionStrategyInstance(params));
+ unrepaired.add(cfs.createCompactionStrategyInstance(params));
}
}
else
{
- repaired.add(CFMetaData.createCompactionStrategyInstance(cfs, params));
- unrepaired.add(CFMetaData.createCompactionStrategyInstance(cfs, params));
+ repaired.add(cfs.createCompactionStrategyInstance(params));
+ unrepaired.add(cfs.createCompactionStrategyInstance(params));
}
this.params = params;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
index f29e6cb..b3395d0 100644
--- a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
@@ -28,7 +28,7 @@ import com.google.common.primitives.Doubles;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.config.Config;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
@@ -299,7 +299,7 @@ public class LeveledCompactionStrategy extends AbstractCompactionStrategy
if (!intersecting.isEmpty())
{
@SuppressWarnings("resource") // The ScannerList will be in charge of closing (and we close properly on errors)
- ISSTableScanner scanner = new LeveledScanner(intersecting, ranges);
+ ISSTableScanner scanner = new LeveledScanner(cfs.metadata(), intersecting, ranges);
scanners.add(scanner);
}
}
@@ -343,6 +343,7 @@ public class LeveledCompactionStrategy extends AbstractCompactionStrategy
// same level (e.g. non overlapping) - see #4142
private static class LeveledScanner extends AbstractIterator<UnfilteredRowIterator> implements ISSTableScanner
{
+ private final TableMetadata metadata;
private final Collection<Range<Token>> ranges;
private final List<SSTableReader> sstables;
private final Iterator<SSTableReader> sstableIterator;
@@ -353,8 +354,9 @@ public class LeveledCompactionStrategy extends AbstractCompactionStrategy
private long positionOffset;
private long totalBytesScanned = 0;
- public LeveledScanner(Collection<SSTableReader> sstables, Collection<Range<Token>> ranges)
+ public LeveledScanner(TableMetadata metadata, Collection<SSTableReader> sstables, Collection<Range<Token>> ranges)
{
+ this.metadata = metadata;
this.ranges = ranges;
// add only sstables that intersect our range, and estimate how much data that involves
@@ -402,9 +404,9 @@ public class LeveledCompactionStrategy extends AbstractCompactionStrategy
return filtered;
}
- public CFMetaData metadata()
+ public TableMetadata metadata()
{
- return sstables.get(0).metadata; // The ctor checks we have at least one sstable
+ return metadata;
}
protected UnfilteredRowIterator computeNext()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/db/compaction/Scrubber.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/Scrubber.java b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
index 0007e30..e8eee9a 100644
--- a/src/java/org/apache/cassandra/db/compaction/Scrubber.java
+++ b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
@@ -24,7 +24,7 @@ import java.util.*;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Throwables;
-import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
import org.apache.cassandra.db.rows.*;
@@ -92,7 +92,7 @@ public class Scrubber implements Closeable
this.sstable = transaction.onlyOne();
this.outputHandler = outputHandler;
this.skipCorrupted = skipCorrupted;
- this.rowIndexEntrySerializer = sstable.descriptor.version.getSSTableFormat().getIndexSerializer(sstable.metadata,
+ this.rowIndexEntrySerializer = sstable.descriptor.version.getSSTableFormat().getIndexSerializer(cfs.metadata(),
sstable.descriptor.version,
sstable.header);
@@ -100,7 +100,7 @@ public class Scrubber implements Closeable
int locIndex = CompactionStrategyManager.getCompactionStrategyIndex(cfs, cfs.getDirectories(), sstable);
this.destination = cfs.getDirectories().getLocationForDisk(cfs.getDirectories().getWriteableLocations()[locIndex]);
- this.isCommutative = cfs.metadata.isCounter();
+ this.isCommutative = cfs.metadata().isCounter();
boolean hasIndexFile = (new File(sstable.descriptor.filenameFor(Component.PRIMARY_INDEX))).exists();
this.isIndex = cfs.isIndex();
@@ -111,7 +111,7 @@ public class Scrubber implements Closeable
}
this.checkData = checkData && !this.isIndex; //LocalByPartitionerType does not support validation
this.expectedBloomFilterSize = Math.max(
- cfs.metadata.params.minIndexInterval,
+ cfs.metadata().params.minIndexInterval,
hasIndexFile ? SSTableReader.getApproximateKeyCount(toScrub) : 0);
// loop through each row, deserializing to check for damage.
@@ -306,7 +306,7 @@ public class Scrubber implements Closeable
// that one row is out of order, it will stop returning them. The remaining rows will be sorted and added
// to the outOfOrder set that will be later written to a new SSTable.
OrderCheckerIterator sstableIterator = new OrderCheckerIterator(new RowMergingSSTableIterator(SSTableIdentityIterator.create(sstable, dataFile, key)),
- cfs.metadata.comparator);
+ cfs.metadata().comparator);
try (UnfilteredRowIterator iterator = withValidation(sstableIterator, dataFile.getPath()))
{
@@ -438,7 +438,7 @@ public class Scrubber implements Closeable
{
try
{
- return new CompactionInfo(sstable.metadata,
+ return new CompactionInfo(sstable.metadata(),
OperationType.SCRUB,
dataFile.getFilePointer(),
dataFile.length(),
@@ -545,7 +545,7 @@ public class Scrubber implements Closeable
this.comparator = comparator;
}
- public CFMetaData metadata()
+ public TableMetadata metadata()
{
return iterator.metadata();
}
@@ -555,7 +555,7 @@ public class Scrubber implements Closeable
return iterator.isReverseOrder();
}
- public PartitionColumns columns()
+ public RegularAndStaticColumns columns()
{
return iterator.columns();
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/db/compaction/Upgrader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/Upgrader.java b/src/java/org/apache/cassandra/db/compaction/Upgrader.java
index aedb208..34ec1dd 100644
--- a/src/java/org/apache/cassandra/db/compaction/Upgrader.java
+++ b/src/java/org/apache/cassandra/db/compaction/Upgrader.java
@@ -61,7 +61,7 @@ public class Upgrader
this.controller = new UpgradeController(cfs);
this.strategyManager = cfs.getCompactionStrategyManager();
- long estimatedTotalKeys = Math.max(cfs.metadata.params.minIndexInterval, SSTableReader.getApproximateKeyCount(Arrays.asList(this.sstable)));
+ long estimatedTotalKeys = Math.max(cfs.metadata().params.minIndexInterval, SSTableReader.getApproximateKeyCount(Arrays.asList(this.sstable)));
long estimatedSSTables = Math.max(1, SSTableReader.getTotalBytes(Arrays.asList(this.sstable)) / strategyManager.getMaxSSTableBytes());
this.estimatedRows = (long) Math.ceil((double) estimatedTotalKeys / estimatedSSTables);
}
@@ -75,7 +75,7 @@ public class Upgrader
repairedAt,
cfs.metadata,
sstableMetadataCollector,
- SerializationHeader.make(cfs.metadata, Sets.newHashSet(sstable)),
+ SerializationHeader.make(cfs.metadata(), Sets.newHashSet(sstable)),
cfs.indexManager.listIndexes(),
transaction);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/db/compaction/Verifier.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/Verifier.java b/src/java/org/apache/cassandra/db/compaction/Verifier.java
index a52dd82..467d50d 100644
--- a/src/java/org/apache/cassandra/db/compaction/Verifier.java
+++ b/src/java/org/apache/cassandra/db/compaction/Verifier.java
@@ -73,7 +73,7 @@ public class Verifier implements Closeable
this.cfs = cfs;
this.sstable = sstable;
this.outputHandler = outputHandler;
- this.rowIndexEntrySerializer = sstable.descriptor.version.getSSTableFormat().getIndexSerializer(sstable.metadata, sstable.descriptor.version, sstable.header);
+ this.rowIndexEntrySerializer = sstable.descriptor.version.getSSTableFormat().getIndexSerializer(cfs.metadata(), sstable.descriptor.version, sstable.header);
this.controller = new VerifyController(cfs);
@@ -260,7 +260,7 @@ public class Verifier implements Closeable
{
try
{
- return new CompactionInfo(sstable.metadata,
+ return new CompactionInfo(sstable.metadata(),
OperationType.VERIFY,
dataFile.getFilePointer(),
dataFile.length(),
http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/db/compaction/writers/DefaultCompactionWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/writers/DefaultCompactionWriter.java b/src/java/org/apache/cassandra/db/compaction/writers/DefaultCompactionWriter.java
index d279321..6f2586b 100644
--- a/src/java/org/apache/cassandra/db/compaction/writers/DefaultCompactionWriter.java
+++ b/src/java/org/apache/cassandra/db/compaction/writers/DefaultCompactionWriter.java
@@ -73,8 +73,8 @@ public class DefaultCompactionWriter extends CompactionAwareWriter
estimatedTotalKeys,
minRepairedAt,
cfs.metadata,
- new MetadataCollector(txn.originals(), cfs.metadata.comparator, sstableLevel),
- SerializationHeader.make(cfs.metadata, nonExpiredSSTables),
+ new MetadataCollector(txn.originals(), cfs.metadata().comparator, sstableLevel),
+ SerializationHeader.make(cfs.metadata(), nonExpiredSSTables),
cfs.indexManager.listIndexes(),
txn);
sstableWriter.switchWriter(writer);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java b/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java
index a3d8c98..5eac658 100644
--- a/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java
+++ b/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java
@@ -109,8 +109,8 @@ public class MajorLeveledCompactionWriter extends CompactionAwareWriter
keysPerSSTable,
minRepairedAt,
cfs.metadata,
- new MetadataCollector(txn.originals(), cfs.metadata.comparator, currentLevel),
- SerializationHeader.make(cfs.metadata, txn.originals()),
+ new MetadataCollector(txn.originals(), cfs.metadata().comparator, currentLevel),
+ SerializationHeader.make(cfs.metadata(), txn.originals()),
cfs.indexManager.listIndexes(),
txn));
partitionsWritten = 0;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/db/compaction/writers/MaxSSTableSizeWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/writers/MaxSSTableSizeWriter.java b/src/java/org/apache/cassandra/db/compaction/writers/MaxSSTableSizeWriter.java
index 7acb870..862e68b 100644
--- a/src/java/org/apache/cassandra/db/compaction/writers/MaxSSTableSizeWriter.java
+++ b/src/java/org/apache/cassandra/db/compaction/writers/MaxSSTableSizeWriter.java
@@ -112,8 +112,8 @@ public class MaxSSTableSizeWriter extends CompactionAwareWriter
estimatedTotalKeys / estimatedSSTables,
minRepairedAt,
cfs.metadata,
- new MetadataCollector(allSSTables, cfs.metadata.comparator, level),
- SerializationHeader.make(cfs.metadata, nonExpiredSSTables),
+ new MetadataCollector(allSSTables, cfs.metadata().comparator, level),
+ SerializationHeader.make(cfs.metadata(), nonExpiredSSTables),
cfs.indexManager.listIndexes(),
txn);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/db/compaction/writers/SplittingSizeTieredCompactionWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/writers/SplittingSizeTieredCompactionWriter.java b/src/java/org/apache/cassandra/db/compaction/writers/SplittingSizeTieredCompactionWriter.java
index a01672e..79f9d1a 100644
--- a/src/java/org/apache/cassandra/db/compaction/writers/SplittingSizeTieredCompactionWriter.java
+++ b/src/java/org/apache/cassandra/db/compaction/writers/SplittingSizeTieredCompactionWriter.java
@@ -108,8 +108,8 @@ public class SplittingSizeTieredCompactionWriter extends CompactionAwareWriter
currentPartitionsToWrite,
minRepairedAt,
cfs.metadata,
- new MetadataCollector(allSSTables, cfs.metadata.comparator, 0),
- SerializationHeader.make(cfs.metadata, nonExpiredSSTables),
+ new MetadataCollector(allSSTables, cfs.metadata().comparator, 0),
+ SerializationHeader.make(cfs.metadata(), nonExpiredSSTables),
cfs.indexManager.listIndexes(),
txn);
logger.trace("Switching writer, currentPartitionsToWrite = {}", currentPartitionsToWrite);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/db/filter/AbstractClusteringIndexFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/AbstractClusteringIndexFilter.java b/src/java/org/apache/cassandra/db/filter/AbstractClusteringIndexFilter.java
index 51e9d8e..c28117c 100644
--- a/src/java/org/apache/cassandra/db/filter/AbstractClusteringIndexFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/AbstractClusteringIndexFilter.java
@@ -19,8 +19,8 @@ package org.apache.cassandra.db.filter;
import java.io.IOException;
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.schema.ColumnMetadata;
+import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.marshal.ReversedType;
import org.apache.cassandra.io.util.DataInputPlus;
@@ -45,13 +45,13 @@ public abstract class AbstractClusteringIndexFilter implements ClusteringIndexFi
protected abstract void serializeInternal(DataOutputPlus out, int version) throws IOException;
protected abstract long serializedSizeInternal(int version);
- protected void appendOrderByToCQLString(CFMetaData metadata, StringBuilder sb)
+ protected void appendOrderByToCQLString(TableMetadata metadata, StringBuilder sb)
{
if (reversed)
{
sb.append(" ORDER BY (");
int i = 0;
- for (ColumnDefinition column : metadata.clusteringColumns())
+ for (ColumnMetadata column : metadata.clusteringColumns())
sb.append(i++ == 0 ? "" : ", ").append(column.name).append(column.type instanceof ReversedType ? " ASC" : " DESC");
sb.append(')');
}
@@ -69,7 +69,7 @@ public abstract class AbstractClusteringIndexFilter implements ClusteringIndexFi
filter.serializeInternal(out, version);
}
- public ClusteringIndexFilter deserialize(DataInputPlus in, int version, CFMetaData metadata) throws IOException
+ public ClusteringIndexFilter deserialize(DataInputPlus in, int version, TableMetadata metadata) throws IOException
{
Kind kind = Kind.values()[in.readUnsignedByte()];
boolean reversed = in.readBoolean();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/db/filter/ClusteringIndexFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/ClusteringIndexFilter.java b/src/java/org/apache/cassandra/db/filter/ClusteringIndexFilter.java
index f184035..cdb61c9 100644
--- a/src/java/org/apache/cassandra/db/filter/ClusteringIndexFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/ClusteringIndexFilter.java
@@ -19,14 +19,14 @@ package org.apache.cassandra.db.filter;
import java.io.IOException;
-import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.rows.*;
import org.apache.cassandra.db.partitions.CachedPartition;
import org.apache.cassandra.db.partitions.Partition;
+import org.apache.cassandra.db.rows.*;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.schema.TableMetadata;
/**
* A filter that selects a subset of the rows of a given partition by using the "clustering index".
@@ -54,7 +54,7 @@ public interface ClusteringIndexFilter
static interface InternalDeserializer
{
- public ClusteringIndexFilter deserialize(DataInputPlus in, int version, CFMetaData metadata, boolean reversed) throws IOException;
+ public ClusteringIndexFilter deserialize(DataInputPlus in, int version, TableMetadata metadata, boolean reversed) throws IOException;
}
/**
@@ -114,10 +114,10 @@ public interface ClusteringIndexFilter
/**
* Returns an iterator that only returns the rows of the provided iterator that this filter selects.
* <p>
- * This method is the "dumb" counterpart to {@link #getSlices(CFMetaData)} in that it has no way to quickly get
+ * This method is the "dumb" counterpart to {@link #getSlices(TableMetadata)} in that it has no way to quickly get
* to what is actually selected, so it simply iterate over it all and filters out what shouldn't be returned. This should
* be avoided in general.
- * Another difference with {@link #getSlices(CFMetaData)} is that this method also filter the queried
+ * Another difference with {@link #getSlices(TableMetadata)} is that this method also filter the queried
* columns in the returned result, while the former assumes that the provided iterator has already done it.
*
* @param columnFilter the columns to include in the rows of the result iterator.
@@ -127,7 +127,7 @@ public interface ClusteringIndexFilter
*/
public UnfilteredRowIterator filterNotIndexed(ColumnFilter columnFilter, UnfilteredRowIterator iterator);
- public Slices getSlices(CFMetaData metadata);
+ public Slices getSlices(TableMetadata metadata);
/**
* Given a partition, returns a row iterator for the rows of this partition that are selected by this filter.
@@ -150,13 +150,13 @@ public interface ClusteringIndexFilter
public Kind kind();
- public String toString(CFMetaData metadata);
- public String toCQLString(CFMetaData metadata);
+ public String toString(TableMetadata metadata);
+ public String toCQLString(TableMetadata metadata);
public interface Serializer
{
public void serialize(ClusteringIndexFilter filter, DataOutputPlus out, int version) throws IOException;
- public ClusteringIndexFilter deserialize(DataInputPlus in, int version, CFMetaData metadata) throws IOException;
+ public ClusteringIndexFilter deserialize(DataInputPlus in, int version, TableMetadata metadata) throws IOException;
public long serializedSize(ClusteringIndexFilter filter, int version);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/db/filter/ClusteringIndexNamesFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/ClusteringIndexNamesFilter.java b/src/java/org/apache/cassandra/db/filter/ClusteringIndexNamesFilter.java
index 6a010d9..2f7c13a 100644
--- a/src/java/org/apache/cassandra/db/filter/ClusteringIndexNamesFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/ClusteringIndexNamesFilter.java
@@ -21,15 +21,15 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.*;
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.ColumnDefinition;
import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.rows.*;
import org.apache.cassandra.db.partitions.*;
+import org.apache.cassandra.db.rows.*;
import org.apache.cassandra.db.transform.Transformation;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.schema.ColumnMetadata;
+import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.utils.SearchIterator;
import org.apache.cassandra.utils.btree.BTreeSet;
@@ -126,7 +126,7 @@ public class ClusteringIndexNamesFilter extends AbstractClusteringIndexFilter
return Transformation.apply(iterator, new FilterNotIndexed());
}
- public Slices getSlices(CFMetaData metadata)
+ public Slices getSlices(TableMetadata metadata)
{
Slices.Builder builder = new Slices.Builder(metadata.comparator, clusteringsInQueryOrder.size());
for (Clustering clustering : clusteringsInQueryOrder)
@@ -138,12 +138,12 @@ public class ClusteringIndexNamesFilter extends AbstractClusteringIndexFilter
{
final SearchIterator<Clustering, Row> searcher = partition.searchIterator(columnFilter, reversed);
return new AbstractUnfilteredRowIterator(partition.metadata(),
- partition.partitionKey(),
- partition.partitionLevelDeletion(),
- columnFilter.fetchedColumns(),
- searcher.next(Clustering.STATIC_CLUSTERING),
- reversed,
- partition.stats())
+ partition.partitionKey(),
+ partition.partitionLevelDeletion(),
+ columnFilter.fetchedColumns(),
+ searcher.next(Clustering.STATIC_CLUSTERING),
+ reversed,
+ partition.stats())
{
private final Iterator<Clustering> clusteringIter = clusteringsInQueryOrder.iterator();
@@ -162,7 +162,7 @@ public class ClusteringIndexNamesFilter extends AbstractClusteringIndexFilter
public boolean shouldInclude(SSTableReader sstable)
{
- ClusteringComparator comparator = sstable.metadata.comparator;
+ ClusteringComparator comparator = sstable.metadata().comparator;
List<ByteBuffer> minClusteringValues = sstable.getSSTableMetadata().minClusteringValues;
List<ByteBuffer> maxClusteringValues = sstable.getSSTableMetadata().maxClusteringValues;
@@ -175,7 +175,7 @@ public class ClusteringIndexNamesFilter extends AbstractClusteringIndexFilter
return false;
}
- public String toString(CFMetaData metadata)
+ public String toString(TableMetadata metadata)
{
StringBuilder sb = new StringBuilder();
sb.append("names(");
@@ -187,13 +187,13 @@ public class ClusteringIndexNamesFilter extends AbstractClusteringIndexFilter
return sb.append(')').toString();
}
- public String toCQLString(CFMetaData metadata)
+ public String toCQLString(TableMetadata metadata)
{
if (metadata.clusteringColumns().isEmpty() || clusterings.size() <= 1)
return "";
StringBuilder sb = new StringBuilder();
- sb.append('(').append(ColumnDefinition.toCQLString(metadata.clusteringColumns())).append(')');
+ sb.append('(').append(ColumnMetadata.toCQLString(metadata.clusteringColumns())).append(')');
sb.append(clusterings.size() == 1 ? " = " : " IN (");
int i = 0;
for (Clustering clustering : clusterings)
@@ -228,7 +228,7 @@ public class ClusteringIndexNamesFilter extends AbstractClusteringIndexFilter
private static class NamesDeserializer implements InternalDeserializer
{
- public ClusteringIndexFilter deserialize(DataInputPlus in, int version, CFMetaData metadata, boolean reversed) throws IOException
+ public ClusteringIndexFilter deserialize(DataInputPlus in, int version, TableMetadata metadata, boolean reversed) throws IOException
{
ClusteringComparator comparator = metadata.comparator;
BTreeSet.Builder<Clustering> clusterings = BTreeSet.builder(comparator);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/db/filter/ClusteringIndexSliceFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/ClusteringIndexSliceFilter.java b/src/java/org/apache/cassandra/db/filter/ClusteringIndexSliceFilter.java
index 02a44d7..9490adf 100644
--- a/src/java/org/apache/cassandra/db/filter/ClusteringIndexSliceFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/ClusteringIndexSliceFilter.java
@@ -21,7 +21,7 @@ import java.io.IOException;
import java.util.List;
import java.nio.ByteBuffer;
-import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.rows.*;
import org.apache.cassandra.db.partitions.CachedPartition;
@@ -109,7 +109,7 @@ public class ClusteringIndexSliceFilter extends AbstractClusteringIndexFilter
return Transformation.apply(iterator, new FilterNotIndexed());
}
- public Slices getSlices(CFMetaData metadata)
+ public Slices getSlices(TableMetadata metadata)
{
return slices;
}
@@ -130,12 +130,12 @@ public class ClusteringIndexSliceFilter extends AbstractClusteringIndexFilter
return slices.intersects(minClusteringValues, maxClusteringValues);
}
- public String toString(CFMetaData metadata)
+ public String toString(TableMetadata metadata)
{
return String.format("slice(slices=%s, reversed=%b)", slices, reversed);
}
- public String toCQLString(CFMetaData metadata)
+ public String toCQLString(TableMetadata metadata)
{
StringBuilder sb = new StringBuilder();
@@ -164,7 +164,7 @@ public class ClusteringIndexSliceFilter extends AbstractClusteringIndexFilter
private static class SliceDeserializer implements InternalDeserializer
{
- public ClusteringIndexFilter deserialize(DataInputPlus in, int version, CFMetaData metadata, boolean reversed) throws IOException
+ public ClusteringIndexFilter deserialize(DataInputPlus in, int version, TableMetadata metadata, boolean reversed) throws IOException
{
Slices slices = Slices.serializer.deserialize(in, version, metadata);
return new ClusteringIndexSliceFilter(slices, reversed);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/db/filter/ColumnFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/ColumnFilter.java b/src/java/org/apache/cassandra/db/filter/ColumnFilter.java
index b3ae505..6f6fc08 100644
--- a/src/java/org/apache/cassandra/db/filter/ColumnFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/ColumnFilter.java
@@ -24,14 +24,14 @@ import com.google.common.collect.Iterables;
import com.google.common.collect.SortedSetMultimap;
import com.google.common.collect.TreeMultimap;
-import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.cql3.ColumnIdentifier;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.rows.CellPath;
-import org.apache.cassandra.config.ColumnDefinition;
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputPlus;
import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.schema.ColumnMetadata;
+import org.apache.cassandra.schema.TableMetadata;
/**
* Represents which (non-PK) columns (and optionally which sub-part of a column for complex columns) are selected
@@ -68,15 +68,15 @@ public class ColumnFilter
// null. If false, then _fetched_ == _queried_ and we only store _queried_.
private final boolean fetchAllRegulars;
- private final CFMetaData metadata; // can be null if !fetchAllRegulars
+ private final TableMetadata metadata; // can be null if !isFetchAll
- private final PartitionColumns queried; // can be null if fetchAllRegulars, to represent a wildcard query (all
+ private final RegularAndStaticColumns queried; // can be null if fetchAllRegulars, to represent a wildcard query (all
// static and regular columns are both _fetched_ and _queried_).
private final SortedSetMultimap<ColumnIdentifier, ColumnSubselection> subSelections; // can be null
private ColumnFilter(boolean fetchAllRegulars,
- CFMetaData metadata,
- PartitionColumns queried,
+ TableMetadata metadata,
+ RegularAndStaticColumns queried,
SortedSetMultimap<ColumnIdentifier, ColumnSubselection> subSelections)
{
assert !fetchAllRegulars || metadata != null;
@@ -90,7 +90,7 @@ public class ColumnFilter
/**
* A filter that includes all columns for the provided table.
*/
- public static ColumnFilter all(CFMetaData metadata)
+ public static ColumnFilter all(TableMetadata metadata)
{
return new ColumnFilter(true, metadata, null, null);
}
@@ -102,7 +102,7 @@ public class ColumnFilter
* preserve CQL semantic (see class javadoc). This is ok for some internal queries however (and
* for #6588 if/when we implement it).
*/
- public static ColumnFilter selection(PartitionColumns columns)
+ public static ColumnFilter selection(RegularAndStaticColumns columns)
{
return new ColumnFilter(false, null, columns, null);
}
@@ -111,7 +111,7 @@ public class ColumnFilter
* A filter that fetches all columns for the provided table, but returns
* only the queried ones.
*/
- public static ColumnFilter selection(CFMetaData metadata, PartitionColumns queried)
+ public static ColumnFilter selection(TableMetadata metadata, RegularAndStaticColumns queried)
{
return new ColumnFilter(true, metadata, queried, null);
}
@@ -121,17 +121,17 @@ public class ColumnFilter
*
* @return the columns to fetch for this filter.
*/
- public PartitionColumns fetchedColumns()
+ public RegularAndStaticColumns fetchedColumns()
{
if (!fetchAllRegulars)
return queried;
// We always fetch all regulars, but only fetch the statics in queried. Unless queried == null, in which
// case it's a wildcard and we fetch everything.
- PartitionColumns all = metadata.partitionColumns();
+ RegularAndStaticColumns all = metadata.regularAndStaticColumns();
return queried == null || all.statics.isEmpty()
? all
- : new PartitionColumns(queried.statics, all.regulars);
+ : new RegularAndStaticColumns(queried.statics, all.regulars);
}
/**
@@ -139,10 +139,10 @@ public class ColumnFilter
* <p>
* Note that this is in general not all the columns that are fetched internally (see {@link #fetchedColumns}).
*/
- public PartitionColumns queriedColumns()
+ public RegularAndStaticColumns queriedColumns()
{
assert queried != null || fetchAllRegulars;
- return queried == null ? metadata.partitionColumns() : queried;
+ return queried == null ? metadata.regularAndStaticColumns() : queried;
}
/**
@@ -175,7 +175,7 @@ public class ColumnFilter
/**
* Whether the provided column is fetched by this filter.
*/
- public boolean fetches(ColumnDefinition column)
+ public boolean fetches(ColumnMetadata column)
{
// For statics, it is included only if it's part of _queried_, or if _queried_ is null (wildcard query).
if (column.isStatic())
@@ -193,7 +193,7 @@ public class ColumnFilter
* columns that this class made before using this method. If unsure, you probably want
* to use the {@link #fetches} method.
*/
- public boolean fetchedColumnIsQueried(ColumnDefinition column)
+ public boolean fetchedColumnIsQueried(ColumnMetadata column)
{
return !fetchAllRegulars || queried == null || queried.contains(column);
}
@@ -206,7 +206,7 @@ public class ColumnFilter
* columns that this class made before using this method. If unsure, you probably want
* to use the {@link #fetches} method.
*/
- public boolean fetchedCellIsQueried(ColumnDefinition column, CellPath path)
+ public boolean fetchedCellIsQueried(ColumnMetadata column, CellPath path)
{
assert path != null;
if (!fetchAllRegulars || subSelections == null)
@@ -232,7 +232,7 @@ public class ColumnFilter
* @return the created tester or {@code null} if all the cells from the provided column
* are queried.
*/
- public Tester newTester(ColumnDefinition column)
+ public Tester newTester(ColumnMetadata column)
{
if (subSelections == null || !column.isComplex())
return null;
@@ -248,7 +248,7 @@ public class ColumnFilter
* Returns a {@code ColumnFilter}} builder that fetches all regular columns (and queries the columns
* added to the builder, or everything if no column is added).
*/
- public static Builder allRegularColumnsBuilder(CFMetaData metadata)
+ public static Builder allRegularColumnsBuilder(TableMetadata metadata)
{
return new Builder(metadata);
}
@@ -315,32 +315,32 @@ public class ColumnFilter
*
* Note that for a allColumnsBuilder, if no queried columns are added, this is interpreted as querying
* all columns, not querying none (but if you know you want to query all columns, prefer
- * {@link ColumnFilter#all(CFMetaData)}. For selectionBuilder, adding no queried columns means no column will be
+ * {@link ColumnFilter#all(TableMetadata)}. For selectionBuilder, adding no queried columns means no column will be
* fetched (so the builder will return {@code PartitionColumns.NONE}).
*/
public static class Builder
{
- private final CFMetaData metadata; // null if we don't fetch all columns
- private PartitionColumns.Builder queriedBuilder;
+ private final TableMetadata metadata; // null if we don't fetch all columns
+ private RegularAndStaticColumns.Builder queriedBuilder;
private List<ColumnSubselection> subSelections;
- private Builder(CFMetaData metadata)
+ private Builder(TableMetadata metadata)
{
this.metadata = metadata;
}
- public Builder add(ColumnDefinition c)
+ public Builder add(ColumnMetadata c)
{
if (queriedBuilder == null)
- queriedBuilder = PartitionColumns.builder();
+ queriedBuilder = RegularAndStaticColumns.builder();
queriedBuilder.add(c);
return this;
}
- public Builder addAll(Iterable<ColumnDefinition> columns)
+ public Builder addAll(Iterable<ColumnMetadata> columns)
{
if (queriedBuilder == null)
- queriedBuilder = PartitionColumns.builder();
+ queriedBuilder = RegularAndStaticColumns.builder();
queriedBuilder.addAll(columns);
return this;
}
@@ -354,12 +354,12 @@ public class ColumnFilter
return this;
}
- public Builder slice(ColumnDefinition c, CellPath from, CellPath to)
+ public Builder slice(ColumnMetadata c, CellPath from, CellPath to)
{
return addSubSelection(ColumnSubselection.slice(c, from, to));
}
- public Builder select(ColumnDefinition c, CellPath elt)
+ public Builder select(ColumnMetadata c, CellPath elt)
{
return addSubSelection(ColumnSubselection.element(c, elt));
}
@@ -368,11 +368,11 @@ public class ColumnFilter
{
boolean isFetchAll = metadata != null;
- PartitionColumns queried = queriedBuilder == null ? null : queriedBuilder.build();
+ RegularAndStaticColumns queried = queriedBuilder == null ? null : queriedBuilder.build();
// It's only ok to have queried == null in ColumnFilter if isFetchAll. So deal with the case of a selectionBuilder
// with nothing selected (we can at least happen on some backward compatible queries - CASSANDRA-10471).
if (!isFetchAll && queried == null)
- queried = PartitionColumns.NONE;
+ queried = RegularAndStaticColumns.NONE;
SortedSetMultimap<ColumnIdentifier, ColumnSubselection> s = null;
if (subSelections != null)
@@ -395,7 +395,7 @@ public class ColumnFilter
if (queried.isEmpty())
return "";
- Iterator<ColumnDefinition> defs = queried.selectOrderIterator();
+ Iterator<ColumnMetadata> defs = queried.selectOrderIterator();
if (!defs.hasNext())
return "<none>";
@@ -409,7 +409,7 @@ public class ColumnFilter
return sb.toString();
}
- private void appendColumnDef(StringBuilder sb, ColumnDefinition column)
+ private void appendColumnDef(StringBuilder sb, ColumnMetadata column)
{
if (subSelections == null)
{
@@ -454,12 +454,12 @@ public class ColumnFilter
// queried some columns that are actually only fetched, but it's fine during upgrade).
// More concretely, we replace our filter by a non-fetch-all one that queries every columns that our
// current filter fetches.
- Columns allRegulars = selection.metadata.partitionColumns().regulars;
- Set<ColumnDefinition> queriedStatic = new HashSet<>();
- Iterables.addAll(queriedStatic, Iterables.filter(selection.queried, ColumnDefinition::isStatic));
+ Columns allRegulars = selection.metadata.regularColumns();
+ Set<ColumnMetadata> queriedStatic = new HashSet<>();
+ Iterables.addAll(queriedStatic, Iterables.filter(selection.queried, ColumnMetadata::isStatic));
return new ColumnFilter(false,
null,
- new PartitionColumns(Columns.from(queriedStatic), allRegulars),
+ new RegularAndStaticColumns(Columns.from(queriedStatic), allRegulars),
selection.subSelections);
}
@@ -483,19 +483,19 @@ public class ColumnFilter
}
}
- public ColumnFilter deserialize(DataInputPlus in, int version, CFMetaData metadata) throws IOException
+ public ColumnFilter deserialize(DataInputPlus in, int version, TableMetadata metadata) throws IOException
{
int header = in.readUnsignedByte();
boolean isFetchAll = (header & FETCH_ALL_MASK) != 0;
boolean hasQueried = (header & HAS_QUERIED_MASK) != 0;
boolean hasSubSelections = (header & HAS_SUB_SELECTIONS_MASK) != 0;
- PartitionColumns queried = null;
+ RegularAndStaticColumns queried = null;
if (hasQueried)
{
Columns statics = Columns.serializer.deserialize(in, metadata);
Columns regulars = Columns.serializer.deserialize(in, metadata);
- queried = new PartitionColumns(statics, regulars);
+ queried = new RegularAndStaticColumns(statics, regulars);
}
SortedSetMultimap<ColumnIdentifier, ColumnSubselection> subSelections = null;
@@ -517,7 +517,7 @@ public class ColumnFilter
// statics even though we only care about _fetching_ them all, but that's a minor inefficiency, so fine
// during upgrade.
if (version <= MessagingService.VERSION_30 && isFetchAll && queried != null)
- queried = new PartitionColumns(metadata.partitionColumns().statics, queried.regulars);
+ queried = new RegularAndStaticColumns(metadata.staticColumns(), queried.regulars);
return new ColumnFilter(isFetchAll, isFetchAll ? metadata : null, queried, subSelections);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/db/filter/ColumnSubselection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/ColumnSubselection.java b/src/java/org/apache/cassandra/db/filter/ColumnSubselection.java
index b762fa5..ddc7b1c 100644
--- a/src/java/org/apache/cassandra/db/filter/ColumnSubselection.java
+++ b/src/java/org/apache/cassandra/db/filter/ColumnSubselection.java
@@ -21,15 +21,15 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Comparator;
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.ColumnDefinition;
import org.apache.cassandra.db.TypeSizes;
-import org.apache.cassandra.db.rows.CellPath;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.db.marshal.CollectionType;
import org.apache.cassandra.db.marshal.UTF8Type;
+import org.apache.cassandra.db.rows.CellPath;
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.schema.ColumnMetadata;
+import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.utils.ByteBufferUtil;
/**
@@ -44,28 +44,28 @@ public abstract class ColumnSubselection implements Comparable<ColumnSubselectio
private enum Kind { SLICE, ELEMENT }
- protected final ColumnDefinition column;
+ protected final ColumnMetadata column;
- protected ColumnSubselection(ColumnDefinition column)
+ protected ColumnSubselection(ColumnMetadata column)
{
this.column = column;
}
- public static ColumnSubselection slice(ColumnDefinition column, CellPath from, CellPath to)
+ public static ColumnSubselection slice(ColumnMetadata column, CellPath from, CellPath to)
{
assert column.isComplex() && column.type instanceof CollectionType;
assert from.size() <= 1 && to.size() <= 1;
return new Slice(column, from, to);
}
- public static ColumnSubselection element(ColumnDefinition column, CellPath elt)
+ public static ColumnSubselection element(ColumnMetadata column, CellPath elt)
{
assert column.isComplex() && column.type instanceof CollectionType;
assert elt.size() == 1;
return new Element(column, elt);
}
- public ColumnDefinition column()
+ public ColumnMetadata column()
{
return column;
}
@@ -91,7 +91,7 @@ public abstract class ColumnSubselection implements Comparable<ColumnSubselectio
private final CellPath from;
private final CellPath to;
- private Slice(ColumnDefinition column, CellPath from, CellPath to)
+ private Slice(ColumnMetadata column, CellPath from, CellPath to)
{
super(column);
this.from = from;
@@ -132,7 +132,7 @@ public abstract class ColumnSubselection implements Comparable<ColumnSubselectio
{
private final CellPath element;
- private Element(ColumnDefinition column, CellPath elt)
+ private Element(ColumnMetadata column, CellPath elt)
{
super(column);
this.element = elt;
@@ -166,7 +166,7 @@ public abstract class ColumnSubselection implements Comparable<ColumnSubselectio
{
public void serialize(ColumnSubselection subSel, DataOutputPlus out, int version) throws IOException
{
- ColumnDefinition column = subSel.column();
+ ColumnMetadata column = subSel.column();
ByteBufferUtil.writeWithShortLength(column.name.bytes, out);
out.writeByte(subSel.kind().ordinal());
switch (subSel.kind())
@@ -185,16 +185,16 @@ public abstract class ColumnSubselection implements Comparable<ColumnSubselectio
}
}
- public ColumnSubselection deserialize(DataInputPlus in, int version, CFMetaData metadata) throws IOException
+ public ColumnSubselection deserialize(DataInputPlus in, int version, TableMetadata metadata) throws IOException
{
ByteBuffer name = ByteBufferUtil.readWithShortLength(in);
- ColumnDefinition column = metadata.getColumnDefinition(name);
+ ColumnMetadata column = metadata.getColumn(name);
if (column == null)
{
// If we don't find the definition, it could be we have data for a dropped column, and we shouldn't
- // fail deserialization because of that. So we grab a "fake" ColumnDefinition that ensure proper
+ // fail deserialization because of that. So we grab a "fake" ColumnMetadata that ensure proper
// deserialization. The column will be ignore later on anyway.
- column = metadata.getDroppedColumnDefinition(name);
+ column = metadata.getDroppedColumn(name);
if (column == null)
throw new RuntimeException("Unknown column " + UTF8Type.instance.getString(name) + " during deserialization");
}
@@ -217,7 +217,7 @@ public abstract class ColumnSubselection implements Comparable<ColumnSubselectio
{
long size = 0;
- ColumnDefinition column = subSel.column();
+ ColumnMetadata column = subSel.column();
size += TypeSizes.sizeofWithShortLength(column.name.bytes);
size += 1; // kind
switch (subSel.kind())
http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/db/filter/DataLimits.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/DataLimits.java b/src/java/org/apache/cassandra/db/filter/DataLimits.java
index 442b5f8..410845c 100644
--- a/src/java/org/apache/cassandra/db/filter/DataLimits.java
+++ b/src/java/org/apache/cassandra/db/filter/DataLimits.java
@@ -412,7 +412,7 @@ public abstract class DataLimits
{
// TODO: we should start storing stats on the number of rows (instead of the number of cells, which
// is what getMeanColumns returns)
- float rowsPerPartition = ((float) cfs.getMeanColumns()) / cfs.metadata.partitionColumns().regulars.size();
+ float rowsPerPartition = ((float) cfs.getMeanColumns()) / cfs.metadata().regularColumns().size();
return rowsPerPartition * (cfs.estimateKeys());
}