You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by cl...@apache.org on 2019/05/15 13:57:27 UTC
[cassandra] branch trunk updated: Reduce heap pressure during
compactions Patch by Chris Lohfink;
Reviewed by Dinesh Joshi and Benedict for CASSANDRA-14654
This is an automated email from the ASF dual-hosted git repository.
clohfink pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push:
new 7df67ef Reduce heap pressure during compactions Patch by Chris Lohfink; Reviewed by Dinesh Joshi and Benedict for CASSANDRA-14654
7df67ef is described below
commit 7df67eff2d66dba4bed2b4f6aeabf05144d9b057
Author: Chris Lohfink <cl...@apple.com>
AuthorDate: Wed May 15 08:55:31 2019 -0500
Reduce heap pressure during compactions
Patch by Chris Lohfink; Reviewed by Dinesh Joshi and Benedict for CASSANDRA-14654
---
CHANGES.txt | 1 +
src/java/org/apache/cassandra/config/Config.java | 4 +-
.../cassandra/config/DatabaseDescriptor.java | 14 +-
.../apache/cassandra/db/rows/EncodingStats.java | 39 ++++-
.../cassandra/db/rows/UnfilteredRowIterators.java | 10 +-
.../io/sstable/SSTableIdentityIterator.java | 2 +-
.../cassandra/io/sstable/SSTableRewriter.java | 20 +--
.../cassandra/io/sstable/format/SSTableReader.java | 24 +--
.../io/sstable/format/big/BigTableReader.java | 6 +-
.../io/sstable/metadata/StatsMetadata.java | 4 +
.../apache/cassandra/service/StorageService.java | 21 ++-
.../cassandra/service/StorageServiceMBean.java | 6 +
.../unit/org/apache/cassandra/db/KeyCacheTest.java | 2 +-
.../apache/cassandra/db/lifecycle/TrackerTest.java | 13 +-
.../cassandra/db/rows/EncodingStatsTest.java | 170 +++++++++++++++++++++
.../db/streaming/CassandraStreamManagerTest.java | 6 +-
.../CompressedSequentialWriterReopenTest.java | 3 +-
.../org/apache/cassandra/schema/MockSchema.java | 39 ++++-
18 files changed, 320 insertions(+), 64 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index 960ed64..3a98fa5 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
4.0
+ * Reduce heap pressure during compactions (CASSANDRA-14654)
* Support building Cassandra with JDK 11 (CASSANDRA-15108)
* Use quilt to patch cassandra.in.sh in Debian packaging (CASSANDRA-14710)
* Take sstable references before calculating approximate key count (CASSANDRA-14647)
diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java
index 04ac608..a6050be 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -253,12 +253,14 @@ public class Config
public int hints_flush_period_in_ms = 10000;
public int max_hints_file_size_in_mb = 128;
public ParameterizedClass hints_compression;
- public int sstable_preemptive_open_interval_in_mb = 50;
public volatile boolean incremental_backups = false;
public boolean trickle_fsync = false;
public int trickle_fsync_interval_in_kb = 10240;
+ public volatile int sstable_preemptive_open_interval_in_mb = 50;
+
+ public volatile boolean key_cache_migrate_during_compaction = true;
public Long key_cache_size_in_mb = null;
public volatile int key_cache_save_period = 14400;
public volatile int key_cache_keys_to_save = Integer.MAX_VALUE;
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index e2c2ace..b3ab054 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -2241,11 +2241,21 @@ public class DatabaseDescriptor
return conf.commitlog_total_space_in_mb;
}
- public static int getSSTablePreempiveOpenIntervalInMB()
+ public static boolean shouldMigrateKeycacheOnCompaction()
+ {
+ return conf.key_cache_migrate_during_compaction;
+ }
+
+ public static void setMigrateKeycacheOnCompaction(boolean migrateCacheEntry)
+ {
+ conf.key_cache_migrate_during_compaction = migrateCacheEntry;
+ }
+
+ public static int getSSTablePreemptiveOpenIntervalInMB()
{
return FBUtilities.isWindows ? -1 : conf.sstable_preemptive_open_interval_in_mb;
}
- public static void setSSTablePreempiveOpenIntervalInMB(int mb)
+ public static void setSSTablePreemptiveOpenIntervalInMB(int mb)
{
conf.sstable_preemptive_open_interval_in_mb = mb;
}
diff --git a/src/java/org/apache/cassandra/db/rows/EncodingStats.java b/src/java/org/apache/cassandra/db/rows/EncodingStats.java
index 955ffc7..4a7bb19 100644
--- a/src/java/org/apache/cassandra/db/rows/EncodingStats.java
+++ b/src/java/org/apache/cassandra/db/rows/EncodingStats.java
@@ -19,6 +19,9 @@ package org.apache.cassandra.db.rows;
import java.io.IOException;
import java.util.*;
+import java.util.function.Function;
+
+import com.google.common.collect.Iterables;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.partitions.PartitionStatisticsCollector;
@@ -41,7 +44,7 @@ public class EncodingStats
{
// Default values for the timestamp, deletion time and ttl. We use this both for NO_STATS, but also to serialize
// an EncodingStats. Basically, we encode the diff of each value of to these epoch, which give values with better vint encoding.
- private static final long TIMESTAMP_EPOCH;
+ public static final long TIMESTAMP_EPOCH;
private static final int DELETION_TIME_EPOCH;
private static final int TTL_EPOCH = 0;
static
@@ -93,20 +96,42 @@ public class EncodingStats
public EncodingStats mergeWith(EncodingStats that)
{
long minTimestamp = this.minTimestamp == TIMESTAMP_EPOCH
- ? that.minTimestamp
- : (that.minTimestamp == TIMESTAMP_EPOCH ? this.minTimestamp : Math.min(this.minTimestamp, that.minTimestamp));
+ ? that.minTimestamp
+ : (that.minTimestamp == TIMESTAMP_EPOCH ? this.minTimestamp : Math.min(this.minTimestamp, that.minTimestamp));
int minDelTime = this.minLocalDeletionTime == DELETION_TIME_EPOCH
- ? that.minLocalDeletionTime
- : (that.minLocalDeletionTime == DELETION_TIME_EPOCH ? this.minLocalDeletionTime : Math.min(this.minLocalDeletionTime, that.minLocalDeletionTime));
+ ? that.minLocalDeletionTime
+ : (that.minLocalDeletionTime == DELETION_TIME_EPOCH ? this.minLocalDeletionTime : Math.min(this.minLocalDeletionTime, that.minLocalDeletionTime));
int minTTL = this.minTTL == TTL_EPOCH
- ? that.minTTL
- : (that.minTTL == TTL_EPOCH ? this.minTTL : Math.min(this.minTTL, that.minTTL));
+ ? that.minTTL
+ : (that.minTTL == TTL_EPOCH ? this.minTTL : Math.min(this.minTTL, that.minTTL));
return new EncodingStats(minTimestamp, minDelTime, minTTL);
}
+ /**
+ * Merge one or more EncodingStats, that are lazily materialized from some list of arbitrary type by the provided function
+ */
+ public static <V, F extends Function<V, EncodingStats>> EncodingStats merge(List<V> values, F function)
+ {
+ if (values.size() == 1)
+ return function.apply(values.get(0));
+
+ Collector collector = new Collector();
+ for (V v : values)
+ {
+ EncodingStats stats = function.apply(v);
+ if (stats.minTimestamp != TIMESTAMP_EPOCH)
+ collector.updateTimestamp(stats.minTimestamp);
+ if(stats.minLocalDeletionTime != DELETION_TIME_EPOCH)
+ collector.updateLocalDeletionTime(stats.minLocalDeletionTime);
+ if(stats.minTTL != TTL_EPOCH)
+ collector.updateTTL(stats.minTTL);
+ }
+ return collector.get();
+ }
+
@Override
public boolean equals(Object o)
{
diff --git a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java
index 42807a2..21e1954 100644
--- a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java
+++ b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java
@@ -403,7 +403,7 @@ public abstract class UnfilteredRowIterators
columns,
mergeStaticRows(iterators, columns.statics, listener, partitionDeletion),
reversed,
- mergeStats(iterators));
+ EncodingStats.merge(iterators, UnfilteredRowIterator::stats));
this.mergeIterator = MergeIterator.get(iterators,
reversed ? metadata.comparator.reversed() : metadata.comparator,
@@ -512,14 +512,6 @@ public abstract class UnfilteredRowIterators
: new RegularAndStaticColumns(statics, regulars);
}
- private static EncodingStats mergeStats(List<UnfilteredRowIterator> iterators)
- {
- EncodingStats stats = EncodingStats.NO_STATS;
- for (UnfilteredRowIterator iter : iterators)
- stats = stats.mergeWith(iter.stats());
- return stats;
- }
-
protected Unfiltered computeNext()
{
while (mergeIterator.hasNext())
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java b/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
index a49e7b4..1846aa5 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
@@ -190,7 +190,7 @@ public class SSTableIdentityIterator implements Comparable<SSTableIdentityIterat
{
// We could return sstable.header.stats(), but this may not be as accurate than the actual sstable stats (see
// SerializationHeader.make() for details) so we use the latter instead.
- return new EncodingStats(sstable.getMinTimestamp(), sstable.getMinLocalDeletionTime(), sstable.getMinTTL());
+ return sstable.stats();
}
public int compareTo(SSTableIdentityIterator o)
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
index a71d1af..fb3aa2d 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
@@ -32,7 +32,6 @@ import org.apache.cassandra.db.lifecycle.ILifecycleTransaction;
import org.apache.cassandra.db.rows.UnfilteredRowIterator;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.io.sstable.format.SSTableWriter;
-import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
import org.apache.cassandra.utils.NativeLibrary;
import org.apache.cassandra.utils.concurrent.Transactional;
@@ -117,7 +116,7 @@ public class SSTableRewriter extends Transactional.AbstractTransactional impleme
private static long calculateOpenInterval(boolean shouldOpenEarly)
{
- long interval = DatabaseDescriptor.getSSTablePreempiveOpenIntervalInMB() * (1L << 20);
+ long interval = DatabaseDescriptor.getSSTablePreemptiveOpenIntervalInMB() * (1L << 20);
if (disableEarlyOpeningForTests || !shouldOpenEarly || interval < 0)
interval = Long.MAX_VALUE;
return interval;
@@ -134,14 +133,17 @@ public class SSTableRewriter extends Transactional.AbstractTransactional impleme
DecoratedKey key = partition.partitionKey();
maybeReopenEarly(key);
RowIndexEntry index = writer.append(partition);
- if (!transaction.isOffline() && index != null)
+ if (DatabaseDescriptor.shouldMigrateKeycacheOnCompaction())
{
- for (SSTableReader reader : transaction.originals())
+ if (!transaction.isOffline() && index != null)
{
- if (reader.getCachedPosition(key, false) != null)
+ for (SSTableReader reader : transaction.originals())
{
- cachedKeys.put(key, index);
- break;
+ if (reader.getCachedPosition(key, false) != null)
+ {
+ cachedKeys.put(key, index);
+ break;
+ }
}
}
}
@@ -223,9 +225,7 @@ public class SSTableRewriter extends Transactional.AbstractTransactional impleme
*/
private void moveStarts(SSTableReader newReader, DecoratedKey lowerbound)
{
- if (transaction.isOffline())
- return;
- if (preemptiveOpenInterval == Long.MAX_VALUE)
+ if (transaction.isOffline() || preemptiveOpenInterval == Long.MAX_VALUE)
return;
newReader.setupOnline();
diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
index ca6eb85..e3059c8 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
@@ -699,17 +699,15 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
// under normal operation we can do this at any time, but SSTR is also used outside C* proper,
// e.g. by BulkLoader, which does not initialize the cache. As a kludge, we set up the cache
// here when we know we're being wired into the rest of the server infrastructure.
- keyCache = CacheService.instance.keyCache;
+ InstrumentingCache<KeyCacheKey, RowIndexEntry> maybeKeyCache = CacheService.instance.keyCache;
+ if (maybeKeyCache.getCapacity() > 0)
+ keyCache = maybeKeyCache;
+
final ColumnFamilyStore cfs = Schema.instance.getColumnFamilyStoreInstance(metadata().id);
if (cfs != null)
setCrcCheckChance(cfs.getCrcCheckChance());
}
- public boolean isKeyCacheSetup()
- {
- return keyCache != null;
- }
-
/**
* See {@link #load(boolean, boolean)}
* @param validation Metadata for SSTable being loaded
@@ -1534,12 +1532,14 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
public RowIndexEntry getCachedPosition(DecoratedKey key, boolean updateStats)
{
- return getCachedPosition(new KeyCacheKey(metadata(), descriptor, key.getKey()), updateStats);
+ if (isKeyCacheEnabled())
+ return getCachedPosition(new KeyCacheKey(metadata(), descriptor, key.getKey()), updateStats);
+ return null;
}
protected RowIndexEntry getCachedPosition(KeyCacheKey unifiedKey, boolean updateStats)
{
- if (keyCacheEnabled())
+ if (isKeyCacheEnabled())
{
if (updateStats)
{
@@ -1560,9 +1560,9 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
return null;
}
- private boolean keyCacheEnabled()
+ public boolean isKeyCacheEnabled()
{
- return keyCache != null && keyCache.getCapacity() > 0 && metadata().params.caching.cacheKeys();
+ return keyCache != null && metadata().params.caching.cacheKeys();
}
/**
@@ -1830,7 +1830,7 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
// hint read path about key location if caching is enabled
// this saves index summary lookup and index file iteration which whould be pretty costly
// especially in presence of promoted column indexes
- if (isKeyCacheSetup())
+ if (isKeyCacheEnabled())
cacheKey(key, rowIndexEntrySerializer.deserialize(in, in.getFilePointer()));
}
@@ -2101,7 +2101,7 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
{
// We could return sstable.header.stats(), but this may not be as accurate than the actual sstable stats (see
// SerializationHeader.make() for details) so we use the latter instead.
- return new EncodingStats(getMinTimestamp(), getMinLocalDeletionTime(), getMinTTL());
+ return sstableMetadata.encodingStats;
}
public Ref<SSTableReader> tryRef()
diff --git a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java
index a4787a2..c9ae431 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java
@@ -24,7 +24,6 @@ import java.util.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.cassandra.cache.KeyCacheKey;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.columniterator.SSTableIterator;
import org.apache.cassandra.db.columniterator.SSTableReversedIterator;
@@ -155,9 +154,8 @@ public class BigTableReader extends SSTableReader
// next, the key cache (only make sense for valid row key)
if ((op == Operator.EQ || op == Operator.GE) && (key instanceof DecoratedKey))
{
- DecoratedKey decoratedKey = (DecoratedKey)key;
- KeyCacheKey cacheKey = new KeyCacheKey(metadata(), descriptor, decoratedKey.getKey());
- RowIndexEntry cachedPosition = getCachedPosition(cacheKey, updateCacheAndStats);
+ DecoratedKey decoratedKey = (DecoratedKey) key;
+ RowIndexEntry cachedPosition = getCachedPosition(decoratedKey, updateCacheAndStats);
if (cachedPosition != null)
{
listener.onSSTableSelected(this, cachedPosition, SelectionReason.KEY_CACHE_HIT);
diff --git a/src/java/org/apache/cassandra/io/sstable/metadata/StatsMetadata.java b/src/java/org/apache/cassandra/io/sstable/metadata/StatsMetadata.java
index 5b06ada..5d464fe 100755
--- a/src/java/org/apache/cassandra/io/sstable/metadata/StatsMetadata.java
+++ b/src/java/org/apache/cassandra/io/sstable/metadata/StatsMetadata.java
@@ -23,6 +23,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
+import org.apache.cassandra.db.rows.EncodingStats;
import org.apache.cassandra.io.ISerializer;
import org.apache.cassandra.io.sstable.format.Version;
import org.apache.commons.lang3.builder.EqualsBuilder;
@@ -65,6 +66,8 @@ public class StatsMetadata extends MetadataComponent
public final long totalRows;
public final UUID pendingRepair;
public final boolean isTransient;
+ // just holds the current encoding stats to avoid allocating - it is not serialized
+ public final EncodingStats encodingStats;
public StatsMetadata(EstimatedHistogram estimatedPartitionSize,
EstimatedHistogram estimatedColumnCount,
@@ -107,6 +110,7 @@ public class StatsMetadata extends MetadataComponent
this.totalRows = totalRows;
this.pendingRepair = pendingRepair;
this.isTransient = isTransient;
+ this.encodingStats = new EncodingStats(minTimestamp, minLocalDeletionTime, minTTL);
}
public MetadataType getType()
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index 2707b85..eade7dd 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -18,7 +18,6 @@
package org.apache.cassandra.service;
import java.io.*;
-import java.lang.management.ManagementFactory;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
@@ -5347,6 +5346,26 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
return DatabaseDescriptor.getPartitionerName();
}
+ public void setSSTablePreemptiveOpenIntervalInMB(int intervalInMB)
+ {
+ DatabaseDescriptor.setSSTablePreemptiveOpenIntervalInMB(intervalInMB);
+ }
+
+ public int getSSTablePreemptiveOpenIntervalInMB()
+ {
+ return DatabaseDescriptor.getSSTablePreemptiveOpenIntervalInMB();
+ }
+
+ public boolean getMigrateKeycacheOnCompaction()
+ {
+ return DatabaseDescriptor.shouldMigrateKeycacheOnCompaction();
+ }
+
+ public void setMigrateKeycacheOnCompaction(boolean invalidateKeyCacheOnCompaction)
+ {
+ DatabaseDescriptor.setMigrateKeycacheOnCompaction(invalidateKeyCacheOnCompaction);
+ }
+
public int getTombstoneWarnThreshold()
{
return DatabaseDescriptor.getTombstoneWarnThreshold();
diff --git a/src/java/org/apache/cassandra/service/StorageServiceMBean.java b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
index e74f002..3fef7f6 100644
--- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java
+++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
@@ -577,6 +577,12 @@ public interface StorageServiceMBean extends NotificationEmitter
public int getConcurrentValidators();
public void setConcurrentValidators(int value);
+ public int getSSTablePreemptiveOpenIntervalInMB();
+ public void setSSTablePreemptiveOpenIntervalInMB(int intervalInMB);
+
+ public boolean getMigrateKeycacheOnCompaction();
+ public void setMigrateKeycacheOnCompaction(boolean invalidateKeyCacheOnCompaction);
+
public int getConcurrentViewBuilders();
public void setConcurrentViewBuilders(int value);
diff --git a/test/unit/org/apache/cassandra/db/KeyCacheTest.java b/test/unit/org/apache/cassandra/db/KeyCacheTest.java
index d7a31d6..1819b18 100644
--- a/test/unit/org/apache/cassandra/db/KeyCacheTest.java
+++ b/test/unit/org/apache/cassandra/db/KeyCacheTest.java
@@ -287,7 +287,7 @@ public class KeyCacheTest
throw new IllegalStateException();
Util.compactAll(cfs, Integer.MAX_VALUE).get();
- boolean noEarlyOpen = DatabaseDescriptor.getSSTablePreempiveOpenIntervalInMB() < 0;
+ boolean noEarlyOpen = DatabaseDescriptor.getSSTablePreemptiveOpenIntervalInMB() < 0;
// after compaction cache should have entries for new SSTables,
// but since we have kept a reference to the old sstables,
diff --git a/test/unit/org/apache/cassandra/db/lifecycle/TrackerTest.java b/test/unit/org/apache/cassandra/db/lifecycle/TrackerTest.java
index 2891126..522e59a 100644
--- a/test/unit/org/apache/cassandra/db/lifecycle/TrackerTest.java
+++ b/test/unit/org/apache/cassandra/db/lifecycle/TrackerTest.java
@@ -43,6 +43,7 @@ import org.apache.cassandra.db.commitlog.CommitLogPosition;
import org.apache.cassandra.db.compaction.OperationType;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.notifications.*;
+import org.apache.cassandra.schema.CachingParams;
import org.apache.cassandra.schema.MockSchema;
import org.apache.cassandra.utils.concurrent.OpOrder;
@@ -146,7 +147,7 @@ public class TrackerTest
@Test
public void testAddInitialSSTables()
{
- ColumnFamilyStore cfs = MockSchema.newCFS();
+ ColumnFamilyStore cfs = MockSchema.newCFS(metadata -> metadata.caching(CachingParams.CACHE_KEYS));
Tracker tracker = cfs.getTracker();
List<SSTableReader> readers = ImmutableList.of(MockSchema.sstable(0, 17, cfs),
MockSchema.sstable(1, 121, cfs),
@@ -156,7 +157,7 @@ public class TrackerTest
Assert.assertEquals(3, tracker.view.get().sstables.size());
for (SSTableReader reader : readers)
- Assert.assertTrue(reader.isKeyCacheSetup());
+ Assert.assertTrue(reader.isKeyCacheEnabled());
Assert.assertEquals(17 + 121 + 9, cfs.metric.liveDiskSpaceUsed.getCount());
}
@@ -166,7 +167,7 @@ public class TrackerTest
{
boolean backups = DatabaseDescriptor.isIncrementalBackupsEnabled();
DatabaseDescriptor.setIncrementalBackupsEnabled(false);
- ColumnFamilyStore cfs = MockSchema.newCFS();
+ ColumnFamilyStore cfs = MockSchema.newCFS(metadata -> metadata.caching(CachingParams.CACHE_KEYS));
Tracker tracker = cfs.getTracker();
MockListener listener = new MockListener(false);
tracker.subscribe(listener);
@@ -178,7 +179,7 @@ public class TrackerTest
Assert.assertEquals(3, tracker.view.get().sstables.size());
for (SSTableReader reader : readers)
- Assert.assertTrue(reader.isKeyCacheSetup());
+ Assert.assertTrue(reader.isKeyCacheEnabled());
Assert.assertEquals(17 + 121 + 9, cfs.metric.liveDiskSpaceUsed.getCount());
Assert.assertEquals(1, listener.senders.size());
@@ -263,7 +264,7 @@ public class TrackerTest
{
boolean backups = DatabaseDescriptor.isIncrementalBackupsEnabled();
DatabaseDescriptor.setIncrementalBackupsEnabled(false);
- ColumnFamilyStore cfs = MockSchema.newCFS();
+ ColumnFamilyStore cfs = MockSchema.newCFS(metadata -> metadata.caching(CachingParams.CACHE_KEYS));
MockListener listener = new MockListener(false);
Tracker tracker = cfs.getTracker();
tracker.subscribe(listener);
@@ -308,7 +309,7 @@ public class TrackerTest
Assert.assertEquals(singleton(reader), ((SSTableAddedNotification) listener.received.get(1)).added);
Assert.assertEquals(Optional.of(prev2), ((SSTableAddedNotification) listener.received.get(1)).memtable());
listener.received.clear();
- Assert.assertTrue(reader.isKeyCacheSetup());
+ Assert.assertTrue(reader.isKeyCacheEnabled());
Assert.assertEquals(10, cfs.metric.liveDiskSpaceUsed.getCount());
// test invalidated CFS
diff --git a/test/unit/org/apache/cassandra/db/rows/EncodingStatsTest.java b/test/unit/org/apache/cassandra/db/rows/EncodingStatsTest.java
new file mode 100644
index 0000000..1ac092a
--- /dev/null
+++ b/test/unit/org/apache/cassandra/db/rows/EncodingStatsTest.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.rows;
+
+import java.util.function.Function;
+
+import com.google.common.collect.ImmutableList;
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.cassandra.db.LivenessInfo;
+
+import static org.quicktheories.QuickTheory.qt;
+import static org.quicktheories.generators.SourceDSL.integers;
+import static org.quicktheories.generators.SourceDSL.longs;
+
+public class EncodingStatsTest
+{
+ @Test
+ public void testCollectWithNoStats()
+ {
+ EncodingStats none = EncodingStats.merge(ImmutableList.of(
+ EncodingStats.NO_STATS,
+ EncodingStats.NO_STATS,
+ EncodingStats.NO_STATS
+ ), Function.identity());
+ Assert.assertEquals(none, EncodingStats.NO_STATS);
+ }
+
+ @Test
+ public void testCollectWithNoStatsWithEmpty()
+ {
+ EncodingStats none = EncodingStats.merge(ImmutableList.of(
+ EncodingStats.NO_STATS,
+ EncodingStats.NO_STATS,
+ new EncodingStats(LivenessInfo.NO_TIMESTAMP, LivenessInfo.NO_EXPIRATION_TIME, 0)
+ ), Function.identity());
+ Assert.assertEquals(none, EncodingStats.NO_STATS);
+ }
+
+ @Test
+ public void testCollectWithNoStatsWithTimestamp()
+ {
+ EncodingStats single = new EncodingStats(1, LivenessInfo.NO_EXPIRATION_TIME, 0);
+ EncodingStats result = EncodingStats.merge(ImmutableList.of(
+ EncodingStats.NO_STATS,
+ EncodingStats.NO_STATS,
+ single,
+ EncodingStats.NO_STATS
+ ), Function.identity());
+ Assert.assertEquals(single, result);
+ }
+
+ @Test
+ public void testCollectWithNoStatsWithExpires()
+ {
+ EncodingStats single = new EncodingStats(LivenessInfo.NO_TIMESTAMP, 1, 0);
+ EncodingStats result = EncodingStats.merge(ImmutableList.of(
+ EncodingStats.NO_STATS,
+ single,
+ EncodingStats.NO_STATS
+ ), Function.identity());
+ Assert.assertEquals(single, result);
+ }
+
+ @Test
+ public void testCollectWithNoStatsWithTTL()
+ {
+ EncodingStats single = new EncodingStats(LivenessInfo.NO_TIMESTAMP, LivenessInfo.NO_EXPIRATION_TIME, 1);
+ EncodingStats result = EncodingStats.merge(ImmutableList.of(
+ EncodingStats.NO_STATS,
+ single,
+ EncodingStats.NO_STATS
+ ), Function.identity());
+ Assert.assertEquals(single, result);
+ }
+
+ @Test
+ public void testCollectOneEach()
+ {
+ EncodingStats tsp = new EncodingStats(1, LivenessInfo.NO_EXPIRATION_TIME, 0);
+ EncodingStats exp = new EncodingStats(LivenessInfo.NO_TIMESTAMP, 1, 0);
+ EncodingStats ttl = new EncodingStats(LivenessInfo.NO_TIMESTAMP, LivenessInfo.NO_EXPIRATION_TIME, 1);
+ EncodingStats result = EncodingStats.merge(ImmutableList.of(
+ tsp,
+ exp,
+ ttl
+ ), Function.identity());
+ Assert.assertEquals(new EncodingStats(1, 1, 1), result);
+ }
+
+ @Test
+ public void testTimestamp()
+ {
+ EncodingStats one = new EncodingStats(1, LivenessInfo.NO_EXPIRATION_TIME, 0);
+ EncodingStats two = new EncodingStats(2, LivenessInfo.NO_EXPIRATION_TIME, 0);
+ EncodingStats thr = new EncodingStats(3, LivenessInfo.NO_EXPIRATION_TIME, 0);
+ EncodingStats result = EncodingStats.merge(ImmutableList.of(
+ one,
+ two,
+ thr
+ ), Function.identity());
+ Assert.assertEquals(one, result);
+ }
+
+ @Test
+ public void testExpires()
+ {
+ EncodingStats one = new EncodingStats(LivenessInfo.NO_TIMESTAMP,1, 0);
+ EncodingStats two = new EncodingStats(LivenessInfo.NO_TIMESTAMP,2, 0);
+ EncodingStats thr = new EncodingStats(LivenessInfo.NO_TIMESTAMP,3, 0);
+ EncodingStats result = EncodingStats.merge(ImmutableList.of(
+ one,
+ two,
+ thr
+ ), Function.identity());
+ Assert.assertEquals(one, result);
+ }
+
+ @Test
+ public void testTTL()
+ {
+ EncodingStats one = new EncodingStats(LivenessInfo.NO_TIMESTAMP, LivenessInfo.NO_EXPIRATION_TIME,1);
+ EncodingStats two = new EncodingStats(LivenessInfo.NO_TIMESTAMP, LivenessInfo.NO_EXPIRATION_TIME,2);
+ EncodingStats thr = new EncodingStats(LivenessInfo.NO_TIMESTAMP, LivenessInfo.NO_EXPIRATION_TIME,3);
+ EncodingStats result = EncodingStats.merge(ImmutableList.of(
+ thr,
+ one,
+ two
+ ), Function.identity());
+ Assert.assertEquals(one, result);
+ }
+
+ @Test
+ public void testEncodingStatsCollectWithNone()
+ {
+ qt().forAll(longs().between(Long.MIN_VALUE+1, Long.MAX_VALUE),
+ integers().between(0, Integer.MAX_VALUE-1),
+ integers().allPositive())
+ .asWithPrecursor(EncodingStats::new)
+ .check((timestamp, expires, ttl, stats) ->
+ {
+ EncodingStats result = EncodingStats.merge(ImmutableList.of(
+ EncodingStats.NO_STATS,
+ stats,
+ EncodingStats.NO_STATS
+ ), Function.identity());
+ return result.minTTL == ttl
+ && result.minLocalDeletionTime == expires
+ && result.minTimestamp == timestamp;
+ });
+ }
+
+}
diff --git a/test/unit/org/apache/cassandra/db/streaming/CassandraStreamManagerTest.java b/test/unit/org/apache/cassandra/db/streaming/CassandraStreamManagerTest.java
index b597bfe..eb15e9a 100644
--- a/test/unit/org/apache/cassandra/db/streaming/CassandraStreamManagerTest.java
+++ b/test/unit/org/apache/cassandra/db/streaming/CassandraStreamManagerTest.java
@@ -33,7 +33,7 @@ import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.Uninterruptibles;
-import org.apache.cassandra.locator.EndpointsForRange;
+
import org.apache.cassandra.locator.RangesAtEndpoint;
import org.junit.Assert;
import org.junit.Before;
@@ -195,7 +195,7 @@ public class CassandraStreamManagerTest
Collection<SSTableReader> allSSTables = cfs.getLiveSSTables();
Assert.assertEquals(1, allSSTables.size());
final Token firstToken = allSSTables.iterator().next().first.getToken();
- DatabaseDescriptor.setSSTablePreempiveOpenIntervalInMB(1);
+ DatabaseDescriptor.setSSTablePreemptiveOpenIntervalInMB(1);
Set<SSTableReader> sstablesBeforeRewrite = getReadersForRange(new Range<>(firstToken, firstToken));
Assert.assertEquals(1, sstablesBeforeRewrite.size());
@@ -227,7 +227,7 @@ public class CassandraStreamManagerTest
}
finally
{
- DatabaseDescriptor.setSSTablePreempiveOpenIntervalInMB(50);
+ DatabaseDescriptor.setSSTablePreemptiveOpenIntervalInMB(50);
done.set(true);
t.join(20);
}
diff --git a/test/unit/org/apache/cassandra/io/compress/CompressedSequentialWriterReopenTest.java b/test/unit/org/apache/cassandra/io/compress/CompressedSequentialWriterReopenTest.java
index 1bc3454..461c13c 100644
--- a/test/unit/org/apache/cassandra/io/compress/CompressedSequentialWriterReopenTest.java
+++ b/test/unit/org/apache/cassandra/io/compress/CompressedSequentialWriterReopenTest.java
@@ -28,7 +28,6 @@ import org.junit.Test;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.cql3.CQLTester;
-import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.utils.ByteBufferUtil;
import static org.junit.Assert.assertEquals;
@@ -85,7 +84,7 @@ public class CompressedSequentialWriterReopenTest extends CQLTester
execute("insert into %s (id, t) values (?, ?)", i, ByteBuffer.wrap(blob));
}
getCurrentColumnFamilyStore().forceBlockingFlush();
- DatabaseDescriptor.setSSTablePreempiveOpenIntervalInMB(1);
+ DatabaseDescriptor.setSSTablePreemptiveOpenIntervalInMB(1);
getCurrentColumnFamilyStore().forceMajorCompaction();
}
diff --git a/test/unit/org/apache/cassandra/schema/MockSchema.java b/test/unit/org/apache/cassandra/schema/MockSchema.java
index 9ca2d6e..dfa8731 100644
--- a/test/unit/org/apache/cassandra/schema/MockSchema.java
+++ b/test/unit/org/apache/cassandra/schema/MockSchema.java
@@ -23,6 +23,8 @@ import java.io.IOException;
import java.io.RandomAccessFile;
import java.util.*;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Consumer;
+import java.util.function.Function;
import com.google.common.collect.ImmutableSet;
@@ -148,20 +150,47 @@ public class MockSchema
public static ColumnFamilyStore newCFS(String ksname)
{
- String cfname = "mockcf" + (id.incrementAndGet());
- TableMetadata metadata = newTableMetadata(ksname, cfname);
- return new ColumnFamilyStore(ks, cfname, 0, new TableMetadataRef(metadata), new Directories(metadata), false, false, false);
+ return newCFS(newTableMetadata(ksname));
+ }
+
+ public static ColumnFamilyStore newCFS(Function<TableMetadata.Builder, TableMetadata.Builder> options)
+ {
+ return newCFS(ks.getName(), options);
+ }
+
+ public static ColumnFamilyStore newCFS(String ksname, Function<TableMetadata.Builder, TableMetadata.Builder> options)
+ {
+ return newCFS(options.apply(newTableMetadataBuilder(ksname)).build());
+ }
+
+ public static ColumnFamilyStore newCFS(TableMetadata metadata)
+ {
+ return new ColumnFamilyStore(ks, metadata.name, 0, new TableMetadataRef(metadata), new Directories(metadata), false, false, false);
+ }
+
+ public static TableMetadata newTableMetadata(String ksname)
+ {
+ return newTableMetadata(ksname, "mockcf" + (id.incrementAndGet()));
}
public static TableMetadata newTableMetadata(String ksname, String cfname)
{
+ return newTableMetadataBuilder(ksname, cfname).build();
+ }
+
+ public static TableMetadata.Builder newTableMetadataBuilder(String ksname)
+ {
+ return newTableMetadataBuilder(ksname, "mockcf" + (id.incrementAndGet()));
+ }
+
+ public static TableMetadata.Builder newTableMetadataBuilder(String ksname, String cfname)
+ {
return TableMetadata.builder(ksname, cfname)
.partitioner(Murmur3Partitioner.instance)
.addPartitionKeyColumn("key", UTF8Type.instance)
.addClusteringColumn("col", UTF8Type.instance)
.addRegularColumn("value", UTF8Type.instance)
- .caching(CachingParams.CACHE_NOTHING)
- .build();
+ .caching(CachingParams.CACHE_NOTHING);
}
public static BufferDecoratedKey readerBounds(long generation)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org