You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ad...@apache.org on 2021/03/01 12:52:04 UTC

[cassandra] 01/01: Merge branch 'cassandra-3.11' into trunk

This is an automated email from the ASF dual-hosted git repository.

adelapena pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit d1f3d40afc5d20bab70c6200508baa3cd9409458
Merge: b063f30 0541c51
Author: Andrés de la Peña <a....@gmail.com>
AuthorDate: Mon Mar 1 12:48:32 2021 +0000

    Merge branch 'cassandra-3.11' into trunk

 CHANGES.txt                                        |   1 +
 .../cassandra/cql3/statements/CQL3CasRequest.java  |  27 +-
 .../org/apache/cassandra/db/SystemKeyspace.java    |   4 +-
 .../apache/cassandra/db/filter/ColumnFilter.java   | 205 ++++---
 src/java/org/apache/cassandra/gms/Gossiper.java    |  70 ++-
 .../repair/SystemDistributedKeyspace.java          |   2 +-
 .../apache/cassandra/tracing/TraceKeyspace.java    |   4 +-
 .../apache/cassandra/utils/CassandraVersion.java   |  14 +
 .../cassandra/utils/ExpiringMemoizingSupplier.java |   7 +
 .../test/ReadDigestConsistencyTest.java            | 146 +++++
 .../distributed/upgrade/MixedModeReadTest.java     |  72 +--
 test/unit/org/apache/cassandra/Util.java           |  17 +
 .../operations/InsertUpdateIfConditionTest.java    |  50 +-
 .../cassandra/db/filter/ColumnFilterTest.java      | 676 +++++++++++++--------
 .../org/apache/cassandra/gms/GossiperTest.java     |  29 +-
 15 files changed, 885 insertions(+), 439 deletions(-)

diff --cc CHANGES.txt
index 4dfa9d9,6bd40ad..95028a6
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,40 -1,8 +1,41 @@@
 -3.11.11
 +4.0-beta5
 + * Prevent parent repair sessions leak (CASSANDRA-16446)
 + * Fix timestamp issue in SinglePartitionSliceCommandTest testPartitionD…eletionRowDeletionTie (CASSANDRA-16443)
 + * Promote protocol V5 out of beta (CASSANDRA-14973)
 + * Fix incorrect encoding for strings can be UTF8 (CASSANDRA-16429)
 + * Fix node unable to join when RF > N in multi-DC with added warning (CASSANDRA-16296)
 + * Add an option to nodetool tablestats to check sstable location correctness (CASSANDRA-16344) 
 + * Unable to ALTER KEYSPACE while decommissioned/assassinated nodes are in gossip (CASSANDRA-16422)
 + * Metrics backward compatibility restored after CASSANDRA-15066 (CASSANDRA-16083)
 + * Reduce new reserved keywords introduced since 3.0 (CASSANDRA-16439)
 + * Improve system tables handling in case of disk failures (CASSANDRA-14793)
 + * Add access and datacenters to unreserved keywords (CASSANDRA-16398)
 + * Fix nodetool ring, status output when DNS resolution or port printing are in use (CASSANDRA-16283)
 + * Upgrade Jacoco to 0.8.6 (for Java 11 support) (CASSANDRA-16365)
 + * Move excessive repair debug loggings to trace level (CASSANDRA-16406)
 + * Restore validation of each message's protocol version (CASSANDRA-16374)
 + * Upgrade netty and chronicle-queue dependencies to get Auditing and native library loading working on arm64 architectures (CASSANDRA-16384,CASSANDRA-16392)
 + * Release StreamingTombstoneHistogramBuilder spool when switching writers (CASSANDRA-14834)
 + * Correct memtable on-heap size calculations to match actual use (CASSANDRA-16318)
 + * Fix client notifications in CQL protocol v5 (CASSANDRA-16353)
 + * Too defensive check when picking sstables for preview repair (CASSANDRA-16284)
 + * Ensure pre-negotiation native protocol responses have correct stream id (CASSANDRA-16376)
 + * Fix check for -Xlog in cassandra-env.sh (CASSANDRA-16279)
 + * SSLFactory should initialize SSLContext before setting protocols (CASSANDRA-16362)
 + * Restore sasi dependencies jflex, snowball-stemmer, and concurrent-trees, in the cassandra-all pom (CASSANDRA-16303)
 + * Fix DecimalDeserializer#toString OOM (CASSANDRA-14925)
 +Merged from 3.11:
 + * Fix digest computation for queries with fetched but non queried columns (CASSANDRA-15962)
 + * Reduce amount of allocations during batch statement execution (CASSANDRA-16201)
 + * Update jflex-1.6.0.jar to match upstream (CASSANDRA-16393)
  Merged from 3.0:
+  * Fix ColumnFilter behaviour to prevent digest mitmatches during upgrades (CASSANDRA-16415)
   * Update debian packaging for python3 (CASSANDRA-16396)
   * Avoid pushing schema mutations when setting up distributed system keyspaces locally (CASSANDRA-16387)
 + * Prevent unbounded number of pending flushing tasks (CASSANDRA-16261)
 + * Improve empty hint file handling during startup (CASSANDRA-16162)
 + * Fix skipping on pre-3.0 created compact storage sstables due to missing primary key liveness (CASSANDRA-16226)
 + * Allow empty string in collections with COPY FROM in cqlsh (CASSANDRA-16372)
  Merged from 2.2:
   * Make TokenMetadata's ring version increments atomic (CASSANDRA-16286)
  
diff --cc src/java/org/apache/cassandra/cql3/statements/CQL3CasRequest.java
index d61381d,47920a4..563a639
--- a/src/java/org/apache/cassandra/cql3/statements/CQL3CasRequest.java
+++ b/src/java/org/apache/cassandra/cql3/statements/CQL3CasRequest.java
@@@ -162,26 -161,21 +162,21 @@@ public class CQL3CasRequest implements 
          }
      }
  
 -    private PartitionColumns columnsToRead()
 +    private RegularAndStaticColumns columnsToRead()
      {
-         // If all our conditions are columns conditions (IF x = ?), then it's enough to query
-         // the columns from the conditions. If we have a IF EXISTS or IF NOT EXISTS however,
-         // we need to query all columns for the row since if the condition fails, we want to
-         // return everything to the user. Static columns make this a bit more complex, in that
-         // if an insert only static columns, then the existence condition applies only to the
-         // static columns themselves, and so we don't want to include regular columns in that
-         // case.
-         if (hasExists)
-         {
-             RegularAndStaticColumns allColumns = metadata.regularAndStaticColumns();
-             Columns statics = updatesStaticRow ? allColumns.statics : Columns.NONE;
-             Columns regulars = updatesRegularRows ? allColumns.regulars : Columns.NONE;
-             return new RegularAndStaticColumns(statics, regulars);
-         }
-         return conditionColumns;
 -        PartitionColumns allColumns = cfm.partitionColumns();
++        RegularAndStaticColumns allColumns = metadata.regularAndStaticColumns();
+ 
+         // If we update static row, we won't have any conditions on regular rows.
+         // If we update regular row, we have to fetch all regular rows (which would satisfy column condition) and
+         // static rows that take part in column condition.
+         // In both cases, we're fetching enough rows to distinguish between "all conditions are nulls" and "row does not exist".
+         // We have to do this as we can't rely on row marker for that (see #6623)
+         Columns statics = updatesStaticRow ? allColumns.statics : conditionColumns.statics;
+         Columns regulars = updatesRegularRows ? allColumns.regulars : conditionColumns.regulars;
 -        return new PartitionColumns(statics, regulars);
++        return new RegularAndStaticColumns(statics, regulars);
      }
  
 -    public SinglePartitionReadCommand readCommand(int nowInSec)
 +    public SinglePartitionReadQuery readCommand(int nowInSec)
      {
          assert staticConditions != null || !conditions.isEmpty();
  
diff --cc src/java/org/apache/cassandra/db/SystemKeyspace.java
index 278541d,196face..25078b8
--- a/src/java/org/apache/cassandra/db/SystemKeyspace.java
+++ b/src/java/org/apache/cassandra/db/SystemKeyspace.java
@@@ -90,6 -91,6 +90,8 @@@ public final class SystemKeyspac
      // Cassandra was not previously installed and we're in the process of starting a fresh node.
      public static final CassandraVersion NULL_VERSION = new CassandraVersion("0.0.0-absent");
  
++    public static final CassandraVersion CURRENT_VERSION = new CassandraVersion(FBUtilities.getReleaseVersionString());
++
      public static final String BATCHES = "batches";
      public static final String PAXOS = "paxos";
      public static final String BUILT_INDEXES = "IndexInfo";
@@@ -924,12 -941,12 +926,12 @@@
      {
          try
          {
 -            if (FBUtilities.getBroadcastAddress().equals(ep))
 +            if (FBUtilities.getBroadcastAddressAndPort().equals(ep))
              {
--                return new CassandraVersion(FBUtilities.getReleaseVersionString());
++                return CURRENT_VERSION;
              }
 -            String req = "SELECT release_version FROM system.%s WHERE peer=?";
 -            UntypedResultSet result = executeInternal(String.format(req, PEERS), ep);
 +            String req = "SELECT release_version FROM system.%s WHERE peer=? AND peer_port=?";
 +            UntypedResultSet result = executeInternal(String.format(req, PEERS_V2), ep.address, ep.port);
              if (result != null && result.one().has("release_version"))
              {
                  return new CassandraVersion(result.one().getString("release_version"));
diff --cc src/java/org/apache/cassandra/db/filter/ColumnFilter.java
index b2ffa52,f405431..8ca2ccc
--- a/src/java/org/apache/cassandra/db/filter/ColumnFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/ColumnFilter.java
@@@ -20,22 -20,21 +20,25 @@@ package org.apache.cassandra.db.filter
  import java.io.IOException;
  import java.util.*;
  
 +import com.google.common.annotations.VisibleForTesting;
- import com.google.common.collect.Iterables;
 +import com.google.common.collect.Iterators;
  import com.google.common.collect.SortedSetMultimap;
  import com.google.common.collect.TreeMultimap;
  
+ import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
+ 
 -import org.apache.cassandra.config.CFMetaData;
  import org.apache.cassandra.cql3.ColumnIdentifier;
  import org.apache.cassandra.db.*;
 +import org.apache.cassandra.db.rows.Cell;
  import org.apache.cassandra.db.rows.CellPath;
 -import org.apache.cassandra.config.ColumnDefinition;
  import org.apache.cassandra.gms.Gossiper;
  import org.apache.cassandra.io.util.DataInputPlus;
  import org.apache.cassandra.io.util.DataOutputPlus;
  import org.apache.cassandra.net.MessagingService;
 +import org.apache.cassandra.schema.ColumnMetadata;
 +import org.apache.cassandra.schema.TableMetadata;
++import org.apache.cassandra.utils.CassandraVersion;
  
  /**
   * Represents which (non-PK) columns (and optionally which sub-part of a column for complex columns) are selected
@@@ -66,67 -65,37 +69,127 @@@
   */
  public class ColumnFilter
  {
+     private final static Logger logger = LoggerFactory.getLogger(ColumnFilter.class);
+ 
 +    public static final ColumnFilter NONE = selection(RegularAndStaticColumns.NONE);
 +
      public static final Serializer serializer = new Serializer();
  
 -    // True if _fetched_ is all the columns, in which case metadata must not be null. If false,
 -    // then _fetched_ == _queried_ and we only store _queried_.
 -    private final boolean isFetchAll;
 +    // True if _fetched_ includes all regular columns (and any static in _queried_), in which case metadata must not be
 +    // null. If false, then _fetched_ == _queried_ and we only store _queried_.
++    @VisibleForTesting
 +    final boolean fetchAllRegulars;
 +
++    // This flag can be only set when fetchAllRegulars is set. When fetchAllRegulars is set and queried==null then
++    // it is implied to be true. The flag when set allows for interpreting the column filter in the same way as it was
++    // interpreted by pre 4.0 Cassandra versions (3.4 ~ 4.0), that is, we fetch all columns (both regulars and static)
++    // but we query only some of them. This allows for proper behaviour during upgrades.
++    private final boolean fetchAllStatics;
++
++    @VisibleForTesting
 +    final RegularAndStaticColumns fetched;
-     final RegularAndStaticColumns queried; // can be null if fetchAllRegulars, to represent a wildcard query (all
-                                            // static and regular columns are both _fetched_ and _queried_).
+ 
 -    private final PartitionColumns fetched;
 -    private final PartitionColumns queried; // can be null if isFetchAll and _fetched_ == _queried_
++    private final RegularAndStaticColumns queried; // can be null if fetchAllRegulars, to represent a wildcard query (all
++
++    // static and regular columns are both _fetched_ and _queried_).
      private final SortedSetMultimap<ColumnIdentifier, ColumnSubselection> subSelections; // can be null
  
 -    private ColumnFilter(boolean isFetchAll,
 -                         PartitionColumns fetched,
 -                         PartitionColumns queried,
 +    private ColumnFilter(boolean fetchAllRegulars,
++                         boolean fetchAllStatics,
 +                         TableMetadata metadata,
 +                         RegularAndStaticColumns queried,
                           SortedSetMultimap<ColumnIdentifier, ColumnSubselection> subSelections)
      {
 -        assert !isFetchAll || fetched != null;
 -        assert isFetchAll || queried != null;
 -        this.isFetchAll = isFetchAll;
 -        this.fetched = isFetchAll ? fetched : queried;
 +        assert !fetchAllRegulars || metadata != null;
 +        assert fetchAllRegulars || queried != null;
++        assert !fetchAllStatics || fetchAllRegulars;
 +        this.fetchAllRegulars = fetchAllRegulars;
++        this.fetchAllStatics = fetchAllStatics || fetchAllRegulars && queried == null;
 +
 +        if (fetchAllRegulars)
 +        {
 +            RegularAndStaticColumns all = metadata.regularAndStaticColumns();
 +
-             this.fetched = (all.statics.isEmpty() || queried == null)
++            this.fetched = (all.statics.isEmpty() || queried == null || fetchAllStatics)
 +                           ? all
 +                           : new RegularAndStaticColumns(queried.statics, all.regulars);
 +        }
 +        else
 +        {
 +            this.fetched = queried;
 +        }
 +
          this.queried = queried;
          this.subSelections = subSelections;
      }
  
      /**
 +     * Used on replica for deserialisation
 +     */
 +    private ColumnFilter(boolean fetchAllRegulars,
++                         boolean fetchAllStatics,
 +                         RegularAndStaticColumns fetched,
 +                         RegularAndStaticColumns queried,
 +                         SortedSetMultimap<ColumnIdentifier, ColumnSubselection> subSelections)
 +    {
 +        assert !fetchAllRegulars || fetched != null;
 +        assert fetchAllRegulars || queried != null;
++        assert !fetchAllStatics || fetchAllRegulars;
 +        this.fetchAllRegulars = fetchAllRegulars;
++        this.fetchAllStatics = fetchAllStatics || fetchAllRegulars && queried == null;
 +        this.fetched = fetchAllRegulars ? fetched : queried;
 +        this.queried = queried;
 +        this.subSelections = subSelections;
 +    }
 +
 +    /**
++     * Returns true if all static columns should be fetched along with all regular columns (it only makes sense to call
++     * this method if fetchAllRegulars is going to be true and queried != null).
++     *
++     * We have to apply this conversion when there are pre-4.0 nodes in the cluster because they interpret
++     * the ColumnFilter with fetchAllRegulars (translated to fetchAll in pre 4.0) and queried != null so that all
++     * the columns are fetched (both regular and static) and just some of them are queried. In 4.0+ with the same
++     * scenario, all regulars are fetched and only those statics which are queried. We need to apply the conversion
++     * so that the retrieved data is the same (note that non-queried columns may have skipped values or may not be
++     * included at all).
++     */
++    private static boolean shouldFetchAllStatics()
++    {
++        if (Gossiper.instance.isUpgradingFromVersionLowerThan(CassandraVersion.CASSANDRA_4_0))
++        {
++            logger.trace("ColumnFilter conversion has been applied so that all static columns will be fetched because there are pre 4.0 nodes in the cluster");
++            return true;
++        }
++        return false;
++    }
++
++    /**
++     * Returns true if we want to consider all fetched columns as queried as well (it only makes sense to call
++     * this method if fetchAllRegulars is going to be true).
++     *
++     * We have to apply this conversion when there are pre-3.4 (in particular, pre CASSANDRA-10657) nodes in the cluster
++     * because they interpret the ColumnFilter with fetchAllRegulars (translated to fetchAll in pre 4.0) so that all
++     * fetched columns are queried. In 3.4+ with the same scenario, all the columns are fetched
++     * (though see {@link #shouldFetchAllStatics()}) but queried columns are taken into account in the way that we may
++     * skip values or whole cells when reading data. We need to apply the conversion so that the retrieved data is
++     * the same.
++     */
++    private static boolean shouldQueriedBeNull()
++    {
++        if (Gossiper.instance.isUpgradingFromVersionLowerThan(CassandraVersion.CASSANDRA_3_4))
++        {
++            logger.trace("ColumnFilter conversion has been applied so that all columns will be queried because there are pre 3.4 nodes in the cluster");
++            return true;
++        }
++        return false;
++    }
++
++    /**
       * A filter that includes all columns for the provided table.
       */
 -    public static ColumnFilter all(CFMetaData metadata)
 +    public static ColumnFilter all(TableMetadata metadata)
      {
-         return new ColumnFilter(true, metadata, null, null);
 -        return new ColumnFilter(true, metadata.partitionColumns(), null, null);
++        return new ColumnFilter(true, true, metadata, null, null);
      }
  
      /**
@@@ -136,18 -105,30 +199,18 @@@
       * preserve CQL semantic (see class javadoc). This is ok for some internal queries however (and
       * for #6588 if/when we implement it).
       */
 -    public static ColumnFilter selection(PartitionColumns columns)
 +    public static ColumnFilter selection(RegularAndStaticColumns columns)
      {
-         return new ColumnFilter(false, (TableMetadata) null, columns, null);
 -        return new ColumnFilter(false, null, columns, null);
++        return new ColumnFilter(false, false, (TableMetadata) null, columns, null);
      }
  
--	/**
++    /**
       * A filter that fetches all columns for the provided table, but returns
       * only the queried ones.
       */
 -    public static ColumnFilter selection(CFMetaData metadata, PartitionColumns queried)
 +    public static ColumnFilter selection(TableMetadata metadata, RegularAndStaticColumns queried)
      {
-         return new ColumnFilter(true, metadata, queried, null);
 -        // When fetchAll is enabled on pre CASSANDRA-10657 (3.4-), queried columns are not considered at all, and it
 -        // is assumed that all columns are queried. CASSANDRA-10657 (3.4+) brings back skipping values of columns
 -        // which are not in queried set when fetchAll is enabled. That makes exactly the same filter being
 -        // interpreted in a different way on 3.4- and 3.4+.
 -        //
 -        // Moreover, there is no way to convert the filter with fetchAll and queried != null so that it is
 -        // interpreted the same way on 3.4- because that Cassandra version does not support such filtering.
 -        //
 -        // In order to avoid inconsitencies in data read by 3.4- and 3.4+ we need to avoid creation of incompatible
 -        // filters when the cluster contains 3.4- nodes. We do that by forcibly setting queried to null.
 -        //
 -        // see CASSANDRA-10657, CASSANDRA-15833, CASSANDRA-16415
 -        return new ColumnFilter(true, metadata.partitionColumns(), Gossiper.instance.isAnyNodeOn30() ? null : queried, null);
++        return new ColumnFilter(true, shouldFetchAllStatics(), metadata, shouldQueriedBeNull() ? null : queried, null);
      }
  
      /**
@@@ -170,22 -151,9 +233,22 @@@
          return queried == null ? fetched : queried;
      }
  
 -    public boolean fetchesAllColumns()
 +    /**
 +     * Wether all the (regular or static) columns are fetched by this filter.
 +     * <p>
 +     * Note that this method is meant as an optimization but a negative return
 +     * shouldn't be relied upon strongly: this can return {@code false} but
 +     * still have all the columns fetches if those were manually selected by the
 +     * user. The goal here is to cheaply avoid filtering things on wildcard
 +     * queries, as those are common.
 +     *
 +     * @param isStatic whether to check for static columns or not. If {@code true},
 +     * the method returns if all static columns are fetched, otherwise it checks
 +     * regular columns.
 +     */
 +    public boolean fetchesAllColumns(boolean isStatic)
      {
-         return isStatic ? queried == null : fetchAllRegulars;
 -        return isFetchAll;
++        return isStatic ? queried == null || fetchAllStatics : fetchAllRegulars;
      }
  
      /**
@@@ -200,14 -168,9 +263,14 @@@
      /**
       * Whether the provided column is fetched by this filter.
       */
 -    public boolean fetches(ColumnDefinition column)
 +    public boolean fetches(ColumnMetadata column)
      {
 -        return isFetchAll || queried.contains(column);
 +        // For statics, it is included only if it's part of _queried_, or if _queried_ is null (wildcard query).
 +        if (column.isStatic())
-             return queried == null || queried.contains(column);
++            return fetchAllStatics || queried == null || queried.contains(column);
 +
 +        // For regulars, if 'fetchAllRegulars', then it's included automatically. Otherwise, it depends on _queried_.
 +        return fetchAllRegulars || queried.contains(column);
      }
  
      /**
@@@ -271,24 -233,7 +334,24 @@@
          if (s.isEmpty())
              return null;
  
-         return new Tester(!column.isStatic() && fetchAllRegulars, s.iterator());
 -        return new Tester(isFetchAll, s.iterator());
++        return new Tester(!column.isStatic() && fetchAllRegulars || column.isStatic() && fetchAllStatics, s.iterator());
 +    }
 +
 +    /**
 +     * Given an iterator on the cell of a complex column, returns an iterator that only include the cells selected by
 +     * this filter.
 +     *
 +     * @param column the (complex) column for which the cells are.
 +     * @param cells the cells to filter.
 +     * @return a filtered iterator that only include the cells from {@code cells} that are included by this filter.
 +     */
 +    public Iterator<Cell<?>> filterComplexCells(ColumnMetadata column, Iterator<Cell<?>> cells)
 +    {
 +        Tester tester = newTester(column);
 +        if (tester == null)
 +            return cells;
 +
 +        return Iterators.filter(cells, cell -> tester.fetchedCellIsQueried(cell.path()));
      }
  
      /**
@@@ -449,17 -371,17 +512,25 @@@
              {
                  s = TreeMultimap.create(Comparator.<ColumnIdentifier>naturalOrder(), Comparator.<ColumnSubselection>naturalOrder());
                  for (ColumnSubselection subSelection : subSelections)
 -                    s.put(subSelection.column().name, subSelection);
 -            }
 -
 -            // See the comment in {@link ColumnFilter#selection(CFMetaData, PartitionColumns)}
 -            if (isFetchAll && queried != null && Gossiper.instance.isAnyNodeOn30())
 -            {
 -                logger.trace("Column filter will be automatically converted to query all columns because 3.0 nodes are present in the cluster");
 -                queried = null;
 +                {
 +                    if (fullySelectedComplexColumns == null || !fullySelectedComplexColumns.contains(subSelection.column()))
 +                        s.put(subSelection.column().name, subSelection);
 +                }
              }
  
-             // see CASSANDRA-15833
-             if (isFetchAll && Gossiper.instance.haveMajorVersion3Nodes())
-                 queried = null;
- 
-             return new ColumnFilter(isFetchAll, metadata, queried, s);
 -            return new ColumnFilter(isFetchAll, isFetchAll ? metadata.partitionColumns() : null, queried, s);
++            // When fetchAll is enabled on pre CASSANDRA-10657 (3.4-), queried columns are not considered at all, and it
++            // is assumed that all columns are queried. CASSANDRA-10657 (3.4+) brings back skipping values of columns
++            // which are not in queried set when fetchAll is enabled. That makes exactly the same filter being
++            // interpreted in a different way on 3.4- and 3.4+.
++            //
++            // Moreover, there is no way to convert the filter with fetchAll and queried != null so that it is
++            // interpreted the same way on 3.4- because that Cassandra version does not support such filtering.
++            //
++            // In order to avoid inconsitencies in data read by 3.4- and 3.4+ we need to avoid creation of incompatible
++            // filters when the cluster contains 3.4- nodes. We do that by forcibly setting queried to null.
++            //
++            // see CASSANDRA-10657, CASSANDRA-15833, CASSANDRA-16415
++            return new ColumnFilter(isFetchAll, isFetchAll && shouldFetchAllStatics(), metadata, isFetchAll && shouldQueriedBeNull() ? null : queried, s);
          }
      }
  
@@@ -474,7 -396,7 +545,8 @@@
  
          ColumnFilter otherCf = (ColumnFilter) other;
  
 -        return otherCf.isFetchAll == this.isFetchAll &&
 +        return otherCf.fetchAllRegulars == this.fetchAllRegulars &&
++               otherCf.fetchAllStatics == this.fetchAllStatics &&
                 Objects.equals(otherCf.fetched, this.fetched) &&
                 Objects.equals(otherCf.queried, this.queried) &&
                 Objects.equals(otherCf.subSelections, this.subSelections);
@@@ -483,87 -405,49 +555,59 @@@
      @Override
      public String toString()
      {
-         if (fetchAllRegulars && queried == null)
-             return "*";
+         String prefix = "";
 -        if (isFetchAll && queried == null)
 +
-         if (queried.isEmpty())
-             return "";
++        if (fetchAllRegulars && queried == null)
+             return "*/*";
  
-         Iterator<ColumnMetadata> defs = queried.selectOrderIterator();
-         if (!defs.hasNext())
-             return "<none>";
 -        if (isFetchAll)
++        if (fetchAllRegulars && fetchAllStatics)
+             prefix = "*/";
  
-         StringBuilder sb = new StringBuilder();
-         while (defs.hasNext())
 -        if (queried.isEmpty())
 -            return prefix + "[]";
++        if (fetchAllRegulars && !fetchAllStatics)
 +        {
-             appendColumnDef(sb, defs.next());
-             if (defs.hasNext())
-                 sb.append(", ");
++            prefix = queried.statics.isEmpty()
++                     ? "<all regulars>/"
++                     : String.format("<all regulars>+%s/", columnsToString(queried.statics::selectOrderIterator));
 +        }
-         return sb.toString();
++
++        return prefix + columnsToString(queried::selectOrderIterator);
 +    }
  
-     private void appendColumnDef(StringBuilder sb, ColumnMetadata column)
++    private String columnsToString(Iterable<ColumnMetadata> columns)
 +    {
-         if (subSelections == null)
+         StringJoiner joiner = new StringJoiner(", ", "[", "]");
 -        Iterator<ColumnDefinition> it = queried.selectOrderIterator();
++        Iterator<ColumnMetadata> it = columns.iterator();
+         while (it.hasNext())
          {
-             sb.append(column.name);
-             return;
-         }
 -            ColumnDefinition column = it.next();
++            ColumnMetadata column = it.next();
+             SortedSet<ColumnSubselection> s = subSelections != null ? subSelections.get(column.name) : Collections.emptySortedSet();
  
-         SortedSet<ColumnSubselection> s = subSelections.get(column.name);
-         if (s.isEmpty())
-         {
-             sb.append(column.name);
-             return;
+             if (s.isEmpty())
+                 joiner.add(String.valueOf(column.name));
+             else
 -                s.forEach(subSel -> joiner.add(String.format("%s%s", column.name , subSel)));
++                s.forEach(subSel -> joiner.add(String.format("%s%s", column.name, subSel)));
          }
- 
-         int i = 0;
-         for (ColumnSubselection subSel : s)
-             sb.append(i++ == 0 ? "" : ", ").append(column.name).append(subSel);
 -        return prefix + joiner.toString();
++        return joiner.toString();
      }
  
      public static class Serializer
      {
-         private static final int FETCH_ALL_MASK          = 0x01;
-         private static final int HAS_QUERIED_MASK        = 0x02;
 -        private static final int IS_FETCH_ALL_MASK       = 0x01;
 -        private static final int HAS_QUERIED_MASK      = 0x02;
++        private static final int FETCH_ALL_MASK = 0x01;
++        private static final int HAS_QUERIED_MASK = 0x02;
          private static final int HAS_SUB_SELECTIONS_MASK = 0x04;
  
          private static int makeHeaderByte(ColumnFilter selection)
          {
 -            return (selection.isFetchAll ? IS_FETCH_ALL_MASK : 0)
 -                 | (selection.queried != null ? HAS_QUERIED_MASK : 0)
 -                 | (selection.subSelections != null ? HAS_SUB_SELECTIONS_MASK : 0);
 +            return (selection.fetchAllRegulars ? FETCH_ALL_MASK : 0)
-                  | (selection.queried != null ? HAS_QUERIED_MASK : 0)
-                  | (selection.subSelections != null ? HAS_SUB_SELECTIONS_MASK : 0);
-         }
- 
-         @VisibleForTesting
-         public static ColumnFilter maybeUpdateForBackwardCompatility(ColumnFilter selection, int version)
-         {
-             if (version > MessagingService.VERSION_3014 || !selection.fetchAllRegulars || selection.queried == null)
-                 return selection;
- 
-             // The meaning of fetchAllRegulars changed (at least when queried != null) due to CASSANDRA-12768: in
-             // pre-4.0 it means that *all* columns are fetched, not just the regular ones, and so 3.0/3.X nodes
-             // would send us more than we'd like. So instead recreating a filter that correspond to what we
-             // actually want (it's a tiny bit less efficient as we include all columns manually and will mark as
-             // queried some columns that are actually only fetched, but it's fine during upgrade).
-             // More concretely, we replace our filter by a non-fetch-all one that queries every columns that our
-             // current filter fetches.
-             Set<ColumnMetadata> queriedStatic = new HashSet<>();
-             Iterables.addAll(queriedStatic, Iterables.filter(selection.queried, ColumnMetadata::isStatic));
-             return new ColumnFilter(false,
-                                     (TableMetadata) null,
-                                     new RegularAndStaticColumns(Columns.from(queriedStatic), selection.fetched.regulars),
-                                     selection.subSelections);
++                   | (selection.queried != null ? HAS_QUERIED_MASK : 0)
++                   | (selection.subSelections != null ? HAS_SUB_SELECTIONS_MASK : 0);
          }
  
          public void serialize(ColumnFilter selection, DataOutputPlus out, int version) throws IOException
          {
-             selection = maybeUpdateForBackwardCompatility(selection, version);
- 
              out.writeByte(makeHeaderByte(selection));
  
 -            if (version >= MessagingService.VERSION_3014 && selection.isFetchAll)
 +            if (version >= MessagingService.VERSION_3014 && selection.fetchAllRegulars)
              {
                  Columns.serializer.serialize(selection.fetched.statics, out);
                  Columns.serializer.serialize(selection.fetched.regulars, out);
@@@ -618,7 -502,7 +662,7 @@@
              if (hasSubSelections)
              {
                  subSelections = TreeMultimap.create(Comparator.<ColumnIdentifier>naturalOrder(), Comparator.<ColumnSubselection>naturalOrder());
--                int size = (int)in.readUnsignedVInt();
++                int size = (int) in.readUnsignedVInt();
                  for (int i = 0; i < size; i++)
                  {
                      ColumnSubselection subSel = ColumnSubselection.serializer.deserialize(in, version, metadata);
@@@ -626,29 -510,25 +670,14 @@@
                  }
              }
  
-             // See CASSANDRA-15833
-             if (version <= MessagingService.VERSION_3014 && isFetchAll)
 -            // Since nodes with and without CASSANDRA-10657 are not distinguishable by messaging version, we need to
 -            // check whether there are any nodes running pre CASSANDRA-10657 Cassandra and apply conversion to the
 -            // column filter so that it is interpreted in the same way as on the nodes without CASSANDRA-10657.
 -            //
 -            // See the comment in {@link ColumnFilter#selection(CFMetaData, PartitionColumns)}
 -            if (isFetchAll && queried != null && Gossiper.instance.isAnyNodeOn30())
 -            {
 -                logger.trace("Deserialized column filter will be automatically converted to query all columns because 3.0 nodes are present in the cluster");
--                queried = null;
- 
-             // Same concern than in serialize/serializedSize: we should be wary of the change in meaning for isFetchAll.
-             // If we get a filter with isFetchAll from 3.0/3.x, it actually expects all static columns to be fetched,
-             // make sure we do that (note that if queried == null, that's already what we do).
-             // Note that here again this will make us do a bit more work that necessary, namely we'll _query_ all
-             // statics even though we only care about _fetching_ them all, but that's a minor inefficiency, so fine
-             // during upgrade.
-             if (version <= MessagingService.VERSION_30 && isFetchAll && queried != null)
-                 queried = new RegularAndStaticColumns(metadata.staticColumns(), queried.regulars);
 -            }
--
--            return new ColumnFilter(isFetchAll, fetched, queried, subSelections);
++            return new ColumnFilter(isFetchAll, isFetchAll && shouldFetchAllStatics(), fetched, isFetchAll && shouldQueriedBeNull() ? null : queried, subSelections);
          }
  
          public long serializedSize(ColumnFilter selection, int version)
          {
-             selection = maybeUpdateForBackwardCompatility(selection, version);
- 
              long size = 1; // header byte
  
 -            if (version >= MessagingService.VERSION_3014 && selection.isFetchAll)
 +            if (version >= MessagingService.VERSION_3014 && selection.fetchAllRegulars)
              {
                  size += Columns.serializer.serializedSize(selection.fetched.statics);
                  size += Columns.serializer.serializedSize(selection.fetched.regulars);
@@@ -671,4 -551,4 +700,4 @@@
              return size;
          }
      }
--}
++}
diff --cc src/java/org/apache/cassandra/gms/Gossiper.java
index 1129c29,819078d..7acaec9
--- a/src/java/org/apache/cassandra/gms/Gossiper.java
+++ b/src/java/org/apache/cassandra/gms/Gossiper.java
@@@ -160,38 -157,7 +160,54 @@@ public class Gossiper implements IFailu
  
      private volatile long lastProcessedMessageAt = System.currentTimeMillis();
  
-     //This property and anything that checks it should be removed in 5.0
-     private boolean haveMajorVersion3Nodes = true;
 -    private static FastThreadLocal<Boolean> isGossipStage = new FastThreadLocal<>();
++    /**
++     * This property is initially set to {@code true} which means that we have no information about the other nodes.
++     * Once all nodes are on at least this node version, it becomes {@code false}, which means that we are not
++     * upgrading from the previous version (major, minor).
++     *
++     * This property and anything that checks it should be removed in 5.0
++     */
++    private volatile boolean upgradeInProgressPossible = true;
 +
-     final Supplier<ExpiringMemoizingSupplier.ReturnValue<Boolean>> haveMajorVersion3NodesSupplier = () ->
++    final Supplier<ExpiringMemoizingSupplier.ReturnValue<CassandraVersion>> upgradeFromVersionSupplier = () ->
 +    {
-         //Once there are no prior version nodes we don't need to keep rechecking
-         if (!haveMajorVersion3Nodes)
-             return new ExpiringMemoizingSupplier.Memoized<>(false);
++        // Once there are no prior version nodes we don't need to keep rechecking
++        if (!upgradeInProgressPossible)
++            return new ExpiringMemoizingSupplier.Memoized<>(null);
 +
 +        Iterable<InetAddressAndPort> allHosts = Iterables.concat(Gossiper.instance.getLiveMembers(), Gossiper.instance.getUnreachableMembers());
-         CassandraVersion referenceVersion = null;
 +
++        CassandraVersion minVersion = SystemKeyspace.CURRENT_VERSION.familyLowerBound.get();
++        boolean allHostsHaveKnownVersion = true;
 +        for (InetAddressAndPort host : allHosts)
 +        {
 +            CassandraVersion version = getReleaseVersion(host);
 +
 +            //Raced with changes to gossip state, wait until next iteration
 +            if (version == null)
-                 return new ExpiringMemoizingSupplier.NotMemoized(true);
++                allHostsHaveKnownVersion = false;
++            else if (version.compareTo(minVersion) < 0)
++                minVersion = version;
++        }
 +
-             if (referenceVersion == null)
-                 referenceVersion = version;
++        if (minVersion.compareTo(SystemKeyspace.CURRENT_VERSION.familyLowerBound.get()) < 0)
++            return new ExpiringMemoizingSupplier.Memoized<>(minVersion);
 +
-             if (version.major < 4)
-                 return new ExpiringMemoizingSupplier.Memoized<>(true);
-         }
++        if (!allHostsHaveKnownVersion)
++            return new ExpiringMemoizingSupplier.NotMemoized<>(minVersion);
 +
-         haveMajorVersion3Nodes = false;
-         return new ExpiringMemoizingSupplier.Memoized(false);
++        upgradeInProgressPossible = false;
++        return new ExpiringMemoizingSupplier.Memoized<>(null);
 +    };
 +
-     private Supplier<Boolean> haveMajorVersion3NodesMemoized = ExpiringMemoizingSupplier.memoizeWithExpiration(haveMajorVersion3NodesSupplier, 1, TimeUnit.MINUTES);
++    private final Supplier<CassandraVersion> upgradeFromVersionMemoized = ExpiringMemoizingSupplier.memoizeWithExpiration(upgradeFromVersionSupplier, 1, TimeUnit.MINUTES);
++
++    @VisibleForTesting
++    public void expireUpgradeFromVersion()
++    {
++        upgradeInProgressPossible = true;
++        ((ExpiringMemoizingSupplier<CassandraVersion>) upgradeFromVersionMemoized).expire();
++    }
  
      private static final boolean disableThreadValidation = Boolean.getBoolean(Props.DISABLE_THREAD_VALIDATION);
  
@@@ -2119,56 -1827,11 +2135,74 @@@
              logger.info("No gossip backlog; proceeding");
      }
  
 -    @VisibleForTesting
 -    public void stopShutdownAndWait(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException
 +    /**
 +     * Blockingly wait for all live nodes to agree on the current schema version.
 +     *
 +     * @param maxWait maximum time to wait for schema agreement
 +     * @param unit TimeUnit of maxWait
 +     * @return true if agreement was reached, false if not
 +     */
 +    public boolean waitForSchemaAgreement(long maxWait, TimeUnit unit, BooleanSupplier abortCondition)
      {
 -        stop();
 -        ExecutorUtils.shutdownAndWait(timeout, unit, executor);
 +        int waited = 0;
 +        int toWait = 50;
 +
 +        Set<InetAddressAndPort> members = getLiveTokenOwners();
 +
 +        while (true)
 +        {
 +            if (nodesAgreeOnSchema(members))
 +                return true;
 +
 +            if (waited >= unit.toMillis(maxWait) || abortCondition.getAsBoolean())
 +                return false;
 +
 +            Uninterruptibles.sleepUninterruptibly(toWait, TimeUnit.MILLISECONDS);
 +            waited += toWait;
 +            toWait = Math.min(1000, toWait * 2);
 +        }
 +    }
 +
-     public boolean haveMajorVersion3Nodes()
++    /**
++     * Returns {@code false} only if the information about the version of each node in the cluster is available and
++     * ALL the nodes are on 4.0+ (regardless of the patch version).
++     */
++    public boolean hasMajorVersion3Nodes()
 +    {
-         return haveMajorVersion3NodesMemoized.get();
++        return isUpgradingFromVersionLowerThan(CassandraVersion.CASSANDRA_4_0) || // this is quite obvious
++               // however if we discovered only nodes at current version so far (in particular only this node),
++               // but still there are nodes with unkonwn version, we also want to report that the cluster may have nodes at 3.x
++               upgradeInProgressPossible && !isUpgradingFromVersionLowerThan(SystemKeyspace.CURRENT_VERSION);
++    }
++
++    /**
++     * Returns {@code true} if there are nodes on version lower than the provided version (only major / minor counts)
++     */
++    public boolean isUpgradingFromVersionLowerThan(CassandraVersion referenceVersion) {
++        CassandraVersion v = upgradeFromVersionMemoized.get();
++        if (SystemKeyspace.NULL_VERSION.equals(v) && scheduledGossipTask == null)
++            return false;
++        else
++            return v != null && v.compareTo(referenceVersion.familyLowerBound.get()) < 0;
 +    }
 +
 +    private boolean nodesAgreeOnSchema(Collection<InetAddressAndPort> nodes)
 +    {
 +        UUID expectedVersion = null;
 +
 +        for (InetAddressAndPort node : nodes)
 +        {
 +            EndpointState state = getEndpointStateForEndpoint(node);
 +            UUID remoteVersion = state.getSchemaVersion();
 +
 +            if (null == expectedVersion)
 +                expectedVersion = remoteVersion;
 +
 +            if (null == expectedVersion || !expectedVersion.equals(remoteVersion))
 +                return false;
 +        }
 +
 +        return true;
      }
  
      @VisibleForTesting
diff --cc src/java/org/apache/cassandra/repair/SystemDistributedKeyspace.java
index c62818a,eb2226b..7e8d8bc
--- a/src/java/org/apache/cassandra/repair/SystemDistributedKeyspace.java
+++ b/src/java/org/apache/cassandra/repair/SystemDistributedKeyspace.java
@@@ -210,28 -192,17 +210,28 @@@ public final class SystemDistributedKey
          processSilent(fmtQuery);
      }
  
 -    public static void startRepairs(UUID id, UUID parent_id, String keyspaceName, String[] cfnames, Collection<Range<Token>> ranges, Iterable<InetAddress> endpoints)
 +    public static void startRepairs(UUID id, UUID parent_id, String keyspaceName, String[] cfnames, CommonRange commonRange)
      {
 -        String coordinator = FBUtilities.getBroadcastAddress().getHostAddress();
 -        Set<String> participants = Sets.newHashSet(coordinator);
 +        //Don't record repair history if an upgrade is in progress as version 3 nodes generates errors
 +        //due to schema differences
-         boolean includeNewColumns = !Gossiper.instance.haveMajorVersion3Nodes();
++        boolean includeNewColumns = !Gossiper.instance.hasMajorVersion3Nodes();
 +
 +        InetAddressAndPort coordinator = FBUtilities.getBroadcastAddressAndPort();
 +        Set<String> participants = Sets.newHashSet();
 +        Set<String> participants_v2 = Sets.newHashSet();
  
 -        for (InetAddress endpoint : endpoints)
 -            participants.add(endpoint.getHostAddress());
 +        for (InetAddressAndPort endpoint : commonRange.endpoints)
 +        {
 +            participants.add(endpoint.getHostAddress(false));
 +            participants_v2.add(endpoint.getHostAddressAndPort());
 +        }
  
          String query =
 +                "INSERT INTO %s.%s (keyspace_name, columnfamily_name, id, parent_id, range_begin, range_end, coordinator, coordinator_port, participants, participants_v2, status, started_at) " +
 +                        "VALUES (   '%s',          '%s',              %s, %s,        '%s',        '%s',      '%s',        %d,               { '%s' },     { '%s' },        '%s',   toTimestamp(now()))";
 +        String queryWithoutNewColumns =
                  "INSERT INTO %s.%s (keyspace_name, columnfamily_name, id, parent_id, range_begin, range_end, coordinator, participants, status, started_at) " +
 -                        "VALUES (   '%s',          '%s',              %s, %s,        '%s',        '%s',      '%s',        { '%s' },     '%s',   toTimestamp(now()))";
 +                        "VALUES (   '%s',          '%s',              %s, %s,        '%s',        '%s',      '%s',               { '%s' },        '%s',   toTimestamp(now()))";
  
          for (String cfname : cfnames)
          {
diff --cc src/java/org/apache/cassandra/tracing/TraceKeyspace.java
index 8c6d8c8,0d7c4f1..c2e74d8
--- a/src/java/org/apache/cassandra/tracing/TraceKeyspace.java
+++ b/src/java/org/apache/cassandra/tracing/TraceKeyspace.java
@@@ -114,16 -105,14 +114,16 @@@ public final class TraceKeyspac
                                               int ttl)
      {
          PartitionUpdate.SimpleBuilder builder = PartitionUpdate.simpleBuilder(Sessions, sessionId);
 -        builder.row()
 -               .ttl(ttl)
 -               .add("client", client)
 -               .add("coordinator", FBUtilities.getBroadcastAddress())
 -               .add("request", request)
 -               .add("started_at", new Date(startedAt))
 -               .add("command", command)
 -               .appendAll("parameters", parameters);
 +        Row.SimpleBuilder rb = builder.row();
 +        rb.ttl(ttl)
 +          .add("client", client)
 +          .add("coordinator", FBUtilities.getBroadcastAddressAndPort().address);
-         if (!Gossiper.instance.haveMajorVersion3Nodes())
++        if (!Gossiper.instance.hasMajorVersion3Nodes())
 +            rb.add("coordinator_port", FBUtilities.getBroadcastAddressAndPort().port);
 +        rb.add("request", request)
 +          .add("started_at", new Date(startedAt))
 +          .add("command", command)
 +          .appendAll("parameters", parameters);
  
          return builder.buildAsMutation();
      }
@@@ -144,10 -133,8 +144,10 @@@
                                                .ttl(ttl);
  
          rowBuilder.add("activity", message)
 -                  .add("source", FBUtilities.getBroadcastAddress())
 -                  .add("thread", threadName);
 +                  .add("source", FBUtilities.getBroadcastAddressAndPort().address);
-         if (!Gossiper.instance.haveMajorVersion3Nodes())
++        if (!Gossiper.instance.hasMajorVersion3Nodes())
 +            rowBuilder.add("source_port", FBUtilities.getBroadcastAddressAndPort().port);
 +        rowBuilder.add("thread", threadName);
  
          if (elapsed >= 0)
              rowBuilder.add("source_elapsed", elapsed);
diff --cc src/java/org/apache/cassandra/utils/CassandraVersion.java
index b3dca96,bf9fe6a..8638f0e
--- a/src/java/org/apache/cassandra/utils/CassandraVersion.java
+++ b/src/java/org/apache/cassandra/utils/CassandraVersion.java
@@@ -18,13 -18,10 +18,15 @@@
  package org.apache.cassandra.utils;
  
  import java.util.Arrays;
 +import java.util.Collections;
 +import java.util.List;
 +import java.util.Objects;
++import java.util.function.Supplier;
  import java.util.regex.Matcher;
  import java.util.regex.Pattern;
  
 -import com.google.common.base.Objects;
 +import com.google.common.annotations.VisibleForTesting;
++import com.google.common.base.Suppliers;
  import org.apache.commons.lang3.StringUtils;
  
  /**
@@@ -36,21 -33,19 +38,26 @@@
  public class CassandraVersion implements Comparable<CassandraVersion>
  {
      /**
 -     * note: 3rd group matches to words but only allows number and checked after regexp test.
 +     * note: 3rd/4th groups matches to words but only allows number and checked after regexp test.
       * this is because 3rd and the last can be identical.
       **/
 -    private static final String VERSION_REGEXP = "(\\d+)\\.(\\d+)(?:\\.(\\w+))?(\\-[.\\w]+)?([.+][.\\w]+)?";
 -    private static final Pattern PATTERN_WHITESPACE = Pattern.compile("\\w+");
 +    private static final String VERSION_REGEXP = "(\\d+)\\.(\\d+)(?:\\.(\\w+))?(?:\\.(\\w+))?(\\-[-.\\w]+)?([.+][.\\w]+)?";
 +    private static final Pattern PATTERN_WORDS = Pattern.compile("\\w+");
 +    @VisibleForTesting
 +    static final int NO_HOTFIX = -1;
  
 -    private static final Pattern pattern = Pattern.compile(VERSION_REGEXP);
 -    private static final Pattern SNAPSHOT = Pattern.compile("-SNAPSHOT");
 +    private static final Pattern PATTERN = Pattern.compile(VERSION_REGEXP);
 +
++    public static final CassandraVersion CASSANDRA_4_0 = new CassandraVersion("4.0").familyLowerBound.get();
++    public static final CassandraVersion CASSANDRA_3_4 = new CassandraVersion("3.4").familyLowerBound.get();
+ 
      public final int major;
      public final int minor;
      public final int patch;
 +    public final int hotfix;
 +
++    public final Supplier<CassandraVersion> familyLowerBound = Suppliers.memoize(this::getFamilyLowerBound);
+ 
      private final String[] preRelease;
      private final String[] build;
  
@@@ -97,6 -81,6 +104,13 @@@
          }
      }
  
++    private CassandraVersion getFamilyLowerBound()
++    {
++        return patch == 0 && hotfix == NO_HOTFIX && preRelease != null && preRelease.length == 0 && build == null
++               ? this
++               : new CassandraVersion(major, minor, 0, NO_HOTFIX, new String[0], null);
++    }
++
      private static String[] parseIdentifiers(String version, String str)
      {
          // Drop initial - or +
diff --cc src/java/org/apache/cassandra/utils/ExpiringMemoizingSupplier.java
index 0280541,0000000..1736ae2
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/utils/ExpiringMemoizingSupplier.java
+++ b/src/java/org/apache/cassandra/utils/ExpiringMemoizingSupplier.java
@@@ -1,130 -1,0 +1,137 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.cassandra.utils;
 +
 +
 +import java.util.concurrent.TimeUnit;
 +import java.util.function.Supplier;
 +
++import com.google.common.annotations.VisibleForTesting;
 +import com.google.common.base.Preconditions;
 +
 +/**
 + * An implementation similar to Guava's Suppliers.memoizeWithExpiration(Supplier)
 + * but allowing for memoization to be skipped.
 + *
 + * See CASSANDRA-16148
 + */
 +public class ExpiringMemoizingSupplier<T> implements Supplier<T>
 +{
 +    final Supplier<ReturnValue<T>> delegate;
 +    final long durationNanos;
 +    transient volatile T value;
 +    // The special value 0 means "not yet initialized".
 +    transient volatile long expirationNanos;
 +
 +    public static <T> Supplier<T> memoizeWithExpiration(Supplier<ReturnValue<T>> delegate, long duration, TimeUnit unit)
 +    {
 +        return new ExpiringMemoizingSupplier<>(delegate, duration, unit);
 +    }
 +
 +    ExpiringMemoizingSupplier(Supplier<ReturnValue<T>> delegate, long duration, TimeUnit unit) {
 +        this.delegate = Preconditions.checkNotNull(delegate);
 +        this.durationNanos = unit.toNanos(duration);
 +        Preconditions.checkArgument(duration > 0);
 +    }
 +
 +    @Override
 +    public T get() {
 +        // Another variant of Double Checked Locking.
 +        //
 +        // We use two volatile reads.  We could reduce this to one by
 +        // putting our fields into a holder class, but (at least on x86)
 +        // the extra memory consumption and indirection are more
 +        // expensive than the extra volatile reads.
 +        long nanos = this.expirationNanos;
 +        long now = System.nanoTime();
 +        if (nanos == 0L || now - nanos >= 0L) {
 +            synchronized(this) {
 +                if (nanos == this.expirationNanos) {
 +                    ReturnValue<T> t = this.delegate.get();
 +                    if (t.canMemoize())
 +                        this.value = t.value();
 +                    else
 +                        return t.value();
 +
 +                    nanos = now + this.durationNanos;
 +                    this.expirationNanos = nanos == 0L ? 1L : nanos;
 +                    return t.value();
 +                }
 +            }
 +        }
 +        return this.value;
 +    }
 +
++    @VisibleForTesting
++    public void expire()
++    {
++        this.expirationNanos = 0;
++    }
++
 +    @Override
 +    public String toString() {
 +        // This is a little strange if the unit the user provided was not NANOS,
 +        // but we don't want to store the unit just for toString
 +        return "Suppliers.memoizeWithExpiration(" + delegate + ", " + durationNanos + ", NANOS)";
 +    }
 +
 +    private static final long serialVersionUID = 0;
 +
 +    public static abstract class ReturnValue<T>
 +    {
 +        protected final T value;
 +
 +        ReturnValue(T value){
 +            this.value = value;
 +        }
 +
 +        abstract boolean canMemoize();
 +
 +        public T value()
 +        {
 +            return value;
 +        }
 +    }
 +
 +    public static class Memoized<T> extends ReturnValue<T>
 +    {
 +        public Memoized(T value)
 +        {
 +            super(value);
 +        }
 +
 +        public boolean canMemoize()
 +        {
 +            return true;
 +        }
 +    }
 +
 +    public static class NotMemoized<T> extends ReturnValue<T>
 +    {
 +        public NotMemoized(T value)
 +        {
 +            super(value);
 +        }
 +
 +        public boolean canMemoize()
 +        {
 +            return false;
 +        }
 +    }
 +}
diff --cc test/distributed/org/apache/cassandra/distributed/test/ReadDigestConsistencyTest.java
index 0000000,8071eea..05e705b
mode 000000,100644..100644
--- a/test/distributed/org/apache/cassandra/distributed/test/ReadDigestConsistencyTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/ReadDigestConsistencyTest.java
@@@ -1,0 -1,113 +1,146 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ 
+ package org.apache.cassandra.distributed.test;
+ 
+ import java.util.Arrays;
+ import java.util.UUID;
+ 
+ import org.junit.Assert;
+ import org.junit.Test;
++import org.slf4j.Logger;
++import org.slf4j.LoggerFactory;
+ 
+ import org.apache.cassandra.distributed.Cluster;
+ import org.apache.cassandra.distributed.api.ConsistencyLevel;
+ import org.apache.cassandra.distributed.api.ICoordinator;
++import org.apache.cassandra.exceptions.SyntaxException;
++import org.apache.cassandra.utils.Throwables;
+ 
+ public class ReadDigestConsistencyTest extends TestBaseImpl
+ {
++    private final static Logger logger = LoggerFactory.getLogger(ReadDigestConsistencyTest.class);
++
+     public static final String TABLE_NAME = "tbl";
+     public static final String CREATE_TABLE = String.format("CREATE TABLE %s.%s (" +
+                                                             "   k int, " +
+                                                             "   c int, " +
+                                                             "   s1 int static, " +
+                                                             "   s2 set<int> static, " +
+                                                             "   v1 int, " +
+                                                             "   v2 set<int>, " +
+                                                             "   PRIMARY KEY (k, c))", KEYSPACE, TABLE_NAME);
+ 
+     public static final String INSERT = String.format("INSERT INTO %s.%s (k, c, s1, s2, v1, v2) VALUES (?, ?, ?, ?, ?, ?)", KEYSPACE, TABLE_NAME);
+ 
+ 
+     public static final String SELECT_TRACE = "SELECT activity FROM system_traces.events where session_id = ? and source = ? ALLOW FILTERING;";
+ 
+     @Test
+     public void testDigestConsistency() throws Exception
+     {
+         try (Cluster cluster = init(builder().withNodes(2).start()))
+         {
+             cluster.schemaChange(CREATE_TABLE);
+             insertData(cluster.coordinator(1));
+             testDigestConsistency(cluster.coordinator(1));
+             testDigestConsistency(cluster.coordinator(2));
+         }
+     }
+ 
+     public static void checkTraceForDigestMismatch(ICoordinator coordinator, String query, Object... boundValues)
+     {
+         UUID sessionId = UUID.randomUUID();
 -        coordinator.executeWithTracing(sessionId, query, ConsistencyLevel.ALL, boundValues);
++        try
++        {
++            coordinator.executeWithTracing(sessionId, query, ConsistencyLevel.ALL, boundValues);
++        }
++        catch (RuntimeException ex)
++        {
++            if (Throwables.isCausedBy(ex, t -> t.getClass().getName().equals(SyntaxException.class.getName())))
++            {
++                if (coordinator.instance().getReleaseVersionString().startsWith("3.") && query.contains("["))
++                {
++                    logger.warn("Query {} is not supported on node {} version {}",
++                                query,
++                                coordinator.instance().broadcastAddress().getAddress().getHostAddress(),
++                                coordinator.instance().getReleaseVersionString());
++
++                    // we can forgive SyntaxException for C* < 4.0 if the query contains collection element selection
++                    return;
++                }
++            }
++            logger.error("Failing for coordinator {} and query {}", coordinator.instance().getReleaseVersionString(), query);
++            throw ex;
++        }
+         Object[][] results = coordinator.execute(SELECT_TRACE,
+                                                  ConsistencyLevel.ALL,
+                                                  sessionId,
+                                                  coordinator.instance().broadcastAddress().getAddress());
+         for (Object[] result : results)
+         {
+             String activity = (String) result[0];
+             Assert.assertFalse(String.format("Found Digest Mismatch while executing query: %s with bound values %s on %s/%s",
+                                              query,
+                                              Arrays.toString(boundValues),
+                                              coordinator.instance().broadcastAddress(),
+                                              coordinator.instance().getReleaseVersionString()),
+                                activity.toLowerCase().contains("mismatch for key"));
+         }
+     }
+ 
+     public static void insertData(ICoordinator coordinator)
+     {
+         coordinator.execute(String.format("INSERT INTO %s.%s (k, c, s1, s2, v1, v2) VALUES (1, 1, 2, {1, 2, 3, 4, 5}, 3, {6, 7, 8, 9, 10})", KEYSPACE, TABLE_NAME), ConsistencyLevel.ALL);
+         coordinator.execute(String.format("INSERT INTO %s.%s (k, c, s1, s2, v1, v2) VALUES (1, 2, 3, {2, 3, 4, 5, 6}, 4, {7, 8, 9, 10, 11})", KEYSPACE, TABLE_NAME), ConsistencyLevel.ALL);
+     }
+ 
+     public static void testDigestConsistency(ICoordinator coordinator)
+     {
+         String queryPattern = "SELECT %s FROM %s.%s WHERE %s";
+         String[] columnss1 = {
+         "k, c",
+         "*",
+         "v1",
+         "v2",
+         "v1, s1",
 -        "v1, s2"
++        "v1, s2",
++        "v2[3]",
++        "v2[2..4]",
++        "v1, s2[7]",
++        "v1, s2[6..8]"
+         };
+ 
+         String[] columnss2 = {
+         "s1",
 -        "s2"
++        "s2",
++        "s2[7]",
++        "s2[6..8]"
+         };
+ 
+         for (String columns : columnss1)
+         {
+             checkTraceForDigestMismatch(coordinator, String.format(queryPattern, columns, KEYSPACE, TABLE_NAME, "k = 1"));
+             checkTraceForDigestMismatch(coordinator, String.format(queryPattern, columns, KEYSPACE, TABLE_NAME, "k = 1 AND c = 2"));
+         }
+         for (String columns : columnss2)
+         {
+             checkTraceForDigestMismatch(coordinator, String.format(queryPattern, columns, KEYSPACE, TABLE_NAME, "k = 1"));
+         }
+     }
+ }
diff --cc test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeReadTest.java
index 6ee9b0a,d908cd5..f9a3542
--- a/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeReadTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeReadTest.java
@@@ -18,15 -18,16 +18,16 @@@
  
  package org.apache.cassandra.distributed.upgrade;
  
- import java.util.UUID;
- 
- import org.junit.Assert;
  import org.junit.Test;
  
- import org.apache.cassandra.distributed.UpgradeableCluster;
- import org.apache.cassandra.distributed.api.ConsistencyLevel;
- import org.apache.cassandra.distributed.shared.DistributedTestBase;
 -import org.apache.cassandra.distributed.api.Feature;
+ import org.apache.cassandra.distributed.api.IInvokableInstance;
  import org.apache.cassandra.distributed.shared.Versions;
+ import org.apache.cassandra.gms.Gossiper;
++import org.apache.cassandra.utils.CassandraVersion;
+ 
+ import static org.apache.cassandra.distributed.test.ReadDigestConsistencyTest.CREATE_TABLE;
+ import static org.apache.cassandra.distributed.test.ReadDigestConsistencyTest.insertData;
+ import static org.apache.cassandra.distributed.test.ReadDigestConsistencyTest.testDigestConsistency;
  
  public class MixedModeReadTest extends UpgradeTestBase
  {
@@@ -51,27 -37,28 +37,29 @@@
          new TestCase()
          .nodes(2)
          .nodesToUpgrade(1)
 -        .upgrade(Versions.Major.v30, Versions.Major.v3X)
 -        .withConfig(config -> config.with(Feature.GOSSIP, Feature.NETWORK))
 +        .upgrade(Versions.Major.v30, Versions.Major.v4)
 +        .upgrade(Versions.Major.v3X, Versions.Major.v4)
          .setup(cluster -> {
              cluster.schemaChange(CREATE_TABLE);
-             cluster.coordinator(1).execute(INSERT, ConsistencyLevel.ALL, 1, "static", "foo", "bar", "baz");
-             cluster.coordinator(1).execute(INSERT, ConsistencyLevel.ALL, 1, "static", "fi", "biz", "baz");
-             cluster.coordinator(1).execute(INSERT, ConsistencyLevel.ALL, 1, "static", "fo", "boz", "baz");
- 
-             // baseline to show no digest mismatches before upgrade
-             checkTraceForDigestMismatch(cluster, 1, SELECT_C1, 1);
-             checkTraceForDigestMismatch(cluster, 2, SELECT_C1, 1);
+             insertData(cluster.coordinator(1));
+             testDigestConsistency(cluster.coordinator(1));
+             testDigestConsistency(cluster.coordinator(2));
          })
-         .runAfterNodeUpgrade((cluster, node) -> {
-             if (node != 1)
-                 return; // shouldn't happen but guard for future test changes
+         .runAfterClusterUpgrade(cluster -> {
+             // we need to let gossip settle or the test will fail
+             int attempts = 1;
+             //noinspection Convert2MethodRef
 -            while (!((IInvokableInstance) (cluster.get(1))).callOnInstance(() -> Gossiper.instance.isAnyNodeOn30()))
++            while (!((IInvokableInstance) (cluster.get(1))).callOnInstance(() -> Gossiper.instance.isUpgradingFromVersionLowerThan(CassandraVersion.CASSANDRA_4_0.familyLowerBound.get()) &&
++                                                                                 !Gossiper.instance.isUpgradingFromVersionLowerThan(new CassandraVersion(("3.0")).familyLowerBound.get())))
+             {
+                 if (attempts++ > 30)
 -                    throw new RuntimeException("Gossiper.instance.isAnyNodeOn30() continually returns false despite expecting to be true");
++                    throw new RuntimeException("Gossiper.instance.haveMajorVersion3Nodes() continually returns false despite expecting to be true");
+                 Thread.sleep(1000);
+             }
  
              // should not cause a disgest mismatch in mixed mode
-             checkTraceForDigestMismatch(cluster, 1, SELECT_C1, 1);
-             checkTraceForDigestMismatch(cluster, 2, SELECT_C1, 1);
-             checkTraceForDigestMismatch(cluster, 1, SELECT_C1_S1_ROW, 1, "foo");
-             checkTraceForDigestMismatch(cluster, 2, SELECT_C1_S1_ROW, 1, "fi");
+             testDigestConsistency(cluster.coordinator(1));
+             testDigestConsistency(cluster.coordinator(2));
          })
          .run();
      }
diff --cc test/unit/org/apache/cassandra/Util.java
index ca5ec63,fa24167..8e487a0
--- a/test/unit/org/apache/cassandra/Util.java
+++ b/test/unit/org/apache/cassandra/Util.java
@@@ -79,14 -67,11 +79,15 @@@ import org.apache.cassandra.service.Sto
  import org.apache.cassandra.service.pager.PagingState;
  import org.apache.cassandra.transport.ProtocolVersion;
  import org.apache.cassandra.utils.ByteBufferUtil;
++import org.apache.cassandra.utils.CassandraVersion;
  import org.apache.cassandra.utils.CounterId;
  import org.apache.cassandra.utils.FBUtilities;
 +import org.apache.cassandra.utils.FilterFactory;
  
  import static org.junit.Assert.assertEquals;
 +import static org.junit.Assert.assertFalse;
  import static org.junit.Assert.assertNotNull;
 +import static org.junit.Assert.assertThat;
  import static org.junit.Assert.assertTrue;
  
  public class Util
@@@ -747,59 -712,4 +748,75 @@@
          PagingState.RowMark mark = PagingState.RowMark.create(metadata, row, protocolVersion);
          return new PagingState(pk, mark, 10, remainingInPartition);
      }
 +
 +    public static void assertRCEquals(ReplicaCollection<?> a, ReplicaCollection<?> b)
 +    {
 +        assertTrue(a + " not equal to " + b, Iterables.elementsEqual(a, b));
 +    }
 +
 +    public static void assertNotRCEquals(ReplicaCollection<?> a, ReplicaCollection<?> b)
 +    {
 +        assertFalse(a + " equal to " + b, Iterables.elementsEqual(a, b));
 +    }
 +
 +    /**
 +     * Makes sure that the sstables on disk are the same ones as the cfs live sstables (that they have the same generation)
 +     */
 +    public static void assertOnDiskState(ColumnFamilyStore cfs, int expectedSSTableCount)
 +    {
 +        LifecycleTransaction.waitForDeletions();
 +        assertEquals(expectedSSTableCount, cfs.getLiveSSTables().size());
 +        Set<Integer> liveGenerations = cfs.getLiveSSTables().stream().map(sstable -> sstable.descriptor.generation).collect(Collectors.toSet());
 +        int fileCount = 0;
 +        for (File f : cfs.getDirectories().getCFDirectories())
 +        {
 +            for (File sst : f.listFiles())
 +            {
 +                if (sst.getName().contains("Data"))
 +                {
 +                    Descriptor d = Descriptor.fromFilename(sst.getAbsolutePath());
 +                    assertTrue(liveGenerations.contains(d.generation));
 +                    fileCount++;
 +                }
 +            }
 +        }
 +        assertEquals(expectedSSTableCount, fileCount);
 +    }
 +
 +    /**
 +     * Disable bloom filter on all sstables of given table
 +     */
 +    public static void disableBloomFilter(ColumnFamilyStore cfs)
 +    {
 +        Collection<SSTableReader> sstables = cfs.getLiveSSTables();
 +        try (LifecycleTransaction txn = cfs.getTracker().tryModify(sstables, OperationType.UNKNOWN))
 +        {
 +            for (SSTableReader sstable : sstables)
 +            {
 +                sstable = sstable.cloneAndReplace(FilterFactory.AlwaysPresent);
 +                txn.update(sstable, true);
 +                txn.checkpoint();
 +            }
 +            txn.finish();
 +        }
 +
 +        for (SSTableReader reader : cfs.getLiveSSTables())
 +            assertEquals(FilterFactory.AlwaysPresent, reader.getBloomFilter());
 +    }
++
++    /**
++     * Setups Gossiper to mimic the upgrade behaviour when {@link Gossiper#isUpgradingFromVersionLowerThan(CassandraVersion)}
++     * or {@link Gossiper#hasMajorVersion3Nodes()} is called.
++     */
++    public static void setUpgradeFromVersion(String version)
++    {
++        int v = Optional.ofNullable(Gossiper.instance.getEndpointStateForEndpoint(FBUtilities.getBroadcastAddressAndPort()))
++                        .map(ep -> ep.getApplicationState(ApplicationState.RELEASE_VERSION))
++                        .map(rv -> rv.version)
++                        .orElse(0);
++
++        Gossiper.instance.addLocalApplicationState(ApplicationState.RELEASE_VERSION,
++                                                   VersionedValue.unsafeMakeVersionedValue(version, v + 1));
++        Gossiper.instance.expireUpgradeFromVersion();
++    }
  }
diff --cc test/unit/org/apache/cassandra/cql3/validation/operations/InsertUpdateIfConditionTest.java
index edfe57a,e86071a..59770b8
--- a/test/unit/org/apache/cassandra/cql3/validation/operations/InsertUpdateIfConditionTest.java
+++ b/test/unit/org/apache/cassandra/cql3/validation/operations/InsertUpdateIfConditionTest.java
@@@ -19,23 -19,23 +19,69 @@@
  package org.apache.cassandra.cql3.validation.operations;
  
  import java.nio.ByteBuffer;
++import java.util.Arrays;
++import java.util.Collection;
  import java.util.List;
  
++import org.junit.Before;
++import org.junit.BeforeClass;
  import org.junit.Test;
++import org.junit.runner.RunWith;
++import org.junit.runners.Parameterized;
  
- import org.apache.cassandra.schema.SchemaConstants;
 -import org.apache.cassandra.config.SchemaConstants;
++import org.apache.cassandra.Util;
  import org.apache.cassandra.cql3.CQLTester;
  import org.apache.cassandra.cql3.Duration;
++import org.apache.cassandra.db.SystemKeyspace;
  import org.apache.cassandra.exceptions.InvalidRequestException;
  import org.apache.cassandra.exceptions.SyntaxException;
++import org.apache.cassandra.gms.Gossiper;
++import org.apache.cassandra.schema.SchemaConstants;
  import org.apache.cassandra.schema.SchemaKeyspace;
++import org.apache.cassandra.utils.CassandraVersion;
  
  import static java.lang.String.format;
  import static org.junit.Assert.assertEquals;
++import static org.junit.Assert.assertFalse;
  import static org.junit.Assert.assertTrue;
  
++@RunWith(Parameterized.class)
  public class InsertUpdateIfConditionTest extends CQLTester
  {
++    @Parameterized.Parameter(0)
++    public String clusterMinVersion;
++
++    @Parameterized.Parameter(1)
++    public Runnable assertion;
++
++    @Parameterized.Parameters(name = "{index}: clusterMinVersion={0}")
++    public static Collection<Object[]> data()
++    {
++        return Arrays.asList(new Object[]{ "3.0", (Runnable) () -> {
++                                 assertTrue(Gossiper.instance.isUpgradingFromVersionLowerThan(new CassandraVersion("3.11")));
++                             } },
++                             new Object[]{ "3.11", (Runnable) () -> {
++                                 assertTrue(Gossiper.instance.isUpgradingFromVersionLowerThan(new CassandraVersion("4.0")));
++                                 assertFalse(Gossiper.instance.isUpgradingFromVersionLowerThan(new CassandraVersion("3.11")));
++                             } },
++                             new Object[]{ SystemKeyspace.CURRENT_VERSION.toString(), (Runnable) () -> {
++                                 assertFalse(Gossiper.instance.isUpgradingFromVersionLowerThan(new CassandraVersion("4.0")));
++                             } });
++    }
++
++    @BeforeClass
++    public static void beforeClass()
++    {
++        Gossiper.instance.maybeInitializeLocalState(0);
++    }
++
++    @Before
++    public void before()
++    {
++        Util.setUpgradeFromVersion(clusterMinVersion);
++        assertion.run();
++    }
++
      /**
       * Migrated from cql_tests.py:TestCQL.cas_simple_test()
       */
@@@ -1020,7 -1032,7 +1066,7 @@@
      }
  
      /**
--     * Test expanded functionality from CASSANDRA-6839, 
++     * Test expanded functionality from CASSANDRA-6839,
       * migrated from cql_tests.py:TestCQL.expanded_list_item_conditional_test()
       */
      @Test
diff --cc test/unit/org/apache/cassandra/db/filter/ColumnFilterTest.java
index 396a2e9,1ddf039..a96aa57
--- a/test/unit/org/apache/cassandra/db/filter/ColumnFilterTest.java
+++ b/test/unit/org/apache/cassandra/db/filter/ColumnFilterTest.java
@@@ -18,12 -18,23 +18,20 @@@
  
  package org.apache.cassandra.db.filter;
  
- import static org.junit.Assert.assertEquals;
- import static org.junit.Assert.assertFalse;
- import static org.junit.Assert.assertNull;
- import static org.junit.Assert.assertTrue;
+ import java.io.IOException;
+ import java.util.Arrays;
+ import java.util.Collection;
+ import java.util.function.Consumer;
  
 -import com.google.common.base.Throwables;
+ import org.junit.Assert;
+ import org.junit.Before;
+ import org.junit.BeforeClass;
+ import org.junit.Test;
+ import org.junit.runner.RunWith;
+ import org.junit.runners.Parameterized;
+ 
 -import org.apache.cassandra.config.CFMetaData;
 -import org.apache.cassandra.config.ColumnDefinition;
 -import org.apache.cassandra.config.DatabaseDescriptor;
 -import org.apache.cassandra.db.PartitionColumns;
++import org.apache.cassandra.Util;
 +import org.apache.cassandra.db.RegularAndStaticColumns;
  import org.apache.cassandra.db.marshal.Int32Type;
  import org.apache.cassandra.db.marshal.SetType;
  import org.apache.cassandra.db.rows.CellPath;
@@@ -32,285 -44,404 +41,452 @@@ import org.apache.cassandra.io.util.Dat
  import org.apache.cassandra.io.util.DataInputPlus;
  import org.apache.cassandra.io.util.DataOutputBuffer;
  import org.apache.cassandra.net.MessagingService;
 +import org.apache.cassandra.schema.ColumnMetadata;
 +import org.apache.cassandra.schema.TableMetadata;
  import org.apache.cassandra.utils.ByteBufferUtil;
- import org.junit.Test;
++import org.apache.cassandra.utils.Throwables;
  
- import org.junit.Assert;
+ import static org.junit.Assert.assertEquals;
  
+ @RunWith(Parameterized.class)
  public class ColumnFilterTest
  {
      private static final ColumnFilter.Serializer serializer = new ColumnFilter.Serializer();
  
 -    private final CFMetaData metadata = CFMetaData.Builder.create("ks", "table")
 -                                                          .withPartitioner(Murmur3Partitioner.instance)
 -                                                          .addPartitionKey("pk", Int32Type.instance)
 -                                                          .addClusteringColumn("ck", Int32Type.instance)
 -                                                          .addStaticColumn("s1", Int32Type.instance)
 -                                                          .addStaticColumn("s2", SetType.getInstance(Int32Type.instance, true))
 -                                                          .addRegularColumn("v1", Int32Type.instance)
 -                                                          .addRegularColumn("v2", SetType.getInstance(Int32Type.instance, true))
 -                                                          .build();
 -
 -    private final ColumnDefinition s1 = metadata.getColumnDefinition(ByteBufferUtil.bytes("s1"));
 -    private final ColumnDefinition s2 = metadata.getColumnDefinition(ByteBufferUtil.bytes("s2"));
 -    private final ColumnDefinition v1 = metadata.getColumnDefinition(ByteBufferUtil.bytes("v1"));
 -    private final ColumnDefinition v2 = metadata.getColumnDefinition(ByteBufferUtil.bytes("v2"));
++    @Parameterized.Parameter
++    public String clusterMinVersion;
++
++    private final TableMetadata metadata = TableMetadata.builder("ks", "table")
++                                                        .partitioner(Murmur3Partitioner.instance)
++                                                        .addPartitionKeyColumn("pk", Int32Type.instance)
++                                                        .addClusteringColumn("ck", Int32Type.instance)
++                                                        .addStaticColumn("s1", Int32Type.instance)
++                                                        .addStaticColumn("s2", SetType.getInstance(Int32Type.instance, true))
++                                                        .addRegularColumn("v1", Int32Type.instance)
++                                                        .addRegularColumn("v2", SetType.getInstance(Int32Type.instance, true))
++                                                        .build();
++
++    private final ColumnMetadata s1 = metadata.getColumn(ByteBufferUtil.bytes("s1"));
++    private final ColumnMetadata s2 = metadata.getColumn(ByteBufferUtil.bytes("s2"));
++    private final ColumnMetadata v1 = metadata.getColumn(ByteBufferUtil.bytes("v1"));
++    private final ColumnMetadata v2 = metadata.getColumn(ByteBufferUtil.bytes("v2"));
+     private final CellPath path0 = CellPath.create(ByteBufferUtil.bytes(0));
+     private final CellPath path1 = CellPath.create(ByteBufferUtil.bytes(1));
+     private final CellPath path2 = CellPath.create(ByteBufferUtil.bytes(2));
+     private final CellPath path3 = CellPath.create(ByteBufferUtil.bytes(3));
+     private final CellPath path4 = CellPath.create(ByteBufferUtil.bytes(4));
+ 
 -    @Parameterized.Parameter
 -    public boolean anyNodeOn30;
+ 
 -    @Parameterized.Parameters(name = "{index}: anyNodeOn30={0}")
++    @Parameterized.Parameters(name = "{index}: clusterMinVersion={0}")
+     public static Collection<Object[]> data()
+     {
 -        return Arrays.asList(new Object[]{ true }, new Object[]{ false });
++        return Arrays.asList(new Object[]{ "3.0" }, new Object[]{ "3.11" }, new Object[]{ "4.0" });
+     }
+ 
+     @BeforeClass
+     public static void beforeClass()
+     {
 -        DatabaseDescriptor.clientInitialization();
++        Gossiper.instance.maybeInitializeLocalState(0);
+     }
+ 
+     @Before
+     public void before()
+     {
 -        Gossiper.instance.setAnyNodeOn30(anyNodeOn30);
++        Util.setUpgradeFromVersion(clusterMinVersion);
+     }
+ 
+     // Select all
+ 
+     @Test
+     public void testSelectAll()
+     {
+         Consumer<ColumnFilter> check = filter -> {
+             testRoundTrips(filter);
+             assertEquals("*/*", filter.toString());
+             assertFetchedQueried(true, true, filter, v1, v2, s1, s2);
+             assertCellFetchedQueried(true, true, filter, v2, path0, path1, path2, path3, path4);
+             assertCellFetchedQueried(true, true, filter, s2, path0, path1, path2, path3, path4);
+         };
+ 
+         check.accept(ColumnFilter.all(metadata));
 -        check.accept(ColumnFilter.allColumnsBuilder(metadata).build());
++        check.accept(ColumnFilter.allRegularColumnsBuilder(metadata).build());
+     }
+ 
+     // Selections
+ 
      @Test
-     public void testColumnFilterSerialisationRoundTrip() throws Exception
+     public void testSelectNothing()
      {
-         TableMetadata metadata = TableMetadata.builder("ks", "table")
-                                               .partitioner(Murmur3Partitioner.instance)
-                                               .addPartitionKeyColumn("pk", Int32Type.instance)
-                                               .addClusteringColumn("ck", Int32Type.instance)
-                                               .addRegularColumn("v1", Int32Type.instance)
-                                               .addRegularColumn("v2", Int32Type.instance)
-                                               .addRegularColumn("v3", Int32Type.instance)
-                                               .build();
- 
-         ColumnMetadata v1 = metadata.getColumn(ByteBufferUtil.bytes("v1"));
- 
-         ColumnFilter columnFilter;
- 
-         columnFilter = ColumnFilter.all(metadata);
-         testRoundTrip(columnFilter, ColumnFilter.Serializer.maybeUpdateForBackwardCompatility(columnFilter, MessagingService.VERSION_30), metadata, MessagingService.VERSION_30);
-         testRoundTrip(columnFilter, ColumnFilter.Serializer.maybeUpdateForBackwardCompatility(columnFilter, MessagingService.VERSION_3014), metadata, MessagingService.VERSION_3014);
-         testRoundTrip(columnFilter, metadata, MessagingService.VERSION_40);
- 
-         testRoundTrip(ColumnFilter.selection(metadata.regularAndStaticColumns().without(v1)), metadata, MessagingService.VERSION_30);
-         testRoundTrip(ColumnFilter.selection(metadata.regularAndStaticColumns().without(v1)), metadata, MessagingService.VERSION_3014);
-         testRoundTrip(ColumnFilter.selection(metadata.regularAndStaticColumns().without(v1)), metadata, MessagingService.VERSION_40);
- 
-         columnFilter = ColumnFilter.selection(metadata, metadata.regularAndStaticColumns().without(v1));
-         testRoundTrip(columnFilter, ColumnFilter.Serializer.maybeUpdateForBackwardCompatility(columnFilter, MessagingService.VERSION_30), metadata, MessagingService.VERSION_30);
-         testRoundTrip(columnFilter, ColumnFilter.Serializer.maybeUpdateForBackwardCompatility(columnFilter, MessagingService.VERSION_3014), metadata, MessagingService.VERSION_3014);
-         testRoundTrip(columnFilter, metadata, MessagingService.VERSION_40);
- 
-         // Table with static column
-         metadata = TableMetadata.builder("ks", "table")
-                                 .partitioner(Murmur3Partitioner.instance)
-                                 .addPartitionKeyColumn("pk", Int32Type.instance)
-                                 .addClusteringColumn("ck", Int32Type.instance)
-                                 .addStaticColumn("s1", Int32Type.instance)
-                                 .addRegularColumn("v1", Int32Type.instance)
-                                 .addRegularColumn("v2", Int32Type.instance)
-                                 .addRegularColumn("v3", Int32Type.instance)
-                                 .build();
- 
-         v1 = metadata.getColumn(ByteBufferUtil.bytes("v1"));
-         ColumnMetadata s1 = metadata.getColumn(ByteBufferUtil.bytes("s1"));
- 
-         columnFilter = ColumnFilter.all(metadata);
-         testRoundTrip(columnFilter, ColumnFilter.Serializer.maybeUpdateForBackwardCompatility(columnFilter, MessagingService.VERSION_30), metadata, MessagingService.VERSION_30);
-         testRoundTrip(columnFilter, ColumnFilter.Serializer.maybeUpdateForBackwardCompatility(columnFilter, MessagingService.VERSION_3014), metadata, MessagingService.VERSION_3014);
-         testRoundTrip(columnFilter, metadata, MessagingService.VERSION_40);
- 
-         testRoundTrip(ColumnFilter.selection(metadata.regularAndStaticColumns().without(v1)), metadata, MessagingService.VERSION_30);
-         testRoundTrip(ColumnFilter.selection(metadata.regularAndStaticColumns().without(v1)), metadata, MessagingService.VERSION_3014);
-         testRoundTrip(ColumnFilter.selection(metadata.regularAndStaticColumns().without(v1)), metadata, MessagingService.VERSION_40);
- 
-         testRoundTrip(ColumnFilter.selection(metadata.regularAndStaticColumns().without(v1).without(s1)), metadata, MessagingService.VERSION_30);
-         testRoundTrip(ColumnFilter.selection(metadata.regularAndStaticColumns().without(v1).without(s1)), metadata, MessagingService.VERSION_3014);
-         testRoundTrip(ColumnFilter.selection(metadata.regularAndStaticColumns().without(v1).without(s1)), metadata, MessagingService.VERSION_40);
- 
-         columnFilter = ColumnFilter.selection(metadata, metadata.regularAndStaticColumns().without(v1));
-         testRoundTrip(columnFilter, ColumnFilter.Serializer.maybeUpdateForBackwardCompatility(columnFilter, MessagingService.VERSION_30), metadata, MessagingService.VERSION_30);
-         testRoundTrip(columnFilter, ColumnFilter.Serializer.maybeUpdateForBackwardCompatility(columnFilter, MessagingService.VERSION_3014), metadata, MessagingService.VERSION_3014);
-         testRoundTrip(columnFilter, metadata, MessagingService.VERSION_40);
- 
-         columnFilter = ColumnFilter.selection(metadata, metadata.regularAndStaticColumns().without(v1).without(s1));
-         testRoundTrip(columnFilter, ColumnFilter.Serializer.maybeUpdateForBackwardCompatility(columnFilter, MessagingService.VERSION_30), metadata, MessagingService.VERSION_30);
-         testRoundTrip(columnFilter, ColumnFilter.Serializer.maybeUpdateForBackwardCompatility(columnFilter, MessagingService.VERSION_3014), metadata, MessagingService.VERSION_3014);
-         testRoundTrip(columnFilter, metadata, MessagingService.VERSION_40);
+         Consumer<ColumnFilter> check = filter -> {
+             testRoundTrips(filter);
+             assertEquals("[]", filter.toString());
+             assertFetchedQueried(false, false, filter, v1, v2, s1, s2);
+             assertCellFetchedQueried(false, false, filter, v2, path0, path1, path2, path3, path4);
+             assertCellFetchedQueried(false, false, filter, s2, path0, path1, path2, path3, path4);
+         };
+ 
 -        check.accept(ColumnFilter.selection(PartitionColumns.NONE));
++        check.accept(ColumnFilter.selection(RegularAndStaticColumns.NONE));
+         check.accept(ColumnFilter.selectionBuilder().build());
      }
  
      @Test
-     public void testColumnFilterConstruction()
+     public void testSelectSimpleColumn()
      {
-         // all regular column
-         TableMetadata metadata = TableMetadata.builder("ks", "table")
-                                               .partitioner(Murmur3Partitioner.instance)
-                                               .addPartitionKeyColumn("pk", Int32Type.instance)
-                                               .addClusteringColumn("ck", Int32Type.instance)
-                                               .addRegularColumn("v1", Int32Type.instance)
-                                               .addRegularColumn("v2", Int32Type.instance)
-                                               .addRegularColumn("v3", Int32Type.instance)
-                                               .build();
-         ColumnFilter columnFilter = ColumnFilter.all(metadata);
-         assertTrue(columnFilter.fetchAllRegulars);
-         assertEquals(metadata.regularAndStaticColumns(), columnFilter.fetched);
-         assertNull(columnFilter.queried);
-         assertEquals("*", columnFilter.toString());
- 
-         RegularAndStaticColumns queried = RegularAndStaticColumns.builder()
-                                                                  .add(metadata.getColumn(ByteBufferUtil.bytes("v1"))).build();
-         columnFilter = ColumnFilter.selection(queried);
-         assertFalse(columnFilter.fetchAllRegulars);
-         assertEquals(queried, columnFilter.fetched);
-         assertEquals(queried, columnFilter.queried);
-         assertEquals("v1", columnFilter.toString());
- 
-         // with static column
-         metadata = TableMetadata.builder("ks", "table")
-                                 .partitioner(Murmur3Partitioner.instance)
-                                 .addPartitionKeyColumn("pk", Int32Type.instance)
-                                 .addClusteringColumn("ck", Int32Type.instance)
-                                 .addStaticColumn("sc1", Int32Type.instance)
-                                 .addStaticColumn("sc2", Int32Type.instance)
-                                 .addRegularColumn("v1", Int32Type.instance)
-                                 .build();
- 
-         columnFilter = ColumnFilter.all(metadata);
-         assertTrue(columnFilter.fetchAllRegulars);
-         assertEquals(metadata.regularAndStaticColumns(), columnFilter.fetched);
-         assertNull(columnFilter.queried);
-         assertEquals("*", columnFilter.toString());
- 
-         queried = RegularAndStaticColumns.builder()
-                                          .add(metadata.getColumn(ByteBufferUtil.bytes("v1"))).build();
-         columnFilter = ColumnFilter.selection(metadata, queried);
-         assertEquals("v1", columnFilter.toString());
- 
-         // only static
-         metadata = TableMetadata.builder("ks", "table")
-                                 .partitioner(Murmur3Partitioner.instance)
-                                 .addPartitionKeyColumn("pk", Int32Type.instance)
-                                 .addClusteringColumn("ck", Int32Type.instance)
-                                 .addStaticColumn("sc", Int32Type.instance)
-                                 .build();
- 
-         columnFilter = ColumnFilter.all(metadata);
-         assertTrue(columnFilter.fetchAllRegulars);
-         assertEquals(metadata.regularAndStaticColumns(), columnFilter.fetched);
-         assertNull(columnFilter.queried);
-         assertEquals("*", columnFilter.toString());
- 
-         // with collection type
-         metadata = TableMetadata.builder("ks", "table")
-                                 .partitioner(Murmur3Partitioner.instance)
-                                 .addPartitionKeyColumn("pk", Int32Type.instance)
-                                 .addClusteringColumn("ck", Int32Type.instance)
-                                 .addRegularColumn("v1", Int32Type.instance)
-                                 .addRegularColumn("set", SetType.getInstance(Int32Type.instance, true))
-                                 .build();
- 
-         columnFilter = ColumnFilter.all(metadata);
-         assertTrue(columnFilter.fetchAllRegulars);
-         assertEquals(metadata.regularAndStaticColumns(), columnFilter.fetched);
-         assertNull(columnFilter.queried);
-         assertEquals("*", columnFilter.toString());
- 
-         columnFilter = ColumnFilter.selectionBuilder().add(metadata.getColumn(ByteBufferUtil.bytes("v1")))
-                                    .select(metadata.getColumn(ByteBufferUtil.bytes("set")), CellPath.create(ByteBufferUtil.bytes(1)))
-                                    .build();
-         assertEquals("set[1], v1", columnFilter.toString());
+         Consumer<ColumnFilter> check = filter -> {
+             testRoundTrips(filter);
+             assertEquals("[v1]", filter.toString());
+             assertFetchedQueried(true, true, filter, v1);
+             assertFetchedQueried(false, false, filter, v2, s1, s2);
+             assertCellFetchedQueried(false, false, filter, v2, path0, path1, path2, path3, path4);
+             assertCellFetchedQueried(false, false, filter, s2, path0, path1, path2, path3, path4);
+         };
+ 
 -        check.accept(ColumnFilter.selection(PartitionColumns.builder().add(v1).build()));
++        check.accept(ColumnFilter.selection(RegularAndStaticColumns.builder().add(v1).build()));
+         check.accept(ColumnFilter.selectionBuilder().add(v1).build());
      }
  
-     private static void testRoundTrip(ColumnFilter columnFilter, TableMetadata metadata, int version) throws Exception
+     @Test
+     public void testSelectComplexColumn()
      {
-         testRoundTrip(columnFilter, columnFilter, metadata, version);
+         Consumer<ColumnFilter> check = filter -> {
+             testRoundTrips(filter);
+             assertEquals("[v2]", filter.toString());
+             assertFetchedQueried(true, true, filter, v2);
+             assertFetchedQueried(false, false, filter, v1, s1, s2);
+             assertCellFetchedQueried(true, true, filter, v2, path0, path1, path2, path3, path4);
+             assertCellFetchedQueried(false, false, filter, s2, path0, path1, path2, path3, path4);
+         };
+ 
 -        check.accept(ColumnFilter.selection(PartitionColumns.builder().add(v2).build()));
++        check.accept(ColumnFilter.selection(RegularAndStaticColumns.builder().add(v2).build()));
+         check.accept(ColumnFilter.selectionBuilder().add(v2).build());
      }
  
-     private static void testRoundTrip(ColumnFilter columnFilter, ColumnFilter expected, TableMetadata metadata, int version) throws Exception
+     @Test
+     public void testSelectStaticColumn()
      {
-         DataOutputBuffer output = new DataOutputBuffer();
-         serializer.serialize(columnFilter, output, version);
-         Assert.assertEquals(serializer.serializedSize(columnFilter, version), output.position());
-         DataInputPlus input = new DataInputBuffer(output.buffer(), false);
-         ColumnFilter deserialized = serializer.deserialize(input, version, metadata);
-         Assert.assertEquals(deserialized, expected);
+         Consumer<ColumnFilter> check = filter -> {
+             testRoundTrips(filter);
+             assertEquals("[s1]", filter.toString());
+             assertFetchedQueried(true, true, filter, s1);
+             assertFetchedQueried(false, false, filter, v1, v2, s2);
+             assertCellFetchedQueried(false, false, filter, v2, path0, path1, path2, path3, path4);
+             assertCellFetchedQueried(false, false, filter, s2, path0, path1, path2, path3, path4);
+         };
+ 
 -        check.accept(ColumnFilter.selection(PartitionColumns.builder().add(s1).build()));
++        check.accept(ColumnFilter.selection(RegularAndStaticColumns.builder().add(s1).build()));
+         check.accept(ColumnFilter.selectionBuilder().add(s1).build());
      }
  
-     /**
-      * Tests whether a filter fetches and/or queries columns and cells.
-      */
      @Test
-     public void testFetchedQueried()
+     public void testSelectStaticComplexColumn()
      {
-         TableMetadata metadata = TableMetadata.builder("ks", "table")
-                                               .partitioner(Murmur3Partitioner.instance)
-                                               .addPartitionKeyColumn("k", Int32Type.instance)
-                                               .addRegularColumn("simple", Int32Type.instance)
-                                               .addRegularColumn("complex", SetType.getInstance(Int32Type.instance, true))
-                                               .build();
- 
-         ColumnMetadata simple = metadata.getColumn(ByteBufferUtil.bytes("simple"));
-         ColumnMetadata complex = metadata.getColumn(ByteBufferUtil.bytes("complex"));
-         CellPath path1 = CellPath.create(ByteBufferUtil.bytes(1));
-         CellPath path2 = CellPath.create(ByteBufferUtil.bytes(2));
-         ColumnFilter filter;
- 
-         // select only the simple column, without table metadata
-         filter = ColumnFilter.selection(RegularAndStaticColumns.builder().add(simple).build());
-         assertFetchedQueried(true, true, filter, simple);
-         assertFetchedQueried(false, false, filter, complex);
-         assertFetchedQueried(false, false, filter, complex, path1);
-         assertFetchedQueried(false, false, filter, complex, path2);
- 
-         // select only the complex column, without table metadata
-         filter = ColumnFilter.selection(RegularAndStaticColumns.builder().add(complex).build());
-         assertFetchedQueried(false, false, filter, simple);
-         assertFetchedQueried(true, true, filter, complex);
-         assertFetchedQueried(true, true, filter, complex, path1);
-         assertFetchedQueried(true, true, filter, complex, path2);
- 
-         // select both the simple and complex columns, without table metadata
-         filter = ColumnFilter.selection(RegularAndStaticColumns.builder().add(simple).add(complex).build());
-         assertFetchedQueried(true, true, filter, simple);
-         assertFetchedQueried(true, true, filter, complex);
-         assertFetchedQueried(true, true, filter, complex, path1);
-         assertFetchedQueried(true, true, filter, complex, path2);
- 
-         // select only the simple column, with table metadata
-         filter = ColumnFilter.selection(metadata, RegularAndStaticColumns.builder().add(simple).build());
-         assertFetchedQueried(true, true, filter, simple);
-         assertFetchedQueried(true, false, filter, complex);
-         assertFetchedQueried(true, false, filter, complex, path1);
-         assertFetchedQueried(true, false, filter, complex, path2);
- 
-         // select only the complex column, with table metadata
-         filter = ColumnFilter.selection(metadata, RegularAndStaticColumns.builder().add(complex).build());
-         assertFetchedQueried(true, false, filter, simple);
-         assertFetchedQueried(true, true, filter, complex);
-         assertFetchedQueried(true, true, filter, complex, path1);
-         assertFetchedQueried(true, true, filter, complex, path2);
- 
-         // select both the simple and complex columns, with table metadata
-         filter = ColumnFilter.selection(metadata, RegularAndStaticColumns.builder().add(simple).add(complex).build());
-         assertFetchedQueried(true, true, filter, simple);
-         assertFetchedQueried(true, true, filter, complex);
-         assertFetchedQueried(true, true, filter, complex, path1);
-         assertFetchedQueried(true, true, filter, complex, path2);
- 
-         // select only the simple column, with selection builder
-         filter = ColumnFilter.selectionBuilder().add(simple).build();
-         assertFetchedQueried(true, true, filter, simple);
-         assertFetchedQueried(false, false, filter, complex);
-         assertFetchedQueried(false, false, filter, complex, path1);
-         assertFetchedQueried(false, false, filter, complex, path2);
- 
-         // select only a cell of the complex column, with selection builder
-         filter = ColumnFilter.selectionBuilder().select(complex, path1).build();
-         assertFetchedQueried(false, false, filter, simple);
-         assertFetchedQueried(true, true, filter, complex);
-         assertFetchedQueried(true, true, filter, complex, path1);
-         assertFetchedQueried(true, false, filter, complex, path2);
- 
-         // select both the simple column and a cell of the complex column, with selection builder
-         filter = ColumnFilter.selectionBuilder().add(simple).select(complex, path1).build();
-         assertFetchedQueried(true, true, filter, simple);
-         assertFetchedQueried(true, true, filter, complex);
-         assertFetchedQueried(true, true, filter, complex, path1);
-         assertFetchedQueried(true, false, filter, complex, path2);
+         Consumer<ColumnFilter> check = filter -> {
+             testRoundTrips(filter);
+             assertEquals("[s2]", filter.toString());
+             assertFetchedQueried(true, true, filter, s2);
+             assertFetchedQueried(false, false, filter, v1, v2, s1);
+             assertCellFetchedQueried(false, false, filter, v2, path0, path1, path2, path3, path4);
+             assertCellFetchedQueried(true, true, filter, s2, path0, path1, path2, path3, path4);
+         };
+ 
 -        check.accept(ColumnFilter.selection(PartitionColumns.builder().add(s2).build()));
++        check.accept(ColumnFilter.selection(RegularAndStaticColumns.builder().add(s2).build()));
+         check.accept(ColumnFilter.selectionBuilder().add(s2).build());
      }
  
-     private static void assertFetchedQueried(boolean expectedFetched,
-                                              boolean expectedQueried,
-                                              ColumnFilter filter,
-                                              ColumnMetadata column)
+     @Test
+     public void testSelectColumns()
      {
-         assert !expectedQueried || expectedFetched;
-         boolean actualFetched = filter.fetches(column);
-         assertEquals(expectedFetched, actualFetched);
-         assertEquals(expectedQueried, actualFetched && filter.fetchedColumnIsQueried(column));
+         Consumer<ColumnFilter> check = filter -> {
+             testRoundTrips(filter);
+             assertEquals("[s1, s2, v1, v2]", filter.toString());
+             assertFetchedQueried(true, true, filter, v1, v2, s1, s2);
+             assertCellFetchedQueried(true, true, filter, v2, path0, path1, path2, path3, path4);
+             assertCellFetchedQueried(true, true, filter, s2, path0, path1, path2, path3, path4);
+         };
+ 
 -        check.accept(ColumnFilter.selection(PartitionColumns.builder().add(v1).add(v2).add(s1).add(s2).build()));
++        check.accept(ColumnFilter.selection(RegularAndStaticColumns.builder().add(v1).add(v2).add(s1).add(s2).build()));
+         check.accept(ColumnFilter.selectionBuilder().add(v1).add(v2).add(s1).add(s2).build());
      }
  
+     @Test
+     public void testSelectIndividualCells()
+     {
+         ColumnFilter filter = ColumnFilter.selectionBuilder().select(v2, path1).select(v2, path3).build();
+         testRoundTrips(filter);
+         assertEquals("[v2[1], v2[3]]", filter.toString());
+         assertFetchedQueried(true, true, filter, v2);
+         assertFetchedQueried(false, false, filter, v1, s1, s2);
+         assertCellFetchedQueried(true, true, filter, v2, path1, path3);
+         assertCellFetchedQueried(false, false, filter, v2, path0, path2, path4);
+         assertCellFetchedQueried(false, false, filter, s2, path0, path1, path2, path3, path4);
+     }
+ 
+     @Test
+     public void testSelectIndividualCellsFromStatic()
+     {
+         ColumnFilter filter = ColumnFilter.selectionBuilder().select(s2, path1).select(s2, path3).build();
+         testRoundTrips(filter);
+         assertEquals("[s2[1], s2[3]]", filter.toString());
+         assertFetchedQueried(true, true, filter, s2);
+         assertFetchedQueried(false, false, filter, v1, v2, s1);
+         assertCellFetchedQueried(false, false, filter, v2, path0, path1, path2, path3, path4);
+         assertCellFetchedQueried(true, true, filter, s2, path1, path3);
+         assertCellFetchedQueried(false, false, filter, s2, path0, path2, path4);
+     }
+ 
+     @Test
+     public void testSelectCellSlice()
+     {
+         ColumnFilter filter = ColumnFilter.selectionBuilder().slice(v2, path1, path3).build();
+         testRoundTrips(filter);
+         assertEquals("[v2[1:3]]", filter.toString());
+         assertFetchedQueried(true, true, filter, v2);
+         assertFetchedQueried(false, false, filter, v1, s1, s2);
+         assertCellFetchedQueried(true, true, filter, v2, path1, path2, path3);
+         assertCellFetchedQueried(false, false, filter, v2, path0, path4);
+         assertCellFetchedQueried(false, false, filter, s2, path0, path1, path2, path3, path4);
+     }
+ 
+     @Test
+     public void testSelectCellSliceFromStatic()
+     {
+         ColumnFilter filter = ColumnFilter.selectionBuilder().slice(s2, path1, path3).build();
+         testRoundTrips(filter);
+         assertEquals("[s2[1:3]]", filter.toString());
+         assertFetchedQueried(true, true, filter, s2);
+         assertFetchedQueried(false, false, filter, v1, v2, s1);
+         assertCellFetchedQueried(false, false, filter, v2, path0, path1, path2, path3, path4);
+         assertCellFetchedQueried(true, true, filter, s2, path1, path2, path3);
+         assertCellFetchedQueried(false, false, filter, s2, path0, path4);
+     }
+ 
+     @Test
+     public void testSelectColumnsWithCellsAndSlices()
+     {
+         ColumnFilter filter = ColumnFilter.selectionBuilder()
+                                           .add(v1)
+                                           .add(s1)
+                                           .slice(v2, path0, path2)
+                                           .select(v2, path4)
+                                           .select(s2, path0)
+                                           .slice(s2, path2, path4)
+                                           .build();
+         testRoundTrips(filter);
+         assertEquals("[s1, s2[0], s2[2:4], v1, v2[0:2], v2[4]]", filter.toString());
+         assertFetchedQueried(true, true, filter, v1, v2, s1, s2);
+         assertCellFetchedQueried(true, true, filter, v2, path0, path1, path2, path4);
+         assertCellFetchedQueried(false, false, filter, v2, path3);
+         assertCellFetchedQueried(true, true, filter, s2, path0, path2, path3, path4);
+         assertCellFetchedQueried(false, false, filter, s2, path1);
+     }
+ 
+     // select with metadata
+ 
+     @Test
+     public void testSelectSimpleColumnWithMetadata()
+     {
+         Consumer<ColumnFilter> check = filter -> {
+             testRoundTrips(filter);
+             assertFetchedQueried(true, true, filter, v1);
 -            if (anyNodeOn30)
++            if ("3.0".equals(clusterMinVersion))
+             {
+                 assertEquals("*/*", filter.toString());
+                 assertFetchedQueried(true, true, filter, s1, s2, v2);
+                 assertCellFetchedQueried(true, true, filter, v2, path0, path1, path2, path3, path4);
+                 assertCellFetchedQueried(true, true, filter, s2, path0, path1, path2, path3, path4);
+             }
 -            else
++            else if ("3.11".equals(clusterMinVersion))
+             {
+                 assertEquals("*/[v1]", filter.toString());
+                 assertFetchedQueried(true, false, filter, s1, s2, v2);
+                 assertCellFetchedQueried(true, false, filter, v2, path0, path1, path2, path3, path4);
+                 assertCellFetchedQueried(true, false, filter, s2, path0, path1, path2, path3, path4);
+             }
++            else
++            {
++                assertEquals("<all regulars>/[v1]", filter.toString());
++                assertFetchedQueried(true, false, filter, v2);
++                assertFetchedQueried(false, false, filter, s1, s2);
++                assertCellFetchedQueried(true, false, filter, v2, path0, path1, path2, path3, path4);
++                assertCellFetchedQueried(false, false, filter, s2, path0, path1, path2, path3, path4);
++            }
+         };
+ 
 -        check.accept(ColumnFilter.selection(metadata, PartitionColumns.builder().add(v1).build()));
 -        check.accept(ColumnFilter.allColumnsBuilder(metadata).add(v1).build());
++        check.accept(ColumnFilter.selection(metadata, RegularAndStaticColumns.builder().add(v1).build()));
++        check.accept(ColumnFilter.allRegularColumnsBuilder(metadata).add(v1).build());
+     }
+ 
+     @Test
+     public void testSelectStaticColumnWithMetadata()
+     {
+         Consumer<ColumnFilter> check = filter -> {
+             testRoundTrips(filter);
+             assertFetchedQueried(true, true, filter, s1);
 -            if (anyNodeOn30)
++            if ("3.0".equals(clusterMinVersion))
+             {
+                 assertEquals("*/*", filter.toString());
+                 assertFetchedQueried(true, true, filter, v1, v2, s2);
+                 assertCellFetchedQueried(true, true, filter, v2, path0, path1, path2, path3, path4);
+                 assertCellFetchedQueried(true, true, filter, s2, path0, path1, path2, path3, path4);
+             }
 -            else
++            else if ("3.11".equals(clusterMinVersion))
+             {
+                 assertEquals("*/[s1]", filter.toString());
+                 assertFetchedQueried(true, false, filter, v1, v2, s2);
+                 assertCellFetchedQueried(true, false, filter, v2, path0, path1, path2, path3, path4);
+                 assertCellFetchedQueried(false, false, filter, s2, path0, path1, path2, path3, path4);
+             }
++            else
++            {
++                assertEquals("<all regulars>+[s1]/[s1]", filter.toString());
++                assertFetchedQueried(true, false, filter, v1, v2);
++                assertFetchedQueried(false, false, filter, s2);
++                assertCellFetchedQueried(true, false, filter, v2, path0, path1, path2, path3, path4);
++                assertCellFetchedQueried(false, false, filter, s2, path0, path1, path2, path3, path4);
++            }
+         };
+ 
 -        check.accept(ColumnFilter.selection(metadata, PartitionColumns.builder().add(s1).build()));
 -        check.accept(ColumnFilter.allColumnsBuilder(metadata).add(s1).build());
++        check.accept(ColumnFilter.selection(metadata, RegularAndStaticColumns.builder().add(s1).build()));
++        check.accept(ColumnFilter.allRegularColumnsBuilder(metadata).add(s1).build());
+     }
+ 
+     @Test
+     public void testSelectCellWithMetadata()
+     {
 -        ColumnFilter filter = ColumnFilter.allColumnsBuilder(metadata).select(v2, path1).build();
++        ColumnFilter filter = ColumnFilter.allRegularColumnsBuilder(metadata).select(v2, path1).build();
+         testRoundTrips(filter);
+         assertFetchedQueried(true, true, filter, v2);
 -        if (anyNodeOn30)
++        if ("3.0".equals(clusterMinVersion))
+         {
+             assertEquals("*/*", filter.toString());
+             assertFetchedQueried(true, true, filter, s1, s2, v1);
+             assertCellFetchedQueried(true, true, filter, v2, path1);
+             assertCellFetchedQueried(true, false, filter, v2, path0, path2, path3, path4);
+             assertCellFetchedQueried(true, true, filter, s2, path0, path1, path2, path3, path4);
+         }
 -        else
++        else if ("3.11".equals(clusterMinVersion))
+         {
+             assertEquals("*/[v2[1]]", filter.toString());
+             assertFetchedQueried(true, false, filter, s1, s2, v1);
+             assertCellFetchedQueried(true, true, filter, v2, path1);
+             assertCellFetchedQueried(true, false, filter, v2, path0, path2, path3, path4);
+             assertCellFetchedQueried(true, false, filter, s2, path0, path1, path2, path3, path4);
+         }
++        else
++        {
++            assertEquals("<all regulars>/[v2[1]]", filter.toString());
++            assertFetchedQueried(true, false, filter, v1);
++            assertFetchedQueried(false, false, filter, s1, s2);
++            assertCellFetchedQueried(true, true, filter, v2, path1);
++            assertCellFetchedQueried(true, false, filter, v2, path0, path2, path3, path4);
++            assertCellFetchedQueried(false, false, filter, s2, path0, path1, path2, path3, path4);
++        }
+     }
+ 
+     @Test
+     public void testSelectStaticColumnCellWithMetadata()
+     {
 -        ColumnFilter filter = ColumnFilter.allColumnsBuilder(metadata).select(s2, path1).build();
++        ColumnFilter filter = ColumnFilter.allRegularColumnsBuilder(metadata).select(s2, path1).build();
+         testRoundTrips(filter);
+         assertFetchedQueried(true, true, filter, s2);
 -        if (anyNodeOn30)
++        if ("3.0".equals(clusterMinVersion))
+         {
+             assertEquals("*/*", filter.toString());
+             assertFetchedQueried(true, true, filter, v1, v2, s1);
+             assertCellFetchedQueried(true, true, filter, v2, path0, path1, path2, path3, path4);
+             assertCellFetchedQueried(true, true, filter, s2, path1);
 -            assertCellFetchedQueried(true, false, filter, s2, path0, path2, path3, path4);
++            assertCellFetchedQueried(true, false, filter, s2, path0, path2, path3, path4);  // TODO ???
+         }
 -        else
++        else if ("3.11".equals(clusterMinVersion))
+         {
+             assertEquals("*/[s2[1]]", filter.toString());
+             assertFetchedQueried(true, false, filter, v1, v2, s1);
+             assertCellFetchedQueried(true, false, filter, v2, path0, path1, path2, path3, path4);
+             assertCellFetchedQueried(true, true, filter, s2, path1);
+             assertCellFetchedQueried(true, false, filter, s2, path0, path2, path3, path4);
+         }
++        else
++        {
++            assertEquals("<all regulars>+[s2[1]]/[s2[1]]", filter.toString());
++            assertFetchedQueried(true, false, filter, v1, v2);
++            assertFetchedQueried(false, false, filter, s1);
++            assertCellFetchedQueried(false, false, filter, v2, path0, path1, path2, path3, path4);
++            assertCellFetchedQueried(true, true, filter, s2, path1);
++            assertCellFetchedQueried(false, false, filter, s2, path0, path2, path3, path4);
++        }
+     }
+ 
+     private void testRoundTrips(ColumnFilter cf)
+     {
+         testRoundTrip(cf, MessagingService.VERSION_30);
+         testRoundTrip(cf, MessagingService.VERSION_3014);
++        testRoundTrip(cf, MessagingService.VERSION_40);
+     }
+ 
+     private void testRoundTrip(ColumnFilter columnFilter, int version)
+     {
+         try
+         {
+             DataOutputBuffer output = new DataOutputBuffer();
+             serializer.serialize(columnFilter, output, version);
+             Assert.assertEquals(serializer.serializedSize(columnFilter, version), output.position());
+             DataInputPlus input = new DataInputBuffer(output.buffer(), false);
+             ColumnFilter deserialized = serializer.deserialize(input, version, metadata);
 -            Assert.assertEquals(deserialized, columnFilter);
++
++            if (!clusterMinVersion.equals("4.0") || version != MessagingService.VERSION_30 || !columnFilter.fetchAllRegulars)
++            {
++                Assert.assertEquals(deserialized, columnFilter);
++            }
++            else
++            {
++                Assert.assertEquals(deserialized.fetched, metadata.regularAndStaticColumns());
++            }
+         }
+         catch (IOException e)
+         {
 -            throw Throwables.propagate(e);
++            throw Throwables.cleaned(e);
+         }
+     }
+ 
++
      private static void assertFetchedQueried(boolean expectedFetched,
                                               boolean expectedQueried,
                                               ColumnFilter filter,
-                                              ColumnMetadata column,
-                                              CellPath path)
 -                                             ColumnDefinition... columns)
++                                             ColumnMetadata... columns)
+     {
 -        for (ColumnDefinition column : columns)
++        for (ColumnMetadata column : columns)
+         {
 -            assertEquals(String.format("Expected fetches(%s) to be %s", column.name, expectedFetched),
++            assertEquals(String.format("Expected fetches(%s) to be %s", column, expectedFetched),
+                          expectedFetched, filter.fetches(column));
+             if (expectedFetched)
 -                assertEquals(String.format("Expected fetchedColumnIsQueried(%s) to be %s", column.name, expectedQueried),
++                assertEquals(String.format("Expected fetchedColumnIsQueried(%s) to be %s", column, expectedQueried),
+                              expectedQueried, filter.fetchedColumnIsQueried(column));
+         }
+     }
+ 
+     private static void assertCellFetchedQueried(boolean expectedFetched,
+                                                  boolean expectedQueried,
+                                                  ColumnFilter filter,
 -                                                 ColumnDefinition column,
++                                                 ColumnMetadata column,
+                                                  CellPath... paths)
      {
-         assert !expectedQueried || expectedFetched;
-         boolean actualFetched = filter.fetches(column);
-         assertEquals(expectedFetched, actualFetched);
-         assertEquals(expectedQueried, actualFetched && filter.fetchedCellIsQueried(column, path));
+         ColumnFilter.Tester tester = filter.newTester(column);
+ 
+         for (CellPath path : paths)
+         {
+             int p = ByteBufferUtil.toInt(path.get(0));
+             if (expectedFetched)
 -                assertEquals(String.format("Expected fetchedCellIsQueried(%s:%s) to be %s", column.name, p, expectedQueried),
++                assertEquals(String.format("Expected fetchedCellIsQueried(%s:%s) to be %s", column, p, expectedQueried),
+                              expectedQueried, filter.fetchedCellIsQueried(column, path));
+ 
+             if (tester != null)
+             {
 -                assertEquals(String.format("Expected tester.fetches(%s:%s) to be %s", column.name, p, expectedFetched),
++                assertEquals(String.format("Expected tester.fetches(%s:%s) to be %s", column, p, expectedFetched),
+                              expectedFetched, tester.fetches(path));
+                 if (expectedFetched)
 -                    assertEquals(String.format("Expected tester.fetchedCellIsQueried(%s:%s) to be %s", column.name, p, expectedQueried),
++                    assertEquals(String.format("Expected tester.fetchedCellIsQueried(%s:%s) to be %s", column, p, expectedQueried),
+                                  expectedQueried, tester.fetchedCellIsQueried(path));
+             }
+         }
      }
 -}
 +}
diff --cc test/unit/org/apache/cassandra/gms/GossiperTest.java
index 1b17a27,b6b3ffb..74f3dbb
--- a/test/unit/org/apache/cassandra/gms/GossiperTest.java
+++ b/test/unit/org/apache/cassandra/gms/GossiperTest.java
@@@ -38,13 -37,13 +38,15 @@@ import org.apache.cassandra.db.commitlo
  import org.apache.cassandra.dht.IPartitioner;
  import org.apache.cassandra.dht.RandomPartitioner;
  import org.apache.cassandra.dht.Token;
 +import org.apache.cassandra.locator.InetAddressAndPort;
 +import org.apache.cassandra.locator.SeedProvider;
  import org.apache.cassandra.locator.TokenMetadata;
 -import org.apache.cassandra.net.MessagingService;
 -import org.apache.cassandra.schema.KeyspaceParams;
  import org.apache.cassandra.service.StorageService;
++import org.apache.cassandra.utils.CassandraVersion;
  
  import static org.junit.Assert.assertEquals;
  import static org.junit.Assert.assertFalse;
++import static org.junit.Assert.assertNull;
  import static org.junit.Assert.assertTrue;
  
  public class GossiperTest
@@@ -80,44 -74,7 +82,47 @@@
      }
  
      @Test
-     public void testHaveVersion3Nodes() throws Exception
 -    public void testLargeGenerationJump() throws UnknownHostException
++    public void testHasVersion3Nodes() throws Exception
 +    {
++        Gossiper.instance.expireUpgradeFromVersion();
++
 +        VersionedValue.VersionedValueFactory factory = new VersionedValue.VersionedValueFactory(null);
 +        EndpointState es = new EndpointState((HeartBeatState) null);
 +        es.addApplicationState(ApplicationState.RELEASE_VERSION, factory.releaseVersion("4.0-SNAPSHOT"));
 +        Gossiper.instance.endpointStateMap.put(InetAddressAndPort.getByName("127.0.0.1"), es);
 +        Gossiper.instance.liveEndpoints.add(InetAddressAndPort.getByName("127.0.0.1"));
 +
 +
 +        es = new EndpointState((HeartBeatState) null);
 +        es.addApplicationState(ApplicationState.RELEASE_VERSION, factory.releaseVersion("3.11.3"));
 +        Gossiper.instance.endpointStateMap.put(InetAddressAndPort.getByName("127.0.0.2"), es);
 +        Gossiper.instance.liveEndpoints.add(InetAddressAndPort.getByName("127.0.0.2"));
 +
- 
 +        es = new EndpointState((HeartBeatState) null);
 +        es.addApplicationState(ApplicationState.RELEASE_VERSION, factory.releaseVersion("3.0.0"));
 +        Gossiper.instance.endpointStateMap.put(InetAddressAndPort.getByName("127.0.0.3"), es);
 +        Gossiper.instance.liveEndpoints.add(InetAddressAndPort.getByName("127.0.0.3"));
 +
- 
-         assertTrue(Gossiper.instance.haveMajorVersion3NodesSupplier.get().value());
- 
-         Gossiper.instance.endpointStateMap.remove(InetAddressAndPort.getByName("127.0.0.2"));
-         Gossiper.instance.liveEndpoints.remove(InetAddressAndPort.getByName("127.0.0.2"));
- 
- 
-         assertTrue(Gossiper.instance.haveMajorVersion3NodesSupplier.get().value());
++        assertFalse(Gossiper.instance.upgradeFromVersionSupplier.get().value().compareTo(new CassandraVersion("3.0")) < 0);
++        assertTrue(Gossiper.instance.upgradeFromVersionSupplier.get().value().compareTo(new CassandraVersion("3.1")) < 0);
++        assertTrue(Gossiper.instance.hasMajorVersion3Nodes());
 +
 +        Gossiper.instance.endpointStateMap.remove(InetAddressAndPort.getByName("127.0.0.3"));
 +        Gossiper.instance.liveEndpoints.remove(InetAddressAndPort.getByName("127.0.0.3"));
 +
-         assertFalse(Gossiper.instance.haveMajorVersion3NodesSupplier.get().value());
++        assertFalse(Gossiper.instance.upgradeFromVersionSupplier.get().value().compareTo(new CassandraVersion("3.0")) < 0);
++        assertFalse(Gossiper.instance.upgradeFromVersionSupplier.get().value().compareTo(new CassandraVersion("3.1")) < 0);
++        assertTrue(Gossiper.instance.upgradeFromVersionSupplier.get().value().compareTo(new CassandraVersion("3.12")) < 0);
++        assertTrue(Gossiper.instance.hasMajorVersion3Nodes());
++
++        Gossiper.instance.endpointStateMap.remove(InetAddressAndPort.getByName("127.0.0.2"));
++        Gossiper.instance.liveEndpoints.remove(InetAddressAndPort.getByName("127.0.0.2"));
 +
++        assertNull(Gossiper.instance.upgradeFromVersionSupplier.get().value());
 +    }
 +
 +    @Test
 +    public void testLargeGenerationJump() throws UnknownHostException, InterruptedException
      {
          Util.createInitialRing(ss, partitioner, endpointTokens, keyTokens, hosts, hostIds, 2);
          try
@@@ -235,126 -192,53 +240,126 @@@
          }
      }
  
 +    // Note: This test might fail if for some reason the node broadcast address is in 127.99.0.0/16
      @Test
 -    public void testSchemaVersionUpdate() throws UnknownHostException, InterruptedException
 +    public void testReloadSeeds() throws UnknownHostException
      {
 -        Util.createInitialRing(ss, partitioner, endpointTokens, keyTokens, hosts, hostIds, 2);
 -        MessagingService.instance().listen();
 -        Gossiper.instance.start(1);
 -        InetAddress remoteHostAddress = hosts.get(1);
 -
 -        EndpointState initialRemoteState = Gossiper.instance.getEndpointStateForEndpoint(remoteHostAddress);
 -        // Set to any 3.0 version
 -        Gossiper.instance.injectApplicationState(remoteHostAddress, ApplicationState.RELEASE_VERSION, StorageService.instance.valueFactory.releaseVersion("3.0.14"));
 +        Gossiper gossiper = new Gossiper(false);
 +        List<String> loadedList;
 +
 +        // Initialize the seed list directly to a known set to start with
 +        gossiper.seeds.clear();
 +        InetAddressAndPort addr = InetAddressAndPort.getByAddress(InetAddress.getByName("127.99.1.1"));
 +        int nextSize = 4;
 +        List<InetAddressAndPort> nextSeeds = new ArrayList<>(nextSize);
 +        for (int i = 0; i < nextSize; i ++)
 +        {
 +            gossiper.seeds.add(addr);
 +            nextSeeds.add(addr);
 +            addr = InetAddressAndPort.getByAddress(InetAddresses.increment(addr.address));
 +        }
 +        Assert.assertEquals(nextSize, gossiper.seeds.size());
 +
 +        // Add another unique address to the list
 +        addr = InetAddressAndPort.getByAddress(InetAddresses.increment(addr.address));
 +        nextSeeds.add(addr);
 +        nextSize++;
 +        DatabaseDescriptor.setSeedProvider(new TestSeedProvider(nextSeeds));
 +        loadedList = gossiper.reloadSeeds();
 +
 +        // Check that the new entry was added
 +        Assert.assertEquals(nextSize, loadedList.size());
 +        for (InetAddressAndPort a : nextSeeds)
 +            assertTrue(loadedList.contains(a.toString()));
 +
 +        // Check that the return value of the reloadSeeds matches the content of the getSeeds call
 +        // and that they both match the internal contents of the Gossiper seeds list
 +        Assert.assertEquals(loadedList.size(), gossiper.getSeeds().size());
 +        for (InetAddressAndPort a : gossiper.seeds)
 +        {
 +            assertTrue(loadedList.contains(a.toString()));
 +            assertTrue(gossiper.getSeeds().contains(a.toString()));
 +        }
  
 -        Gossiper.instance.applyStateLocally(ImmutableMap.of(remoteHostAddress, initialRemoteState));
 +        // Add a duplicate of the last address to the seed provider list
 +        int uniqueSize = nextSize;
 +        nextSeeds.add(addr);
 +        nextSize++;
 +        DatabaseDescriptor.setSeedProvider(new TestSeedProvider(nextSeeds));
 +        loadedList = gossiper.reloadSeeds();
 +
 +        // Check that the number of seed nodes reported hasn't increased
 +        Assert.assertEquals(uniqueSize, loadedList.size());
 +        for (InetAddressAndPort a : nextSeeds)
 +            assertTrue(loadedList.contains(a.toString()));
 +
 +        // Create a new list that has no overlaps with the previous list
 +        addr = InetAddressAndPort.getByAddress(InetAddress.getByName("127.99.2.1"));
 +        int disjointSize = 3;
 +        List<InetAddressAndPort> disjointSeeds = new ArrayList<>(disjointSize);
 +        for (int i = 0; i < disjointSize; i ++)
 +        {
 +            disjointSeeds.add(addr);
 +            addr = InetAddressAndPort.getByAddress(InetAddresses.increment(addr.address));
 +        }
 +        DatabaseDescriptor.setSeedProvider(new TestSeedProvider(disjointSeeds));
 +        loadedList = gossiper.reloadSeeds();
  
 -        // wait until the schema is set
 -        VersionedValue schema = null;
 -        for (int i = 0; i < 10; i++)
 +        // Check that the list now contains exactly the new other list.
 +        Assert.assertEquals(disjointSize, gossiper.getSeeds().size());
 +        Assert.assertEquals(disjointSize, loadedList.size());
 +        for (InetAddressAndPort a : disjointSeeds)
          {
 -            EndpointState localState = Gossiper.instance.getEndpointStateForEndpoint(hosts.get(0));
 -            schema = localState.getApplicationState(ApplicationState.SCHEMA);
 -            if (schema != null)
 -                break;
 -            Thread.sleep(1000);
 +            assertTrue(gossiper.getSeeds().contains(a.toString()));
 +            assertTrue(loadedList.contains(a.toString()));
          }
  
 -        // schema is set and equals to "alternative" version
 -        assertTrue(schema != null);
 -        assertEquals(schema.value, Schema.instance.getAltVersion().toString());
 +        // Set the seed node provider to return an empty list
 +        DatabaseDescriptor.setSeedProvider(new TestSeedProvider(new ArrayList<InetAddressAndPort>()));
 +        loadedList = gossiper.reloadSeeds();
 +
 +        // Check that the in memory seed node list was not modified
 +        Assert.assertEquals(disjointSize, loadedList.size());
 +        for (InetAddressAndPort a : disjointSeeds)
 +            assertTrue(loadedList.contains(a.toString()));
 +
 +        // Change the seed provider to one that throws an unchecked exception
 +        DatabaseDescriptor.setSeedProvider(new ErrorSeedProvider());
 +        loadedList = gossiper.reloadSeeds();
 +
 +        // Check for the expected null response from a reload error
-         Assert.assertNull(loadedList);
++        assertNull(loadedList);
  
 -        // Upgrade remote host version to the latest one (3.11)
 -        Gossiper.instance.injectApplicationState(remoteHostAddress, ApplicationState.RELEASE_VERSION, StorageService.instance.valueFactory.releaseVersion());
 +        // Check that the in memory seed node list was not modified and the exception was caught
 +        Assert.assertEquals(disjointSize, gossiper.getSeeds().size());
 +        for (InetAddressAndPort a : disjointSeeds)
 +            assertTrue(gossiper.getSeeds().contains(a.toString()));
 +    }
  
 -        Gossiper.instance.applyStateLocally(ImmutableMap.of(remoteHostAddress, initialRemoteState));
 +    static class TestSeedProvider implements SeedProvider
 +    {
 +        private List<InetAddressAndPort> seeds;
  
 -        // wait until the schema change
 -        VersionedValue newSchema = null;
 -        for (int i = 0; i < 10; i++)
 +        TestSeedProvider(List<InetAddressAndPort> seeds)
          {
 -            EndpointState localState = Gossiper.instance.getEndpointStateForEndpoint(hosts.get(0));
 -            newSchema = localState.getApplicationState(ApplicationState.SCHEMA);
 -            if (!schema.value.equals(newSchema.value))
 -                break;
 -            Thread.sleep(1000);
 +            this.seeds = seeds;
          }
  
 -        // schema is changed and equals to real version
 -        assertFalse(schema.value.equals(newSchema.value));
 -        assertEquals(newSchema.value, Schema.instance.getRealVersion().toString());
 +        @Override
 +        public List<InetAddressAndPort> getSeeds()
 +        {
 +            return seeds;
 +        }
 +    }
 +
 +    // A seed provider for testing which throws assertion errors when queried
 +    static class ErrorSeedProvider implements SeedProvider
 +    {
 +        @Override
 +        public List<InetAddressAndPort> getSeeds()
 +        {
 +            assert(false);
 +            return new ArrayList<>();
 +        }
      }
  }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org