You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by bd...@apache.org on 2020/10/05 21:28:21 UTC

[cassandra] 01/01: Merge branch 'cassandra-3.11' into trunk

This is an automated email from the ASF dual-hosted git repository.

bdeggleston pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit 7a63cc2c3319994dd1c6197a61553ab339c238d0
Merge: ba63fa3 9bf1ab1
Author: Blake Eggleston <bd...@gmail.com>
AuthorDate: Mon Oct 5 14:21:08 2020 -0700

    Merge branch 'cassandra-3.11' into trunk

 CHANGES.txt                                        |   1 +
 src/java/org/apache/cassandra/db/Columns.java      |  19 +++-
 .../apache/cassandra/db/SerializationHeader.java   |   4 +-
 .../apache/cassandra/db/filter/ColumnFilter.java   |   8 +-
 .../cassandra/db/partitions/PartitionUpdate.java   |   9 ++
 .../cassandra/db/rows/SerializationHelper.java     |  12 +++
 .../cassandra/db/rows/UnfilteredSerializer.java    |  23 ++--
 .../apache/cassandra/schema/ColumnMetadata.java    |  25 ++++-
 .../utils/btree/LeafBTreeSearchIterator.java       |   2 +-
 .../cassandra/distributed/test/SchemaTest.java     | 117 +++++++++++++++++++++
 .../distributed/test/SimpleReadWriteTest.java      |  91 +++++++++++++---
 test/unit/org/apache/cassandra/db/ColumnsTest.java |   2 +-
 12 files changed, 279 insertions(+), 34 deletions(-)

diff --cc CHANGES.txt
index f576dbf,99369fa..a990fb0
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,58 -1,18 +1,59 @@@
 -3.11.9
 +4.0-beta3
 + * Use unsigned short in ValueAccessor.sliceWithShortLength (CASSANDRA-16147)
 + * Abort repairs when getting a truncation request (CASSANDRA-15854)
 + * Remove bad assert when getting active compactions for an sstable (CASSANDRA-15457)
 + * Avoid failing compactions with very large partitions (CASSANDRA-15164)
 + * Prevent NPE in StreamMessage in type lookup (CASSANDRA-16131)
 + * Avoid invalid state transition exception during incremental repair (CASSANDRA-16067)
 + * Allow zero padding in timestamp serialization (CASSANDRA-16105)
 + * Add byte array backed cells (CASSANDRA-15393)
 + * Correctly handle pending ranges with adjacent range movements (CASSANDRA-14801)
 + * Avoid adding locahost when streaming trivial ranges (CASSANDRA-16099)
 + * Add nodetool getfullquerylog (CASSANDRA-15988)
 + * Fix yaml format and alignment in tpstats (CASSANDRA-11402)
 + * Avoid trying to keep track of RTs for endpoints we won't write to during read repair (CASSANDRA-16084)
 + * When compaction gets interrupted, the exception should include the compactionId (CASSANDRA-15954)
 + * Make Table/Keyspace Metric Names Consistent With Each Other (CASSANDRA-15909)
 + * Mutating sstable component may race with entire-sstable-streaming(ZCS) causing checksum validation failure (CASSANDRA-15861)
 + * NPE thrown while updating speculative execution time if keyspace is removed during task execution (CASSANDRA-15949)
 + * Show the progress of data streaming and index build (CASSANDRA-15406)
 + * Add flag to disable chunk cache and disable by default (CASSANDRA-16036)
 +Merged from 3.11:
   * Fix memory leak in CompressedChunkReader (CASSANDRA-15880)
   * Don't attempt value skipping with mixed version cluster (CASSANDRA-15833)
 - * Avoid failing compactions with very large partitions (CASSANDRA-15164)
 + * Use IF NOT EXISTS for index and UDT create statements in snapshot schema files (CASSANDRA-13935)
   * Make sure LCS handles duplicate sstable added/removed notifications correctly (CASSANDRA-14103)
  Merged from 3.0:
+  * Handle unexpected columns due to schema races (CASSANDRA-15899)
   * Add flag to ignore unreplicated keyspaces during repair (CASSANDRA-15160)
  
 -3.11.8
 +4.0-beta2
 + * Add addition incremental repair visibility to nodetool repair_admin (CASSANDRA-14939)
 + * Always access system properties and environment variables via the new CassandraRelevantProperties and CassandraRelevantEnv classes (CASSANDRA-15876)
 + * Remove deprecated HintedHandOffManager (CASSANDRA-15939)
 + * Prevent repair from overrunning compaction (CASSANDRA-15817)
 + * fix cqlsh COPY functions in Python 3.8 on Mac (CASSANDRA-16053)
 + * Strip comment blocks from cqlsh input before processing statements (CASSANDRA-15802)
 + * Fix unicode chars error input (CASSANDRA-15990)
 + * Improved testability for CacheMetrics and ChunkCacheMetrics (CASSANDRA-15788)
 + * Handle errors in StreamSession#prepare (CASSANDRA-15852)
 + * FQL replay should have options to ignore DDL statements (CASSANDRA-16039)
 + * Remove COMPACT STORAGE internals (CASSANDRA-13994)
 + * Make TimestampSerializer accept fractional seconds of varying precision (CASSANDRA-15976)
 + * Improve cassandra-stress logging when using a profile file that doesn't exist (CASSANDRA-14425)
 + * Improve logging for socket connection/disconnection (CASSANDRA-15980)
 + * Throw FSWriteError upon write failures in order to apply DiskFailurePolicy (CASSANDRA-15928)
 + * Forbid altering UDTs used in partition keys (CASSANDRA-15933)
 + * Fix version parsing logic when upgrading from 3.0 (CASSANDRA-15973)
 + * Optimize NoSpamLogger use in hot paths (CASSANDRA-15766)
 + * Verify sstable components on startup (CASSANDRA-15945)
 + * Resolve JMX output inconsistencies from CASSANDRA-7544 storage-port-configurable-per-node (CASSANDRA-15937)
 +Merged from 3.11:
   * Correctly interpret SASI's `max_compaction_flush_memory_in_mb` setting in megabytes not bytes (CASSANDRA-16071)
   * Fix short read protection for GROUP BY queries (CASSANDRA-15459)
 + * stop_paranoid disk failure policy is ignored on CorruptSSTableException after node is up (CASSANDRA-15191)
   * Frozen RawTuple is not annotated with frozen in the toString method (CASSANDRA-15857)
  Merged from 3.0:
 - * Use IF NOT EXISTS for index and UDT create statements in snapshot schema files (CASSANDRA-13935)
   * Fix gossip shutdown order (CASSANDRA-15816)
   * Remove broken 'defrag-on-read' optimization (CASSANDRA-15432)
   * Check for endpoint collision with hibernating nodes (CASSANDRA-14599)
diff --cc src/java/org/apache/cassandra/db/Columns.java
index fe13919,512b695..aceb868
--- a/src/java/org/apache/cassandra/db/Columns.java
+++ b/src/java/org/apache/cassandra/db/Columns.java
@@@ -436,10 -444,10 +436,10 @@@ public class Columns extends AbstractCo
              return size;
          }
  
-         public Columns deserialize(DataInputPlus in, TableMetadata metadata) throws IOException
 -        public Columns deserialize(DataInputPlus in, CFMetaData metadata, boolean isStatic) throws IOException
++        private Columns deserialize(DataInputPlus in, TableMetadata metadata, boolean isStatic) throws IOException
          {
              int length = (int)in.readUnsignedVInt();
 -            BTree.Builder<ColumnDefinition> builder = BTree.builder(Comparator.naturalOrder());
 +            BTree.Builder<ColumnMetadata> builder = BTree.builder(Comparator.naturalOrder());
              builder.auto(false);
              for (int i = 0; i < length; i++)
              {
@@@ -448,16 -457,31 +448,31 @@@
                  if (column == null)
                  {
                      // If we don't find the definition, it could be we have data for a dropped column, and we shouldn't
 -                    // fail deserialization because of that. So we grab a "fake" ColumnDefinition that ensure proper
 +                    // fail deserialization because of that. So we grab a "fake" ColumnMetadata that ensure proper
                      // deserialization. The column will be ignore later on anyway.
 -                    column = metadata.getDroppedColumnDefinition(name);
 +                    column = metadata.getDroppedColumn(name);
+ 
+                     // If there's no dropped column, it may be for a column we haven't received a schema update for yet
+                     // so we create a placeholder column. If this is a read, the placeholder column will let the response
+                     // serializer know we're not serializing all requested columns when it writes the row flags, but it
+                     // will cause mutations that try to write values for this column to fail.
                      if (column == null)
-                         throw new UnknownColumnException("Unknown column " + UTF8Type.instance.getString(name) + " during deserialization");
 -                        column = ColumnDefinition.placeholder(metadata, name, isStatic);
++                        column = ColumnMetadata.placeholder(metadata, name, isStatic);
                  }
                  builder.add(column);
              }
              return new Columns(builder.build());
          }
+ 
 -        public Columns deserializeStatics(DataInputPlus in, CFMetaData metadata) throws IOException
++        public Columns deserializeStatics(DataInputPlus in, TableMetadata metadata) throws IOException
+         {
+             return deserialize(in, metadata, true);
+         }
+ 
 -        public Columns deserializeRegulars(DataInputPlus in, CFMetaData metadata) throws IOException
++        public Columns deserializeRegulars(DataInputPlus in, TableMetadata metadata) throws IOException
+         {
+             return deserialize(in, metadata, false);
+         }
  
          /**
           * If both ends have a pre-shared superset of the columns we are serializing, we can send them much
diff --cc src/java/org/apache/cassandra/db/filter/ColumnFilter.java
index c9d0a70,3c79539..5a98785
--- a/src/java/org/apache/cassandra/db/filter/ColumnFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/ColumnFilter.java
@@@ -592,9 -471,9 +592,9 @@@ public class ColumnFilte
              {
                  if (version >= MessagingService.VERSION_3014)
                  {
-                     Columns statics = Columns.serializer.deserialize(in, metadata);
-                     Columns regulars = Columns.serializer.deserialize(in, metadata);
+                     Columns statics = Columns.serializer.deserializeStatics(in, metadata);
+                     Columns regulars = Columns.serializer.deserializeRegulars(in, metadata);
 -                    fetched = new PartitionColumns(statics, regulars);
 +                    fetched = new RegularAndStaticColumns(statics, regulars);
                  }
                  else
                  {
@@@ -604,9 -483,9 +604,9 @@@
  
              if (hasQueried)
              {
-                 Columns statics = Columns.serializer.deserialize(in, metadata);
-                 Columns regulars = Columns.serializer.deserialize(in, metadata);
+                 Columns statics = Columns.serializer.deserializeStatics(in, metadata);
+                 Columns regulars = Columns.serializer.deserializeRegulars(in, metadata);
 -                queried = new PartitionColumns(statics, regulars);
 +                queried = new RegularAndStaticColumns(statics, regulars);
              }
  
              SortedSetMultimap<ColumnIdentifier, ColumnSubselection> subSelections = null;
diff --cc src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
index 0bf45b0,f2fe154..ce1a850
--- a/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
+++ b/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
@@@ -30,12 -33,12 +31,13 @@@ import org.slf4j.LoggerFactory
  import org.apache.cassandra.db.*;
  import org.apache.cassandra.db.filter.ColumnFilter;
  import org.apache.cassandra.db.rows.*;
++import org.apache.cassandra.exceptions.IncompatibleSchemaException;
 +import org.apache.cassandra.index.IndexRegistry;
  import org.apache.cassandra.io.util.*;
 -import org.apache.cassandra.net.MessagingService;
 -import org.apache.cassandra.utils.FBUtilities;
 -import org.apache.cassandra.utils.NoSpamLogger;
 +import org.apache.cassandra.schema.ColumnMetadata;
 +import org.apache.cassandra.schema.Schema;
 +import org.apache.cassandra.schema.TableId;
 +import org.apache.cassandra.schema.TableMetadata;
  import org.apache.cassandra.utils.btree.BTree;
  import org.apache.cassandra.utils.btree.UpdateFunction;
  
@@@ -658,6 -920,12 +660,12 @@@ public class PartitionUpdate extends Ab
                          deletionBuilder.add((RangeTombstoneMarker)unfiltered);
                  }
              }
+             catch (IOError e)
+             {
 -                if (e.getCause() != null && e.getCause() instanceof UnknownColumnException)
 -                    throw (UnknownColumnException) e.getCause();
++                if (e.getCause() != null && e.getCause() instanceof IncompatibleSchemaException)
++                    throw (IncompatibleSchemaException) e.getCause();
+                 throw e;
+             }
  
              MutableDeletionInfo deletionInfo = deletionBuilder.build();
              return new PartitionUpdate(metadata,
@@@ -725,205 -1005,4 +733,206 @@@
              ((BTreeRow)row).setValue(column, path, value);
          }
      }
 +
 +    /**
 +     * Builder for PartitionUpdates
 +     *
 +     * This class is not thread safe, but the PartitionUpdate it produces is (since it is immutable).
 +     */
 +    public static class Builder
 +    {
 +        private final TableMetadata metadata;
 +        private final DecoratedKey key;
 +        private final MutableDeletionInfo deletionInfo;
 +        private final boolean canHaveShadowedData;
 +        private Object[] tree = BTree.empty();
 +        private final BTree.Builder<Row> rowBuilder;
 +        private Row staticRow = Rows.EMPTY_STATIC_ROW;
 +        private final RegularAndStaticColumns columns;
 +        private boolean isBuilt = false;
 +
 +        public Builder(TableMetadata metadata,
 +                       DecoratedKey key,
 +                       RegularAndStaticColumns columns,
 +                       int initialRowCapacity,
 +                       boolean canHaveShadowedData)
 +        {
 +            this(metadata, key, columns, initialRowCapacity, canHaveShadowedData, Rows.EMPTY_STATIC_ROW, MutableDeletionInfo.live(), BTree.empty());
 +        }
 +
 +        private Builder(TableMetadata metadata,
 +                       DecoratedKey key,
 +                       RegularAndStaticColumns columns,
 +                       int initialRowCapacity,
 +                       boolean canHaveShadowedData,
 +                       Holder holder)
 +        {
 +            this(metadata, key, columns, initialRowCapacity, canHaveShadowedData, holder.staticRow, holder.deletionInfo, holder.tree);
 +        }
 +
 +        private Builder(TableMetadata metadata,
 +                        DecoratedKey key,
 +                        RegularAndStaticColumns columns,
 +                        int initialRowCapacity,
 +                        boolean canHaveShadowedData,
 +                        Row staticRow,
 +                        DeletionInfo deletionInfo,
 +                        Object[] tree)
 +        {
 +            this.metadata = metadata;
 +            this.key = key;
 +            this.columns = columns;
 +            this.rowBuilder = rowBuilder(initialRowCapacity);
 +            this.canHaveShadowedData = canHaveShadowedData;
 +            this.deletionInfo = deletionInfo.mutableCopy();
 +            this.staticRow = staticRow;
 +            this.tree = tree;
 +        }
 +
 +        public Builder(TableMetadata metadata, DecoratedKey key, RegularAndStaticColumns columnDefinitions, int size)
 +        {
 +            this(metadata, key, columnDefinitions, size, true);
 +        }
 +
 +        public Builder(PartitionUpdate base, int initialRowCapacity)
 +        {
 +            this(base.metadata, base.partitionKey, base.columns(), initialRowCapacity, base.canHaveShadowedData, base.holder);
 +        }
 +
 +        public Builder(TableMetadata metadata,
 +                        ByteBuffer key,
 +                        RegularAndStaticColumns columns,
 +                        int initialRowCapacity)
 +        {
 +            this(metadata, metadata.partitioner.decorateKey(key), columns, initialRowCapacity, true);
 +        }
 +
 +        /**
 +         * Adds a row to this update.
 +         *
 +         * There is no particular assumption made on the order of row added to a partition update. It is further
 +         * allowed to add the same row (more precisely, multiple row objects for the same clustering).
 +         *
 +         * Note however that the columns contained in the added row must be a subset of the columns used when
 +         * creating this update.
 +         *
 +         * @param row the row to add.
 +         */
 +        public void add(Row row)
 +        {
 +            if (row.isEmpty())
 +                return;
 +
 +            if (row.isStatic())
 +            {
 +                // this assert is expensive, and possibly of limited value; we should consider removing it
 +                // or introducing a new class of assertions for test purposes
 +                assert columns().statics.containsAll(row.columns()) : columns().statics + " is not superset of " + row.columns();
 +                staticRow = staticRow.isEmpty()
 +                            ? row
 +                            : Rows.merge(staticRow, row);
 +            }
 +            else
 +            {
 +                // this assert is expensive, and possibly of limited value; we should consider removing it
 +                // or introducing a new class of assertions for test purposes
 +                assert columns().regulars.containsAll(row.columns()) : columns().regulars + " is not superset of " + row.columns();
 +                rowBuilder.add(row);
 +            }
 +        }
 +
 +        public void addPartitionDeletion(DeletionTime deletionTime)
 +        {
 +            deletionInfo.add(deletionTime);
 +        }
 +
 +        public void add(RangeTombstone range)
 +        {
 +            deletionInfo.add(range, metadata.comparator);
 +        }
 +
 +        public DecoratedKey partitionKey()
 +        {
 +            return key;
 +        }
 +
 +        public TableMetadata metadata()
 +        {
 +            return metadata;
 +        }
 +
 +        public PartitionUpdate build()
 +        {
 +            // assert that we are not calling build() several times
 +            assert !isBuilt : "A PartitionUpdate.Builder should only get built once";
 +            Object[] add = rowBuilder.build();
 +            Object[] merged = BTree.<Row>merge(tree, add, metadata.comparator,
 +                                               UpdateFunction.Simple.of(Rows::merge));
 +
 +            EncodingStats newStats = EncodingStats.Collector.collect(staticRow, BTree.iterator(merged), deletionInfo);
 +
 +            isBuilt = true;
 +            return new PartitionUpdate(metadata,
 +                                       partitionKey(),
 +                                       new Holder(columns,
 +                                                  merged,
 +                                                  deletionInfo,
 +                                                  staticRow,
 +                                                  newStats),
 +                                       deletionInfo,
 +                                       canHaveShadowedData);
 +        }
 +
 +        public RegularAndStaticColumns columns()
 +        {
 +            return columns;
 +        }
 +
 +        public DeletionTime partitionLevelDeletion()
 +        {
 +            return deletionInfo.getPartitionDeletion();
 +        }
 +
 +        private BTree.Builder<Row> rowBuilder(int initialCapacity)
 +        {
 +            return BTree.<Row>builder(metadata.comparator, initialCapacity)
 +                   .setQuickResolver(Rows::merge);
 +        }
 +        /**
 +         * Modify this update to set every timestamp for live data to {@code newTimestamp} and
 +         * every deletion timestamp to {@code newTimestamp - 1}.
 +         *
 +         * There is no reason to use that expect on the Paxos code path, where we need ensure that
 +         * anything inserted use the ballot timestamp (to respect the order of update decided by
 +         * the Paxos algorithm). We use {@code newTimestamp - 1} for deletions because tombstones
 +         * always win on timestamp equality and we don't want to delete our own insertions
 +         * (typically, when we overwrite a collection, we first set a complex deletion to delete the
 +         * previous collection before adding new elements. If we were to set that complex deletion
 +         * to the same timestamp that the new elements, it would delete those elements). And since
 +         * tombstones always wins on timestamp equality, using -1 guarantees our deletion will still
 +         * delete anything from a previous update.
 +         */
 +        public Builder updateAllTimestamp(long newTimestamp)
 +        {
 +            deletionInfo.updateAllTimestamp(newTimestamp - 1);
 +            tree = BTree.<Row>transformAndFilter(tree, (x) -> x.updateAllTimestamp(newTimestamp));
 +            staticRow = this.staticRow.updateAllTimestamp(newTimestamp);
 +            return this;
 +        }
 +
 +        @Override
 +        public String toString()
 +        {
 +            return "Builder{" +
 +                   "metadata=" + metadata +
 +                   ", key=" + key +
 +                   ", deletionInfo=" + deletionInfo +
 +                   ", canHaveShadowedData=" + canHaveShadowedData +
 +                   ", staticRow=" + staticRow +
 +                   ", columns=" + columns +
 +                   ", isBuilt=" + isBuilt +
 +                   '}';
 +        }
++
 +    }
  }
diff --cc src/java/org/apache/cassandra/db/rows/SerializationHelper.java
index dca4240,e40a1e1..77f19b5
--- a/src/java/org/apache/cassandra/db/rows/SerializationHelper.java
+++ b/src/java/org/apache/cassandra/db/rows/SerializationHelper.java
@@@ -25,33 -28,116 +25,45 @@@ import org.apache.cassandra.utils.btree
  
  public class SerializationHelper
  {
 -    /**
 -     * Flag affecting deserialization behavior (this only affect counters in practice).
 -     *  - LOCAL: for deserialization of local data (Expired columns are
 -     *      converted to tombstones (to gain disk space)).
 -     *  - FROM_REMOTE: for deserialization of data received from remote hosts
 -     *      (Expired columns are converted to tombstone and counters have
 -     *      their delta cleared)
 -     *  - PRESERVE_SIZE: used when no transformation must be performed, i.e,
 -     *      when we must ensure that deserializing and reserializing the
 -     *      result yield the exact same bytes. Streaming uses this.
 -     */
 -    public enum Flag
 +    public final SerializationHeader header;
 +    private BTreeSearchIterator<ColumnMetadata, ColumnMetadata> statics = null;
 +    private BTreeSearchIterator<ColumnMetadata, ColumnMetadata> regulars = null;
 +
 +    public SerializationHelper(SerializationHeader header)
      {
 -        LOCAL, FROM_REMOTE, PRESERVE_SIZE
 +        this.header = header;
      }
  
 -    private final Flag flag;
 -    public final int version;
 -
 -    private final ColumnFilter columnsToFetch;
 -    private ColumnFilter.Tester tester;
 -
 -    private final Map<ByteBuffer, CFMetaData.DroppedColumn> droppedColumns;
 -    private CFMetaData.DroppedColumn currentDroppedComplex;
 -
 -
 -    public SerializationHelper(CFMetaData metadata, int version, Flag flag, ColumnFilter columnsToFetch)
 +    private BTreeSearchIterator<ColumnMetadata, ColumnMetadata> statics()
      {
 -        this.flag = flag;
 -        this.version = version;
 -        this.columnsToFetch = columnsToFetch;
 -        this.droppedColumns = metadata.getDroppedColumns();
 +        if (statics == null)
 +            statics = header.columns().statics.iterator();
 +        return statics;
      }
  
 -    public SerializationHelper(CFMetaData metadata, int version, Flag flag)
 +    private BTreeSearchIterator<ColumnMetadata, ColumnMetadata> regulars()
      {
 -        this(metadata, version, flag, null);
 +        if (regulars == null)
 +            regulars = header.columns().regulars.iterator();
 +        return regulars;
      }
  
 -    public boolean includes(ColumnDefinition column)
 +    public SearchIterator<ColumnMetadata, ColumnMetadata> iterator(boolean isStatic)
      {
 -        return columnsToFetch == null || columnsToFetch.fetches(column);
 +        BTreeSearchIterator<ColumnMetadata, ColumnMetadata> iterator = isStatic ? statics() : regulars();
 +        iterator.rewind();
 +        return iterator;
      }
+ 
 -    public boolean includes(Cell cell, LivenessInfo rowLiveness)
++    public boolean hasAllColumns(Row row, boolean isStatic)
+     {
 -        if (columnsToFetch == null)
 -            return true;
 -
 -        // During queries, some columns are included even though they are not queried by the user because
 -        // we always need to distinguish between having a row (with potentially only null values) and not
 -        // having a row at all (see #CASSANDRA-7085 for background). In the case where the column is not
 -        // actually requested by the user however (canSkipValue), we can skip the full cell if the cell
 -        // timestamp is lower than the row one, because in that case, the row timestamp is enough proof
 -        // of the liveness of the row. Otherwise, we'll only be able to skip the values of those cells.
 -        ColumnDefinition column = cell.column();
 -        if (column.isComplex())
++        SearchIterator<ColumnMetadata, ColumnData> rowIter = row.searchIterator();
++        Iterable<ColumnMetadata> columns = isStatic ? header.columns().statics : header.columns().regulars;
++        for (ColumnMetadata column : columns)
+         {
 -            if (!includes(cell.path()))
++            if (rowIter.next(column) == null)
+                 return false;
 -
 -            return !canSkipValue(cell.path()) || cell.timestamp() >= rowLiveness.timestamp();
+         }
 -        else
 -        {
 -            return columnsToFetch.fetchedColumnIsQueried(column) || cell.timestamp() >= rowLiveness.timestamp();
 -        }
 -    }
 -
 -    public boolean includes(CellPath path)
 -    {
 -        return path == null || tester == null || tester.fetches(path);
 -    }
 -
 -    public boolean canSkipValue(ColumnDefinition column)
 -    {
 -        return columnsToFetch != null && !columnsToFetch.fetchedColumnIsQueried(column);
 -    }
 -
 -    public boolean canSkipValue(CellPath path)
 -    {
 -        return path != null && tester != null && !tester.fetchedCellIsQueried(path);
 -    }
 -
 -    public void startOfComplexColumn(ColumnDefinition column)
 -    {
 -        this.tester = columnsToFetch == null ? null : columnsToFetch.newTester(column);
 -        this.currentDroppedComplex = droppedColumns.get(column.name.bytes);
 -    }
 -
 -    public void endOfComplexColumn()
 -    {
 -        this.tester = null;
 -    }
 -
 -    public boolean isDropped(Cell cell, boolean isComplex)
 -    {
 -        CFMetaData.DroppedColumn dropped = isComplex ? currentDroppedComplex : droppedColumns.get(cell.column().name.bytes);
 -        return dropped != null && cell.timestamp() <= dropped.droppedTime;
 -    }
 -
 -    public boolean isDroppedComplexDeletion(DeletionTime complexDeletion)
 -    {
 -        return currentDroppedComplex != null && complexDeletion.markedForDeleteAt() <= currentDroppedComplex.droppedTime;
 -    }
 -
 -    public ByteBuffer maybeClearCounterValue(ByteBuffer value)
 -    {
 -        return flag == Flag.FROM_REMOTE || (flag == Flag.LOCAL && CounterContext.instance().shouldClearLocal(value))
 -             ? CounterContext.instance().clearAllLocal(value)
 -             : value;
++        return true;
+     }
  }
diff --cc src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java
index 9313eda,0890611..7d009b7
--- a/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java
+++ b/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java
@@@ -19,9 -19,10 +19,11 @@@ package org.apache.cassandra.db.rows
  
  import java.io.IOException;
  
 -
  import net.nicoulaj.compilecommand.annotations.Inline;
 -import org.apache.cassandra.config.ColumnDefinition;
 +import org.apache.cassandra.db.marshal.ByteArrayAccessor;
+ import org.apache.cassandra.db.marshal.UTF8Type;
++import org.apache.cassandra.exceptions.UnknownColumnException;
 +import org.apache.cassandra.schema.ColumnMetadata;
  import org.apache.cassandra.db.*;
  import org.apache.cassandra.db.rows.Row.Deletion;
  import org.apache.cassandra.io.util.DataInputPlus;
@@@ -151,12 -152,11 +153,11 @@@ public class UnfilteredSerialize
          int extendedFlags = 0;
  
          boolean isStatic = row.isStatic();
 -        Columns headerColumns = header.columns(isStatic);
 +        SerializationHeader header = helper.header;
-         Columns headerColumns = header.columns(isStatic);
          LivenessInfo pkLiveness = row.primaryKeyLivenessInfo();
          Row.Deletion deletion = row.deletion();
          boolean hasComplexDeletion = row.hasComplexDeletion();
-         boolean hasAllColumns = (row.columnCount() == headerColumns.size());
 -        boolean hasAllColumns = header.hasAllColumns(row, isStatic);
++        boolean hasAllColumns = helper.hasAllColumns(row, isStatic);
          boolean hasExtendedFlags = hasExtendedFlags(row);
  
          if (isStatic)
@@@ -238,10 -237,15 +239,15 @@@
                  // We can obtain the column for data directly from data.column(). However, if the cell/complex data
                  // originates from a sstable, the column we'll get will have the type used when the sstable was serialized,
                  // and if that type have been recently altered, that may not be the type we want to serialize the column
 -                // with. So we use the ColumnDefinition from the "header" which is "current". Also see #11810 for what
 +                // with. So we use the ColumnMetadata from the "header" which is "current". Also see #11810 for what
                  // happens if we don't do that.
 -                ColumnDefinition column = si.next(cd.column());
 +                ColumnMetadata column = si.next(cd.column());
-                 assert column != null : cd.column.toString();
+ 
+                 // we may have columns that the remote node isn't aware of due to inflight schema changes
+                 // in cases where it tries to fetch all columns, it will set the `all columns` flag, but only
+                 // expect a subset of columns (from this node's perspective). See CASSANDRA-15899
+                 if (column == null)
+                     return;
  
                  try
                  {
@@@ -336,11 -338,11 +342,10 @@@
              size += TypeSizes.sizeofUnsignedVInt(previousUnfilteredSize);
  
          boolean isStatic = row.isStatic();
--        Columns headerColumns = header.columns(isStatic);
          LivenessInfo pkLiveness = row.primaryKeyLivenessInfo();
          Row.Deletion deletion = row.deletion();
          boolean hasComplexDeletion = row.hasComplexDeletion();
-         boolean hasAllColumns = (row.columnCount() == headerColumns.size());
 -        boolean hasAllColumns = header.hasAllColumns(row, isStatic);
++        boolean hasAllColumns = helper.hasAllColumns(row, isStatic);
  
          if (!pkLiveness.isEmpty())
              size += header.timestampSerializedSize(pkLiveness.timestamp());
@@@ -355,19 -357,23 +360,21 @@@
          if (!hasAllColumns)
              size += Columns.serializer.serializedSubsetSize(row.columns(), header.columns(isStatic));
  
 -        SearchIterator<ColumnDefinition, ColumnDefinition> si = headerColumns.iterator();
 -        for (ColumnData data : row)
 -        {
 -            ColumnDefinition column = si.next(data.column());
 +        SearchIterator<ColumnMetadata, ColumnMetadata> si = helper.iterator(isStatic);
 +        return row.accumulate((data, v) -> {
 +            ColumnMetadata column = si.next(data.column());
-             assert column != null;
++
+             if (column == null)
 -                continue;
++                return v;
  
              if (data.column.isSimple())
 -                size += Cell.serializer.serializedSize((Cell) data, column, pkLiveness, header);
 +                return v + Cell.serializer.serializedSize((Cell<?>) data, column, pkLiveness, header);
              else
 -                size += sizeOfComplexColumn((ComplexColumnData) data, column, hasComplexDeletion, pkLiveness, header);
 -        }
 -
 -        return size;
 +                return v + sizeOfComplexColumn((ComplexColumnData) data, column, hasComplexDeletion, pkLiveness, header);
 +        }, size);
      }
  
 -    private long sizeOfComplexColumn(ComplexColumnData data, ColumnDefinition column, boolean hasComplexDeletion, LivenessInfo rowLiveness, SerializationHeader header)
 +    private long sizeOfComplexColumn(ComplexColumnData data, ColumnMetadata column, boolean hasComplexDeletion, LivenessInfo rowLiveness, SerializationHeader header)
      {
          long size = 0;
  
@@@ -605,6 -611,10 +612,10 @@@
                  columns.apply(column -> {
                      try
                      {
+                         // if the column is a placeholder, then it's not part of our schema, and we can't deserialize it
+                         if (column.isPlaceholder())
 -                            throw new UnknownColumnException(column.ksName, column.cfName, column.name.bytes);
++                            throw new UnknownColumnException("Unknown column " + UTF8Type.instance.getString(column.name.bytes) + " during deserialization");
+ 
                          if (column.isSimple())
                              readSimpleColumn(column, in, header, helper, builder, livenessInfo);
                          else
diff --cc src/java/org/apache/cassandra/schema/ColumnMetadata.java
index d48ca06,0000000..37fbfa2
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/schema/ColumnMetadata.java
+++ b/src/java/org/apache/cassandra/schema/ColumnMetadata.java
@@@ -1,499 -1,0 +1,522 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.schema;
 +
 +import java.nio.ByteBuffer;
 +import java.util.*;
 +import java.util.function.Predicate;
 +
 +import com.google.common.annotations.VisibleForTesting;
 +import com.google.common.base.MoreObjects;
 +import com.google.common.collect.Collections2;
 +
 +import org.apache.cassandra.cql3.*;
 +import org.apache.cassandra.cql3.selection.Selectable;
 +import org.apache.cassandra.cql3.selection.Selector;
 +import org.apache.cassandra.cql3.selection.SimpleSelector;
 +import org.apache.cassandra.db.rows.*;
 +import org.apache.cassandra.db.marshal.*;
 +import org.apache.cassandra.exceptions.InvalidRequestException;
 +import org.apache.cassandra.serializers.MarshalException;
 +import org.apache.cassandra.utils.ByteBufferUtil;
 +import org.github.jamm.Unmetered;
 +
 +@Unmetered
- public final class ColumnMetadata extends ColumnSpecification implements Selectable, Comparable<ColumnMetadata>
++public class ColumnMetadata extends ColumnSpecification implements Selectable, Comparable<ColumnMetadata>
 +{
 +    public static final Comparator<Object> asymmetricColumnDataComparator =
 +        (a, b) -> ((ColumnData) a).column().compareTo((ColumnMetadata) b);
 +
 +    public static final int NO_POSITION = -1;
 +
 +    public enum ClusteringOrder
 +    {
 +        ASC, DESC, NONE
 +    }
 +
 +    /**
 +     * The type of CQL3 column this definition represents.
 +     * There is 4 main type of CQL3 columns: those parts of the partition key,
 +     * those parts of the clustering columns and amongst the others, regular and
 +     * static ones.
 +     *
 +     * IMPORTANT: this enum is serialized as toString() and deserialized by calling
 +     * Kind.valueOf(), so do not override toString() or rename existing values.
 +     */
 +    public enum Kind
 +    {
 +        // NOTE: if adding a new type, must modify comparisonOrder
 +        PARTITION_KEY,
 +        CLUSTERING,
 +        REGULAR,
 +        STATIC;
 +
 +        public boolean isPrimaryKeyKind()
 +        {
 +            return this == PARTITION_KEY || this == CLUSTERING;
 +        }
 +
 +    }
 +
 +    public final Kind kind;
 +
 +    /*
 +     * If the column is a partition key or clustering column, its position relative to
 +     * other columns of the same kind. Otherwise,  NO_POSITION (-1).
 +     *
 +     * Note that partition key and clustering columns are numbered separately so
 +     * the first clustering column is 0.
 +     */
 +    private final int position;
 +
 +    private final Comparator<CellPath> cellPathComparator;
 +    private final Comparator<Object> asymmetricCellPathComparator;
 +    private final Comparator<? super Cell<?>> cellComparator;
 +
 +    private int hash;
 +
 +    /**
 +     * These objects are compared frequently, so we encode several of their comparison components
 +     * into a single long value so that this can be done efficiently
 +     */
 +    private final long comparisonOrder;
 +
 +    private static long comparisonOrder(Kind kind, boolean isComplex, long position, ColumnIdentifier name)
 +    {
 +        assert position >= 0 && position < 1 << 12;
 +        return   (((long) kind.ordinal()) << 61)
 +               | (isComplex ? 1L << 60 : 0)
 +               | (position << 48)
 +               | (name.prefixComparison >>> 16);
 +    }
 +
 +    public static ColumnMetadata partitionKeyColumn(TableMetadata table, ByteBuffer name, AbstractType<?> type, int position)
 +    {
 +        return new ColumnMetadata(table, name, type, position, Kind.PARTITION_KEY);
 +    }
 +
 +    public static ColumnMetadata partitionKeyColumn(String keyspace, String table, String name, AbstractType<?> type, int position)
 +    {
 +        return new ColumnMetadata(keyspace, table, ColumnIdentifier.getInterned(name, true), type, position, Kind.PARTITION_KEY);
 +    }
 +
 +    public static ColumnMetadata clusteringColumn(TableMetadata table, ByteBuffer name, AbstractType<?> type, int position)
 +    {
 +        return new ColumnMetadata(table, name, type, position, Kind.CLUSTERING);
 +    }
 +
 +    public static ColumnMetadata clusteringColumn(String keyspace, String table, String name, AbstractType<?> type, int position)
 +    {
 +        return new ColumnMetadata(keyspace, table, ColumnIdentifier.getInterned(name, true), type, position, Kind.CLUSTERING);
 +    }
 +
 +    public static ColumnMetadata regularColumn(TableMetadata table, ByteBuffer name, AbstractType<?> type)
 +    {
 +        return new ColumnMetadata(table, name, type, NO_POSITION, Kind.REGULAR);
 +    }
 +
 +    public static ColumnMetadata regularColumn(String keyspace, String table, String name, AbstractType<?> type)
 +    {
 +        return new ColumnMetadata(keyspace, table, ColumnIdentifier.getInterned(name, true), type, NO_POSITION, Kind.REGULAR);
 +    }
 +
 +    public static ColumnMetadata staticColumn(TableMetadata table, ByteBuffer name, AbstractType<?> type)
 +    {
 +        return new ColumnMetadata(table, name, type, NO_POSITION, Kind.STATIC);
 +    }
 +
 +    public static ColumnMetadata staticColumn(String keyspace, String table, String name, AbstractType<?> type)
 +    {
 +        return new ColumnMetadata(keyspace, table, ColumnIdentifier.getInterned(name, true), type, NO_POSITION, Kind.STATIC);
 +    }
 +
 +    public ColumnMetadata(TableMetadata table, ByteBuffer name, AbstractType<?> type, int position, Kind kind)
 +    {
 +        this(table.keyspace,
 +             table.name,
 +             ColumnIdentifier.getInterned(name, UTF8Type.instance),
 +             type,
 +             position,
 +             kind);
 +    }
 +
 +    @VisibleForTesting
 +    public ColumnMetadata(String ksName,
 +                          String cfName,
 +                          ColumnIdentifier name,
 +                          AbstractType<?> type,
 +                          int position,
 +                          Kind kind)
 +    {
 +        super(ksName, cfName, name, type);
 +        assert name != null && type != null && kind != null;
 +        assert (position == NO_POSITION) == !kind.isPrimaryKeyKind(); // The position really only make sense for partition and clustering columns (and those must have one),
 +                                                                      // so make sure we don't sneak it for something else since it'd breaks equals()
 +        this.kind = kind;
 +        this.position = position;
 +        this.cellPathComparator = makeCellPathComparator(kind, type);
 +        this.cellComparator = cellPathComparator == null ? ColumnData.comparator : (a, b) -> cellPathComparator.compare(a.path(), b.path());
 +        this.asymmetricCellPathComparator = cellPathComparator == null ? null : (a, b) -> cellPathComparator.compare(((Cell<?>)a).path(), (CellPath) b);
 +        this.comparisonOrder = comparisonOrder(kind, isComplex(), Math.max(0, position), name);
 +    }
 +
 +    private static Comparator<CellPath> makeCellPathComparator(Kind kind, AbstractType<?> type)
 +    {
 +        if (kind.isPrimaryKeyKind() || !type.isMultiCell())
 +            return null;
 +
 +        AbstractType<?> nameComparator = type.isCollection()
 +                                       ? ((CollectionType) type).nameComparator()
 +                                       : ((UserType) type).nameComparator();
 +
 +
 +        return (path1, path2) ->
 +        {
 +            if (path1.size() == 0 || path2.size() == 0)
 +            {
 +                if (path1 == CellPath.BOTTOM)
 +                    return path2 == CellPath.BOTTOM ? 0 : -1;
 +                if (path1 == CellPath.TOP)
 +                    return path2 == CellPath.TOP ? 0 : 1;
 +                return path2 == CellPath.BOTTOM ? 1 : -1;
 +            }
 +
 +            // This will get more complicated once we have non-frozen UDT and nested collections
 +            assert path1.size() == 1 && path2.size() == 1;
 +            return nameComparator.compare(path1.get(0), path2.get(0));
 +        };
 +    }
 +
++    private static class Placeholder extends ColumnMetadata
++    {
++        Placeholder(TableMetadata table, ByteBuffer name, AbstractType<?> type, int position, Kind kind)
++        {
++            super(table, name, type, position, kind);
++        }
++
++        public boolean isPlaceholder()
++        {
++            return true;
++        }
++    }
++
++    public static ColumnMetadata placeholder(TableMetadata table, ByteBuffer name, boolean isStatic)
++    {
++        return new Placeholder(table, name, EmptyType.instance, NO_POSITION, isStatic ? Kind.STATIC : Kind.REGULAR);
++    }
++
++    public boolean isPlaceholder()
++    {
++        return false;
++    }
++
 +    public ColumnMetadata copy()
 +    {
 +        return new ColumnMetadata(ksName, cfName, name, type, position, kind);
 +    }
 +
 +    public ColumnMetadata withNewName(ColumnIdentifier newName)
 +    {
 +        return new ColumnMetadata(ksName, cfName, newName, type, position, kind);
 +    }
 +
 +    public ColumnMetadata withNewType(AbstractType<?> newType)
 +    {
 +        return new ColumnMetadata(ksName, cfName, name, newType, position, kind);
 +    }
 +
 +    public boolean isPartitionKey()
 +    {
 +        return kind == Kind.PARTITION_KEY;
 +    }
 +
 +    public boolean isClusteringColumn()
 +    {
 +        return kind == Kind.CLUSTERING;
 +    }
 +
 +    public boolean isStatic()
 +    {
 +        return kind == Kind.STATIC;
 +    }
 +
 +    public boolean isRegular()
 +    {
 +        return kind == Kind.REGULAR;
 +    }
 +
 +    public ClusteringOrder clusteringOrder()
 +    {
 +        if (!isClusteringColumn())
 +            return ClusteringOrder.NONE;
 +
 +        return type.isReversed() ? ClusteringOrder.DESC : ClusteringOrder.ASC;
 +    }
 +
 +    public int position()
 +    {
 +        return position;
 +    }
 +
 +    @Override
 +    public boolean equals(Object o)
 +    {
 +        if (this == o)
 +            return true;
 +
 +        if (!(o instanceof ColumnMetadata))
 +            return false;
 +
 +        ColumnMetadata cd = (ColumnMetadata) o;
 +
 +        return equalsWithoutType(cd) && type.equals(cd.type);
 +    }
 +
 +    private boolean equalsWithoutType(ColumnMetadata other)
 +    {
 +        return name.equals(other.name)
 +            && kind == other.kind
 +            && position == other.position
 +            && ksName.equals(other.ksName)
 +            && cfName.equals(other.cfName);
 +    }
 +
 +    Optional<Difference> compare(ColumnMetadata other)
 +    {
 +        if (!equalsWithoutType(other))
 +            return Optional.of(Difference.SHALLOW);
 +
 +        if (type.equals(other.type))
 +            return Optional.empty();
 +
 +        return type.asCQL3Type().toString().equals(other.type.asCQL3Type().toString())
 +             ? Optional.of(Difference.DEEP)
 +             : Optional.of(Difference.SHALLOW);
 +    }
 +
 +    @Override
 +    public int hashCode()
 +    {
 +        // This achieves the same as Objects.hashcode, but avoids the object array allocation
 +        // which features significantly in the allocation profile and caches the result.
 +        int result = hash;
 +        if (result == 0)
 +        {
 +            result = 31 + (ksName == null ? 0 : ksName.hashCode());
 +            result = 31 * result + (cfName == null ? 0 : cfName.hashCode());
 +            result = 31 * result + (name == null ? 0 : name.hashCode());
 +            result = 31 * result + (type == null ? 0 : type.hashCode());
 +            result = 31 * result + (kind == null ? 0 : kind.hashCode());
 +            result = 31 * result + position;
 +            hash = result;
 +        }
 +        return result;
 +    }
 +
 +    @Override
 +    public String toString()
 +    {
 +        return name.toString();
 +    }
 +
 +    public String debugString()
 +    {
 +        return MoreObjects.toStringHelper(this)
 +                          .add("name", name)
 +                          .add("type", type)
 +                          .add("kind", kind)
 +                          .add("position", position)
 +                          .toString();
 +    }
 +
 +    public boolean isPrimaryKeyColumn()
 +    {
 +        return kind.isPrimaryKeyKind();
 +    }
 +
 +    @Override
 +    public boolean selectColumns(Predicate<ColumnMetadata> predicate)
 +    {
 +        return predicate.test(this);
 +    }
 +
 +    @Override
 +    public boolean processesSelection()
 +    {
 +        return false;
 +    }
 +
 +    /**
 +     * Converts the specified column definitions into column identifiers.
 +     *
 +     * @param definitions the column definitions to convert.
 +     * @return the column identifiers corresponding to the specified definitions
 +     */
 +    public static Collection<ColumnIdentifier> toIdentifiers(Collection<ColumnMetadata> definitions)
 +    {
 +        return Collections2.transform(definitions, columnDef -> columnDef.name);
 +    }
 +
 +    public int compareTo(ColumnMetadata other)
 +    {
 +        if (this == other)
 +            return 0;
 +
 +        if (comparisonOrder != other.comparisonOrder)
 +            return Long.compare(comparisonOrder, other.comparisonOrder);
 +
 +        return this.name.compareTo(other.name);
 +    }
 +
 +    public Comparator<CellPath> cellPathComparator()
 +    {
 +        return cellPathComparator;
 +    }
 +
 +    public Comparator<Object> asymmetricCellPathComparator()
 +    {
 +        return asymmetricCellPathComparator;
 +    }
 +
 +    public Comparator<? super Cell<?>> cellComparator()
 +    {
 +        return cellComparator;
 +    }
 +
 +    public boolean isComplex()
 +    {
 +        return cellPathComparator != null;
 +    }
 +
 +    public boolean isSimple()
 +    {
 +        return !isComplex();
 +    }
 +
 +    public CellPath.Serializer cellPathSerializer()
 +    {
 +        // Collections are our only complex so far, so keep it simple
 +        return CollectionType.cellPathSerializer;
 +    }
 +
 +    public <V> void validateCell(Cell<V> cell)
 +    {
 +        if (cell.isTombstone())
 +        {
 +            if (cell.valueSize() > 0)
 +                throw new MarshalException("A tombstone should not have a value");
 +            if (cell.path() != null)
 +                validateCellPath(cell.path());
 +        }
 +        else if(type.isUDT())
 +        {
 +            // To validate a non-frozen UDT field, both the path and the value
 +            // are needed, the path being an index into an array of value types.
 +            ((UserType)type).validateCell(cell);
 +        }
 +        else
 +        {
 +            type.validateCellValue(cell.value(), cell.accessor());
 +            if (cell.path() != null)
 +                validateCellPath(cell.path());
 +        }
 +    }
 +
 +    private void validateCellPath(CellPath path)
 +    {
 +        if (!isComplex())
 +            throw new MarshalException("Only complex cells should have a cell path");
 +
 +        assert type.isMultiCell();
 +        if (type.isCollection())
 +            ((CollectionType)type).nameComparator().validate(path.get(0));
 +        else
 +            ((UserType)type).nameComparator().validate(path.get(0));
 +    }
 +
 +    public void appendCqlTo(CqlBuilder builder)
 +    {
 +        builder.append(name)
 +               .append(' ')
 +               .append(type);
 +
 +        if (isStatic())
 +            builder.append(" static");
 +    }
 +
 +    public static String toCQLString(Iterable<ColumnMetadata> defs)
 +    {
 +        return toCQLString(defs.iterator());
 +    }
 +
 +    public static String toCQLString(Iterator<ColumnMetadata> defs)
 +    {
 +        if (!defs.hasNext())
 +            return "";
 +
 +        StringBuilder sb = new StringBuilder();
 +        sb.append(defs.next().name);
 +        while (defs.hasNext())
 +            sb.append(", ").append(defs.next().name);
 +        return sb.toString();
 +    }
 +
 +
 +    public void appendNameAndOrderTo(CqlBuilder builder)
 +    {
 +        builder.append(name.toCQLString())
 +               .append(' ')
 +               .append(clusteringOrder().toString());
 +    }
 +
 +    /**
 +     * The type of the cell values for cell belonging to this column.
 +     *
 +     * This is the same than the column type, except for non-frozen collections where it's the 'valueComparator'
 +     * of the collection.
 +     * 
 +     * This method should not be used to get value type of non-frozon UDT.
 +     */
 +    public AbstractType<?> cellValueType()
 +    {
 +        assert !(type instanceof UserType && type.isMultiCell());
 +        return type instanceof CollectionType && type.isMultiCell()
 +                ? ((CollectionType)type).valueComparator()
 +                : type;
 +    }
 +
 +    /**
 +     * Check if column is counter type.
 +     */
 +    public boolean isCounterColumn()
 +    {
 +        if (type instanceof CollectionType) // Possible with, for example, supercolumns
 +            return ((CollectionType) type).valueComparator().isCounter();
 +        return type.isCounter();
 +    }
 +
 +    public Selector.Factory newSelectorFactory(TableMetadata table, AbstractType<?> expectedType, List<ColumnMetadata> defs, VariableSpecifications boundNames) throws InvalidRequestException
 +    {
 +        return SimpleSelector.newFactory(this, addAndGetIndex(this, defs));
 +    }
 +
 +    public AbstractType<?> getExactTypeIfKnown(String keyspace)
 +    {
 +        return type;
 +    }
 +}
diff --cc src/java/org/apache/cassandra/utils/btree/LeafBTreeSearchIterator.java
index 29aed4b,0000000..a23f460
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/utils/btree/LeafBTreeSearchIterator.java
+++ b/src/java/org/apache/cassandra/utils/btree/LeafBTreeSearchIterator.java
@@@ -1,136 -1,0 +1,136 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.utils.btree;
 +
 +import java.util.Arrays;
 +import java.util.Comparator;
 +import java.util.NoSuchElementException;
 +
 +import static org.apache.cassandra.utils.btree.BTree.size;
 +
 +public class LeafBTreeSearchIterator<K, V> implements BTreeSearchIterator<K, V>
 +{
 +    private final boolean forwards;
 +    private final K[] keys;
 +    private final Comparator<? super K> comparator;
 +    private int nextPos;
 +    private final int lowerBound, upperBound; // inclusive
 +    private boolean hasNext;
 +    private boolean hasCurrent;
 +
 +    public LeafBTreeSearchIterator(Object[] btree, Comparator<? super K> comparator, BTree.Dir dir)
 +    {
 +        this(btree, comparator, dir, 0, size(btree) -1);
 +    }
 +
 +    LeafBTreeSearchIterator(Object[] btree, Comparator<? super K> comparator, BTree.Dir dir, int lowerBound, int upperBound)
 +    {
 +        this.keys = (K[]) btree;
 +        this.forwards = dir == BTree.Dir.ASC;
 +        this.comparator = comparator;
 +        this.lowerBound = lowerBound;
 +        this.upperBound = upperBound;
 +        rewind();
 +    }
 +
 +    public void rewind()
 +    {
 +        nextPos = forwards ? lowerBound : upperBound;
 +        hasNext = nextPos >= lowerBound && nextPos <= upperBound;
 +    }
 +
 +    public V next()
 +    {
 +        if (!hasNext)
 +            throw new NoSuchElementException();
 +        final V elem = (V) keys[nextPos];
 +        nextPos += forwards ? 1 : -1;
 +        hasNext = nextPos >= lowerBound && nextPos <= upperBound;
 +        hasCurrent = true;
 +        return elem;
 +    }
 +
 +    public boolean hasNext()
 +    {
 +        return hasNext;
 +    }
 +
 +    private int searchNext(K key)
 +    {
 +        int lb = forwards ? nextPos : lowerBound; // inclusive
 +        int ub = forwards ? upperBound : nextPos; // inclusive
 +
 +        return Arrays.binarySearch(keys, lb, ub + 1, key, comparator);
 +    }
 +
 +    private void updateHasNext()
 +    {
 +        hasNext = nextPos >= lowerBound && nextPos <= upperBound;
 +    }
 +
 +    public V next(K key)
 +    {
 +        if (!hasNext)
 +            return null;
 +        V result = null;
 +
 +        // first check the current position in case of sequential access
-         if (comparator.compare(key, keys[nextPos]) == 0)
++        if (comparator.compare(keys[nextPos], key) == 0)
 +        {
 +            hasCurrent = true;
 +            result = (V) keys[nextPos];
 +            nextPos += forwards ? 1 : -1;
 +        }
 +        updateHasNext();
 +
 +        if (result != null || !hasNext)
 +            return result;
 +
 +        // otherwise search against the remaining values
 +        int find = searchNext(key);
 +        if (find >= 0)
 +        {
 +            hasCurrent = true;
 +            result = (V) keys[find];
 +            nextPos = find + (forwards ? 1 : -1);
 +        }
 +        else
 +        {
 +            nextPos = (forwards ? -1 : -2) - find;
 +            hasCurrent = false;
 +        }
 +        updateHasNext();
 +        return result;
 +    }
 +
 +    public V current()
 +    {
 +        if (!hasCurrent)
 +            throw new NoSuchElementException();
 +        int current = forwards ? nextPos - 1 : nextPos + 1;
 +        return (V) keys[current];
 +    }
 +
 +    public int indexOfCurrent()
 +    {
 +        if (!hasCurrent)
 +            throw new NoSuchElementException();
 +        int current = forwards ? nextPos - 1 : nextPos + 1;
 +        return forwards ? current - lowerBound : upperBound - current;
 +    }
 +}
diff --cc test/distributed/org/apache/cassandra/distributed/test/SchemaTest.java
index 0000000,2b5dab1..2118c80
mode 000000,100644..100644
--- a/test/distributed/org/apache/cassandra/distributed/test/SchemaTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/SchemaTest.java
@@@ -1,0 -1,117 +1,117 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ 
+ package org.apache.cassandra.distributed.test;
+ 
+ import java.util.function.Consumer;
+ 
+ import org.junit.Test;
+ 
+ import org.apache.cassandra.dht.ByteOrderedPartitioner;
+ import org.apache.cassandra.distributed.Cluster;
+ import org.apache.cassandra.distributed.api.ConsistencyLevel;
+ import org.apache.cassandra.distributed.api.IInstanceConfig;
+ 
+ import static org.apache.cassandra.distributed.shared.AssertUtils.assertRows;
+ 
+ public class SchemaTest extends TestBaseImpl
+ {
+     private static final Consumer<IInstanceConfig> CONFIG_CONSUMER = config -> {
+         config.set("partitioner", ByteOrderedPartitioner.class.getSimpleName());
+         config.set("initial_token", Integer.toString(config.num() * 1000));
+     };
+ 
+     @Test
+     public void dropColumnMixedMode() throws Throwable
+     {
+         try (Cluster cluster = init(Cluster.create(2, CONFIG_CONSUMER)))
+         {
 -            cluster.schemaChange("CREATE TABLE "+KEYSPACE+".tbl (id int primary key, v1 int, v2 int, v3 int)");
++            cluster.schemaChange("CREATE TABLE "+KEYSPACE+".tbl (id int primary key, v1 int, v2 int, v3 int) WITH read_repair='NONE'");
+             Object [][] someExpected = new Object[5][];
+             Object [][] allExpected1 = new Object[5][];
+             Object [][] allExpected2 = new Object[5][];
+             for (int i = 0; i < 5; i++)
+             {
+                 int v1 = i * 10, v2 = i * 100, v3 = i * 1000;
+                 cluster.coordinator(1).execute("INSERT INTO "+KEYSPACE+".tbl (id, v1, v2, v3) VALUES (?,?,?, ?)" , ConsistencyLevel.ALL, i, v1, v2, v3);
+                 someExpected[i] = new Object[] {i, v1};
+                 allExpected1[i] = new Object[] {i, v1, v3};
+                 allExpected2[i] = new Object[] {i, v1, v2, v3};
+             }
+             cluster.forEach((instance) -> instance.flush(KEYSPACE));
+             cluster.get(1).schemaChangeInternal("ALTER TABLE "+KEYSPACE+".tbl DROP v2");
+             assertRows(cluster.coordinator(1).execute("SELECT id, v1 FROM "+KEYSPACE+".tbl", ConsistencyLevel.ALL), someExpected);
+             assertRows(cluster.coordinator(1).execute("SELECT * FROM "+KEYSPACE+".tbl", ConsistencyLevel.ALL), allExpected1);
+             assertRows(cluster.coordinator(2).execute("SELECT id, v1 FROM "+KEYSPACE+".tbl", ConsistencyLevel.ALL), someExpected);
+             assertRows(cluster.coordinator(2).execute("SELECT * FROM "+KEYSPACE+".tbl", ConsistencyLevel.ALL), allExpected2);
+         }
+     }
+ 
+     @Test
+     public void addColumnMixedMode() throws Throwable
+     {
+         try (Cluster cluster = init(Cluster.create(2, CONFIG_CONSUMER)))
+         {
 -            cluster.schemaChange("CREATE TABLE "+KEYSPACE+".tbl (id int primary key, v1 int, v2 int)");
++            cluster.schemaChange("CREATE TABLE "+KEYSPACE+".tbl (id int primary key, v1 int, v2 int) WITH read_repair='NONE'");
+             Object [][] someExpected = new Object[5][];
+             Object [][] allExpected1 = new Object[5][];
+             Object [][] allExpected2 = new Object[5][];
+             for (int i = 0; i < 5; i++)
+             {
+                 int v1 = i * 10, v2 = i * 100;
+                 cluster.coordinator(1).execute("INSERT INTO "+KEYSPACE+".tbl (id, v1, v2) VALUES (?,?,?)" , ConsistencyLevel.ALL, i, v1, v2);
+                 someExpected[i] = new Object[] {i, v1};
+                 allExpected1[i] = new Object[] {i, v1, v2, null};
+                 allExpected2[i] = new Object[] {i, v1, v2};
+             }
+             cluster.forEach((instance) -> instance.flush(KEYSPACE));
+             cluster.get(1).schemaChangeInternal("ALTER TABLE "+KEYSPACE+".tbl ADD v3 int");
+             assertRows(cluster.coordinator(1).execute("SELECT id, v1 FROM "+KEYSPACE+".tbl", ConsistencyLevel.ALL), someExpected);
+             assertRows(cluster.coordinator(1).execute("SELECT * FROM "+KEYSPACE+".tbl", ConsistencyLevel.ALL), allExpected1);
+             assertRows(cluster.coordinator(2).execute("SELECT id, v1 FROM "+KEYSPACE+".tbl", ConsistencyLevel.ALL), someExpected);
+             assertRows(cluster.coordinator(2).execute("SELECT * FROM "+KEYSPACE+".tbl", ConsistencyLevel.ALL), allExpected2);
+         }
+     }
+ 
+     @Test
+     public void addDropColumnMixedMode() throws Throwable
+     {
+         try (Cluster cluster = init(Cluster.create(2, CONFIG_CONSUMER)))
+         {
 -            cluster.schemaChange("CREATE TABLE "+KEYSPACE+".tbl (id int primary key, v1 int, v2 int)");
++            cluster.schemaChange("CREATE TABLE "+KEYSPACE+".tbl (id int primary key, v1 int, v2 int) WITH read_repair='NONE'");
+             Object [][] someExpected = new Object[5][];
+             Object [][] allExpected1 = new Object[5][];
+             Object [][] allExpected2 = new Object[5][];
+             for (int i = 0; i < 5; i++)
+             {
+                 int v1 = i * 10, v2 = i * 100;
+                 cluster.coordinator(1).execute("INSERT INTO "+KEYSPACE+".tbl (id, v1, v2) VALUES (?,?,?)" , ConsistencyLevel.ALL, i, v1, v2);
+                 someExpected[i] = new Object[] {i, v1};
+                 allExpected1[i] = new Object[] {i, v1, v2, null};
+                 allExpected2[i] = new Object[] {i, v1};
+             }
+             cluster.forEach((instance) -> instance.flush(KEYSPACE));
+             cluster.get(1).schemaChangeInternal("ALTER TABLE "+KEYSPACE+".tbl ADD v3 int");
+             cluster.get(2).schemaChangeInternal("ALTER TABLE "+KEYSPACE+".tbl DROP v2");
+             assertRows(cluster.coordinator(1).execute("SELECT id, v1 FROM "+KEYSPACE+".tbl", ConsistencyLevel.ALL), someExpected);
+             assertRows(cluster.coordinator(1).execute("SELECT * FROM "+KEYSPACE+".tbl", ConsistencyLevel.ALL), allExpected1);
+             assertRows(cluster.coordinator(2).execute("SELECT id, v1 FROM "+KEYSPACE+".tbl", ConsistencyLevel.ALL), someExpected);
+             assertRows(cluster.coordinator(2).execute("SELECT * FROM "+KEYSPACE+".tbl", ConsistencyLevel.ALL), allExpected2);
+         }
+     }
+ }
diff --cc test/distributed/org/apache/cassandra/distributed/test/SimpleReadWriteTest.java
index fb0537f,05f3458..dd30a7e
--- a/test/distributed/org/apache/cassandra/distributed/test/SimpleReadWriteTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/SimpleReadWriteTest.java
@@@ -161,120 -74,124 +161,169 @@@ public class SimpleReadWriteTest extend
      }
  
      @Test
 -    public void readRepairTest() throws Throwable
 +    public void failingReadRepairTest() throws Throwable
      {
 -        cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))");
 +        // This test makes a explicit assumption about which nodes are read from; that 1 and 2 will be "contacts", and that 3 will be ignored.
 +        // This is a implementation detail of org.apache.cassandra.locator.ReplicaPlans#contactForRead and
 +        // org.apache.cassandra.locator.AbstractReplicationStrategy.getNaturalReplicasForToken that may change
 +        // in a future release; when that happens this test could start to fail but should only fail with the explicit
 +        // check that detects this behavior has changed.
 +        // see CASSANDRA-15507
 +        try (Cluster cluster = init(Cluster.create(3)))
 +        {
 +            cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck)) WITH read_repair='blocking'");
 +
 +            // nodes 1 and 3 are identical
 +            cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, 1) USING TIMESTAMP 43");
 +            cluster.get(3).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, 1) USING TIMESTAMP 43");
  
 -        cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, 1)");
 -        cluster.get(2).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, 1)");
 +            // node 2 is different because of the timestamp; a repair is needed
 +            cluster.get(2).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, 1) USING TIMESTAMP 42");
  
 -        assertRows(cluster.get(3).executeInternal("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1"));
 +            // 2 is out of date so needs to be repaired.  This will make sure that the repair does not happen on the node
 +            // which will trigger the coordinator to write to node 3
 +            cluster.verbs(READ_REPAIR_REQ).to(2).drop();
  
 -        assertRows(cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1",
 -                                                  ConsistencyLevel.ALL), // ensure node3 in preflist
 -                   row(1, 1, 1));
 +            // save the state of the counters so its known if the contacts list changed
 +            long readRepairRequestsBefore = cluster.get(1).callOnInstance(() -> Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl").metric.readRepairRequests.getCount());
 +            long speculatedWriteBefore = cluster.get(1).callOnInstance(() -> ReadRepairMetrics.speculatedWrite.getCount());
  
 -        // Verify that data got repaired to the third node
 -        assertRows(cluster.get(3).executeInternal("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1"),
 -                   row(1, 1, 1));
 +            Object[][] rows = cluster.coordinator(1)
 +                       .execute("SELECT pk, ck, v, WRITETIME(v) FROM " + KEYSPACE + ".tbl WHERE pk = 1", ConsistencyLevel.QUORUM);
 +
 +            // make sure to check counters first, so can detect if read repair executed as expected
 +            long readRepairRequestsAfter = cluster.get(1).callOnInstance(() -> Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl").metric.readRepairRequests.getCount());
 +            long speculatedWriteAfter = cluster.get(1).callOnInstance(() -> ReadRepairMetrics.speculatedWrite.getCount());
 +
 +            // defensive checks to make sure the nodes selected are the ones expected
 +            Assert.assertEquals("number of read repairs after query does not match expected; its possible the nodes involved with the query did not match expected", readRepairRequestsBefore + 1, readRepairRequestsAfter);
 +            Assert.assertEquals("number of read repairs speculated writes after query does not match expected; its possible the nodes involved with the query did not match expected", speculatedWriteBefore + 1, speculatedWriteAfter);
 +
 +            // 1 has newer data than 2 so its write timestamp will be used for the result
 +            assertRows(rows, row(1, 1, 1, 43L));
 +
 +            // cheack each node's local state
 +            // 1 and 3 should match quorum response
 +            assertRows(cluster.get(1).executeInternal("SELECT pk, ck, v, WRITETIME(v) FROM " + KEYSPACE + ".tbl WHERE pk = 1"), row(1, 1, 1, 43L));
 +            assertRows(cluster.get(3).executeInternal("SELECT pk, ck, v, WRITETIME(v) FROM " + KEYSPACE + ".tbl WHERE pk = 1"), row(1, 1, 1, 43L));
 +
 +            // 2 was not repaired (message was dropped), so still has old data
 +            assertRows(cluster.get(2).executeInternal("SELECT pk, ck, v, WRITETIME(v) FROM " + KEYSPACE + ".tbl WHERE pk = 1"), row(1, 1, 1, 42L));
 +        }
      }
  
+     /**
+      * If a node receives a mutation for a column it's not aware of, it should fail, since it can't write the data.
+      */
      @Test
      public void writeWithSchemaDisagreement() throws Throwable
      {
-         try (ICluster cluster = init(builder().withNodes(3).withConfig(config -> config.with(NETWORK)).start()))
 -        cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v1 int, PRIMARY KEY (pk, ck))");
++        try (Cluster cluster = init(builder().withNodes(3).withConfig(config -> config.with(NETWORK)).start()))
 +        {
 +            cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v1 int, PRIMARY KEY (pk, ck))");
  
 -        cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1) VALUES (1, 1, 1)");
 -        cluster.get(2).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1) VALUES (1, 1, 1)");
 -        cluster.get(3).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1) VALUES (1, 1, 1)");
 +            cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1) VALUES (1, 1, 1)");
 +            cluster.get(2).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1) VALUES (1, 1, 1)");
 +            cluster.get(3).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1) VALUES (1, 1, 1)");
  
 -        // Introduce schema disagreement
 -        cluster.schemaChange("ALTER TABLE " + KEYSPACE + ".tbl ADD v2 int", 1);
 +            // Introduce schema disagreement
 +            cluster.schemaChange("ALTER TABLE " + KEYSPACE + ".tbl ADD v2 int", 1);
  
 -        Exception thrown = null;
 -        try
 -        {
 -            cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1, v2) VALUES (2, 2, 2, 2)",
 -                                           ConsistencyLevel.ALL);
 -        }
 -        catch (RuntimeException e)
 -        {
 -            thrown = e;
 -        }
 +            try
 +            {
 +                cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1, v2) VALUES (2, 2, 2, 2)",
-                                               ConsistencyLevel.QUORUM);
++                                              ConsistencyLevel.ALL);
 +                fail("Should have failed because of schema disagreement.");
 +            }
 +            catch (Exception e)
 +            {
 +                // for some reason, we get weird errors when trying to check class directly
 +                // I suppose it has to do with some classloader manipulation going on
 +                Assert.assertTrue(e.getClass().toString().contains("WriteFailureException"));
 +                // we may see 1 or 2 failures in here, because of the fail-fast behavior of AbstractWriteResponseHandler
 +                Assert.assertTrue(e.getMessage().contains("INCOMPATIBLE_SCHEMA from ") &&
 +                                  (e.getMessage().contains("/127.0.0.2") || e.getMessage().contains("/127.0.0.3")));
  
 -        Assert.assertTrue(thrown.getMessage().contains("Exception occurred on node"));
 -        Assert.assertTrue(thrown.getCause().getCause().getCause().getMessage().contains("Unknown column v2"));
 +            }
 +        }
      }
  
+     /**
+      * If a node receives a mutation for a column it knows has been dropped, the write should succeed
+      */
      @Test
-     public void readWithSchemaDisagreement() throws Throwable
+     public void writeWithSchemaDisagreement2() throws Throwable
+     {
 -        cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v1 int, v2 int, PRIMARY KEY (pk, ck))");
 -
 -        cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1, v2) VALUES (1, 1, 1, 1)");
 -        cluster.get(2).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1, v2) VALUES (1, 1, 1, 1)");
 -        cluster.get(3).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1, v2) VALUES (1, 1, 1, 1)");
 -
 -        for (int i=0; i<cluster.size(); i++)
 -            cluster.get(i+1).flush(KEYSPACE);;
 -
 -        // Introduce schema disagreement
 -        cluster.schemaChange("ALTER TABLE " + KEYSPACE + ".tbl DROP v2", 1);
 -
 -        // execute a write including the dropped column where the coordinator is not yet aware of the drop
 -        // all nodes should process this without error
 -        cluster.coordinator(2).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1, v2) VALUES (2, 2, 2, 2)",
 -                                           ConsistencyLevel.ALL);
 -        // and flushing should also be fine
 -        for (int i=0; i<cluster.size(); i++)
 -            cluster.get(i+1).flush(KEYSPACE);;
 -        // the results of reads will vary depending on whether the coordinator has seen the schema change
 -        // note: read repairs will propagate the v2 value to node1, but this is safe and handled correctly
 -        assertRows(cluster.coordinator(2).execute("SELECT * FROM " + KEYSPACE + ".tbl", ConsistencyLevel.ALL),
 -                   rows(row(1,1,1,1), row(2,2,2,2)));
 -        assertRows(cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + ".tbl", ConsistencyLevel.ALL),
 -                   rows(row(1,1,1), row(2,2,2)));
++        try (Cluster cluster = init(builder().withNodes(3).withConfig(config -> config.with(NETWORK)).start()))
++        {
++            cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v1 int, v2 int, PRIMARY KEY (pk, ck))");
++
++            cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1, v2) VALUES (1, 1, 1, 1)");
++            cluster.get(2).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1, v2) VALUES (1, 1, 1, 1)");
++            cluster.get(3).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1, v2) VALUES (1, 1, 1, 1)");
++            cluster.forEach((instance) -> instance.flush(KEYSPACE));
++
++            // Introduce schema disagreement
++            cluster.schemaChange("ALTER TABLE " + KEYSPACE + ".tbl DROP v2", 1);
++
++            // execute a write including the dropped column where the coordinator is not yet aware of the drop
++            // all nodes should process this without error
++            cluster.coordinator(2).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1, v2) VALUES (2, 2, 2, 2)",
++                                               ConsistencyLevel.ALL);
++            // and flushing should also be fine
++            cluster.forEach((instance) -> instance.flush(KEYSPACE));
++            // the results of reads will vary depending on whether the coordinator has seen the schema change
++            // note: read repairs will propagate the v2 value to node1, but this is safe and handled correctly
++            assertRows(cluster.coordinator(2).execute("SELECT * FROM " + KEYSPACE + ".tbl", ConsistencyLevel.ALL),
++                       rows(row(1,1,1,1), row(2,2,2,2)));
++            assertRows(cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + ".tbl", ConsistencyLevel.ALL),
++                       rows(row(1,1,1), row(2,2,2)));
++        }
+     }
+ 
+     /**
+      * If a node isn't aware of a column, but receives a mutation without that column, the write should succeed
+      */
+     @Test
+     public void writeWithInconsequentialSchemaDisagreement() throws Throwable
      {
 -        cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v1 int, PRIMARY KEY (pk, ck))");
 +        try (ICluster cluster = init(builder().withNodes(3).withConfig(config -> config.with(NETWORK)).start()))
 +        {
 +            cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v1 int, PRIMARY KEY (pk, ck))");
  
 -        cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1) VALUES (1, 1, 1)");
 -        cluster.get(2).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1) VALUES (1, 1, 1)");
 -        cluster.get(3).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1) VALUES (1, 1, 1)");
 +            cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1) VALUES (1, 1, 1)");
 +            cluster.get(2).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1) VALUES (1, 1, 1)");
 +            cluster.get(3).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1) VALUES (1, 1, 1)");
  
 -        // Introduce schema disagreement
 -        cluster.schemaChange("ALTER TABLE " + KEYSPACE + ".tbl ADD v2 int", 1);
 +            // Introduce schema disagreement
 +            cluster.schemaChange("ALTER TABLE " + KEYSPACE + ".tbl ADD v2 int", 1);
  
-             try
-             {
-                 cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1", ConsistencyLevel.ALL);
-                 fail("Should have failed because of schema disagreement.");
-             }
-             catch (Exception e)
-             {
-                 // for some reason, we get weird errors when trying to check class directly
-                 // I suppose it has to do with some classloader manipulation going on
-                 Assert.assertTrue(e.getClass().toString().contains("ReadFailureException"));
-                 // we may see 1 or 2 failures in here, because of the fail-fast behavior of ReadCallback
-                 Assert.assertTrue(e.getMessage().contains("INCOMPATIBLE_SCHEMA from ") &&
-                                   (e.getMessage().contains("/127.0.0.2") || e.getMessage().contains("/127.0.0.3")));
-             }
 -        // this write shouldn't cause any problems because it doesn't write to the new column
 -        cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1) VALUES (2, 2, 2)",
 -                                       ConsistencyLevel.ALL);
++            // this write shouldn't cause any problems because it doesn't write to the new column
++            cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1) VALUES (2, 2, 2)",
++                                           ConsistencyLevel.ALL);
++        }
+     }
+ 
+     /**
+      * If a node receives a read for a column it's not aware of, it shouldn't complain, since it won't have any data for that column
+      */
+     @Test
+     public void readWithSchemaDisagreement() throws Throwable
+     {
 -        cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v1 int, PRIMARY KEY (pk, ck))");
++        try (ICluster cluster = init(builder().withNodes(3).withConfig(config -> config.with(NETWORK)).start()))
++        {
++            cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v1 int, PRIMARY KEY (pk, ck))");
  
 -        cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1) VALUES (1, 1, 1)");
 -        cluster.get(2).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1) VALUES (1, 1, 1)");
 -        cluster.get(3).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1) VALUES (1, 1, 1)");
++            cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1) VALUES (1, 1, 1)");
++            cluster.get(2).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1) VALUES (1, 1, 1)");
++            cluster.get(3).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1) VALUES (1, 1, 1)");
+ 
 -        // Introduce schema disagreement
 -        cluster.schemaChange("ALTER TABLE " + KEYSPACE + ".tbl ADD v2 int", 1);
++            // Introduce schema disagreement
++            cluster.schemaChange("ALTER TABLE " + KEYSPACE + ".tbl ADD v2 int", 1);
+ 
 -        Object[][] expected = new Object[][]{new Object[]{1, 1, 1, null}};
 -        assertRows(cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1", ConsistencyLevel.ALL), expected);
++            Object[][] expected = new Object[][]{new Object[]{1, 1, 1, null}};
++            assertRows(cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1", ConsistencyLevel.ALL), expected);
 +        }
      }
  
      @Test


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org