You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by cl...@apache.org on 2019/05/15 13:57:27 UTC

[cassandra] branch trunk updated: Reduce heap pressure during compactions Patch by Chris Lohfink; Reviewed by Dinesh Joshi and Benedict for CASSANDRA-14654

This is an automated email from the ASF dual-hosted git repository.

clohfink pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 7df67ef  Reduce heap pressure during compactions Patch by Chris Lohfink; Reviewed by Dinesh Joshi and Benedict for CASSANDRA-14654
7df67ef is described below

commit 7df67eff2d66dba4bed2b4f6aeabf05144d9b057
Author: Chris Lohfink <cl...@apple.com>
AuthorDate: Wed May 15 08:55:31 2019 -0500

    Reduce heap pressure during compactions
    Patch by Chris Lohfink; Reviewed by Dinesh Joshi and Benedict for CASSANDRA-14654
---
 CHANGES.txt                                        |   1 +
 src/java/org/apache/cassandra/config/Config.java   |   4 +-
 .../cassandra/config/DatabaseDescriptor.java       |  14 +-
 .../apache/cassandra/db/rows/EncodingStats.java    |  39 ++++-
 .../cassandra/db/rows/UnfilteredRowIterators.java  |  10 +-
 .../io/sstable/SSTableIdentityIterator.java        |   2 +-
 .../cassandra/io/sstable/SSTableRewriter.java      |  20 +--
 .../cassandra/io/sstable/format/SSTableReader.java |  24 +--
 .../io/sstable/format/big/BigTableReader.java      |   6 +-
 .../io/sstable/metadata/StatsMetadata.java         |   4 +
 .../apache/cassandra/service/StorageService.java   |  21 ++-
 .../cassandra/service/StorageServiceMBean.java     |   6 +
 .../unit/org/apache/cassandra/db/KeyCacheTest.java |   2 +-
 .../apache/cassandra/db/lifecycle/TrackerTest.java |  13 +-
 .../cassandra/db/rows/EncodingStatsTest.java       | 170 +++++++++++++++++++++
 .../db/streaming/CassandraStreamManagerTest.java   |   6 +-
 .../CompressedSequentialWriterReopenTest.java      |   3 +-
 .../org/apache/cassandra/schema/MockSchema.java    |  39 ++++-
 18 files changed, 320 insertions(+), 64 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 960ed64..3a98fa5 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.0
+ * Reduce heap pressure during compactions (CASSANDRA-14654)
  * Support building Cassandra with JDK 11 (CASSANDRA-15108)
  * Use quilt to patch cassandra.in.sh in Debian packaging (CASSANDRA-14710)
  * Take sstable references before calculating approximate key count (CASSANDRA-14647)
diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java
index 04ac608..a6050be 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -253,12 +253,14 @@ public class Config
     public int hints_flush_period_in_ms = 10000;
     public int max_hints_file_size_in_mb = 128;
     public ParameterizedClass hints_compression;
-    public int sstable_preemptive_open_interval_in_mb = 50;
 
     public volatile boolean incremental_backups = false;
     public boolean trickle_fsync = false;
     public int trickle_fsync_interval_in_kb = 10240;
 
+    public volatile int sstable_preemptive_open_interval_in_mb = 50;
+
+    public volatile boolean key_cache_migrate_during_compaction = true;
     public Long key_cache_size_in_mb = null;
     public volatile int key_cache_save_period = 14400;
     public volatile int key_cache_keys_to_save = Integer.MAX_VALUE;
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index e2c2ace..b3ab054 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -2241,11 +2241,21 @@ public class DatabaseDescriptor
         return conf.commitlog_total_space_in_mb;
     }
 
-    public static int getSSTablePreempiveOpenIntervalInMB()
+    public static boolean shouldMigrateKeycacheOnCompaction()
+    {
+        return conf.key_cache_migrate_during_compaction;
+    }
+
+    public static void setMigrateKeycacheOnCompaction(boolean migrateCacheEntry)
+    {
+        conf.key_cache_migrate_during_compaction = migrateCacheEntry;
+    }
+
+    public static int getSSTablePreemptiveOpenIntervalInMB()
     {
         return FBUtilities.isWindows ? -1 : conf.sstable_preemptive_open_interval_in_mb;
     }
-    public static void setSSTablePreempiveOpenIntervalInMB(int mb)
+    public static void setSSTablePreemptiveOpenIntervalInMB(int mb)
     {
         conf.sstable_preemptive_open_interval_in_mb = mb;
     }
diff --git a/src/java/org/apache/cassandra/db/rows/EncodingStats.java b/src/java/org/apache/cassandra/db/rows/EncodingStats.java
index 955ffc7..4a7bb19 100644
--- a/src/java/org/apache/cassandra/db/rows/EncodingStats.java
+++ b/src/java/org/apache/cassandra/db/rows/EncodingStats.java
@@ -19,6 +19,9 @@ package org.apache.cassandra.db.rows;
 
 import java.io.IOException;
 import java.util.*;
+import java.util.function.Function;
+
+import com.google.common.collect.Iterables;
 
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.partitions.PartitionStatisticsCollector;
@@ -41,7 +44,7 @@ public class EncodingStats
 {
     // Default values for the timestamp, deletion time and ttl. We use this both for NO_STATS, but also to serialize
     // an EncodingStats. Basically, we encode the diff of each value of to these epoch, which give values with better vint encoding.
-    private static final long TIMESTAMP_EPOCH;
+    public static final long TIMESTAMP_EPOCH;
     private static final int DELETION_TIME_EPOCH;
     private static final int TTL_EPOCH = 0;
     static
@@ -93,20 +96,42 @@ public class EncodingStats
     public EncodingStats mergeWith(EncodingStats that)
     {
         long minTimestamp = this.minTimestamp == TIMESTAMP_EPOCH
-                          ? that.minTimestamp
-                          : (that.minTimestamp == TIMESTAMP_EPOCH ? this.minTimestamp : Math.min(this.minTimestamp, that.minTimestamp));
+                            ? that.minTimestamp
+                            : (that.minTimestamp == TIMESTAMP_EPOCH ? this.minTimestamp : Math.min(this.minTimestamp, that.minTimestamp));
 
         int minDelTime = this.minLocalDeletionTime == DELETION_TIME_EPOCH
-                       ? that.minLocalDeletionTime
-                       : (that.minLocalDeletionTime == DELETION_TIME_EPOCH ? this.minLocalDeletionTime : Math.min(this.minLocalDeletionTime, that.minLocalDeletionTime));
+                         ? that.minLocalDeletionTime
+                         : (that.minLocalDeletionTime == DELETION_TIME_EPOCH ? this.minLocalDeletionTime : Math.min(this.minLocalDeletionTime, that.minLocalDeletionTime));
 
         int minTTL = this.minTTL == TTL_EPOCH
-                   ? that.minTTL
-                   : (that.minTTL == TTL_EPOCH ? this.minTTL : Math.min(this.minTTL, that.minTTL));
+                     ? that.minTTL
+                     : (that.minTTL == TTL_EPOCH ? this.minTTL : Math.min(this.minTTL, that.minTTL));
 
         return new EncodingStats(minTimestamp, minDelTime, minTTL);
     }
 
+    /**
+     * Merge one or more EncodingStats, that are lazily materialized from some list of arbitrary type by the provided function
+     */
+    public static <V, F extends Function<V, EncodingStats>> EncodingStats merge(List<V> values, F function)
+    {
+        if (values.size() == 1)
+            return function.apply(values.get(0));
+
+        Collector collector = new Collector();
+        for (V v : values)
+        {
+            EncodingStats stats = function.apply(v);
+            if (stats.minTimestamp != TIMESTAMP_EPOCH)
+                collector.updateTimestamp(stats.minTimestamp);
+            if(stats.minLocalDeletionTime != DELETION_TIME_EPOCH)
+                collector.updateLocalDeletionTime(stats.minLocalDeletionTime);
+            if(stats.minTTL != TTL_EPOCH)
+                collector.updateTTL(stats.minTTL);
+        }
+        return collector.get();
+    }
+
     @Override
     public boolean equals(Object o)
     {
diff --git a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java
index 42807a2..21e1954 100644
--- a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java
+++ b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java
@@ -403,7 +403,7 @@ public abstract class UnfilteredRowIterators
                   columns,
                   mergeStaticRows(iterators, columns.statics, listener, partitionDeletion),
                   reversed,
-                  mergeStats(iterators));
+                  EncodingStats.merge(iterators, UnfilteredRowIterator::stats));
 
             this.mergeIterator = MergeIterator.get(iterators,
                                                    reversed ? metadata.comparator.reversed() : metadata.comparator,
@@ -512,14 +512,6 @@ public abstract class UnfilteredRowIterators
                  : new RegularAndStaticColumns(statics, regulars);
         }
 
-        private static EncodingStats mergeStats(List<UnfilteredRowIterator> iterators)
-        {
-            EncodingStats stats = EncodingStats.NO_STATS;
-            for (UnfilteredRowIterator iter : iterators)
-                stats = stats.mergeWith(iter.stats());
-            return stats;
-        }
-
         protected Unfiltered computeNext()
         {
             while (mergeIterator.hasNext())
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java b/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
index a49e7b4..1846aa5 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
@@ -190,7 +190,7 @@ public class SSTableIdentityIterator implements Comparable<SSTableIdentityIterat
     {
         // We could return sstable.header.stats(), but this may not be as accurate than the actual sstable stats (see
         // SerializationHeader.make() for details) so we use the latter instead.
-        return new EncodingStats(sstable.getMinTimestamp(), sstable.getMinLocalDeletionTime(), sstable.getMinTTL());
+        return sstable.stats();
     }
 
     public int compareTo(SSTableIdentityIterator o)
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
index a71d1af..fb3aa2d 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
@@ -32,7 +32,6 @@ import org.apache.cassandra.db.lifecycle.ILifecycleTransaction;
 import org.apache.cassandra.db.rows.UnfilteredRowIterator;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.io.sstable.format.SSTableWriter;
-import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
 import org.apache.cassandra.utils.NativeLibrary;
 import org.apache.cassandra.utils.concurrent.Transactional;
 
@@ -117,7 +116,7 @@ public class SSTableRewriter extends Transactional.AbstractTransactional impleme
 
     private static long calculateOpenInterval(boolean shouldOpenEarly)
     {
-        long interval = DatabaseDescriptor.getSSTablePreempiveOpenIntervalInMB() * (1L << 20);
+        long interval = DatabaseDescriptor.getSSTablePreemptiveOpenIntervalInMB() * (1L << 20);
         if (disableEarlyOpeningForTests || !shouldOpenEarly || interval < 0)
             interval = Long.MAX_VALUE;
         return interval;
@@ -134,14 +133,17 @@ public class SSTableRewriter extends Transactional.AbstractTransactional impleme
         DecoratedKey key = partition.partitionKey();
         maybeReopenEarly(key);
         RowIndexEntry index = writer.append(partition);
-        if (!transaction.isOffline() && index != null)
+        if (DatabaseDescriptor.shouldMigrateKeycacheOnCompaction())
         {
-            for (SSTableReader reader : transaction.originals())
+            if (!transaction.isOffline() && index != null)
             {
-                if (reader.getCachedPosition(key, false) != null)
+                for (SSTableReader reader : transaction.originals())
                 {
-                    cachedKeys.put(key, index);
-                    break;
+                    if (reader.getCachedPosition(key, false) != null)
+                    {
+                        cachedKeys.put(key, index);
+                        break;
+                    }
                 }
             }
         }
@@ -223,9 +225,7 @@ public class SSTableRewriter extends Transactional.AbstractTransactional impleme
      */
     private void moveStarts(SSTableReader newReader, DecoratedKey lowerbound)
     {
-        if (transaction.isOffline())
-            return;
-        if (preemptiveOpenInterval == Long.MAX_VALUE)
+        if (transaction.isOffline() || preemptiveOpenInterval == Long.MAX_VALUE)
             return;
 
         newReader.setupOnline();
diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
index ca6eb85..e3059c8 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
@@ -699,17 +699,15 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
         // under normal operation we can do this at any time, but SSTR is also used outside C* proper,
         // e.g. by BulkLoader, which does not initialize the cache.  As a kludge, we set up the cache
         // here when we know we're being wired into the rest of the server infrastructure.
-        keyCache = CacheService.instance.keyCache;
+        InstrumentingCache<KeyCacheKey, RowIndexEntry> maybeKeyCache = CacheService.instance.keyCache;
+        if (maybeKeyCache.getCapacity() > 0)
+            keyCache = maybeKeyCache;
+
         final ColumnFamilyStore cfs = Schema.instance.getColumnFamilyStoreInstance(metadata().id);
         if (cfs != null)
             setCrcCheckChance(cfs.getCrcCheckChance());
     }
 
-    public boolean isKeyCacheSetup()
-    {
-        return keyCache != null;
-    }
-
     /**
      * See {@link #load(boolean, boolean)}
      * @param validation Metadata for SSTable being loaded
@@ -1534,12 +1532,14 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
 
     public RowIndexEntry getCachedPosition(DecoratedKey key, boolean updateStats)
     {
-        return getCachedPosition(new KeyCacheKey(metadata(), descriptor, key.getKey()), updateStats);
+        if (isKeyCacheEnabled())
+            return getCachedPosition(new KeyCacheKey(metadata(), descriptor, key.getKey()), updateStats);
+        return null;
     }
 
     protected RowIndexEntry getCachedPosition(KeyCacheKey unifiedKey, boolean updateStats)
     {
-        if (keyCacheEnabled())
+        if (isKeyCacheEnabled())
         {
             if (updateStats)
             {
@@ -1560,9 +1560,9 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
         return null;
     }
 
-    private boolean keyCacheEnabled()
+    public boolean isKeyCacheEnabled()
     {
-        return keyCache != null && keyCache.getCapacity() > 0 && metadata().params.caching.cacheKeys();
+        return keyCache != null && metadata().params.caching.cacheKeys();
     }
 
     /**
@@ -1830,7 +1830,7 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
             // hint read path about key location if caching is enabled
             // this saves index summary lookup and index file iteration which whould be pretty costly
             // especially in presence of promoted column indexes
-            if (isKeyCacheSetup())
+            if (isKeyCacheEnabled())
                 cacheKey(key, rowIndexEntrySerializer.deserialize(in, in.getFilePointer()));
         }
 
@@ -2101,7 +2101,7 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
     {
         // We could return sstable.header.stats(), but this may not be as accurate than the actual sstable stats (see
         // SerializationHeader.make() for details) so we use the latter instead.
-        return new EncodingStats(getMinTimestamp(), getMinLocalDeletionTime(), getMinTTL());
+        return sstableMetadata.encodingStats;
     }
 
     public Ref<SSTableReader> tryRef()
diff --git a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java
index a4787a2..c9ae431 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java
@@ -24,7 +24,6 @@ import java.util.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.cassandra.cache.KeyCacheKey;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.columniterator.SSTableIterator;
 import org.apache.cassandra.db.columniterator.SSTableReversedIterator;
@@ -155,9 +154,8 @@ public class BigTableReader extends SSTableReader
         // next, the key cache (only make sense for valid row key)
         if ((op == Operator.EQ || op == Operator.GE) && (key instanceof DecoratedKey))
         {
-            DecoratedKey decoratedKey = (DecoratedKey)key;
-            KeyCacheKey cacheKey = new KeyCacheKey(metadata(), descriptor, decoratedKey.getKey());
-            RowIndexEntry cachedPosition = getCachedPosition(cacheKey, updateCacheAndStats);
+            DecoratedKey decoratedKey = (DecoratedKey) key;
+            RowIndexEntry cachedPosition = getCachedPosition(decoratedKey, updateCacheAndStats);
             if (cachedPosition != null)
             {
                 listener.onSSTableSelected(this, cachedPosition, SelectionReason.KEY_CACHE_HIT);
diff --git a/src/java/org/apache/cassandra/io/sstable/metadata/StatsMetadata.java b/src/java/org/apache/cassandra/io/sstable/metadata/StatsMetadata.java
index 5b06ada..5d464fe 100755
--- a/src/java/org/apache/cassandra/io/sstable/metadata/StatsMetadata.java
+++ b/src/java/org/apache/cassandra/io/sstable/metadata/StatsMetadata.java
@@ -23,6 +23,7 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.UUID;
 
+import org.apache.cassandra.db.rows.EncodingStats;
 import org.apache.cassandra.io.ISerializer;
 import org.apache.cassandra.io.sstable.format.Version;
 import org.apache.commons.lang3.builder.EqualsBuilder;
@@ -65,6 +66,8 @@ public class StatsMetadata extends MetadataComponent
     public final long totalRows;
     public final UUID pendingRepair;
     public final boolean isTransient;
+    // just holds the current encoding stats to avoid allocating - it is not serialized
+    public final EncodingStats encodingStats;
 
     public StatsMetadata(EstimatedHistogram estimatedPartitionSize,
                          EstimatedHistogram estimatedColumnCount,
@@ -107,6 +110,7 @@ public class StatsMetadata extends MetadataComponent
         this.totalRows = totalRows;
         this.pendingRepair = pendingRepair;
         this.isTransient = isTransient;
+        this.encodingStats = new EncodingStats(minTimestamp, minLocalDeletionTime, minTTL);
     }
 
     public MetadataType getType()
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index 2707b85..eade7dd 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -18,7 +18,6 @@
 package org.apache.cassandra.service;
 
 import java.io.*;
-import java.lang.management.ManagementFactory;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.nio.ByteBuffer;
@@ -5347,6 +5346,26 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
         return DatabaseDescriptor.getPartitionerName();
     }
 
+    public void setSSTablePreemptiveOpenIntervalInMB(int intervalInMB)
+    {
+        DatabaseDescriptor.setSSTablePreemptiveOpenIntervalInMB(intervalInMB);
+    }
+
+    public int getSSTablePreemptiveOpenIntervalInMB()
+    {
+        return DatabaseDescriptor.getSSTablePreemptiveOpenIntervalInMB();
+    }
+
+    public boolean getMigrateKeycacheOnCompaction()
+    {
+        return DatabaseDescriptor.shouldMigrateKeycacheOnCompaction();
+    }
+
+    public void setMigrateKeycacheOnCompaction(boolean invalidateKeyCacheOnCompaction)
+    {
+        DatabaseDescriptor.setMigrateKeycacheOnCompaction(invalidateKeyCacheOnCompaction);
+    }
+
     public int getTombstoneWarnThreshold()
     {
         return DatabaseDescriptor.getTombstoneWarnThreshold();
diff --git a/src/java/org/apache/cassandra/service/StorageServiceMBean.java b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
index e74f002..3fef7f6 100644
--- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java
+++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
@@ -577,6 +577,12 @@ public interface StorageServiceMBean extends NotificationEmitter
     public int getConcurrentValidators();
     public void setConcurrentValidators(int value);
 
+    public int getSSTablePreemptiveOpenIntervalInMB();
+    public void setSSTablePreemptiveOpenIntervalInMB(int intervalInMB);
+
+    public boolean getMigrateKeycacheOnCompaction();
+    public void setMigrateKeycacheOnCompaction(boolean invalidateKeyCacheOnCompaction);
+
     public int getConcurrentViewBuilders();
     public void setConcurrentViewBuilders(int value);
 
diff --git a/test/unit/org/apache/cassandra/db/KeyCacheTest.java b/test/unit/org/apache/cassandra/db/KeyCacheTest.java
index d7a31d6..1819b18 100644
--- a/test/unit/org/apache/cassandra/db/KeyCacheTest.java
+++ b/test/unit/org/apache/cassandra/db/KeyCacheTest.java
@@ -287,7 +287,7 @@ public class KeyCacheTest
             throw new IllegalStateException();
 
         Util.compactAll(cfs, Integer.MAX_VALUE).get();
-        boolean noEarlyOpen = DatabaseDescriptor.getSSTablePreempiveOpenIntervalInMB() < 0;
+        boolean noEarlyOpen = DatabaseDescriptor.getSSTablePreemptiveOpenIntervalInMB() < 0;
 
         // after compaction cache should have entries for new SSTables,
         // but since we have kept a reference to the old sstables,
diff --git a/test/unit/org/apache/cassandra/db/lifecycle/TrackerTest.java b/test/unit/org/apache/cassandra/db/lifecycle/TrackerTest.java
index 2891126..522e59a 100644
--- a/test/unit/org/apache/cassandra/db/lifecycle/TrackerTest.java
+++ b/test/unit/org/apache/cassandra/db/lifecycle/TrackerTest.java
@@ -43,6 +43,7 @@ import org.apache.cassandra.db.commitlog.CommitLogPosition;
 import org.apache.cassandra.db.compaction.OperationType;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.notifications.*;
+import org.apache.cassandra.schema.CachingParams;
 import org.apache.cassandra.schema.MockSchema;
 import org.apache.cassandra.utils.concurrent.OpOrder;
 
@@ -146,7 +147,7 @@ public class TrackerTest
     @Test
     public void testAddInitialSSTables()
     {
-        ColumnFamilyStore cfs = MockSchema.newCFS();
+        ColumnFamilyStore cfs = MockSchema.newCFS(metadata -> metadata.caching(CachingParams.CACHE_KEYS));
         Tracker tracker = cfs.getTracker();
         List<SSTableReader> readers = ImmutableList.of(MockSchema.sstable(0, 17, cfs),
                                                        MockSchema.sstable(1, 121, cfs),
@@ -156,7 +157,7 @@ public class TrackerTest
         Assert.assertEquals(3, tracker.view.get().sstables.size());
 
         for (SSTableReader reader : readers)
-            Assert.assertTrue(reader.isKeyCacheSetup());
+            Assert.assertTrue(reader.isKeyCacheEnabled());
 
         Assert.assertEquals(17 + 121 + 9, cfs.metric.liveDiskSpaceUsed.getCount());
     }
@@ -166,7 +167,7 @@ public class TrackerTest
     {
         boolean backups = DatabaseDescriptor.isIncrementalBackupsEnabled();
         DatabaseDescriptor.setIncrementalBackupsEnabled(false);
-        ColumnFamilyStore cfs = MockSchema.newCFS();
+        ColumnFamilyStore cfs = MockSchema.newCFS(metadata -> metadata.caching(CachingParams.CACHE_KEYS));
         Tracker tracker = cfs.getTracker();
         MockListener listener = new MockListener(false);
         tracker.subscribe(listener);
@@ -178,7 +179,7 @@ public class TrackerTest
         Assert.assertEquals(3, tracker.view.get().sstables.size());
 
         for (SSTableReader reader : readers)
-            Assert.assertTrue(reader.isKeyCacheSetup());
+            Assert.assertTrue(reader.isKeyCacheEnabled());
 
         Assert.assertEquals(17 + 121 + 9, cfs.metric.liveDiskSpaceUsed.getCount());
         Assert.assertEquals(1, listener.senders.size());
@@ -263,7 +264,7 @@ public class TrackerTest
     {
         boolean backups = DatabaseDescriptor.isIncrementalBackupsEnabled();
         DatabaseDescriptor.setIncrementalBackupsEnabled(false);
-        ColumnFamilyStore cfs = MockSchema.newCFS();
+        ColumnFamilyStore cfs = MockSchema.newCFS(metadata -> metadata.caching(CachingParams.CACHE_KEYS));
         MockListener listener = new MockListener(false);
         Tracker tracker = cfs.getTracker();
         tracker.subscribe(listener);
@@ -308,7 +309,7 @@ public class TrackerTest
         Assert.assertEquals(singleton(reader), ((SSTableAddedNotification) listener.received.get(1)).added);
         Assert.assertEquals(Optional.of(prev2), ((SSTableAddedNotification) listener.received.get(1)).memtable());
         listener.received.clear();
-        Assert.assertTrue(reader.isKeyCacheSetup());
+        Assert.assertTrue(reader.isKeyCacheEnabled());
         Assert.assertEquals(10, cfs.metric.liveDiskSpaceUsed.getCount());
 
         // test invalidated CFS
diff --git a/test/unit/org/apache/cassandra/db/rows/EncodingStatsTest.java b/test/unit/org/apache/cassandra/db/rows/EncodingStatsTest.java
new file mode 100644
index 0000000..1ac092a
--- /dev/null
+++ b/test/unit/org/apache/cassandra/db/rows/EncodingStatsTest.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.rows;
+
+import java.util.function.Function;
+
+import com.google.common.collect.ImmutableList;
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.cassandra.db.LivenessInfo;
+
+import static org.quicktheories.QuickTheory.qt;
+import static org.quicktheories.generators.SourceDSL.integers;
+import static org.quicktheories.generators.SourceDSL.longs;
+
+public class EncodingStatsTest
+{
+    @Test
+    public void testCollectWithNoStats()
+    {
+        EncodingStats none = EncodingStats.merge(ImmutableList.of(
+            EncodingStats.NO_STATS,
+            EncodingStats.NO_STATS,
+            EncodingStats.NO_STATS
+        ), Function.identity());
+        Assert.assertEquals(none, EncodingStats.NO_STATS);
+    }
+
+    @Test
+    public void testCollectWithNoStatsWithEmpty()
+    {
+        EncodingStats none = EncodingStats.merge(ImmutableList.of(
+            EncodingStats.NO_STATS,
+            EncodingStats.NO_STATS,
+            new EncodingStats(LivenessInfo.NO_TIMESTAMP, LivenessInfo.NO_EXPIRATION_TIME, 0)
+        ), Function.identity());
+        Assert.assertEquals(none, EncodingStats.NO_STATS);
+    }
+
+    @Test
+    public void testCollectWithNoStatsWithTimestamp()
+    {
+        EncodingStats single = new EncodingStats(1, LivenessInfo.NO_EXPIRATION_TIME, 0);
+        EncodingStats result = EncodingStats.merge(ImmutableList.of(
+            EncodingStats.NO_STATS,
+            EncodingStats.NO_STATS,
+            single,
+            EncodingStats.NO_STATS
+        ), Function.identity());
+        Assert.assertEquals(single, result);
+    }
+
+    @Test
+    public void testCollectWithNoStatsWithExpires()
+    {
+        EncodingStats single = new EncodingStats(LivenessInfo.NO_TIMESTAMP, 1, 0);
+        EncodingStats result = EncodingStats.merge(ImmutableList.of(
+        EncodingStats.NO_STATS,
+        single,
+        EncodingStats.NO_STATS
+        ), Function.identity());
+        Assert.assertEquals(single, result);
+    }
+
+    @Test
+    public void testCollectWithNoStatsWithTTL()
+    {
+        EncodingStats single = new EncodingStats(LivenessInfo.NO_TIMESTAMP, LivenessInfo.NO_EXPIRATION_TIME, 1);
+        EncodingStats result = EncodingStats.merge(ImmutableList.of(
+            EncodingStats.NO_STATS,
+            single,
+            EncodingStats.NO_STATS
+        ), Function.identity());
+        Assert.assertEquals(single, result);
+    }
+
+    @Test
+    public void testCollectOneEach()
+    {
+        EncodingStats tsp = new EncodingStats(1, LivenessInfo.NO_EXPIRATION_TIME, 0);
+        EncodingStats exp = new EncodingStats(LivenessInfo.NO_TIMESTAMP, 1, 0);
+        EncodingStats ttl = new EncodingStats(LivenessInfo.NO_TIMESTAMP, LivenessInfo.NO_EXPIRATION_TIME, 1);
+        EncodingStats result = EncodingStats.merge(ImmutableList.of(
+            tsp,
+            exp,
+            ttl
+        ), Function.identity());
+        Assert.assertEquals(new EncodingStats(1, 1, 1), result);
+    }
+
+    @Test
+    public void testTimestamp()
+    {
+        EncodingStats one = new EncodingStats(1, LivenessInfo.NO_EXPIRATION_TIME, 0);
+        EncodingStats two = new EncodingStats(2, LivenessInfo.NO_EXPIRATION_TIME, 0);
+        EncodingStats thr = new EncodingStats(3, LivenessInfo.NO_EXPIRATION_TIME, 0);
+        EncodingStats result = EncodingStats.merge(ImmutableList.of(
+            one,
+            two,
+            thr
+        ), Function.identity());
+        Assert.assertEquals(one, result);
+    }
+
+    @Test
+    public void testExpires()
+    {
+        EncodingStats one = new EncodingStats(LivenessInfo.NO_TIMESTAMP,1, 0);
+        EncodingStats two = new EncodingStats(LivenessInfo.NO_TIMESTAMP,2, 0);
+        EncodingStats thr = new EncodingStats(LivenessInfo.NO_TIMESTAMP,3, 0);
+        EncodingStats result = EncodingStats.merge(ImmutableList.of(
+            one,
+            two,
+            thr
+        ), Function.identity());
+        Assert.assertEquals(one, result);
+    }
+
+    @Test
+    public void testTTL()
+    {
+        EncodingStats one = new EncodingStats(LivenessInfo.NO_TIMESTAMP, LivenessInfo.NO_EXPIRATION_TIME,1);
+        EncodingStats two = new EncodingStats(LivenessInfo.NO_TIMESTAMP, LivenessInfo.NO_EXPIRATION_TIME,2);
+        EncodingStats thr = new EncodingStats(LivenessInfo.NO_TIMESTAMP, LivenessInfo.NO_EXPIRATION_TIME,3);
+        EncodingStats result = EncodingStats.merge(ImmutableList.of(
+            thr,
+            one,
+            two
+        ), Function.identity());
+        Assert.assertEquals(one, result);
+    }
+
+    @Test
+    public void testEncodingStatsCollectWithNone()
+    {
+        qt().forAll(longs().between(Long.MIN_VALUE+1, Long.MAX_VALUE),
+                    integers().between(0, Integer.MAX_VALUE-1),
+                    integers().allPositive())
+            .asWithPrecursor(EncodingStats::new)
+            .check((timestamp, expires, ttl, stats) ->
+                   {
+                       EncodingStats result = EncodingStats.merge(ImmutableList.of(
+                           EncodingStats.NO_STATS,
+                           stats,
+                           EncodingStats.NO_STATS
+                       ), Function.identity());
+                       return result.minTTL == ttl
+                              && result.minLocalDeletionTime == expires
+                              && result.minTimestamp == timestamp;
+                   });
+    }
+
+}
diff --git a/test/unit/org/apache/cassandra/db/streaming/CassandraStreamManagerTest.java b/test/unit/org/apache/cassandra/db/streaming/CassandraStreamManagerTest.java
index b597bfe..eb15e9a 100644
--- a/test/unit/org/apache/cassandra/db/streaming/CassandraStreamManagerTest.java
+++ b/test/unit/org/apache/cassandra/db/streaming/CassandraStreamManagerTest.java
@@ -33,7 +33,7 @@ import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.Uninterruptibles;
-import org.apache.cassandra.locator.EndpointsForRange;
+
 import org.apache.cassandra.locator.RangesAtEndpoint;
 import org.junit.Assert;
 import org.junit.Before;
@@ -195,7 +195,7 @@ public class CassandraStreamManagerTest
         Collection<SSTableReader> allSSTables = cfs.getLiveSSTables();
         Assert.assertEquals(1, allSSTables.size());
         final Token firstToken = allSSTables.iterator().next().first.getToken();
-        DatabaseDescriptor.setSSTablePreempiveOpenIntervalInMB(1);
+        DatabaseDescriptor.setSSTablePreemptiveOpenIntervalInMB(1);
 
         Set<SSTableReader> sstablesBeforeRewrite = getReadersForRange(new Range<>(firstToken, firstToken));
         Assert.assertEquals(1, sstablesBeforeRewrite.size());
@@ -227,7 +227,7 @@ public class CassandraStreamManagerTest
         }
         finally
         {
-            DatabaseDescriptor.setSSTablePreempiveOpenIntervalInMB(50);
+            DatabaseDescriptor.setSSTablePreemptiveOpenIntervalInMB(50);
             done.set(true);
             t.join(20);
         }
diff --git a/test/unit/org/apache/cassandra/io/compress/CompressedSequentialWriterReopenTest.java b/test/unit/org/apache/cassandra/io/compress/CompressedSequentialWriterReopenTest.java
index 1bc3454..461c13c 100644
--- a/test/unit/org/apache/cassandra/io/compress/CompressedSequentialWriterReopenTest.java
+++ b/test/unit/org/apache/cassandra/io/compress/CompressedSequentialWriterReopenTest.java
@@ -28,7 +28,6 @@ import org.junit.Test;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.cql3.CQLTester;
-import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.utils.ByteBufferUtil;
 
 import static org.junit.Assert.assertEquals;
@@ -85,7 +84,7 @@ public class CompressedSequentialWriterReopenTest extends CQLTester
             execute("insert into %s (id, t) values (?, ?)", i, ByteBuffer.wrap(blob));
         }
         getCurrentColumnFamilyStore().forceBlockingFlush();
-        DatabaseDescriptor.setSSTablePreempiveOpenIntervalInMB(1);
+        DatabaseDescriptor.setSSTablePreemptiveOpenIntervalInMB(1);
         getCurrentColumnFamilyStore().forceMajorCompaction();
     }
 
diff --git a/test/unit/org/apache/cassandra/schema/MockSchema.java b/test/unit/org/apache/cassandra/schema/MockSchema.java
index 9ca2d6e..dfa8731 100644
--- a/test/unit/org/apache/cassandra/schema/MockSchema.java
+++ b/test/unit/org/apache/cassandra/schema/MockSchema.java
@@ -23,6 +23,8 @@ import java.io.IOException;
 import java.io.RandomAccessFile;
 import java.util.*;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Consumer;
+import java.util.function.Function;
 
 import com.google.common.collect.ImmutableSet;
 
@@ -148,20 +150,47 @@ public class MockSchema
 
     public static ColumnFamilyStore newCFS(String ksname)
     {
-        String cfname = "mockcf" + (id.incrementAndGet());
-        TableMetadata metadata = newTableMetadata(ksname, cfname);
-        return new ColumnFamilyStore(ks, cfname, 0, new TableMetadataRef(metadata), new Directories(metadata), false, false, false);
+        return newCFS(newTableMetadata(ksname));
+    }
+
+    public static ColumnFamilyStore newCFS(Function<TableMetadata.Builder, TableMetadata.Builder> options)
+    {
+        return newCFS(ks.getName(), options);
+    }
+
+    public static ColumnFamilyStore newCFS(String ksname, Function<TableMetadata.Builder, TableMetadata.Builder> options)
+    {
+        return newCFS(options.apply(newTableMetadataBuilder(ksname)).build());
+    }
+
+    public static ColumnFamilyStore newCFS(TableMetadata metadata)
+    {
+        return new ColumnFamilyStore(ks, metadata.name, 0, new TableMetadataRef(metadata), new Directories(metadata), false, false, false);
+    }
+
+    public static TableMetadata newTableMetadata(String ksname)
+    {
+        return newTableMetadata(ksname, "mockcf" + (id.incrementAndGet()));
     }
 
     public static TableMetadata newTableMetadata(String ksname, String cfname)
     {
+        return newTableMetadataBuilder(ksname, cfname).build();
+    }
+
+    public static TableMetadata.Builder newTableMetadataBuilder(String ksname)
+    {
+        return newTableMetadataBuilder(ksname, "mockcf" + (id.incrementAndGet()));
+    }
+
+    public static TableMetadata.Builder newTableMetadataBuilder(String ksname, String cfname)
+    {
         return TableMetadata.builder(ksname, cfname)
                             .partitioner(Murmur3Partitioner.instance)
                             .addPartitionKeyColumn("key", UTF8Type.instance)
                             .addClusteringColumn("col", UTF8Type.instance)
                             .addRegularColumn("value", UTF8Type.instance)
-                            .caching(CachingParams.CACHE_NOTHING)
-                            .build();
+                            .caching(CachingParams.CACHE_NOTHING);
     }
 
     public static BufferDecoratedKey readerBounds(long generation)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org