You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sn...@apache.org on 2020/01/31 09:10:26 UTC

[cassandra] branch trunk updated (5f7c886 -> 7a7eece)

This is an automated email from the ASF dual-hosted git repository.

snazy pushed a change to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git.


    from 5f7c886  Merge branch 'cassandra-3.11' into trunk
     new 4f27a37  C* 3.0 sstables w/ UDTs are corrupted in C* 3.11 and 4.0
     new ffab2b8  C* 3.0 sstables w/ UDTs are corrupted in C* 3.11 and 4.0
     new 7a7eece  Merge branch 'cassandra-3.11' into trunk

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 CHANGES.txt                                        |   1 +
 NEWS.txt                                           |  18 +
 src/java/org/apache/cassandra/cql3/CQL3Type.java   |   4 +-
 .../apache/cassandra/db/SerializationHeader.java   |  12 +
 .../org/apache/cassandra/db/SystemKeyspace.java    |   2 +
 .../org/apache/cassandra/db/rows/AbstractCell.java |  15 +-
 .../org/apache/cassandra/db/rows/AbstractRow.java  |  22 +-
 .../db/rows/AbstractTypeVersionComparator.java     | 121 ---
 .../db/rows/ColumnMetadataVersionComparator.java   |  85 ++
 src/java/org/apache/cassandra/db/rows/Row.java     |   2 +-
 src/java/org/apache/cassandra/db/rows/Rows.java    |   2 +-
 .../cassandra/io/sstable/SSTableHeaderFix.java     | 918 ++++++++++++++++++++
 .../cassandra/io/sstable/format/SSTableReader.java |   5 +
 .../io/sstable/metadata/IMetadataSerializer.java   |   5 +
 .../io/sstable/metadata/MetadataSerializer.java    |   2 +-
 .../apache/cassandra/service/CassandraDaemon.java  |   3 +
 .../apache/cassandra/tools/StandaloneScrubber.java | 144 ++-
 .../org/apache/cassandra/utils/FBUtilities.java    |  12 +
 ...va => ColumnMetadataVersionComparatorTest.java} |  29 +-
 .../cassandra/io/sstable/SSTableHeaderFixTest.java | 964 +++++++++++++++++++++
 .../schema/TupleTypesRepresentationTest.java       | 403 +++++++++
 21 files changed, 2621 insertions(+), 148 deletions(-)
 delete mode 100644 src/java/org/apache/cassandra/db/rows/AbstractTypeVersionComparator.java
 create mode 100644 src/java/org/apache/cassandra/db/rows/ColumnMetadataVersionComparator.java
 create mode 100644 src/java/org/apache/cassandra/io/sstable/SSTableHeaderFix.java
 rename test/unit/org/apache/cassandra/db/rows/{AbstractTypeVersionComparatorTest.java => ColumnMetadataVersionComparatorTest.java} (71%)
 create mode 100644 test/unit/org/apache/cassandra/io/sstable/SSTableHeaderFixTest.java
 create mode 100644 test/unit/org/apache/cassandra/schema/TupleTypesRepresentationTest.java


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org


[cassandra] 02/02: Merge branch 'cassandra-3.11' into trunk

Posted by sn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

snazy pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit 7a7eece9578312a2f9d77de6e0755a3c3c542e99
Merge: 4f27a37 ffab2b8
Author: Robert Stupp <sn...@snazy.de>
AuthorDate: Fri Jan 31 10:08:33 2020 +0100

    Merge branch 'cassandra-3.11' into trunk



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org


[cassandra] 01/02: C* 3.0 sstables w/ UDTs are corrupted in C* 3.11 and 4.0

Posted by sn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

snazy pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit 4f27a37d7dd2750cc25261773a67ee8b4a07142c
Author: Robert Stupp <sn...@snazy.de>
AuthorDate: Fri Feb 15 14:24:39 2019 +0100

    C* 3.0 sstables w/ UDTs are corrupted in C* 3.11 and 4.0
    
    patch by Robert Stupp; reviewed by Brandon Williams for CASSANDRA-15035
---
 CHANGES.txt                                        |   1 +
 NEWS.txt                                           |  18 +
 src/java/org/apache/cassandra/cql3/CQL3Type.java   |   4 +-
 .../apache/cassandra/db/SerializationHeader.java   |  12 +
 .../org/apache/cassandra/db/SystemKeyspace.java    |   2 +
 .../org/apache/cassandra/db/rows/AbstractCell.java |  15 +-
 .../org/apache/cassandra/db/rows/AbstractRow.java  |  22 +-
 .../db/rows/AbstractTypeVersionComparator.java     | 121 ---
 .../db/rows/ColumnMetadataVersionComparator.java   |  85 ++
 src/java/org/apache/cassandra/db/rows/Row.java     |   2 +-
 src/java/org/apache/cassandra/db/rows/Rows.java    |   2 +-
 .../cassandra/io/sstable/SSTableHeaderFix.java     | 918 ++++++++++++++++++++
 .../cassandra/io/sstable/format/SSTableReader.java |   5 +
 .../io/sstable/metadata/IMetadataSerializer.java   |   5 +
 .../io/sstable/metadata/MetadataSerializer.java    |   2 +-
 .../apache/cassandra/service/CassandraDaemon.java  |   3 +
 .../apache/cassandra/tools/StandaloneScrubber.java | 144 ++-
 .../org/apache/cassandra/utils/FBUtilities.java    |  12 +
 ...va => ColumnMetadataVersionComparatorTest.java} |  29 +-
 .../cassandra/io/sstable/SSTableHeaderFixTest.java | 964 +++++++++++++++++++++
 .../schema/TupleTypesRepresentationTest.java       | 403 +++++++++
 21 files changed, 2621 insertions(+), 148 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 98c189a..9554a00 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -25,6 +25,7 @@
    otherwise synchronize their clocks, and that clocks are mostly in sync, since
    this is a requirement for general correctness of last write wins. (CASSANDRA-15216)
 Merged from 3.11:
+ * Fix bad UDT sstable metadata serialization headers written by C* 3.0 on upgrade and in sstablescrub (CASSANDRA-15035)
  * Fix nodetool compactionstats showing extra pending task for TWCS - patch implemented (CASSANDRA-15409)
  * Fix SELECT JSON formatting for the "duration" type (CASSANDRA-15075)
  * Update nodetool help stop output (CASSANDRA-15401)
diff --git a/NEWS.txt b/NEWS.txt
index 9c6b43f..7d716fc 100644
--- a/NEWS.txt
+++ b/NEWS.txt
@@ -113,6 +113,24 @@ New features
 
 Upgrading
 ---------
+    - Sstables for tables using with a frozen UDT written by C* 3.0 appear as corrupted.
+
+      Background: The serialization-header in the -Statistics.db sstable component contains the type information
+      of the table columns. C* 3.0 write incorrect type information for frozen UDTs by omitting the
+      "frozen" information. Non-frozen UDTs were introduced by CASSANDRA-7423 in C* 3.6. Since then, the missing
+      "frozen" information leads to deserialization issues that result in CorruptSSTableExceptions, potentially other
+      exceptions as well.
+
+      As a mitigation, the sstable serialization-headers are rewritten to contain the missing "frozen" information for
+      UDTs once, when an upgrade from C* 3.0 is detected. This migration does not touch snapshots or backups.
+
+      The sstablescrub tool now performs a check of the sstable serialization-header against the schema. A mismatch of
+      the types in the serialization-header and the schema will cause sstablescrub to error out and stop by default.
+      See the new `-e` option. `-e off` disables the new validation code. `-e fix` or `-e fix-only`, e.g.
+      `sstablescrub -e fix keyspace table`, will validate the serialization-header, rewrite the non-frozen UDTs
+      in the serialzation-header to frozen UDTs, if that matches the schema, and continue with scrub.
+      See `sstablescrub -h`.
+      (CASSANDRA-15035)
     - CASSANDRA-13241 lowered the default chunk_lengh_in_kb for compresesd tables from
       64kb to 16kb. For highly compressible data this can have a noticeable impact
       on space utilization. You may want to consider manually specifying this value.
diff --git a/src/java/org/apache/cassandra/cql3/CQL3Type.java b/src/java/org/apache/cassandra/cql3/CQL3Type.java
index 340a992..ee2db68 100644
--- a/src/java/org/apache/cassandra/cql3/CQL3Type.java
+++ b/src/java/org/apache/cassandra/cql3/CQL3Type.java
@@ -704,8 +704,10 @@ public interface CQL3Type
             {
                 if (innerType instanceof RawCollection)
                     throw new InvalidRequestException("Non-frozen collections are not allowed inside collections: " + this);
-                else
+                else if (innerType.isUDT())
                     throw new InvalidRequestException("Non-frozen UDTs are not allowed inside collections: " + this);
+                else
+                    throw new InvalidRequestException("Non-frozen tuples are not allowed inside collections: " + this);
             }
 
             public boolean referencesUserType(String name)
diff --git a/src/java/org/apache/cassandra/db/SerializationHeader.java b/src/java/org/apache/cassandra/db/SerializationHeader.java
index 2e5211c..15ef268 100644
--- a/src/java/org/apache/cassandra/db/SerializationHeader.java
+++ b/src/java/org/apache/cassandra/db/SerializationHeader.java
@@ -288,6 +288,18 @@ public class SerializationHeader
             this.stats = stats;
         }
 
+        /**
+         * <em>Only</em> exposed for {@link org.apache.cassandra.io.sstable.SSTableHeaderFix}.
+         */
+        public static Component buildComponentForTools(AbstractType<?> keyType,
+                                                       List<AbstractType<?>> clusteringTypes,
+                                                       Map<ByteBuffer, AbstractType<?>> staticColumns,
+                                                       Map<ByteBuffer, AbstractType<?>> regularColumns,
+                                                       EncodingStats stats)
+        {
+            return new Component(keyType, clusteringTypes, staticColumns, regularColumns, stats);
+        }
+
         public MetadataType getType()
         {
             return MetadataType.HEADER;
diff --git a/src/java/org/apache/cassandra/db/SystemKeyspace.java b/src/java/org/apache/cassandra/db/SystemKeyspace.java
index 0d79ae9..c427c8f 100644
--- a/src/java/org/apache/cassandra/db/SystemKeyspace.java
+++ b/src/java/org/apache/cassandra/db/SystemKeyspace.java
@@ -1400,6 +1400,8 @@ public final class SystemKeyspace
         String previous = getPreviousVersionString();
         String next = FBUtilities.getReleaseVersionString();
 
+        FBUtilities.setPreviousReleaseVersionString(previous);
+
         // if we're restarting after an upgrade, snapshot the system and schema keyspaces
         if (!previous.equals(NULL_VERSION.toString()) && !previous.equals(next))
 
diff --git a/src/java/org/apache/cassandra/db/rows/AbstractCell.java b/src/java/org/apache/cassandra/db/rows/AbstractCell.java
index 51c9ff4..3f2da96 100644
--- a/src/java/org/apache/cassandra/db/rows/AbstractCell.java
+++ b/src/java/org/apache/cassandra/db/rows/AbstractCell.java
@@ -28,6 +28,7 @@ import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.db.marshal.CollectionType;
 import org.apache.cassandra.schema.ColumnMetadata;
 import org.apache.cassandra.serializers.MarshalException;
+import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.memory.AbstractAllocator;
 
 /**
@@ -204,7 +205,19 @@ public abstract class AbstractCell extends Cell
         if (isTombstone())
             return String.format("[%s=<tombstone> %s]", column().name, livenessInfoString());
         else
-            return String.format("[%s=%s %s]", column().name, type.getString(value()), livenessInfoString());
+            return String.format("[%s=%s %s]", column().name, safeToString(type, value()), livenessInfoString());
+    }
+
+    private static String safeToString(AbstractType<?> type, ByteBuffer data)
+    {
+        try
+        {
+            return type.getString(data);
+        }
+        catch (Exception e)
+        {
+            return "0x" + ByteBufferUtil.bytesToHex(data);
+        }
     }
 
     private String livenessInfoString()
diff --git a/src/java/org/apache/cassandra/db/rows/AbstractRow.java b/src/java/org/apache/cassandra/db/rows/AbstractRow.java
index 2018d4e..957ffd4 100644
--- a/src/java/org/apache/cassandra/db/rows/AbstractRow.java
+++ b/src/java/org/apache/cassandra/db/rows/AbstractRow.java
@@ -77,15 +77,31 @@ public abstract class AbstractRow implements Row
         {
             ByteBuffer value = clustering.get(i);
             if (value != null)
-                metadata.comparator.subtype(i).validate(value);
+            {
+                try
+                {
+                    metadata.comparator.subtype(i).validate(value);
+                }
+                catch (Exception e)
+                {
+                    throw new MarshalException("comparator #" + i + " '" + metadata.comparator.subtype(i) + "' in '" + metadata + "' didn't validate", e);
+                }
+            }
         }
 
         primaryKeyLivenessInfo().validate();
         if (deletion().time().localDeletionTime() < 0)
-            throw new MarshalException("A local deletion time should not be negative");
+            throw new MarshalException("A local deletion time should not be negative in '" + metadata + "'");
 
         for (ColumnData cd : this)
-            cd.validate();
+            try
+            {
+                cd.validate();
+            }
+            catch (Exception e)
+            {
+                throw new MarshalException("data for '" + cd.column.debugString() + "', " + cd + " in '" + metadata + "' didn't validate", e);
+            }
     }
 
     public boolean hasInvalidDeletions()
diff --git a/src/java/org/apache/cassandra/db/rows/AbstractTypeVersionComparator.java b/src/java/org/apache/cassandra/db/rows/AbstractTypeVersionComparator.java
deleted file mode 100644
index e47f681..0000000
--- a/src/java/org/apache/cassandra/db/rows/AbstractTypeVersionComparator.java
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db.rows;
-
-import java.util.Comparator;
-import java.util.List;
-
-import org.apache.cassandra.db.marshal.*;
-
-/**
- * A {@code Comparator} use to determine which version of a type should be used.
- * <p>In the case of UDTs it is possible to have 2 versions or more of the same type, if some fields has been added to
- * the type. To avoid problems the latest type need to be used.</p>
- */
-final class AbstractTypeVersionComparator implements Comparator<AbstractType<?>>
-{
-    public static final Comparator<AbstractType<?>> INSTANCE = new AbstractTypeVersionComparator();
-
-    private AbstractTypeVersionComparator()
-    {
-    }
-
-    @Override
-    public int compare(AbstractType<?> type, AbstractType<?> otherType)
-    {
-        if (!type.getClass().equals(otherType.getClass()))
-            throw new IllegalArgumentException(String.format("Trying to compare 2 different types: %s and %s",
-                                                             type,
-                                                             otherType));
-
-        if (type.equals(otherType))
-            return 0;
-
-        // The only case where 2 types can differ is if they contains some UDTs and one of them has more
-        // fields (due to an ALTER type ADD) than in the other type. In this case we need to pick the type with
-        // the bigger amount of fields.
-        if (type.isUDT())
-            return compareUserType((UserType) type, (UserType) otherType);
-
-        if (type.isTuple())
-            return compareTuple((TupleType) type, (TupleType) otherType);
-
-        if (type.isCollection())
-            return compareCollectionTypes(type, otherType);
-
-        if (type instanceof CompositeType)
-            return compareCompositeTypes((CompositeType) type, (CompositeType) otherType);
-
-        // In theory we should never reach that point but to be on the safe side we allow it.
-        return 0;
-    }
-
-    private int compareCompositeTypes(CompositeType type, CompositeType otherType)
-    {
-        List<AbstractType<?>> types = type.getComponents();
-        List<AbstractType<?>> otherTypes = otherType.getComponents();
-
-        if (types.size() != otherTypes.size())
-            return Integer.compare(types.size(), otherTypes.size());
-
-        for (int i = 0, m = type.componentsCount(); i < m ; i++)
-        {
-            int test = compare(types.get(i), otherTypes.get(i));
-            if (test != 0);
-                return test;
-        }
-        return 0;
-    }
-
-    private int compareCollectionTypes(AbstractType<?> type, AbstractType<?> otherType)
-    {
-        if (type instanceof MapType)
-            return compareMapType((MapType<?, ?>) type, (MapType<?, ?>) otherType);
-
-        if (type instanceof SetType)
-            return compare(((SetType<?>) type).getElementsType(), ((SetType<?>) otherType).getElementsType());
-
-        return compare(((ListType<?>) type).getElementsType(), ((ListType<?>) otherType).getElementsType());
-    }
-
-    private int compareMapType(MapType<?, ?> type, MapType<?, ?> otherType)
-    {
-        int test = compare(type.getKeysType(), otherType.getKeysType());
-        return test != 0 ? test : compare(type.getValuesType(), otherType.getValuesType());
-    }
-
-    private int compareUserType(UserType type, UserType otherType)
-    {
-        return compareTuple(type, otherType);
-    }
-
-    private int compareTuple(TupleType type, TupleType otherType)
-    {
-        if (type.size() != otherType.size())
-            return Integer.compare(type.size(), otherType.size());
-
-        int test = 0;
-        int i = 0;
-        while (test == 0 && i < type.size())
-        {
-            test = compare(type.type(i), otherType.type(i));
-            i++;
-        }
-        return test;
-    }
-}
diff --git a/src/java/org/apache/cassandra/db/rows/ColumnMetadataVersionComparator.java b/src/java/org/apache/cassandra/db/rows/ColumnMetadataVersionComparator.java
new file mode 100644
index 0000000..6b2d97c
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/rows/ColumnMetadataVersionComparator.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.rows;
+
+import java.util.Comparator;
+
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.schema.ColumnMetadata;
+
+/**
+ * A {@code Comparator} use to determine which version of a {@link ColumnMetadata} should be used.
+ * <p>
+ * We can sometimes get 2 different versions of the definition of a give column due to differing types. This can happen
+ * in at least 2 cases:
+ * <ul>
+ *     <li>for UDT, where new fields can be added (see CASSANDRA-13776).</li>
+ *     <li>pre-CASSANDRA-12443, when we allowed type altering. And while we don't allow it anymore, it is possible
+ *     to still have sstables with metadata mentioning an old pre-altering type (such old version of pre-altering
+ *     types will be eventually eliminated from the system by compaction and thanks to this comparator, but we
+ *     cannot guarantee when that's fully done).</li>
+ * </ul>
+ */
+final class ColumnMetadataVersionComparator implements Comparator<ColumnMetadata>
+{
+    public static final Comparator<ColumnMetadata> INSTANCE = new ColumnMetadataVersionComparator();
+
+    private ColumnMetadataVersionComparator()
+    {
+    }
+
+    @Override
+    public int compare(ColumnMetadata v1, ColumnMetadata v2)
+    {
+        assert v1.ksName.equals(v2.ksName)
+               && v1.cfName.equals(v2.cfName)
+               && v1.name.equals(v2.name) : v1.debugString() + " != " + v2.debugString();
+
+        AbstractType<?> v1Type = v1.type;
+        AbstractType<?> v2Type = v2.type;
+
+        // In most cases, this is used on equal types, and on most types, equality is cheap (most are singleton classes
+        // and just use reference equality), so evacuating that case first.
+        if (v1Type.equals(v2Type))
+            return 0;
+
+        // If those aren't the same type, one must be "more general" than the other, that is accept strictly more values.
+        if (v1Type.isValueCompatibleWith(v2Type))
+        {
+            // Note: if both accept the same values, there is really no good way to prefer one over the other and so we
+            // consider them equal here. In practice, this mean we have 2 types that accepts the same values but are
+            // not equal. For internal types, TimestampType/DataType/LongType is, afaik, the only example, but as user
+            // can write custom types, who knows when this can happen. But excluding any user custom type weirdness
+            // (that would really be a bug of their type), such types should only differ in the way they sort, and as
+            // this method is only used for regular/static columns in practice, where sorting has no impact whatsoever,
+            // it shouldn't matter too much what we return here.
+            return v2Type.isValueCompatibleWith(v1Type) ? 0 : 1;
+        }
+        else if (v2Type.isValueCompatibleWith(v1Type))
+        {
+            return -1;
+        }
+        else
+        {
+            // Neither is a super type of the other: something is pretty wrong and we probably shouldn't ignore it.
+            throw new IllegalArgumentException(String.format("Found 2 incompatible versions of column %s in %s.%s: one " +
+                                                             "of type %s and one of type %s (but both types are incompatible)",
+                                                             v1.name, v1.ksName, v1.cfName, v1Type, v2Type));
+        }
+    }
+}
diff --git a/src/java/org/apache/cassandra/db/rows/Row.java b/src/java/org/apache/cassandra/db/rows/Row.java
index 6f0b43e..2f752b8 100644
--- a/src/java/org/apache/cassandra/db/rows/Row.java
+++ b/src/java/org/apache/cassandra/db/rows/Row.java
@@ -764,7 +764,7 @@ public interface Row extends Unfiltered, Iterable<ColumnData>
                 if (column == null)
                     return true;
 
-                return AbstractTypeVersionComparator.INSTANCE.compare(column.type, dataColumn.type) < 0;
+                return ColumnMetadataVersionComparator.INSTANCE.compare(column, dataColumn) < 0;
             }
 
             @SuppressWarnings("resource")
diff --git a/src/java/org/apache/cassandra/db/rows/Rows.java b/src/java/org/apache/cassandra/db/rows/Rows.java
index c0c84b6..d62d3b5 100644
--- a/src/java/org/apache/cassandra/db/rows/Rows.java
+++ b/src/java/org/apache/cassandra/db/rows/Rows.java
@@ -410,7 +410,7 @@ public abstract class Rows
         if (curb == null)
             return cura.column;
 
-        if (AbstractTypeVersionComparator.INSTANCE.compare(cura.column.type, curb.column.type) >= 0)
+        if (ColumnMetadataVersionComparator.INSTANCE.compare(cura.column, curb.column) >= 0)
             return cura.column;
 
         return curb.column;
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableHeaderFix.java b/src/java/org/apache/cassandra/io/sstable/SSTableHeaderFix.java
new file mode 100644
index 0000000..3577259
--- /dev/null
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableHeaderFix.java
@@ -0,0 +1,918 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.io.sstable;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.charset.CharacterCodingException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.EnumSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.cql3.CQL3Type;
+import org.apache.cassandra.cql3.statements.schema.IndexTarget;
+import org.apache.cassandra.db.Directories;
+import org.apache.cassandra.db.SerializationHeader;
+import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
+import org.apache.cassandra.db.marshal.AbstractCompositeType;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.marshal.CollectionType;
+import org.apache.cassandra.db.marshal.CompositeType;
+import org.apache.cassandra.db.marshal.DynamicCompositeType;
+import org.apache.cassandra.db.marshal.ListType;
+import org.apache.cassandra.db.marshal.MapType;
+import org.apache.cassandra.db.marshal.SetType;
+import org.apache.cassandra.db.marshal.TupleType;
+import org.apache.cassandra.db.marshal.UserType;
+import org.apache.cassandra.io.sstable.metadata.MetadataComponent;
+import org.apache.cassandra.io.sstable.metadata.MetadataType;
+import org.apache.cassandra.schema.ColumnMetadata;
+import org.apache.cassandra.schema.IndexMetadata;
+import org.apache.cassandra.schema.Schema;
+import org.apache.cassandra.schema.SchemaConstants;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.CassandraVersion;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.Pair;
+
+/**
+ * Validates and fixes type issues in the serialization-header of sstables.
+ */
+public abstract class SSTableHeaderFix
+{
+    // C* 3.0 upgrade code
+
+    private static final String SKIPAUTOMATICUDTFIX = "cassandra.skipautomaticudtfix";
+    private static final boolean SKIP_AUTOMATIC_FIX_ON_UPGRADE = Boolean.getBoolean(SKIPAUTOMATICUDTFIX);
+
+    public static void fixNonFrozenUDTIfUpgradeFrom30()
+    {
+        String previousVersionString = FBUtilities.getPreviousReleaseVersionString();
+        if (previousVersionString == null)
+            return;
+        CassandraVersion previousVersion = new CassandraVersion(previousVersionString);
+        if (previousVersion.major != 3 || previousVersion.minor > 0)
+        {
+            // Not an upgrade from 3.0 to 3.x, nothing to do here
+            return;
+        }
+
+        if (SKIP_AUTOMATIC_FIX_ON_UPGRADE)
+        {
+            logger.warn("Detected upgrade from {} to {}, but -D{}=true, NOT fixing UDT type references in " +
+                        "sstable metadata serialization-headers",
+                        previousVersionString,
+                        FBUtilities.getReleaseVersionString(),
+                        SKIPAUTOMATICUDTFIX);
+            return;
+        }
+
+        logger.info("Detected upgrade from {} to {}, fixing UDT type references in sstable metadata serialization-headers",
+                    previousVersionString,
+                    FBUtilities.getReleaseVersionString());
+
+        SSTableHeaderFix instance = SSTableHeaderFix.builder()
+                                                    .schemaCallback(() -> Schema.instance::getTableMetadata)
+                                                    .build();
+        instance.execute();
+    }
+
+    // "regular" SSTableHeaderFix code, also used by StandaloneScrubber.
+
+    private static final Logger logger = LoggerFactory.getLogger(SSTableHeaderFix.class);
+
+    protected final Consumer<String> info;
+    protected final Consumer<String> warn;
+    protected final Consumer<String> error;
+    protected final boolean dryRun;
+    protected final Function<Descriptor, TableMetadata> schemaCallback;
+
+    private final List<Descriptor> descriptors;
+
+    private final List<Pair<Descriptor, Map<MetadataType, MetadataComponent>>> updates = new ArrayList<>();
+    private boolean hasErrors;
+
+    SSTableHeaderFix(Builder builder)
+    {
+        this.info = builder.info;
+        this.warn = builder.warn;
+        this.error = builder.error;
+        this.dryRun = builder.dryRun;
+        this.schemaCallback = builder.schemaCallback.get();
+        this.descriptors = new ArrayList<>(builder.descriptors);
+        Objects.requireNonNull(this.info, "info is null");
+        Objects.requireNonNull(this.warn, "warn is null");
+        Objects.requireNonNull(this.error, "error is null");
+        Objects.requireNonNull(this.schemaCallback, "schemaCallback is null");
+    }
+
+    public static Builder builder()
+    {
+        return new Builder();
+    }
+
+    /**
+     * Builder to configure and construct an instance of {@link SSTableHeaderFix}.
+     * Default settings:
+     * <ul>
+     *     <li>log via the slf4j logger of {@link SSTableHeaderFix}</li>
+     *     <li>no dry-run (i.e. validate and fix, if no serious errors are detected)</li>
+     *     <li>no schema callback</li>
+     * </ul>
+     * If neither {@link #withDescriptor(Descriptor)} nor {@link #withPath(Path)} are used,
+     * all "live" sstables in all data directories will be scanned.
+     */
+    public static class Builder
+    {
+        private final List<Path> paths = new ArrayList<>();
+        private final List<Descriptor> descriptors = new ArrayList<>();
+        private Consumer<String> info = (ln) -> logger.info("{}", ln);
+        private Consumer<String> warn = (ln) -> logger.warn("{}", ln);
+        private Consumer<String> error = (ln) -> logger.error("{}", ln);
+        private boolean dryRun;
+        private Supplier<Function<Descriptor, TableMetadata>> schemaCallback = () -> null;
+
+        private Builder()
+        {}
+
+        /**
+         * Only validate and prepare fix, but do not write updated (fixed) sstable serialization-headers.
+         */
+        public Builder dryRun()
+        {
+            dryRun = true;
+            return this;
+        }
+
+        public Builder info(Consumer<String> output)
+        {
+            this.info = output;
+            return this;
+        }
+
+        public Builder warn(Consumer<String> warn)
+        {
+            this.warn = warn;
+            return this;
+        }
+
+        public Builder error(Consumer<String> error)
+        {
+            this.error = error;
+            return this;
+        }
+
+        /**
+         * Manually provide an individual sstable or directory containing sstables.
+         *
+         * Implementation note: procesing "live" sstables in their data directories as well as sstables
+         * in snapshots and backups in the data directories works.
+         *
+         * But processing sstables that reside somewhere else (i.e. verifying sstables before import)
+         * requires the use of {@link #withDescriptor(Descriptor)}.
+         */
+        public Builder withPath(Path path)
+        {
+            this.paths.add(path);
+            return this;
+        }
+
+        public Builder withDescriptor(Descriptor descriptor)
+        {
+            this.descriptors.add(descriptor);
+            return this;
+        }
+
+        /**
+         * Schema callback to retrieve the schema of a table. Production code always delegates to the
+         * live schema ({@code Schema.instance}). Unit tests use this method to feed a custom schema.
+         */
+        public Builder schemaCallback(Supplier<Function<Descriptor, TableMetadata>> schemaCallback)
+        {
+            this.schemaCallback = schemaCallback;
+            return this;
+        }
+
+        public SSTableHeaderFix build()
+        {
+            if (paths.isEmpty() && descriptors.isEmpty())
+                return new AutomaticHeaderFix(this);
+
+            return new ManualHeaderFix(this);
+        }
+
+        public Builder logToList(List<String> output)
+        {
+            return info(ln -> output.add("INFO  " + ln))
+                   .warn(ln -> output.add("WARN  " + ln))
+                   .error(ln -> output.add("ERROR " + ln));
+        }
+    }
+
+    public final void execute()
+    {
+        prepare();
+
+        logger.debug("Processing {} sstables:{}",
+                     descriptors.size(),
+                     descriptors.stream().map(Descriptor::toString).collect(Collectors.joining("\n    ", "\n    ", "")));
+
+        descriptors.forEach(this::processSSTable);
+
+        if (updates.isEmpty())
+            return;
+
+        if (hasErrors)
+        {
+            info.accept("Stopping due to previous errors. Either fix the errors or specify the ignore-errors option.");
+            return;
+        }
+
+        if (dryRun)
+        {
+            info.accept("Not fixing identified and fixable serialization-header issues.");
+            return;
+        }
+
+        info.accept("Writing new metadata files");
+        updates.forEach(descAndMeta -> writeNewMetadata(descAndMeta.left, descAndMeta.right));
+        info.accept("Finished writing new metadata files");
+    }
+
+    /**
+     * Whether {@link #execute()} encountered an error.
+     */
+    public boolean hasError()
+    {
+        return hasErrors;
+    }
+
+    /**
+     * Whether {@link #execute()} found mismatches.
+     */
+    public boolean hasChanges()
+    {
+        return !updates.isEmpty();
+    }
+
+    abstract void prepare();
+
+    private void error(String format, Object... args)
+    {
+        hasErrors = true;
+        error.accept(String.format(format, args));
+    }
+
+    void processFileOrDirectory(Path path)
+    {
+        Stream.of(path)
+              .flatMap(SSTableHeaderFix::maybeExpandDirectory)
+              .filter(p -> Descriptor.fromFilenameWithComponent(p.toFile()).right.type == Component.Type.DATA)
+              .map(Path::toString)
+              .map(Descriptor::fromFilename)
+              .forEach(descriptors::add);
+    }
+
+    private static Stream<Path> maybeExpandDirectory(Path path)
+    {
+        if (Files.isRegularFile(path))
+            return Stream.of(path);
+        return LifecycleTransaction.getFiles(path, (file, fileType) -> fileType == Directories.FileType.FINAL, Directories.OnTxnErr.IGNORE)
+                                   .stream()
+                                   .map(File::toPath);
+    }
+
+    private void processSSTable(Descriptor desc)
+    {
+        if (desc.cfname.indexOf('.') != -1)
+        {
+            // secondary index not checked
+
+            // partition-key is the indexed column type
+            // clustering-key is org.apache.cassandra.db.marshal.PartitionerDefinedOrder
+            // no static columns, no regular columns
+            return;
+        }
+
+        TableMetadata tableMetadata = schemaCallback.apply(desc);
+        if (tableMetadata == null)
+        {
+            error("Table %s.%s not found in the schema - NOT checking sstable %s", desc.ksname, desc.cfname, desc);
+            return;
+        }
+
+        Set<Component> components = SSTable.discoverComponentsFor(desc);
+        if (components.stream().noneMatch(c -> c.type == Component.Type.STATS))
+        {
+            error("sstable %s has no -Statistics.db component.", desc);
+            return;
+        }
+
+        Map<MetadataType, MetadataComponent> metadata = readSSTableMetadata(desc);
+        if (metadata == null)
+            return;
+
+        MetadataComponent component = metadata.get(MetadataType.HEADER);
+        if (!(component instanceof SerializationHeader.Component))
+        {
+            error("sstable %s: Expected %s, but got %s from metadata.get(MetadataType.HEADER)",
+                  desc,
+                  SerializationHeader.Component.class.getName(),
+                  component != null ? component.getClass().getName() : "'null'");
+            return;
+        }
+        SerializationHeader.Component header = (SerializationHeader.Component) component;
+
+        // check partition key type
+        AbstractType<?> keyType = validatePartitionKey(desc, tableMetadata, header);
+
+        // check clustering columns
+        List<AbstractType<?>> clusteringTypes = validateClusteringColumns(desc, tableMetadata, header);
+
+        // check static and regular columns
+        Map<ByteBuffer, AbstractType<?>> staticColumns = validateColumns(desc, tableMetadata, header.getStaticColumns(), ColumnMetadata.Kind.STATIC);
+        Map<ByteBuffer, AbstractType<?>> regularColumns = validateColumns(desc, tableMetadata, header.getRegularColumns(), ColumnMetadata.Kind.REGULAR);
+
+        SerializationHeader.Component newHeader = SerializationHeader.Component.buildComponentForTools(keyType,
+                                                                                                       clusteringTypes,
+                                                                                                       staticColumns,
+                                                                                                       regularColumns,
+                                                                                                       header.getEncodingStats());
+
+        // SerializationHeader.Component has no equals(), but a "good" toString()
+        if (header.toString().equals(newHeader.toString()))
+            return;
+
+        Map<MetadataType, MetadataComponent> newMetadata = new LinkedHashMap<>(metadata);
+        newMetadata.put(MetadataType.HEADER, newHeader);
+
+        updates.add(Pair.create(desc, newMetadata));
+    }
+
+    private AbstractType<?> validatePartitionKey(Descriptor desc, TableMetadata tableMetadata, SerializationHeader.Component header)
+    {
+        boolean keyMismatch = false;
+        AbstractType<?> headerKeyType = header.getKeyType();
+        AbstractType<?> schemaKeyType = tableMetadata.partitionKeyType;
+        boolean headerKeyComposite = headerKeyType instanceof CompositeType;
+        boolean schemaKeyComposite = schemaKeyType instanceof CompositeType;
+        if (headerKeyComposite != schemaKeyComposite)
+        {
+            // one is a composite partition key, the other is not - very suspicious
+            keyMismatch = true;
+        }
+        else if (headerKeyComposite) // && schemaKeyComposite
+        {
+            // Note, the logic is similar as just calling 'fixType()' using the composite partition key,
+            // but the log messages should use the composite partition key column names.
+            List<AbstractType<?>> headerKeyComponents = ((CompositeType) headerKeyType).types;
+            List<AbstractType<?>> schemaKeyComponents = ((CompositeType) schemaKeyType).types;
+            if (headerKeyComponents.size() != schemaKeyComponents.size())
+            {
+                // different number of components in composite partition keys - very suspicious
+                keyMismatch = true;
+                // Just use the original type from the header. Since the number of partition key components
+                // don't match, there's nothing to meaningfully validate against.
+            }
+            else
+            {
+                // fix components in composite partition key, if necessary
+                List<AbstractType<?>> newComponents = new ArrayList<>(schemaKeyComponents.size());
+                for (int i = 0; i < schemaKeyComponents.size(); i++)
+                {
+                    AbstractType<?> headerKeyComponent = headerKeyComponents.get(i);
+                    AbstractType<?> schemaKeyComponent = schemaKeyComponents.get(i);
+                    AbstractType<?> fixedType = fixType(desc,
+                                                        tableMetadata.partitionKeyColumns().get(i).name.bytes,
+                                                        headerKeyComponent,
+                                                        schemaKeyComponent,
+                                                        false);
+                    if (fixedType == null)
+                        keyMismatch = true;
+                    else
+                        headerKeyComponent = fixedType;
+                    newComponents.add(fixType(desc,
+                                              tableMetadata.partitionKeyColumns().get(i).name.bytes,
+                                              headerKeyComponent,
+                                              schemaKeyComponent,
+                                              false));
+                }
+                headerKeyType = CompositeType.getInstance(newComponents);
+            }
+        }
+        else
+        {
+            // fix non-composite partition key, if necessary
+            AbstractType<?> fixedType = fixType(desc, tableMetadata.partitionKeyColumns().get(0).name.bytes, headerKeyType, schemaKeyType, false);
+            if (fixedType == null)
+                // non-composite partition key doesn't match and cannot be fixed
+                keyMismatch = true;
+            else
+                headerKeyType = fixedType;
+        }
+        if (keyMismatch)
+            error("sstable %s: Mismatch in partition key type between sstable serialization-header and schema (%s vs %s)",
+                  desc,
+                  headerKeyType.asCQL3Type(),
+                  schemaKeyType.asCQL3Type());
+        return headerKeyType;
+    }
+
+    private List<AbstractType<?>> validateClusteringColumns(Descriptor desc, TableMetadata tableMetadata, SerializationHeader.Component header)
+    {
+        List<AbstractType<?>> headerClusteringTypes = header.getClusteringTypes();
+        List<AbstractType<?>> clusteringTypes = new ArrayList<>();
+        boolean clusteringMismatch = false;
+        List<ColumnMetadata> schemaClustering = tableMetadata.clusteringColumns();
+        if (schemaClustering.size() != headerClusteringTypes.size())
+        {
+            clusteringMismatch = true;
+            // Just use the original types. Since the number of clustering columns don't match, there's nothing to
+            // meaningfully validate against.
+            clusteringTypes.addAll(headerClusteringTypes);
+        }
+        else
+        {
+            for (int i = 0; i < headerClusteringTypes.size(); i++)
+            {
+                AbstractType<?> headerType = headerClusteringTypes.get(i);
+                ColumnMetadata column = schemaClustering.get(i);
+                AbstractType<?> schemaType = column.type;
+                AbstractType<?> fixedType = fixType(desc, column.name.bytes, headerType, schemaType, false);
+                if (fixedType == null)
+                    clusteringMismatch = true;
+                else
+                    headerType = fixedType;
+                clusteringTypes.add(headerType);
+            }
+        }
+        if (clusteringMismatch)
+            error("sstable %s: mismatch in clustering columns between sstable serialization-header and schema (%s vs %s)",
+                  desc,
+                  headerClusteringTypes.stream().map(AbstractType::asCQL3Type).map(CQL3Type::toString).collect(Collectors.joining(",")),
+                  schemaClustering.stream().map(cd -> cd.type.asCQL3Type().toString()).collect(Collectors.joining(",")));
+        return clusteringTypes;
+    }
+
+    private Map<ByteBuffer, AbstractType<?>> validateColumns(Descriptor desc, TableMetadata tableMetadata, Map<ByteBuffer, AbstractType<?>> columns, ColumnMetadata.Kind kind)
+    {
+        Map<ByteBuffer, AbstractType<?>> target = new LinkedHashMap<>();
+        for (Map.Entry<ByteBuffer, AbstractType<?>> nameAndType : columns.entrySet())
+        {
+            ByteBuffer name = nameAndType.getKey();
+            AbstractType<?> type = nameAndType.getValue();
+
+            AbstractType<?> fixedType = validateColumn(desc, tableMetadata, kind, name, type);
+            if (fixedType == null)
+            {
+                error("sstable %s: contains column '%s' of type '%s', which could not be validated",
+                      desc,
+                      type,
+                      logColumnName(name));
+                // don't use a "null" type instance
+                fixedType = type;
+            }
+
+            target.put(name, fixedType);
+        }
+        return target;
+    }
+
+    private AbstractType<?> validateColumn(Descriptor desc, TableMetadata tableMetadata, ColumnMetadata.Kind kind, ByteBuffer name, AbstractType<?> type)
+    {
+        ColumnMetadata cd = tableMetadata.getColumn(name);
+        if (cd == null)
+        {
+            // In case the column was dropped, there is not much that we can actually validate.
+            // The column could have been recreated using the same or a different kind or the same or
+            // a different type. Lottery...
+
+            cd = tableMetadata.getDroppedColumn(name, kind == ColumnMetadata.Kind.STATIC);
+            if (cd == null)
+            {
+                for (IndexMetadata indexMetadata : tableMetadata.indexes)
+                {
+                    String target = indexMetadata.options.get(IndexTarget.TARGET_OPTION_NAME);
+                    if (target != null && ByteBufferUtil.bytes(target).equals(name))
+                    {
+                        warn.accept(String.format("sstable %s: contains column '%s', which is not a column in the table '%s.%s', but a target for that table's index '%s'",
+                                desc,
+                                logColumnName(name),
+                                tableMetadata.keyspace,
+                                tableMetadata.name,
+                                indexMetadata.name));
+                        return type;
+                    }
+                }
+
+                warn.accept(String.format("sstable %s: contains column '%s', which is not present in the schema",
+                                          desc,
+                                          logColumnName(name)));
+            }
+            else
+            {
+                // This is a best-effort approach to handle the case of a UDT column created *AND* dropped in
+                // C* 3.0.
+                if (type instanceof UserType && cd.type instanceof TupleType)
+                {
+                    // At this point, we know that the type belongs to a dropped column, recorded with the
+                    // dropped column type "TupleType" and using "UserType" in the sstable. So it is very
+                    // likely, that this belongs to a dropped UDT. Fix that information to tuple-type.
+                    return fixType(desc, name, type, cd.type, true);
+                }
+            }
+
+            return type;
+        }
+
+        // At this point, the column name is known to be a "non-dropped" column in the table.
+        if (cd.kind != kind)
+            error("sstable %s: contains column '%s' as a %s column, but is of kind %s in the schema",
+                  desc,
+                  logColumnName(name),
+                  kind.name().toLowerCase(),
+                  cd.kind.name().toLowerCase());
+        else
+            type = fixType(desc, name, type, cd.type, false);
+        return type;
+    }
+
+    private AbstractType<?> fixType(Descriptor desc, ByteBuffer name, AbstractType<?> typeInHeader, AbstractType<?> typeInSchema, boolean droppedColumnMode)
+    {
+        AbstractType<?> fixedType = fixTypeInner(typeInHeader, typeInSchema, droppedColumnMode);
+        if (fixedType != null)
+        {
+            if (fixedType != typeInHeader)
+                info.accept(String.format("sstable %s: Column '%s' needs to be updated from type '%s' to '%s'",
+                                          desc,
+                                          logColumnName(name),
+                                          typeInHeader.asCQL3Type(),
+                                          fixedType.asCQL3Type()));
+            return fixedType;
+        }
+
+        error("sstable %s: contains column '%s' as type '%s', but schema mentions '%s'",
+              desc,
+              logColumnName(name),
+              typeInHeader.asCQL3Type(),
+              typeInSchema.asCQL3Type());
+
+        return typeInHeader;
+    }
+
+    private AbstractType<?> fixTypeInner(AbstractType<?> typeInHeader, AbstractType<?> typeInSchema, boolean droppedColumnMode)
+    {
+        if (typeEquals(typeInHeader, typeInSchema))
+            return typeInHeader;
+
+        if (typeInHeader instanceof CollectionType)
+            return fixTypeInnerCollection(typeInHeader, typeInSchema, droppedColumnMode);
+
+        if (typeInHeader instanceof AbstractCompositeType)
+            return fixTypeInnerAbstractComposite(typeInHeader, typeInSchema, droppedColumnMode);
+
+        if (typeInHeader instanceof TupleType)
+            return fixTypeInnerAbstractTuple(typeInHeader, typeInSchema, droppedColumnMode);
+
+        // all types, beside CollectionType + AbstractCompositeType + TupleType, should be ok (no nested types) - just check for compatibility
+        if (typeInHeader.isCompatibleWith(typeInSchema))
+            return typeInHeader;
+
+        return null;
+    }
+
+    private AbstractType<?> fixTypeInnerAbstractTuple(AbstractType<?> typeInHeader, AbstractType<?> typeInSchema, boolean droppedColumnMode)
+    {
+        // This first 'if' handles the case when a UDT has been dropped, as a dropped UDT is recorded as a tuple
+        // in dropped_columns. If a UDT is to be replaced with a tuple, then also do that for the inner UDTs.
+        if (droppedColumnMode && typeInHeader.getClass() == UserType.class && typeInSchema instanceof TupleType)
+            return fixTypeInnerUserTypeDropped((UserType) typeInHeader, (TupleType) typeInSchema);
+
+        if (typeInHeader.getClass() != typeInSchema.getClass())
+            return null;
+
+        if (typeInHeader.getClass() == UserType.class)
+            return fixTypeInnerUserType((UserType) typeInHeader, (UserType) typeInSchema);
+
+        if (typeInHeader.getClass() == TupleType.class)
+            return fixTypeInnerTuple((TupleType) typeInHeader, (TupleType) typeInSchema, droppedColumnMode);
+
+        throw new IllegalArgumentException("Unknown tuple type class " + typeInHeader.getClass().getName());
+    }
+
+    private AbstractType<?> fixTypeInnerCollection(AbstractType<?> typeInHeader, AbstractType<?> typeInSchema, boolean droppedColumnMode)
+    {
+        if (typeInHeader.getClass() != typeInSchema.getClass())
+            return null;
+
+        if (typeInHeader.getClass() == ListType.class)
+            return fixTypeInnerList((ListType<?>) typeInHeader, (ListType<?>) typeInSchema, droppedColumnMode);
+
+        if (typeInHeader.getClass() == SetType.class)
+            return fixTypeInnerSet((SetType<?>) typeInHeader, (SetType<?>) typeInSchema, droppedColumnMode);
+
+        if (typeInHeader.getClass() == MapType.class)
+            return fixTypeInnerMap((MapType<?, ?>) typeInHeader, (MapType<?, ?>) typeInSchema, droppedColumnMode);
+
+        throw new IllegalArgumentException("Unknown collection type class " + typeInHeader.getClass().getName());
+    }
+
+    private AbstractType<?> fixTypeInnerAbstractComposite(AbstractType<?> typeInHeader, AbstractType<?> typeInSchema, boolean droppedColumnMode)
+    {
+        if (typeInHeader.getClass() != typeInSchema.getClass())
+            return null;
+
+        if (typeInHeader.getClass() == CompositeType.class)
+            return fixTypeInnerComposite((CompositeType) typeInHeader, (CompositeType) typeInSchema, droppedColumnMode);
+
+        if (typeInHeader.getClass() == DynamicCompositeType.class)
+        {
+            // Not sure if we should care about UDTs in DynamicCompositeType at all...
+            if (!typeInHeader.isCompatibleWith(typeInSchema))
+                return null;
+
+            return typeInHeader;
+        }
+
+        throw new IllegalArgumentException("Unknown composite type class " + typeInHeader.getClass().getName());
+    }
+
+    private AbstractType<?> fixTypeInnerUserType(UserType cHeader, UserType cSchema)
+    {
+        if (!cHeader.keyspace.equals(cSchema.keyspace) || !cHeader.name.equals(cSchema.name))
+            // different UDT - bummer...
+            return null;
+
+        if (cHeader.isMultiCell() != cSchema.isMultiCell())
+        {
+            if (cHeader.isMultiCell() && !cSchema.isMultiCell())
+            {
+                // C* 3.0 writes broken SerializationHeader.Component instances - i.e. broken UDT type
+                // definitions into the sstable -Stats.db file, because 3.0 does not enclose frozen UDTs
+                // (and all UDTs in 3.0 were frozen) with an '' bracket. Since CASSANDRA-7423 (support
+                // for non-frozen UDTs, committed to C* 3.6), that frozen-bracket is quite important.
+                // Non-frozen (= multi-cell) UDTs are serialized in a fundamentally different way than
+                // frozen UDTs in sstables - most importantly, the order of serialized columns depends on
+                // the type: fixed-width types first, then variable length types (like frozen types),
+                // multi-cell types last. If C* >= 3.6 reads an sstable with a UDT that's written by
+                // C* < 3.6, a variety of CorruptSSTableExceptions get logged and clients will encounter
+                // read errors.
+                // At this point, we know that the type belongs to a "live" (non-dropped) column, so it
+                // is safe to correct the information from the header.
+                return cSchema;
+            }
+
+            // In all other cases, there's not much we can do.
+            return null;
+        }
+
+        return cHeader;
+    }
+
+    private AbstractType<?> fixTypeInnerUserTypeDropped(UserType cHeader, TupleType cSchema)
+    {
+        // Do not mess around with the UserType in the serialization header, if the column has been dropped.
+        // Only fix the multi-cell status when the header contains it as a multicell (non-frozen) UserType,
+        // but the schema says "frozen".
+        if (cHeader.isMultiCell() && !cSchema.isMultiCell())
+        {
+            return new UserType(cHeader.keyspace, cHeader.name, cHeader.fieldNames(), cHeader.fieldTypes(), cSchema.isMultiCell());
+        }
+
+        return cHeader;
+    }
+
+    private AbstractType<?> fixTypeInnerTuple(TupleType cHeader, TupleType cSchema, boolean droppedColumnMode)
+    {
+        if (cHeader.size() != cSchema.size())
+            // different number of components - bummer...
+            return null;
+        List<AbstractType<?>> cHeaderFixed = new ArrayList<>(cHeader.size());
+        boolean anyChanged = false;
+        for (int i = 0; i < cHeader.size(); i++)
+        {
+            AbstractType<?> cHeaderComp = cHeader.type(i);
+            AbstractType<?> cHeaderCompFixed = fixTypeInner(cHeaderComp, cSchema.type(i), droppedColumnMode);
+            if (cHeaderCompFixed == null)
+                // incompatible, bummer...
+                return null;
+            cHeaderFixed.add(cHeaderCompFixed);
+            anyChanged |= cHeaderComp != cHeaderCompFixed;
+        }
+        if (anyChanged || cSchema.isMultiCell() != cHeader.isMultiCell())
+            // TODO this should create a non-frozen tuple type for the sake of handling a dropped, non-frozen UDT
+            return new TupleType(cHeaderFixed);
+        return cHeader;
+    }
+
+    private AbstractType<?> fixTypeInnerComposite(CompositeType cHeader, CompositeType cSchema, boolean droppedColumnMode)
+    {
+        if (cHeader.types.size() != cSchema.types.size())
+            // different number of components - bummer...
+            return null;
+        List<AbstractType<?>> cHeaderFixed = new ArrayList<>(cHeader.types.size());
+        boolean anyChanged = false;
+        for (int i = 0; i < cHeader.types.size(); i++)
+        {
+            AbstractType<?> cHeaderComp = cHeader.types.get(i);
+            AbstractType<?> cHeaderCompFixed = fixTypeInner(cHeaderComp, cSchema.types.get(i), droppedColumnMode);
+            if (cHeaderCompFixed == null)
+                // incompatible, bummer...
+                return null;
+            cHeaderFixed.add(cHeaderCompFixed);
+            anyChanged |= cHeaderComp != cHeaderCompFixed;
+        }
+        if (anyChanged)
+            return CompositeType.getInstance(cHeaderFixed);
+        return cHeader;
+    }
+
+    private AbstractType<?> fixTypeInnerList(ListType<?> cHeader, ListType<?> cSchema, boolean droppedColumnMode)
+    {
+        AbstractType<?> cHeaderElem = cHeader.getElementsType();
+        AbstractType<?> cHeaderElemFixed = fixTypeInner(cHeaderElem, cSchema.getElementsType(), droppedColumnMode);
+        if (cHeaderElemFixed == null)
+            // bummer...
+            return null;
+        if (cHeaderElem != cHeaderElemFixed)
+            // element type changed
+            return ListType.getInstance(cHeaderElemFixed, cHeader.isMultiCell());
+        return cHeader;
+    }
+
+    private AbstractType<?> fixTypeInnerSet(SetType<?> cHeader, SetType<?> cSchema, boolean droppedColumnMode)
+    {
+        AbstractType<?> cHeaderElem = cHeader.getElementsType();
+        AbstractType<?> cHeaderElemFixed = fixTypeInner(cHeaderElem, cSchema.getElementsType(), droppedColumnMode);
+        if (cHeaderElemFixed == null)
+            // bummer...
+            return null;
+        if (cHeaderElem != cHeaderElemFixed)
+            // element type changed
+            return SetType.getInstance(cHeaderElemFixed, cHeader.isMultiCell());
+        return cHeader;
+    }
+
+    private AbstractType<?> fixTypeInnerMap(MapType<?, ?> cHeader, MapType<?, ?> cSchema, boolean droppedColumnMode)
+    {
+        AbstractType<?> cHeaderKey = cHeader.getKeysType();
+        AbstractType<?> cHeaderVal = cHeader.getValuesType();
+        AbstractType<?> cHeaderKeyFixed = fixTypeInner(cHeaderKey, cSchema.getKeysType(), droppedColumnMode);
+        AbstractType<?> cHeaderValFixed = fixTypeInner(cHeaderVal, cSchema.getValuesType(), droppedColumnMode);
+        if (cHeaderKeyFixed == null || cHeaderValFixed == null)
+            // bummer...
+            return null;
+        if (cHeaderKey != cHeaderKeyFixed || cHeaderVal != cHeaderValFixed)
+            // element type changed
+            return MapType.getInstance(cHeaderKeyFixed, cHeaderValFixed, cHeader.isMultiCell());
+        return cHeader;
+    }
+
+    private boolean typeEquals(AbstractType<?> typeInHeader, AbstractType<?> typeInSchema)
+    {
+        // Quite annoying, but the implementations of equals() on some implementation of AbstractType seems to be
+        // wrong, but toString() seems to work in such cases.
+        return typeInHeader.equals(typeInSchema) || typeInHeader.toString().equals(typeInSchema.toString());
+    }
+
+    private static String logColumnName(ByteBuffer columnName)
+    {
+        try
+        {
+            return ByteBufferUtil.string(columnName);
+        }
+        catch (CharacterCodingException e)
+        {
+            return "?? " + e;
+        }
+    }
+
+    private Map<MetadataType, MetadataComponent> readSSTableMetadata(Descriptor desc)
+    {
+        Map<MetadataType, MetadataComponent> metadata;
+        try
+        {
+            metadata = desc.getMetadataSerializer().deserialize(desc, EnumSet.allOf(MetadataType.class));
+        }
+        catch (IOException e)
+        {
+            error("Failed to deserialize metadata for sstable %s: %s", desc, e.toString());
+            return null;
+        }
+        return metadata;
+    }
+
+    private void writeNewMetadata(Descriptor desc, Map<MetadataType, MetadataComponent> newMetadata)
+    {
+        String file = desc.filenameFor(Component.STATS);
+        info.accept(String.format("  Writing new metadata file %s", file));
+        try
+        {
+            desc.getMetadataSerializer().rewriteSSTableMetadata(desc, newMetadata);
+        }
+        catch (IOException e)
+        {
+            error("Failed to write metadata component for %s: %s", file, e.toString());
+            throw new RuntimeException(e);
+        }
+    }
+
+    /**
+     * Fix individually provided sstables or directories containing sstables.
+     */
+    static class ManualHeaderFix extends SSTableHeaderFix
+    {
+        private final List<Path> paths;
+
+        ManualHeaderFix(Builder builder)
+        {
+            super(builder);
+            this.paths = builder.paths;
+        }
+
+        public void prepare()
+        {
+            paths.forEach(this::processFileOrDirectory);
+        }
+    }
+
+    /**
+     * Fix all sstables in the configured data-directories.
+     */
+    static class AutomaticHeaderFix extends SSTableHeaderFix
+    {
+        AutomaticHeaderFix(Builder builder)
+        {
+            super(builder);
+        }
+
+        public void prepare()
+        {
+            info.accept("Scanning all data directories...");
+            for (Directories.DataDirectory dataDirectory : Directories.dataDirectories)
+                scanDataDirectory(dataDirectory);
+            info.accept("Finished scanning all data directories...");
+        }
+
+        private void scanDataDirectory(Directories.DataDirectory dataDirectory)
+        {
+            info.accept(String.format("Scanning data directory %s", dataDirectory.location));
+            File[] ksDirs = dataDirectory.location.listFiles();
+            if (ksDirs == null)
+                return;
+            for (File ksDir : ksDirs)
+            {
+                if (!ksDir.isDirectory() || !ksDir.canRead())
+                    continue;
+
+                String name = ksDir.getName();
+
+                // silently ignore all system keyspaces
+                if (SchemaConstants.isLocalSystemKeyspace(name) || SchemaConstants.isReplicatedSystemKeyspace(name))
+                    continue;
+
+                File[] tabDirs = ksDir.listFiles();
+                if (tabDirs == null)
+                    continue;
+                for (File tabDir : tabDirs)
+                {
+                    if (!tabDir.isDirectory() || !tabDir.canRead())
+                        continue;
+
+                    processFileOrDirectory(tabDir.toPath());
+                }
+            }
+        }
+    }
+}
diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
index 36a1e63..9a467af 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
@@ -1821,6 +1821,11 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
 
     public void createLinks(String snapshotDirectoryPath)
     {
+        createLinks(descriptor, components, snapshotDirectoryPath);
+    }
+
+    public static void createLinks(Descriptor descriptor, Set<Component> components, String snapshotDirectoryPath)
+    {
         for (Component component : components)
         {
             File sourceFile = new File(descriptor.filenameFor(component));
diff --git a/src/java/org/apache/cassandra/io/sstable/metadata/IMetadataSerializer.java b/src/java/org/apache/cassandra/io/sstable/metadata/IMetadataSerializer.java
index eb7b2c7..c842d02 100644
--- a/src/java/org/apache/cassandra/io/sstable/metadata/IMetadataSerializer.java
+++ b/src/java/org/apache/cassandra/io/sstable/metadata/IMetadataSerializer.java
@@ -74,4 +74,9 @@ public interface IMetadataSerializer
      * Mutate the repairedAt time, pendingRepair ID, and transient status
      */
     public void mutateRepairMetadata(Descriptor descriptor, long newRepairedAt, UUID newPendingRepair, boolean isTransient) throws IOException;
+
+    /**
+     * Replace the sstable metadata file ({@code -Statistics.db}) with the given components.
+     */
+    void rewriteSSTableMetadata(Descriptor descriptor, Map<MetadataType, MetadataComponent> currentComponents) throws IOException;
 }
diff --git a/src/java/org/apache/cassandra/io/sstable/metadata/MetadataSerializer.java b/src/java/org/apache/cassandra/io/sstable/metadata/MetadataSerializer.java
index f76db2d..9cb9a20 100644
--- a/src/java/org/apache/cassandra/io/sstable/metadata/MetadataSerializer.java
+++ b/src/java/org/apache/cassandra/io/sstable/metadata/MetadataSerializer.java
@@ -242,7 +242,7 @@ public class MetadataSerializer implements IMetadataSerializer
         rewriteSSTableMetadata(descriptor, currentComponents);
     }
 
-    private void rewriteSSTableMetadata(Descriptor descriptor, Map<MetadataType, MetadataComponent> currentComponents) throws IOException
+    public void rewriteSSTableMetadata(Descriptor descriptor, Map<MetadataType, MetadataComponent> currentComponents) throws IOException
     {
         String filePath = descriptor.tmpFilenameFor(Component.STATS);
         try (DataOutputStreamPlus out = new BufferedDataOutputStreamPlus(new FileOutputStream(filePath)))
diff --git a/src/java/org/apache/cassandra/service/CassandraDaemon.java b/src/java/org/apache/cassandra/service/CassandraDaemon.java
index b3cfd19..7465bb3 100644
--- a/src/java/org/apache/cassandra/service/CassandraDaemon.java
+++ b/src/java/org/apache/cassandra/service/CassandraDaemon.java
@@ -62,6 +62,7 @@ import org.apache.cassandra.exceptions.StartupException;
 import org.apache.cassandra.gms.Gossiper;
 import org.apache.cassandra.io.FSError;
 import org.apache.cassandra.io.sstable.CorruptSSTableException;
+import org.apache.cassandra.io.sstable.SSTableHeaderFix;
 import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.metrics.CassandraMetricsRegistry;
 import org.apache.cassandra.metrics.DefaultNameFactory;
@@ -246,6 +247,8 @@ public class CassandraDaemon
 
         setupVirtualKeyspaces();
 
+        SSTableHeaderFix.fixNonFrozenUDTIfUpgradeFrom30();
+
         // clean up debris in the rest of the keyspaces
         for (String keyspaceName : Schema.instance.getKeyspaces())
         {
diff --git a/src/java/org/apache/cassandra/tools/StandaloneScrubber.java b/src/java/org/apache/cassandra/tools/StandaloneScrubber.java
index f50f937..d9d8db1 100644
--- a/src/java/org/apache/cassandra/tools/StandaloneScrubber.java
+++ b/src/java/org/apache/cassandra/tools/StandaloneScrubber.java
@@ -19,6 +19,7 @@
 package org.apache.cassandra.tools;
 
 import java.io.File;
+import java.nio.file.Paths;
 import java.util.*;
 import java.util.concurrent.TimeUnit;
 
@@ -38,6 +39,7 @@ import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.io.sstable.*;
 import org.apache.cassandra.utils.JVMStabilityInspector;
 import org.apache.cassandra.utils.OutputHandler;
+import org.apache.cassandra.utils.Pair;
 
 import static org.apache.cassandra.tools.BulkLoader.CmdLineOptions;
 
@@ -57,6 +59,7 @@ public class StandaloneScrubber
     private static final String SKIP_CORRUPTED_OPTION = "skip-corrupted";
     private static final String NO_VALIDATE_OPTION = "no-validate";
     private static final String REINSERT_OVERFLOWED_TTL_OPTION = "reinsert-overflowed-ttl";
+    private static final String HEADERFIX_OPTION = "header-fix";
 
     public static void main(String args[])
     {
@@ -93,34 +96,106 @@ public class StandaloneScrubber
 
             OutputHandler handler = new OutputHandler.SystemOutput(options.verbose, options.debug);
             Directories.SSTableLister lister = cfs.getDirectories().sstableLister(Directories.OnTxnErr.THROW).skipTemporary(true);
+            List<Pair<Descriptor, Set<Component>>> listResult = new ArrayList<>();
 
-            List<SSTableReader> sstables = new ArrayList<>();
-
-            // Scrub sstables
+            // create snapshot
             for (Map.Entry<Descriptor, Set<Component>> entry : lister.list().entrySet())
             {
+                Descriptor descriptor = entry.getKey();
                 Set<Component> components = entry.getValue();
                 if (!components.contains(Component.DATA))
                     continue;
 
+                listResult.add(Pair.create(descriptor, components));
+
+                File snapshotDirectory = Directories.getSnapshotDirectory(descriptor, snapshotName);
+                SSTableReader.createLinks(descriptor, components, snapshotDirectory.getPath());
+            }
+            System.out.println(String.format("Pre-scrub sstables snapshotted into snapshot %s", snapshotName));
+
+            if (options.headerFixMode != Options.HeaderFixMode.OFF)
+            {
+                // Run the frozen-UDT checks _before_ the sstables are opened
+
+                List<String> logOutput = new ArrayList<>();
+
+                SSTableHeaderFix.Builder headerFixBuilder = SSTableHeaderFix.builder()
+                                                                            .logToList(logOutput)
+                                                                            .schemaCallback(() -> Schema.instance::getTableMetadata);
+                if (options.headerFixMode == Options.HeaderFixMode.VALIDATE)
+                    headerFixBuilder = headerFixBuilder.dryRun();
+
+                for (Pair<Descriptor, Set<Component>> p : listResult)
+                    headerFixBuilder.withPath(Paths.get(p.left.filenameFor(Component.DATA)));
+
+                SSTableHeaderFix headerFix = headerFixBuilder.build();
                 try
                 {
-                    SSTableReader sstable = SSTableReader.openNoValidation(entry.getKey(), components, cfs);
-                    sstables.add(sstable);
+                    headerFix.execute();
+                }
+                catch (Exception e)
+                {
+                    JVMStabilityInspector.inspectThrowable(e);
+                    if (options.debug)
+                        e.printStackTrace(System.err);
+                }
+
+                if (headerFix.hasChanges() || headerFix.hasError())
+                    logOutput.forEach(System.out::println);
+
+                if (headerFix.hasError())
+                {
+                    System.err.println("Errors in serialization-header detected, aborting.");
+                    System.exit(1);
+                }
+
+                switch (options.headerFixMode)
+                {
+                    case VALIDATE_ONLY:
+                    case FIX_ONLY:
+                        System.out.printf("Not continuing with scrub, since '--%s %s' was specified.%n",
+                                          HEADERFIX_OPTION,
+                                          options.headerFixMode.asCommandLineOption());
+                        System.exit(0);
+                    case VALIDATE:
+                        if (headerFix.hasChanges())
+                        {
+                            System.err.printf("Unfixed, but fixable errors in serialization-header detected, aborting. " +
+                                              "Use a non-validating mode ('-e %s' or '-e %s') for --%s%n",
+                                              Options.HeaderFixMode.FIX.asCommandLineOption(),
+                                              Options.HeaderFixMode.FIX_ONLY.asCommandLineOption(),
+                                              HEADERFIX_OPTION);
+                            System.exit(2);
+                        }
+                        break;
+                    case FIX:
+                        break;
+                }
+            }
+
+            List<SSTableReader> sstables = new ArrayList<>();
 
-                    File snapshotDirectory = Directories.getSnapshotDirectory(sstable.descriptor, snapshotName);
-                    sstable.createLinks(snapshotDirectory.getPath());
+            // Open sstables
+            for (Pair<Descriptor, Set<Component>> pair : listResult)
+            {
+                Descriptor descriptor = pair.left;
+                Set<Component> components = pair.right;
+                if (!components.contains(Component.DATA))
+                    continue;
 
+                try
+                {
+                    SSTableReader sstable = SSTableReader.openNoValidation(descriptor, components, cfs);
+                    sstables.add(sstable);
                 }
                 catch (Exception e)
                 {
                     JVMStabilityInspector.inspectThrowable(e);
-                    System.err.println(String.format("Error Loading %s: %s", entry.getKey(), e.getMessage()));
+                    System.err.println(String.format("Error Loading %s: %s", descriptor, e.getMessage()));
                     if (options.debug)
                         e.printStackTrace(System.err);
                 }
             }
-            System.out.println(String.format("Pre-scrub sstables snapshotted into snapshot %s", snapshotName));
 
             if (!options.manifestCheckOnly)
             {
@@ -208,6 +283,26 @@ public class StandaloneScrubber
         public boolean skipCorrupted;
         public boolean noValidate;
         public boolean reinserOverflowedTTL;
+        public HeaderFixMode headerFixMode = HeaderFixMode.VALIDATE;
+
+        enum HeaderFixMode
+        {
+            VALIDATE_ONLY,
+            VALIDATE,
+            FIX_ONLY,
+            FIX,
+            OFF;
+
+            static HeaderFixMode fromCommandLine(String value)
+            {
+                return valueOf(value.replace('-', '_').toUpperCase().trim());
+            }
+
+            String asCommandLineOption()
+            {
+                return name().toLowerCase().replace('_', '-');
+            }
+        }
 
         private Options(String keyspaceName, String cfName)
         {
@@ -249,7 +344,18 @@ public class StandaloneScrubber
                 opts.skipCorrupted = cmd.hasOption(SKIP_CORRUPTED_OPTION);
                 opts.noValidate = cmd.hasOption(NO_VALIDATE_OPTION);
                 opts.reinserOverflowedTTL = cmd.hasOption(REINSERT_OVERFLOWED_TTL_OPTION);
-
+                if (cmd.hasOption(HEADERFIX_OPTION))
+                {
+                    try
+                    {
+                        opts.headerFixMode = HeaderFixMode.fromCommandLine(cmd.getOptionValue(HEADERFIX_OPTION));
+                    }
+                    catch (Exception e)
+                    {
+                        errorMsg(String.format("Invalid argument value '%s' for --%s", cmd.getOptionValue(HEADERFIX_OPTION), HEADERFIX_OPTION), options);
+                        return null;
+                    }
+                }
                 return opts;
             }
             catch (ParseException e)
@@ -275,6 +381,22 @@ public class StandaloneScrubber
             options.addOption("m",  MANIFEST_CHECK_OPTION, "only check and repair the leveled manifest, without actually scrubbing the sstables");
             options.addOption("s",  SKIP_CORRUPTED_OPTION, "skip corrupt rows in counter tables");
             options.addOption("n",  NO_VALIDATE_OPTION,    "do not validate columns using column validator");
+            options.addOption("e",  HEADERFIX_OPTION,      true, "Option whether and how to perform a " +
+                                                                 "check of the sstable serialization-headers and fix known, " +
+                                                                 "fixable issues.\n" +
+                                                                 "Possible argument values:\n" +
+                                                                 "- validate-only: validate the serialization-headers, " +
+                                                                 "but do not fix those. Do not continue with scrub - " +
+                                                                 "i.e. only validate the header (dry-run of fix-only).\n" +
+                                                                 "- validate: (default) validate the serialization-headers, " +
+                                                                 "but do not fix those and only continue with scrub if no " +
+                                                                 "error were detected.\n" +
+                                                                 "- fix-only: validate and fix the serialization-headers, " +
+                                                                 "don't continue with scrub.\n" +
+                                                                 "- fix: validate and fix the serialization-headers, do not " +
+                                                                 "fix and do not continue with scrub if the serialization-header " +
+                                                                 "check encountered errors.\n" +
+                                                                 "- off: don't perform the serialization-header checks.");
             options.addOption("r", REINSERT_OVERFLOWED_TTL_OPTION, REINSERT_OVERFLOWED_TTL_OPTION_DESCRIPTION);
             return options;
         }
@@ -287,7 +409,7 @@ public class StandaloneScrubber
             header.append("Scrub the sstable for the provided table." );
             header.append("\n--\n");
             header.append("Options are:");
-            new HelpFormatter().printHelp(usage, header.toString(), options, "");
+            new HelpFormatter().printHelp(120, usage, header.toString(), options, "");
         }
     }
 }
diff --git a/src/java/org/apache/cassandra/utils/FBUtilities.java b/src/java/org/apache/cassandra/utils/FBUtilities.java
index 1797087..1df84ab 100644
--- a/src/java/org/apache/cassandra/utils/FBUtilities.java
+++ b/src/java/org/apache/cassandra/utils/FBUtilities.java
@@ -92,6 +92,8 @@ public class FBUtilities
     private static volatile InetAddressAndPort broadcastInetAddressAndPort;
     private static volatile InetAddressAndPort localInetAddressAndPort;
 
+    private static volatile String previousReleaseVersionString;
+
     public static int getAvailableProcessors()
     {
         String availableProcessors = System.getProperty("cassandra.available_processors");
@@ -338,6 +340,16 @@ public class FBUtilities
         return triggerDir;
     }
 
+    public static void setPreviousReleaseVersionString(String previousReleaseVersionString)
+    {
+        FBUtilities.previousReleaseVersionString = previousReleaseVersionString;
+    }
+
+    public static String getPreviousReleaseVersionString()
+    {
+        return previousReleaseVersionString;
+    }
+
     public static String getReleaseVersionString()
     {
         try (InputStream in = FBUtilities.class.getClassLoader().getResourceAsStream("org/apache/cassandra/config/version.properties"))
diff --git a/test/unit/org/apache/cassandra/db/rows/AbstractTypeVersionComparatorTest.java b/test/unit/org/apache/cassandra/db/rows/ColumnMetadataVersionComparatorTest.java
similarity index 71%
rename from test/unit/org/apache/cassandra/db/rows/AbstractTypeVersionComparatorTest.java
rename to test/unit/org/apache/cassandra/db/rows/ColumnMetadataVersionComparatorTest.java
index 7170696..854421a 100644
--- a/test/unit/org/apache/cassandra/db/rows/AbstractTypeVersionComparatorTest.java
+++ b/test/unit/org/apache/cassandra/db/rows/ColumnMetadataVersionComparatorTest.java
@@ -25,6 +25,7 @@ import org.junit.Before;
 import org.junit.Test;
 
 import org.apache.cassandra.db.marshal.*;
+import org.apache.cassandra.schema.ColumnMetadata;
 
 import static java.util.Arrays.asList;
 import static org.apache.cassandra.cql3.FieldIdentifier.forUnquoted;
@@ -32,7 +33,7 @@ import static org.apache.cassandra.utils.ByteBufferUtil.bytes;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
 
-public class AbstractTypeVersionComparatorTest
+public class ColumnMetadataVersionComparatorTest
 {
     private UserType udtWith2Fields;
     private UserType udtWith3Fields;
@@ -60,6 +61,13 @@ public class AbstractTypeVersionComparatorTest
     }
 
     @Test
+    public void testWithSimpleTypes()
+    {
+        checkComparisonResults(Int32Type.instance, BytesType.instance);
+        checkComparisonResults(EmptyType.instance, BytesType.instance);
+    }
+
+    @Test
     public void testWithTuples()
     {
         checkComparisonResults(new TupleType(asList(Int32Type.instance, Int32Type.instance)),
@@ -142,19 +150,22 @@ public class AbstractTypeVersionComparatorTest
     @Test
     public void testInvalidComparison()
     {
-        assertInvalidComparison("Trying to compare 2 different types: org.apache.cassandra.db.marshal.FrozenType(org.apache.cassandra.db.marshal.UserType(ks,6d7954797065,61:org.apache.cassandra.db.marshal.Int32Type,62:org.apache.cassandra.db.marshal.Int32Type)) and org.apache.cassandra.db.marshal.Int32Type",
+        assertInvalidComparison("Found 2 incompatible versions of column c in ks.t: one of type org.apache.cassandra.db.marshal.Int32Type and one of type org.apache.cassandra.db.marshal.UTF8Type (but both types are incompatible)",
+                                Int32Type.instance,
+                                UTF8Type.instance);
+        assertInvalidComparison("Found 2 incompatible versions of column c in ks.t: one of type org.apache.cassandra.db.marshal.FrozenType(org.apache.cassandra.db.marshal.UserType(ks,6d7954797065,61:org.apache.cassandra.db.marshal.Int32Type,62:org.apache.cassandra.db.marshal.Int32Type)) and one of type org.apache.cassandra.db.marshal.Int32Type (but both types are incompatible)",
                                 udtWith2Fields,
                                 Int32Type.instance);
-        assertInvalidComparison("Trying to compare 2 different types: org.apache.cassandra.db.marshal.UTF8Type and org.apache.cassandra.db.marshal.InetAddressType",
+        assertInvalidComparison("Found 2 incompatible versions of column c in ks.t: one of type org.apache.cassandra.db.marshal.SetType(org.apache.cassandra.db.marshal.UTF8Type) and one of type org.apache.cassandra.db.marshal.SetType(org.apache.cassandra.db.marshal.InetAddressType) (but both types are incompatible)",
                                 SetType.getInstance(UTF8Type.instance, true),
                                 SetType.getInstance(InetAddressType.instance, true));
-        assertInvalidComparison("Trying to compare 2 different types: org.apache.cassandra.db.marshal.UTF8Type and org.apache.cassandra.db.marshal.InetAddressType",
+        assertInvalidComparison("Found 2 incompatible versions of column c in ks.t: one of type org.apache.cassandra.db.marshal.ListType(org.apache.cassandra.db.marshal.UTF8Type) and one of type org.apache.cassandra.db.marshal.ListType(org.apache.cassandra.db.marshal.InetAddressType) (but both types are incompatible)",
                                 ListType.getInstance(UTF8Type.instance, true),
                                 ListType.getInstance(InetAddressType.instance, true));
-        assertInvalidComparison("Trying to compare 2 different types: org.apache.cassandra.db.marshal.UTF8Type and org.apache.cassandra.db.marshal.InetAddressType",
+        assertInvalidComparison("Found 2 incompatible versions of column c in ks.t: one of type org.apache.cassandra.db.marshal.MapType(org.apache.cassandra.db.marshal.UTF8Type,org.apache.cassandra.db.marshal.IntegerType) and one of type org.apache.cassandra.db.marshal.MapType(org.apache.cassandra.db.marshal.InetAddressType,org.apache.cassandra.db.marshal.IntegerType) (but both types are incompatible)",
                                 MapType.getInstance(UTF8Type.instance, IntegerType.instance, true),
                                 MapType.getInstance(InetAddressType.instance, IntegerType.instance, true));
-        assertInvalidComparison("Trying to compare 2 different types: org.apache.cassandra.db.marshal.UTF8Type and org.apache.cassandra.db.marshal.InetAddressType",
+        assertInvalidComparison("Found 2 incompatible versions of column c in ks.t: one of type org.apache.cassandra.db.marshal.MapType(org.apache.cassandra.db.marshal.IntegerType,org.apache.cassandra.db.marshal.UTF8Type) and one of type org.apache.cassandra.db.marshal.MapType(org.apache.cassandra.db.marshal.IntegerType,org.apache.cassandra.db.marshal.InetAddressType) (but both types are incompatible)",
                                 MapType.getInstance(IntegerType.instance, UTF8Type.instance, true),
                                 MapType.getInstance(IntegerType.instance, InetAddressType.instance, true));
     }
@@ -169,7 +180,7 @@ public class AbstractTypeVersionComparatorTest
         catch (IllegalArgumentException e)
         {
             System.out.println(e.getMessage());
-            assertEquals(e.getMessage(), expectedMessage);
+            assertEquals(expectedMessage, e.getMessage());
         }
     }
 
@@ -183,6 +194,8 @@ public class AbstractTypeVersionComparatorTest
 
     private static int compare(AbstractType<?> left, AbstractType<?> right)
     {
-        return AbstractTypeVersionComparator.INSTANCE.compare(left, right);
+        ColumnMetadata v1 = ColumnMetadata.regularColumn("ks", "t", "c", left);
+        ColumnMetadata v2 = ColumnMetadata.regularColumn("ks", "t", "c", right);
+        return ColumnMetadataVersionComparator.INSTANCE.compare(v1, v2);
     }
 }
diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableHeaderFixTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableHeaderFixTest.java
new file mode 100644
index 0000000..d07187b
--- /dev/null
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableHeaderFixTest.java
@@ -0,0 +1,964 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.io.sstable;
+
+import java.io.File;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import com.google.common.collect.Sets;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.cql3.ColumnIdentifier;
+import org.apache.cassandra.cql3.FieldIdentifier;
+import org.apache.cassandra.cql3.statements.schema.IndexTarget;
+import org.apache.cassandra.db.SerializationHeader;
+import org.apache.cassandra.db.marshal.AbstractCompositeType;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.marshal.CollectionType;
+import org.apache.cassandra.db.marshal.CompositeType;
+import org.apache.cassandra.db.marshal.FloatType;
+import org.apache.cassandra.db.marshal.FrozenType;
+import org.apache.cassandra.db.marshal.Int32Type;
+import org.apache.cassandra.db.marshal.ListType;
+import org.apache.cassandra.db.marshal.MapType;
+import org.apache.cassandra.db.marshal.SetType;
+import org.apache.cassandra.db.marshal.TupleType;
+import org.apache.cassandra.db.marshal.UTF8Type;
+import org.apache.cassandra.db.marshal.UserType;
+import org.apache.cassandra.db.rows.EncodingStats;
+import org.apache.cassandra.io.sstable.format.SSTableFormat;
+import org.apache.cassandra.io.sstable.format.Version;
+import org.apache.cassandra.io.sstable.format.big.BigFormat;
+import org.apache.cassandra.io.sstable.metadata.MetadataType;
+import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.io.util.SequentialWriter;
+import org.apache.cassandra.schema.ColumnMetadata;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.schema.IndexMetadata;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * Test the functionality of {@link SSTableHeaderFix}.
+ * It writes an 'big-m' version sstable(s) and executes against these.
+ */
+public class SSTableHeaderFixTest
+{
+    static
+    {
+        DatabaseDescriptor.toolInitialization();
+    }
+
+    private File temporaryFolder;
+
+    @Before
+    public void setup()
+    {
+        File f = FileUtils.createTempFile("SSTableUDTFixTest", "");
+        f.delete();
+        f.mkdirs();
+        temporaryFolder = f;
+    }
+
+    @After
+    public void teardown()
+    {
+        FileUtils.deleteRecursive(temporaryFolder);
+    }
+
+    private static final AbstractType<?> udtPK = makeUDT("udt_pk");
+    private static final AbstractType<?> udtCK = makeUDT("udt_ck");
+    private static final AbstractType<?> udtStatic = makeUDT("udt_static");
+    private static final AbstractType<?> udtRegular = makeUDT("udt_regular");
+    private static final AbstractType<?> udtInner = makeUDT("udt_inner");
+    private static final AbstractType<?> udtNested = new UserType("ks",
+                                                                  ByteBufferUtil.bytes("udt_nested"),
+                                                                  Arrays.asList(new FieldIdentifier(ByteBufferUtil.bytes("a_field")),
+                                                                                new FieldIdentifier(ByteBufferUtil.bytes("a_udt"))),
+                                                                  Arrays.asList(UTF8Type.instance,
+                                                                                udtInner),
+                                                                  true);
+    private static final AbstractType<?> tupleInTuple = makeTuple(makeTuple());
+    private static final AbstractType<?> udtInTuple = makeTuple(udtInner);
+    private static final AbstractType<?> tupleInComposite = CompositeType.getInstance(UTF8Type.instance, makeTuple());
+    private static final AbstractType<?> udtInComposite = CompositeType.getInstance(UTF8Type.instance, udtInner);
+    private static final AbstractType<?> udtInList = ListType.getInstance(udtInner, true);
+    private static final AbstractType<?> udtInSet = SetType.getInstance(udtInner, true);
+    private static final AbstractType<?> udtInMap = MapType.getInstance(UTF8Type.instance, udtInner, true);
+    private static final AbstractType<?> udtInFrozenList = ListType.getInstance(udtInner, false);
+    private static final AbstractType<?> udtInFrozenSet = SetType.getInstance(udtInner, false);
+    private static final AbstractType<?> udtInFrozenMap = MapType.getInstance(UTF8Type.instance, udtInner, false);
+
+    private static AbstractType<?> makeUDT2(String udtName, boolean multiCell)
+    {
+        return new UserType("ks",
+                            ByteBufferUtil.bytes(udtName),
+                            Arrays.asList(new FieldIdentifier(ByteBufferUtil.bytes("a_field")),
+                                          new FieldIdentifier(ByteBufferUtil.bytes("a_udt"))),
+                            Arrays.asList(UTF8Type.instance,
+                                          udtInner),
+                            multiCell);
+    }
+
+    private static AbstractType<?> makeUDT(String udtName)
+    {
+        return new UserType("ks",
+                            ByteBufferUtil.bytes(udtName),
+                            Collections.singletonList(new FieldIdentifier(ByteBufferUtil.bytes("a_field"))),
+                            Collections.singletonList(UTF8Type.instance),
+                            true);
+    }
+
+    private static TupleType makeTuple()
+    {
+        return makeTuple(Int32Type.instance);
+    }
+
+    private static TupleType makeTuple(AbstractType<?> second)
+    {
+        return new TupleType(Arrays.asList(UTF8Type.instance,
+                                           second));
+    }
+
+    private static TupleType makeTupleSimple()
+    {
+        // TODO this should create a non-frozen tuple type for the sake of handling a dropped, non-frozen UDT
+        return new TupleType(Collections.singletonList(UTF8Type.instance));
+    }
+
+    private static final Version version = BigFormat.instance.getVersion("mc");
+
+    private TableMetadata tableMetadata;
+    private final Set<String> updatedColumns = new HashSet<>();
+
+    private ColumnMetadata getColDef(String n)
+    {
+        return tableMetadata.getColumn(ByteBufferUtil.bytes(n));
+    }
+
+    /**
+     * Very basic test whether {@link SSTableHeaderFix} detect a type mismatch (regular_c 'int' vs 'float').
+     */
+    @Test
+    public void verifyTypeMismatchTest() throws Exception
+    {
+        File dir = temporaryFolder;
+        File sstable = generateFakeSSTable(dir, 1);
+
+        SerializationHeader.Component header = readHeader(sstable);
+        assertFrozenUdt(header, false, true);
+
+        ColumnMetadata cd = getColDef("regular_c");
+        tableMetadata = tableMetadata.unbuild()
+                                     .removeRegularOrStaticColumn(cd.name)
+                                     .addRegularColumn("regular_c", FloatType.instance)
+                                     .build();
+
+        SSTableHeaderFix headerFix = builder().withPath(sstable.toPath())
+                                              .build();
+        headerFix.execute();
+
+        assertTrue(headerFix.hasError());
+        assertTrue(headerFix.hasChanges());
+
+        // must not have re-written the stats-component
+        header = readHeader(sstable);
+        assertFrozenUdt(header, false, true);
+    }
+
+    @Test
+    public void verifyTypeMatchTest() throws Exception
+    {
+        File dir = temporaryFolder;
+
+        TableMetadata.Builder cols = TableMetadata.builder("ks", "cf")
+                                                  .addPartitionKeyColumn("pk", udtPK)
+                                                  .addClusteringColumn("ck", udtCK);
+        commonColumns(cols);
+        File sstable = buildFakeSSTable(dir, 1, cols, false);
+
+        SerializationHeader.Component header = readHeader(sstable);
+        assertFrozenUdt(header, false, true);
+
+        SSTableHeaderFix headerFix = builder().withPath(sstable.toPath())
+                                              .build();
+        headerFix.execute();
+
+        assertTrue(updatedColumns.isEmpty());
+        assertFalse(headerFix.hasError());
+        assertFalse(headerFix.hasChanges());
+
+        // must not have re-written the stats-component
+        header = readHeader(sstable);
+        assertFrozenUdt(header, false, true);
+    }
+
+    /**
+     * Simulates the case when an sstable contains a column not present in the schema, which can just be ignored.
+     */
+    @Test
+    public void verifyWithUnknownColumnTest() throws Exception
+    {
+        File dir = temporaryFolder;
+        TableMetadata.Builder cols = TableMetadata.builder("ks", "cf")
+                                                  .addPartitionKeyColumn("pk", udtPK)
+                                                  .addClusteringColumn("ck", udtCK);
+        commonColumns(cols);
+        cols.addRegularColumn("solr_query", UTF8Type.instance);
+        File sstable = buildFakeSSTable(dir, 1, cols, true);
+
+        SerializationHeader.Component header = readHeader(sstable);
+        assertFrozenUdt(header, false, true);
+
+        ColumnMetadata cd = getColDef("solr_query");
+        tableMetadata = tableMetadata.unbuild()
+                                     .removeRegularOrStaticColumn(cd.name)
+                                     .build();
+
+        SSTableHeaderFix headerFix = builder().withPath(sstable.toPath())
+                                              .build();
+        headerFix.execute();
+
+        assertFalse(headerFix.hasError());
+        assertTrue(headerFix.hasChanges());
+
+        // must not have re-written the stats-component
+        header = readHeader(sstable);
+        assertFrozenUdt(header, true, true);
+    }
+
+    /**
+     * Simulates the case when an sstable contains a column not present in the table but as a target for an index.
+     * It can just be ignored.
+     */
+    @Test
+    public void verifyWithIndexedUnknownColumnTest() throws Exception
+    {
+        File dir = temporaryFolder;
+        TableMetadata.Builder cols = TableMetadata.builder("ks", "cf")
+                                                  .addPartitionKeyColumn("pk", udtPK)
+                                                  .addClusteringColumn("ck", udtCK);
+        commonColumns(cols);
+        cols.addRegularColumn("solr_query", UTF8Type.instance);
+        File sstable = buildFakeSSTable(dir, 1, cols, true);
+
+        SerializationHeader.Component header = readHeader(sstable);
+        assertFrozenUdt(header, false, true);
+
+        ColumnMetadata cd = getColDef("solr_query");
+        tableMetadata = tableMetadata.unbuild()
+                                     .indexes(tableMetadata.indexes.with(IndexMetadata.fromSchemaMetadata("some search index", IndexMetadata.Kind.CUSTOM, Collections.singletonMap(IndexTarget.TARGET_OPTION_NAME, "solr_query"))))
+                                     .removeRegularOrStaticColumn(cd.name)
+                                     .build();
+
+        SSTableHeaderFix headerFix = builder().withPath(sstable.toPath())
+                                              .build();
+        headerFix.execute();
+
+        assertFalse(headerFix.hasError());
+        assertTrue(headerFix.hasChanges());
+
+        // must not have re-written the stats-component
+        header = readHeader(sstable);
+        assertFrozenUdt(header, true, true);
+    }
+
+    @Test
+    public void complexTypeMatchTest() throws Exception
+    {
+        File dir = temporaryFolder;
+
+        TableMetadata.Builder cols = TableMetadata.builder("ks", "cf")
+                                                  .addPartitionKeyColumn("pk", udtPK)
+                                                  .addClusteringColumn("ck", udtCK);
+        commonColumns(cols);
+        cols.addRegularColumn("tuple_in_tuple", tupleInTuple)
+            .addRegularColumn("udt_nested", udtNested)
+            .addRegularColumn("udt_in_tuple", udtInTuple)
+            .addRegularColumn("tuple_in_composite", tupleInComposite)
+            .addRegularColumn("udt_in_composite", udtInComposite)
+            .addRegularColumn("udt_in_list", udtInList)
+            .addRegularColumn("udt_in_set", udtInSet)
+            .addRegularColumn("udt_in_map", udtInMap)
+            .addRegularColumn("udt_in_frozen_list", udtInFrozenList)
+            .addRegularColumn("udt_in_frozen_set", udtInFrozenSet)
+            .addRegularColumn("udt_in_frozen_map", udtInFrozenMap);
+        File sstable = buildFakeSSTable(dir, 1, cols, true);
+
+        SerializationHeader.Component header = readHeader(sstable);
+        assertFrozenUdt(header, false, true);
+
+        SSTableHeaderFix headerFix = builder().withPath(sstable.toPath())
+                                              .build();
+        headerFix.execute();
+
+        assertFalse(headerFix.hasError());
+        assertTrue(headerFix.hasChanges());
+        assertEquals(Sets.newHashSet("pk", "ck", "regular_b", "static_b",
+                                     "udt_nested", "udt_in_composite", "udt_in_list", "udt_in_set", "udt_in_map"), updatedColumns);
+
+        // must not have re-written the stats-component
+        header = readHeader(sstable);
+        assertFrozenUdt(header, true, true);
+    }
+
+    @Test
+    public void complexTypeDroppedColumnsMatchTest() throws Exception
+    {
+        File dir = temporaryFolder;
+
+        TableMetadata.Builder cols = TableMetadata.builder("ks", "cf")
+                                                  .addPartitionKeyColumn("pk", udtPK)
+                                                  .addClusteringColumn("ck", udtCK);
+        commonColumns(cols);
+        cols.addRegularColumn("tuple_in_tuple", tupleInTuple)
+            .addRegularColumn("udt_nested", udtNested)
+            .addRegularColumn("udt_in_tuple", udtInTuple)
+            .addRegularColumn("tuple_in_composite", tupleInComposite)
+            .addRegularColumn("udt_in_composite", udtInComposite)
+            .addRegularColumn("udt_in_list", udtInList)
+            .addRegularColumn("udt_in_set", udtInSet)
+            .addRegularColumn("udt_in_map", udtInMap)
+            .addRegularColumn("udt_in_frozen_list", udtInFrozenList)
+            .addRegularColumn("udt_in_frozen_set", udtInFrozenSet)
+            .addRegularColumn("udt_in_frozen_map", udtInFrozenMap);
+        File sstable = buildFakeSSTable(dir, 1, cols, true);
+
+        cols = tableMetadata.unbuild();
+        for (String col : new String[]{"tuple_in_tuple", "udt_nested", "udt_in_tuple",
+                                       "tuple_in_composite", "udt_in_composite",
+                                       "udt_in_list", "udt_in_set", "udt_in_map",
+                                       "udt_in_frozen_list", "udt_in_frozen_set", "udt_in_frozen_map"})
+        {
+            ColumnIdentifier ci = new ColumnIdentifier(col, true);
+            ColumnMetadata cd = getColDef(col);
+            AbstractType<?> dropType = cd.type.expandUserTypes();
+            cols.removeRegularOrStaticColumn(ci)
+                .recordColumnDrop(new ColumnMetadata(cd.ksName, cd.cfName, cd.name, dropType, cd.position(), cd.kind), FBUtilities.timestampMicros());
+        }
+        tableMetadata = cols.build();
+
+        SerializationHeader.Component header = readHeader(sstable);
+        assertFrozenUdt(header, false, true);
+
+        SSTableHeaderFix headerFix = builder().withPath(sstable.toPath())
+                                              .build();
+        headerFix.execute();
+
+        assertFalse(headerFix.hasError());
+        assertTrue(headerFix.hasChanges());
+        assertEquals(Sets.newHashSet("pk", "ck", "regular_b", "static_b", "udt_nested"), updatedColumns);
+
+        // must not have re-written the stats-component
+        header = readHeader(sstable);
+        // do not check the inner types, as the inner types were not fixed in the serialization-header (test thing)
+        assertFrozenUdt(header, true, false);
+    }
+
+    @Test
+    public void variousDroppedUserTypes() throws Exception
+    {
+        File dir = temporaryFolder;
+
+        TableMetadata.Builder cols = TableMetadata.builder("ks", "cf")
+                                                  .addPartitionKeyColumn("pk", udtPK)
+                                                  .addClusteringColumn("ck", udtCK);
+
+        ColSpec[] colSpecs = new ColSpec[]
+                {
+                        // 'frozen<udt>' / live
+                        new ColSpec("frozen_udt_as_frozen_udt_live",
+                                    makeUDT2("frozen_udt_as_frozen_udt_live", false),
+                                    makeUDT2("frozen_udt_as_frozen_udt_live", false),
+                                    false,
+                                    false),
+                        // 'frozen<udt>' / live / as 'udt'
+                        new ColSpec("frozen_udt_as_unfrozen_udt_live",
+                                    makeUDT2("frozen_udt_as_unfrozen_udt_live", false),
+                                    makeUDT2("frozen_udt_as_unfrozen_udt_live", true),
+                                    false,
+                                    true),
+                        // 'frozen<udt>' / dropped
+                        new ColSpec("frozen_udt_as_frozen_udt_dropped",
+                                    makeUDT2("frozen_udt_as_frozen_udt_dropped", true).freezeNestedMulticellTypes().freeze().expandUserTypes(),
+                                    makeUDT2("frozen_udt_as_frozen_udt_dropped", false),
+                                    makeUDT2("frozen_udt_as_frozen_udt_dropped", false),
+                                    true,
+                                    false),
+                        // 'frozen<udt>' / dropped / as 'udt'
+                        new ColSpec("frozen_udt_as_unfrozen_udt_dropped",
+                                    makeUDT2("frozen_udt_as_unfrozen_udt_dropped", true).freezeNestedMulticellTypes().freeze().expandUserTypes(),
+                                    makeUDT2("frozen_udt_as_unfrozen_udt_dropped", true),
+                                    makeUDT2("frozen_udt_as_unfrozen_udt_dropped", false),
+                                    true,
+                                    true),
+                        // 'udt' / live
+                        new ColSpec("unfrozen_udt_as_unfrozen_udt_live",
+                                    makeUDT2("unfrozen_udt_as_unfrozen_udt_live", true),
+                                    makeUDT2("unfrozen_udt_as_unfrozen_udt_live", true),
+                                    false,
+                                    false),
+                        // 'udt' / dropped
+// TODO unable to test dropping a non-frozen UDT, as that requires an unfrozen tuple as well
+//                        new ColSpec("unfrozen_udt_as_unfrozen_udt_dropped",
+//                                    makeUDT2("unfrozen_udt_as_unfrozen_udt_dropped", true).freezeNestedMulticellTypes().expandUserTypes(),
+//                                    makeUDT2("unfrozen_udt_as_unfrozen_udt_dropped", true),
+//                                    makeUDT2("unfrozen_udt_as_unfrozen_udt_dropped", true),
+//                                    true,
+//                                    false),
+                        // 'frozen<tuple>' as 'TupleType(multiCell=false' (there is nothing like 'FrozenType(TupleType(')
+                        new ColSpec("frozen_tuple_as_frozen_tuple_live",
+                                    makeTupleSimple(),
+                                    makeTupleSimple(),
+                                    false,
+                                    false),
+                        // 'frozen<tuple>' as 'TupleType(multiCell=false' (there is nothing like 'FrozenType(TupleType(')
+                        new ColSpec("frozen_tuple_as_frozen_tuple_dropped",
+                                    makeTupleSimple(),
+                                    makeTupleSimple(),
+                                    true,
+                                    false)
+                };
+
+        Arrays.stream(colSpecs).forEach(c -> cols.addRegularColumn(c.name,
+                                                                   // use the initial column type for the serialization header header.
+                                                                   c.preFix));
+
+        Map<String, ColSpec> colSpecMap = Arrays.stream(colSpecs).collect(Collectors.toMap(c -> c.name, c -> c));
+        File sstable = buildFakeSSTable(dir, 1, cols, c -> {
+            ColSpec cs = colSpecMap.get(c.name.toString());
+            if (cs == null)
+                return c;
+            // update the column type in the schema to the "correct" one.
+            return c.withNewType(cs.schema);
+        });
+
+        Arrays.stream(colSpecs)
+              .filter(c -> c.dropped)
+              .forEach(c -> {
+                  ColumnMetadata cd = getColDef(c.name);
+                  tableMetadata = tableMetadata.unbuild()
+                                               .removeRegularOrStaticColumn(cd.name)
+                                               .recordColumnDrop(cd, FBUtilities.timestampMicros())
+                                               .build();
+              });
+
+        SerializationHeader.Component header = readHeader(sstable);
+        for (ColSpec colSpec : colSpecs)
+        {
+            AbstractType<?> hdrType = header.getRegularColumns().get(ByteBufferUtil.bytes(colSpec.name));
+            assertEquals(colSpec.name, colSpec.preFix, hdrType);
+            assertEquals(colSpec.name, colSpec.preFix.isMultiCell(), hdrType.isMultiCell());
+        }
+
+        SSTableHeaderFix headerFix = builder().withPath(sstable.toPath())
+                                              .build();
+        headerFix.execute();
+
+        assertFalse(headerFix.hasError());
+        assertTrue(headerFix.hasChanges());
+        // Verify that all columns to fix are in the updatedColumns set (paranoid, yet)
+        Arrays.stream(colSpecs)
+              .filter(c -> c.mustFix)
+              .forEach(c -> assertTrue("expect " + c.name + " to be updated, but was not (" + updatedColumns + ")", updatedColumns.contains(c.name)));
+        // Verify that the number of updated columns maches the expected number of columns to fix
+        assertEquals(Arrays.stream(colSpecs).filter(c -> c.mustFix).count(), updatedColumns.size());
+
+        header = readHeader(sstable);
+        for (ColSpec colSpec : colSpecs)
+        {
+            AbstractType<?> hdrType = header.getRegularColumns().get(ByteBufferUtil.bytes(colSpec.name));
+            assertEquals(colSpec.name, colSpec.expect, hdrType);
+            assertEquals(colSpec.name, colSpec.expect.isMultiCell(), hdrType.isMultiCell());
+        }
+    }
+
+    static class ColSpec
+    {
+        final String name;
+        final AbstractType<?> schema;
+        final AbstractType<?> preFix;
+        final AbstractType<?> expect;
+        final boolean dropped;
+        final boolean mustFix;
+
+        ColSpec(String name, AbstractType<?> schema, AbstractType<?> preFix, boolean dropped, boolean mustFix)
+        {
+            this(name, schema, preFix, schema, dropped, mustFix);
+        }
+
+        ColSpec(String name, AbstractType<?> schema, AbstractType<?> preFix, AbstractType<?> expect, boolean dropped, boolean mustFix)
+        {
+            this.name = name;
+            this.schema = schema;
+            this.preFix = preFix;
+            this.expect = expect;
+            this.dropped = dropped;
+            this.mustFix = mustFix;
+        }
+    }
+
+    @Test
+    public void verifyTypeMatchCompositeKeyTest() throws Exception
+    {
+        File dir = temporaryFolder;
+
+        TableMetadata.Builder cols = TableMetadata.builder("ks", "cf")
+                                                  .addPartitionKeyColumn("pk1", UTF8Type.instance)
+                                                  .addPartitionKeyColumn("pk2", udtPK)
+                                                  .addClusteringColumn("ck", udtCK);
+        commonColumns(cols);
+        File sstable = buildFakeSSTable(dir, 1, cols, false);
+
+        SerializationHeader.Component header = readHeader(sstable);
+        assertFrozenUdt(header, false, true);
+
+        SSTableHeaderFix headerFix = builder().withPath(sstable.toPath())
+                                              .build();
+        headerFix.execute();
+
+        assertFalse(headerFix.hasError());
+        assertFalse(headerFix.hasChanges());
+        assertTrue(updatedColumns.isEmpty());
+
+        // must not have re-written the stats-component
+        header = readHeader(sstable);
+        assertFrozenUdt(header, false, true);
+    }
+
+    @Test
+    public void compositePartitionKey() throws Exception
+    {
+        TableMetadata.Builder cols = TableMetadata.builder("ks", "cf")
+                                                  .addPartitionKeyColumn("pk1", UTF8Type.instance)
+                                                  .addPartitionKeyColumn("pk2", udtPK)
+                                                  .addClusteringColumn("ck", udtCK);
+        commonColumns(cols);
+
+        File dir = temporaryFolder;
+        File sstable = buildFakeSSTable(dir, 1, cols, true);
+
+        SerializationHeader.Component header = readHeader(sstable);
+        assertTrue(header.getKeyType() instanceof CompositeType);
+        CompositeType keyType = (CompositeType) header.getKeyType();
+        assertEquals(Arrays.asList(UTF8Type.instance, udtPK), keyType.getComponents());
+
+        SSTableHeaderFix headerFix = builder().withPath(sstable.toPath())
+                                              .build();
+        headerFix.execute();
+
+        assertFalse(headerFix.hasError());
+        assertTrue(headerFix.hasChanges());
+        assertEquals(Sets.newHashSet("pk2", "ck", "regular_b", "static_b"), updatedColumns);
+
+        header = readHeader(sstable);
+        assertTrue(header.getKeyType() instanceof CompositeType);
+        keyType = (CompositeType) header.getKeyType();
+        assertEquals(Arrays.asList(UTF8Type.instance, udtPK.freeze()), keyType.getComponents());
+    }
+
+    @Test
+    public void compositeClusteringKey() throws Exception
+    {
+        TableMetadata.Builder cols = TableMetadata.builder("ks", "cf")
+                                                  .addPartitionKeyColumn("pk", udtPK)
+                                                  .addClusteringColumn("ck1", Int32Type.instance)
+                                                  .addClusteringColumn("ck2", udtCK);
+        commonColumns(cols);
+
+        File dir = temporaryFolder;
+        File sstable = buildFakeSSTable(dir, 1, cols, true);
+
+        SerializationHeader.Component header = readHeader(sstable);
+        assertEquals(Arrays.asList(Int32Type.instance, udtCK), header.getClusteringTypes());
+
+        SSTableHeaderFix headerFix = builder().withPath(sstable.toPath())
+                                              .build();
+        headerFix.execute();
+
+        assertFalse(headerFix.hasError());
+        assertTrue(headerFix.hasChanges());
+        assertEquals(Sets.newHashSet("pk", "ck2", "regular_b", "static_b"), updatedColumns);
+
+        header = readHeader(sstable);
+        assertEquals(Arrays.asList(Int32Type.instance, udtCK.freeze()), header.getClusteringTypes());
+    }
+
+    /**
+     * Check whether {@link SSTableHeaderFix} can operate on a single file.
+     */
+    @Test
+    public void singleFileUDTFixTest() throws Exception
+    {
+        File dir = temporaryFolder;
+        File sstable = generateFakeSSTable(dir, 1);
+
+        SerializationHeader.Component header = readHeader(sstable);
+        assertFrozenUdt(header, false, true);
+
+        SSTableHeaderFix headerFix = builder().withPath(sstable.toPath())
+                                              .build();
+        headerFix.execute();
+
+        assertTrue(headerFix.hasChanges());
+        assertFalse(headerFix.hasError());
+
+        header = readHeader(sstable);
+        assertFrozenUdt(header, true, true);
+    }
+
+    /**
+     * Check whether {@link SSTableHeaderFix} can operate on a file in a directory.
+     */
+    @Test
+    public void singleDirectoryUDTFixTest() throws Exception
+    {
+        File dir = temporaryFolder;
+        List<File> sstables = IntStream.range(1, 11)
+                                       .mapToObj(g -> generateFakeSSTable(dir, g))
+                                       .collect(Collectors.toList());
+
+        for (File sstable : sstables)
+        {
+            SerializationHeader.Component header = readHeader(sstable);
+            assertFrozenUdt(header, false, true);
+        }
+
+        SSTableHeaderFix headerFix = builder().withPath(dir.toPath())
+                                              .build();
+        headerFix.execute();
+
+        assertTrue(headerFix.hasChanges());
+        assertFalse(headerFix.hasError());
+
+        for (File sstable : sstables)
+        {
+            SerializationHeader.Component header = readHeader(sstable);
+            assertFrozenUdt(header, true, true);
+        }
+    }
+
+    /**
+     * Check whether {@link SSTableHeaderFix} can operate multiple, single files.
+     */
+    @Test
+    public void multipleFilesUDTFixTest() throws Exception
+    {
+        File dir = temporaryFolder;
+        List<File> sstables = IntStream.range(1, 11)
+                                       .mapToObj(g -> generateFakeSSTable(dir, g))
+                                       .collect(Collectors.toList());
+
+        for (File sstable : sstables)
+        {
+            SerializationHeader.Component header = readHeader(sstable);
+            assertFrozenUdt(header, false, true);
+        }
+
+        SSTableHeaderFix.Builder builder = builder();
+        sstables.stream().map(File::toPath).forEach(builder::withPath);
+        SSTableHeaderFix headerFix = builder.build();
+        headerFix.execute();
+
+        assertTrue(headerFix.hasChanges());
+        assertFalse(headerFix.hasError());
+
+        for (File sstable : sstables)
+        {
+            SerializationHeader.Component header = readHeader(sstable);
+            assertFrozenUdt(header, true, true);
+        }
+    }
+
+    /**
+     * Check whether {@link SSTableHeaderFix} can operate multiple files in a directory.
+     */
+    @Test
+    public void multipleFilesInDirectoryUDTFixTest() throws Exception
+    {
+        File dir = temporaryFolder;
+        List<File> sstables = IntStream.range(1, 11)
+                                       .mapToObj(g -> generateFakeSSTable(dir, g))
+                                       .collect(Collectors.toList());
+
+        for (File sstable : sstables)
+        {
+            SerializationHeader.Component header = readHeader(sstable);
+            assertFrozenUdt(header, false, true);
+        }
+
+        SSTableHeaderFix headerFix = builder().withPath(dir.toPath())
+                                              .build();
+        headerFix.execute();
+
+        assertTrue(headerFix.hasChanges());
+        assertFalse(headerFix.hasError());
+
+        for (File sstable : sstables)
+        {
+            SerializationHeader.Component header = readHeader(sstable);
+            assertFrozenUdt(header, true, true);
+        }
+    }
+
+    private static final Pattern p = Pattern.compile(".* Column '([^']+)' needs to be updated from type .*");
+
+    private SSTableHeaderFix.Builder builder()
+    {
+        updatedColumns.clear();
+        return SSTableHeaderFix.builder()
+                               .schemaCallback(() -> (desc) -> tableMetadata)
+                               .info(ln -> {
+                                   System.out.println("INFO: " + ln);
+                                   Matcher m = p.matcher(ln);
+                                   if (m.matches())
+                                       updatedColumns.add(m.group(1));
+                               })
+                               .warn(ln -> System.out.println("WARN: " + ln))
+                               .error(ln -> System.out.println("ERROR: " + ln));
+    }
+
+    private File generateFakeSSTable(File dir, int generation)
+    {
+        TableMetadata.Builder cols = TableMetadata.builder("ks", "cf")
+                                                  .addPartitionKeyColumn("pk", udtPK)
+                                                  .addClusteringColumn("ck", udtCK);
+        commonColumns(cols);
+        return buildFakeSSTable(dir, generation, cols, true);
+    }
+
+    private void commonColumns(TableMetadata.Builder cols)
+    {
+        cols.addRegularColumn("regular_a", UTF8Type.instance)
+            .addRegularColumn("regular_b", udtRegular)
+            .addRegularColumn("regular_c", Int32Type.instance)
+            .addStaticColumn("static_a", UTF8Type.instance)
+            .addStaticColumn("static_b", udtStatic)
+            .addStaticColumn("static_c", Int32Type.instance);
+    }
+
+    private File buildFakeSSTable(File dir, int generation, TableMetadata.Builder cols, boolean freezeInSchema)
+    {
+        return buildFakeSSTable(dir, generation, cols, freezeInSchema
+                                                       ? c -> c.withNewType(freezeUdt(c.type))
+                                                       : c -> c);
+    }
+
+    private File buildFakeSSTable(File dir, int generation, TableMetadata.Builder cols, Function<ColumnMetadata, ColumnMetadata> freezer)
+    {
+        TableMetadata headerMetadata = cols.build();
+
+        TableMetadata.Builder schemaCols = TableMetadata.builder("ks", "cf");
+        for (ColumnMetadata cm : cols.columns())
+            schemaCols.addColumn(freezer.apply(cm));
+        tableMetadata = schemaCols.build();
+
+        try
+        {
+
+            Descriptor desc = new Descriptor(version, dir, "ks", "cf", generation, SSTableFormat.Type.BIG);
+
+            // Just create the component files - we don't really need those.
+            for (Component component : requiredComponents)
+                assertTrue(new File(desc.filenameFor(component)).createNewFile());
+
+            AbstractType<?> partitionKey = headerMetadata.partitionKeyType;
+            List<AbstractType<?>> clusteringKey = headerMetadata.clusteringColumns()
+                                                                .stream()
+                                                                .map(cd -> cd.type)
+                                                                .collect(Collectors.toList());
+            Map<ByteBuffer, AbstractType<?>> staticColumns = headerMetadata.columns()
+                                                                           .stream()
+                                                                           .filter(cd -> cd.kind == ColumnMetadata.Kind.STATIC)
+                                                                           .collect(Collectors.toMap(cd -> cd.name.bytes, cd -> cd.type, (a, b) -> a));
+            Map<ByteBuffer, AbstractType<?>> regularColumns = headerMetadata.columns()
+                                                                            .stream()
+                                                                            .filter(cd -> cd.kind == ColumnMetadata.Kind.REGULAR)
+                                                                            .collect(Collectors.toMap(cd -> cd.name.bytes, cd -> cd.type, (a, b) -> a));
+
+            File statsFile = new File(desc.filenameFor(Component.STATS));
+            SerializationHeader.Component header = SerializationHeader.Component.buildComponentForTools(partitionKey,
+                                                                                                        clusteringKey,
+                                                                                                        staticColumns,
+                                                                                                        regularColumns,
+                                                                                                        EncodingStats.NO_STATS);
+
+            try (SequentialWriter out = new SequentialWriter(statsFile))
+            {
+                desc.getMetadataSerializer().serialize(Collections.singletonMap(MetadataType.HEADER, header), out, version);
+                out.finish();
+            }
+
+            return new File(desc.filenameFor(Component.DATA));
+        }
+        catch (Exception e)
+        {
+            throw new RuntimeException(e);
+        }
+    }
+
+    private AbstractType<?> freezeUdt(AbstractType<?> type)
+    {
+        if (type instanceof CollectionType)
+        {
+            if (type.getClass() == ListType.class)
+            {
+                ListType<?> cHeader = (ListType<?>) type;
+                return ListType.getInstance(freezeUdt(cHeader.getElementsType()), cHeader.isMultiCell());
+            }
+            else if (type.getClass() == SetType.class)
+            {
+                SetType<?> cHeader = (SetType<?>) type;
+                return SetType.getInstance(freezeUdt(cHeader.getElementsType()), cHeader.isMultiCell());
+            }
+            else if (type.getClass() == MapType.class)
+            {
+                MapType<?, ?> cHeader = (MapType<?, ?>) type;
+                return MapType.getInstance(freezeUdt(cHeader.getKeysType()), freezeUdt(cHeader.getValuesType()), cHeader.isMultiCell());
+            }
+        }
+        else if (type instanceof AbstractCompositeType)
+        {
+            if (type.getClass() == CompositeType.class)
+            {
+                CompositeType cHeader = (CompositeType) type;
+                return CompositeType.getInstance(cHeader.types.stream().map(this::freezeUdt).collect(Collectors.toList()));
+            }
+        }
+        else if (type instanceof TupleType)
+        {
+            if (type.getClass() == UserType.class)
+            {
+                UserType cHeader = (UserType) type;
+                cHeader = cHeader.freeze();
+                return new UserType(cHeader.keyspace, cHeader.name, cHeader.fieldNames(),
+                                    cHeader.allTypes().stream().map(this::freezeUdt).collect(Collectors.toList()),
+                                    cHeader.isMultiCell());
+            }
+        }
+        return type;
+    }
+
+    private void assertFrozenUdt(SerializationHeader.Component header, boolean frozen, boolean checkInner)
+    {
+        AbstractType<?> keyType = header.getKeyType();
+        if (keyType instanceof CompositeType)
+        {
+            for (AbstractType<?> component : ((CompositeType) keyType).types)
+                assertFrozenUdt("partition-key-component", component, frozen, checkInner);
+        }
+        assertFrozenUdt("partition-key", keyType, frozen, checkInner);
+
+        for (AbstractType<?> type : header.getClusteringTypes())
+            assertFrozenUdt("clustering-part", type, frozen, checkInner);
+        for (Map.Entry<ByteBuffer, AbstractType<?>> col : header.getStaticColumns().entrySet())
+            assertFrozenUdt(UTF8Type.instance.compose(col.getKey()), col.getValue(), frozen, checkInner);
+        for (Map.Entry<ByteBuffer, AbstractType<?>> col : header.getRegularColumns().entrySet())
+            assertFrozenUdt(UTF8Type.instance.compose(col.getKey()), col.getValue(), frozen, checkInner);
+    }
+
+    private void assertFrozenUdt(String name, AbstractType<?> type, boolean frozen, boolean checkInner)
+    {
+        if (type instanceof CompositeType)
+        {
+            if (checkInner)
+                for (AbstractType<?> component : ((CompositeType) type).types)
+                    assertFrozenUdt(name, component, frozen, true);
+        }
+        else if (type instanceof CollectionType)
+        {
+            if (checkInner)
+            {
+                if (type instanceof MapType)
+                {
+                    MapType map = (MapType) type;
+                    // only descend for non-frozen types (checking frozen in frozen is just stupid)
+                    if (map.isMultiCell())
+                    {
+                        assertFrozenUdt(name + "<map-key>", map.getKeysType(), frozen, true);
+                        assertFrozenUdt(name + "<map-value>", map.getValuesType(), frozen, true);
+                    }
+                }
+                else if (type instanceof SetType)
+                {
+                    SetType set = (SetType) type;
+                    // only descend for non-frozen types (checking frozen in frozen is just stupid)
+                    if (set.isMultiCell())
+                        assertFrozenUdt(name + "<set>", set.getElementsType(), frozen, true);
+                }
+                else if (type instanceof ListType)
+                {
+                    ListType list = (ListType) type;
+                    // only descend for non-frozen types (checking frozen in frozen is just stupid)
+                    if (list.isMultiCell())
+                        assertFrozenUdt(name + "<list>", list.getElementsType(), frozen, true);
+                }
+            }
+        }
+        else if (type instanceof TupleType)
+        {
+            if (checkInner)
+            {
+                TupleType tuple = (TupleType) type;
+                // only descend for non-frozen types (checking frozen in frozen is just stupid)
+                if (tuple.isMultiCell())
+                    for (AbstractType<?> component : tuple.allTypes())
+                        assertFrozenUdt(name + "<tuple>", component, frozen, true);
+            }
+        }
+
+        if (type instanceof UserType)
+        {
+            String typeString = type.toString();
+            assertEquals(name + ": " + typeString, frozen, !type.isMultiCell());
+            if (typeString.startsWith(UserType.class.getName() + '('))
+                if (frozen)
+                    fail(name + ": " + typeString);
+            if (typeString.startsWith(FrozenType.class.getName() + '(' + UserType.class.getName() + '('))
+                if (!frozen)
+                    fail(name + ": " + typeString);
+        }
+    }
+
+    private SerializationHeader.Component readHeader(File sstable) throws Exception
+    {
+        Descriptor desc = Descriptor.fromFilename(sstable);
+        return (SerializationHeader.Component) desc.getMetadataSerializer().deserialize(desc, MetadataType.HEADER);
+    }
+
+    private static final Component[] requiredComponents = new Component[]{ Component.DATA, Component.FILTER, Component.PRIMARY_INDEX, Component.TOC };
+}
diff --git a/test/unit/org/apache/cassandra/schema/TupleTypesRepresentationTest.java b/test/unit/org/apache/cassandra/schema/TupleTypesRepresentationTest.java
new file mode 100644
index 0000000..e6daa1f
--- /dev/null
+++ b/test/unit/org/apache/cassandra/schema/TupleTypesRepresentationTest.java
@@ -0,0 +1,403 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.schema;
+
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+
+import org.junit.Ignore;
+import org.junit.Test;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.cql3.CQL3Type;
+import org.apache.cassandra.cql3.CQLFragmentParser;
+import org.apache.cassandra.cql3.CqlParser;
+import org.apache.cassandra.cql3.FieldIdentifier;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.marshal.TypeParser;
+import org.apache.cassandra.db.marshal.UTF8Type;
+import org.apache.cassandra.db.marshal.UserType;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Verifies that the string representations of {@link AbstractType} and {@link CQL3Type} are as expected and compatible.
+ *
+ * C* 3.0 is known to <em>not</em> enclose a frozen UDT in a "frozen bracket" in the {@link AbstractType}.
+ * The string representation of a frozuen UDT using the {@link CQL3Type} type hierarchy is correct in C* 3.0.
+ */
+public class TupleTypesRepresentationTest
+{
+    static
+    {
+        DatabaseDescriptor.toolInitialization();
+    }
+
+    private static final String keyspace = "ks";
+    private static final String mcUdtName = "mc_udt";
+    private static final ByteBuffer mcUdtNameBytes = ByteBufferUtil.bytes(mcUdtName);
+    private static final String iUdtName = "i_udt";
+    private static final ByteBuffer iUdtNameBytes = ByteBufferUtil.bytes(iUdtName);
+    private static final String fUdtName = "f_udt";
+    private static final ByteBuffer fUdtNameBytes = ByteBufferUtil.bytes(fUdtName);
+    private static final String udtField1 = "a";
+    private static final String udtField2 = "b";
+    private static final AbstractType<?> udtType1 = UTF8Type.instance;
+    private static final AbstractType<?> udtType2 = UTF8Type.instance;
+
+    private static final Types types = Types.builder()
+                                            .add(new UserType(keyspace,
+                                                              mcUdtNameBytes,
+                                                              Arrays.asList(new FieldIdentifier(ByteBufferUtil.bytes(udtField1)),
+                                                                            new FieldIdentifier(ByteBufferUtil.bytes(udtField2))),
+                                                              Arrays.asList(udtType1,
+                                                                            udtType2),
+                                                              true))
+                                            .add(new UserType(keyspace,
+                                                              iUdtNameBytes,
+                                                              Arrays.asList(new FieldIdentifier(ByteBufferUtil.bytes(udtField1)),
+                                                                            new FieldIdentifier(ByteBufferUtil.bytes(udtField2))),
+                                                              Arrays.asList(udtType1,
+                                                                            udtType2),
+                                                              true))
+                                            .add(new UserType(keyspace,
+                                                              fUdtNameBytes,
+                                                              Arrays.asList(new FieldIdentifier(ByteBufferUtil.bytes(udtField1)),
+                                                                            new FieldIdentifier(ByteBufferUtil.bytes(udtField2))),
+                                                              Arrays.asList(udtType1,
+                                                                            udtType2),
+                                                              true))
+                                            .build();
+
+    static class TypeDef
+    {
+        final String typeString;
+        final String cqlTypeString;
+        final String droppedCqlTypeString;
+        final boolean multiCell;
+        final String cqlValue;
+
+        final AbstractType<?> type;
+        final CQL3Type cqlType;
+
+        final AbstractType<?> droppedType;
+        final CQL3Type droppedCqlType;
+
+        TypeDef(String typeString, String cqlTypeString, String droppedCqlTypeString, boolean multiCell, String cqlValue)
+        {
+            this.typeString = typeString;
+            this.cqlTypeString = cqlTypeString;
+            this.droppedCqlTypeString = droppedCqlTypeString;
+            this.multiCell = multiCell;
+            this.cqlValue = cqlValue;
+
+            cqlType = CQLFragmentParser.parseAny(CqlParser::comparatorType, cqlTypeString, "non-dropped type")
+                                       .prepare(keyspace, types);
+            type = cqlType.getType();
+
+            droppedCqlType = CQLFragmentParser.parseAny(CqlParser::comparatorType, droppedCqlTypeString, "dropped type")
+                                              .prepare(keyspace, types);
+            // NOTE: TupleType is *always* parsed as frozen, but never toString()'d with the surrounding FrozenType
+            droppedType = droppedCqlType.getType();
+        }
+
+        @Override
+        public String toString()
+        {
+            return "TypeDef{\n" +
+                   "typeString='" + typeString + "'\n" +
+                   ", type=" + type + '\n' +
+                   ", cqlTypeString='" + cqlTypeString + "'\n" +
+                   ", cqlType=" + cqlType + '\n' +
+                   ", droppedType=" + droppedType + '\n' +
+                   ", droppedCqlTypeString='" + droppedCqlTypeString + "'\n" +
+                   ", droppedCqlType=" + droppedCqlType + '\n' +
+                   '}';
+        }
+    }
+
+    private static final TypeDef text = new TypeDef(
+            "org.apache.cassandra.db.marshal.UTF8Type",
+            "text",
+            "text",
+            false,
+            "'foobar'");
+
+    private static final TypeDef tuple_text__text_ = new TypeDef(
+            "org.apache.cassandra.db.marshal.TupleType(org.apache.cassandra.db.marshal.UTF8Type,org.apache.cassandra.db.marshal.UTF8Type)",
+            "tuple<text, text>",
+            "frozen<tuple<text, text>>",
+            false,
+            "('foo','bar')");
+
+    // Currently, dropped non-frozen-UDT columns are recorded as frozen<tuple<...>>, which is technically wrong
+    //private static final TypeDef mc_udt = new TypeDef(
+    //        "org.apache.cassandra.db.marshal.UserType(ks,6d635f756474,61:org.apache.cassandra.db.marshal.UTF8Type,62:org.apache.cassandra.db.marshal.UTF8Type)",
+    //        "mc_udt",
+    //        "tuple<text, text>",
+    //        true,
+    //        "{a:'foo',b:'bar'}");
+
+    private static final TypeDef frozen_f_udt_ = new TypeDef(
+            "org.apache.cassandra.db.marshal.FrozenType(org.apache.cassandra.db.marshal.UserType(ks,665f756474,61:org.apache.cassandra.db.marshal.UTF8Type,62:org.apache.cassandra.db.marshal.UTF8Type))",
+            "frozen<f_udt>",
+            "frozen<tuple<text, text>>",
+            false,
+            "{a:'foo',b:'bar'}");
+
+    private static final TypeDef list_text_ = new TypeDef(
+            "org.apache.cassandra.db.marshal.ListType(org.apache.cassandra.db.marshal.UTF8Type)",
+            "list<text>",
+            "list<text>",
+            true,
+            "['foobar']");
+
+    private static final TypeDef frozen_list_text__ = new TypeDef(
+            "org.apache.cassandra.db.marshal.FrozenType(org.apache.cassandra.db.marshal.ListType(org.apache.cassandra.db.marshal.UTF8Type))",
+            "frozen<list<text>>",
+            "frozen<list<text>>",
+            true,
+            "['foobar']");
+
+    private static final TypeDef set_text_ = new TypeDef(
+            "org.apache.cassandra.db.marshal.SetType(org.apache.cassandra.db.marshal.UTF8Type)",
+            "set<text>",
+            "set<text>",
+            true,
+            "{'foobar'}");
+
+    private static final TypeDef frozen_set_text__ = new TypeDef(
+            "org.apache.cassandra.db.marshal.FrozenType(org.apache.cassandra.db.marshal.SetType(org.apache.cassandra.db.marshal.UTF8Type))",
+            "frozen<set<text>>",
+            "frozen<set<text>>",
+            true,
+            "{'foobar'}");
+
+    private static final TypeDef map_text__text_ = new TypeDef(
+            "org.apache.cassandra.db.marshal.MapType(org.apache.cassandra.db.marshal.UTF8Type,org.apache.cassandra.db.marshal.UTF8Type)",
+            "map<text, text>",
+            "map<text, text>",
+            true,
+            "{'foo':'bar'}");
+
+    private static final TypeDef frozen_map_text__text__ = new TypeDef(
+            "org.apache.cassandra.db.marshal.FrozenType(org.apache.cassandra.db.marshal.MapType(org.apache.cassandra.db.marshal.UTF8Type,org.apache.cassandra.db.marshal.UTF8Type))",
+            "frozen<map<text, text>>",
+            "frozen<map<text, text>>",
+            true,
+            "{'foo':'bar'}");
+
+    private static final TypeDef list_frozen_tuple_text__text___ = new TypeDef(
+            // in consequence, this should be:
+            // "org.apache.cassandra.db.marshal.ListType(org.apache.cassandra.db.marshal.FrozenType(org.apache.cassandra.db.marshal.TupleType(org.apache.cassandra.db.marshal.UTF8Type,org.apache.cassandra.db.marshal.UTF8Type)))",
+            "org.apache.cassandra.db.marshal.ListType(org.apache.cassandra.db.marshal.TupleType(org.apache.cassandra.db.marshal.UTF8Type,org.apache.cassandra.db.marshal.UTF8Type))",
+            "list<frozen<tuple<text, text>>>",
+            "list<frozen<tuple<text, text>>>",
+            true,
+            "[('foo','bar')]");
+
+    private static final TypeDef frozen_list_tuple_text__text___ = new TypeDef(
+            "org.apache.cassandra.db.marshal.FrozenType(org.apache.cassandra.db.marshal.ListType(org.apache.cassandra.db.marshal.TupleType(org.apache.cassandra.db.marshal.UTF8Type,org.apache.cassandra.db.marshal.UTF8Type)))",
+            "frozen<list<frozen<tuple<text, text>>>>",
+            "frozen<list<frozen<tuple<text, text>>>>",
+            true,
+            "[('foo','bar')]");
+
+    private static final TypeDef set_frozen_tuple_text__text___ = new TypeDef(
+            // in consequence, this should be:
+            // "org.apache.cassandra.db.marshal.SetType(org.apache.cassandra.db.marshal.FrozenType(org.apache.cassandra.db.marshal.TupleType(org.apache.cassandra.db.marshal.UTF8Type,org.apache.cassandra.db.marshal.UTF8Type)))",
+            "org.apache.cassandra.db.marshal.SetType(org.apache.cassandra.db.marshal.TupleType(org.apache.cassandra.db.marshal.UTF8Type,org.apache.cassandra.db.marshal.UTF8Type))",
+            "set<frozen<tuple<text, text>>>",
+            "set<frozen<tuple<text, text>>>",
+            true,
+            "{('foo','bar')}");
+
+    private static final TypeDef frozen_set_tuple_text__text___ = new TypeDef(
+            "org.apache.cassandra.db.marshal.FrozenType(org.apache.cassandra.db.marshal.SetType(org.apache.cassandra.db.marshal.TupleType(org.apache.cassandra.db.marshal.UTF8Type,org.apache.cassandra.db.marshal.UTF8Type)))",
+            "frozen<set<frozen<tuple<text, text>>>>",
+            "frozen<set<frozen<tuple<text, text>>>>",
+            true,
+            "{('foo','bar')}");
+
+    private static final TypeDef map_text__frozen_tuple_text__text___ = new TypeDef(
+            // in consequence, this should be:
+            // "org.apache.cassandra.db.marshal.MapType(org.apache.cassandra.db.marshal.UTF8Type,org.apache.cassandra.db.marshal.FrozenType(org.apache.cassandra.db.marshal.TupleType(org.apache.cassandra.db.marshal.UTF8Type,org.apache.cassandra.db.marshal.UTF8Type)))",
+            "org.apache.cassandra.db.marshal.MapType(org.apache.cassandra.db.marshal.UTF8Type,org.apache.cassandra.db.marshal.TupleType(org.apache.cassandra.db.marshal.UTF8Type,org.apache.cassandra.db.marshal.UTF8Type))",
+            "map<text, frozen<tuple<text, text>>>",
+            "map<text, frozen<tuple<text, text>>>",
+            true,
+            "{'foobar':('foo','bar')}");
+
+    private static final TypeDef frozen_map_text__tuple_text__text___ = new TypeDef(
+            "org.apache.cassandra.db.marshal.FrozenType(org.apache.cassandra.db.marshal.MapType(org.apache.cassandra.db.marshal.UTF8Type,org.apache.cassandra.db.marshal.TupleType(org.apache.cassandra.db.marshal.UTF8Type,org.apache.cassandra.db.marshal.UTF8Type)))",
+            "frozen<map<text, frozen<tuple<text, text>>>>",
+            "frozen<map<text, frozen<tuple<text, text>>>>",
+            true,
+            "{'foobar':('foo','bar')}");
+
+    private static final TypeDef list_frozen_i_udt__ = new TypeDef(
+            "org.apache.cassandra.db.marshal.ListType(org.apache.cassandra.db.marshal.FrozenType(org.apache.cassandra.db.marshal.UserType(ks,695f756474,61:org.apache.cassandra.db.marshal.UTF8Type,62:org.apache.cassandra.db.marshal.UTF8Type)))",
+            "list<frozen<i_udt>>",
+            "list<frozen<tuple<text, text>>>",
+            true,
+            "[{a:'foo',b:'bar'}]");
+
+    private static final TypeDef frozen_list_i_udt__ = new TypeDef(
+            "org.apache.cassandra.db.marshal.FrozenType(org.apache.cassandra.db.marshal.ListType(org.apache.cassandra.db.marshal.UserType(ks,695f756474,61:org.apache.cassandra.db.marshal.UTF8Type,62:org.apache.cassandra.db.marshal.UTF8Type)))",
+            "frozen<list<frozen<i_udt>>>",
+            "frozen<list<frozen<tuple<text, text>>>>",
+            true,
+            "[{a:'foo',b:'bar'}]");
+
+    private static final TypeDef set_frozen_i_udt__ = new TypeDef(
+            "org.apache.cassandra.db.marshal.SetType(org.apache.cassandra.db.marshal.FrozenType(org.apache.cassandra.db.marshal.UserType(ks,695f756474,61:org.apache.cassandra.db.marshal.UTF8Type,62:org.apache.cassandra.db.marshal.UTF8Type)))",
+            "set<frozen<i_udt>>",
+            "set<frozen<tuple<text, text>>>",
+            true,
+            "{{a:'foo',b:'bar'}}");
+
+    private static final TypeDef frozen_set_i_udt__ = new TypeDef(
+            "org.apache.cassandra.db.marshal.FrozenType(org.apache.cassandra.db.marshal.SetType(org.apache.cassandra.db.marshal.UserType(ks,695f756474,61:org.apache.cassandra.db.marshal.UTF8Type,62:org.apache.cassandra.db.marshal.UTF8Type)))",
+            "frozen<set<frozen<i_udt>>>",
+            "frozen<set<frozen<tuple<text, text>>>>",
+            true,
+            "{{a:'foo',b:'bar'}}");
+
+    private static final TypeDef map_text__frozen_i_udt__ = new TypeDef(
+            "org.apache.cassandra.db.marshal.MapType(org.apache.cassandra.db.marshal.UTF8Type,org.apache.cassandra.db.marshal.FrozenType(org.apache.cassandra.db.marshal.UserType(ks,695f756474,61:org.apache.cassandra.db.marshal.UTF8Type,62:org.apache.cassandra.db.marshal.UTF8Type)))",
+            "map<text, frozen<i_udt>>",
+            "map<text, frozen<tuple<text, text>>>",
+            true,
+            "{'foobar':{a:'foo',b:'bar'}}");
+
+    private static final TypeDef frozen_map_text__i_udt__ = new TypeDef(
+            "org.apache.cassandra.db.marshal.FrozenType(org.apache.cassandra.db.marshal.MapType(org.apache.cassandra.db.marshal.UTF8Type,org.apache.cassandra.db.marshal.UserType(ks,695f756474,61:org.apache.cassandra.db.marshal.UTF8Type,62:org.apache.cassandra.db.marshal.UTF8Type)))",
+            "frozen<map<text, frozen<i_udt>>>",
+            "frozen<map<text, frozen<tuple<text, text>>>>",
+            true,
+            "{'foobar':{a:'foo',b:'bar'}}");
+
+    private static final TypeDef[] allTypes = {
+            text,
+            tuple_text__text_,
+            frozen_f_udt_,
+            list_text_,
+            frozen_list_text__,
+            set_text_,
+            frozen_set_text__,
+            map_text__text_,
+            frozen_map_text__text__,
+            list_frozen_tuple_text__text___,
+            frozen_list_tuple_text__text___,
+            set_frozen_tuple_text__text___,
+            frozen_set_tuple_text__text___,
+            map_text__frozen_tuple_text__text___,
+            frozen_map_text__tuple_text__text___,
+            list_frozen_i_udt__,
+            frozen_list_i_udt__,
+            set_frozen_i_udt__,
+            frozen_set_i_udt__,
+            map_text__frozen_i_udt__,
+            frozen_map_text__i_udt__,
+            };
+
+    @Ignore("Only used to ")
+    @Test
+    public void generateCqlStatements() throws InterruptedException
+    {
+        StringWriter sw = new StringWriter();
+        PrintWriter pw = new PrintWriter(sw);
+
+        pw.println("DROP TABLE sstableheaderfixtest;");
+        pw.println();
+        pw.println("CREATE TYPE i_udt (a text, b text);");
+        pw.println("CREATE TYPE f_udt (a text, b text);");
+        pw.println("CREATE TYPE mc_udt (a text, b text);");
+        pw.println();
+        pw.println("CREATE TABLE sstableheaderfixtest (");
+        pw.print("  id int PRIMARY KEY");
+        for (TypeDef typeDef : allTypes)
+        {
+            String cname = typeDef.cqlTypeString.replaceAll("[, <>]", "_");
+            pw.printf(",%n  %s %s", cname, typeDef.cqlTypeString);
+        }
+        pw.println(");");
+        pw.println();
+
+        pw.printf("INSERT INTO sstableheaderfixtest%n  (id");
+        for (TypeDef typeDef : allTypes)
+        {
+            String cname = typeDef.cqlTypeString.replaceAll("[, <>]", "_");
+            pw.printf(",%n    %s", cname);
+        }
+        pw.printf(")%n  VALUES%n  (1");
+        for (TypeDef typeDef : allTypes)
+        {
+            pw.printf(",%n    %s", typeDef.cqlValue);
+        }
+        pw.println(");");
+
+        pw.println();
+        pw.println();
+        pw.println("-- Run tools/bin/sstablemetadata data/data/<keyspace>/<table>/*-Data.db to show the sstable");
+        pw.println("-- serialization-header (types not shown in the C* 3.0 variant of the sstablemetadata tool)");
+
+        sw.flush();
+
+        System.out.println(sw.toString());
+
+        Thread.sleep(1000);
+    }
+
+    @Test
+    public void verifyTypes()
+    {
+        AssertionError master = null;
+        for (TypeDef typeDef : allTypes)
+        {
+            try
+            {
+                assertEquals(typeDef.toString() + "\n typeString vs type\n", typeDef.typeString, typeDef.type.toString());
+                assertEquals(typeDef.toString() + "\n typeString vs cqlType.getType()\n", typeDef.typeString, typeDef.cqlType.getType().toString());
+                AbstractType<?> expanded = typeDef.type.expandUserTypes();
+                CQL3Type expandedCQL = expanded.asCQL3Type();
+                // Note: cannot include this commented-out assertion, because the parsed CQL3Type instance for
+                // 'frozen<list<tuple<text, text>>>' returns 'frozen<list<frozen<tuple<text, text>>>>' via it's CQL3Type.toString()
+                // implementation.
+                assertEquals(typeDef.toString() + "\n droppedCqlType\n", typeDef.droppedCqlType, expandedCQL);
+                assertEquals(typeDef.toString() + "\n droppedCqlTypeString\n", typeDef.droppedCqlTypeString, expandedCQL.toString());
+                assertEquals(typeDef.toString() + "\n multiCell\n", typeDef.type.isMultiCell(), typeDef.droppedType.isMultiCell());
+
+                AbstractType<?> parsedType = TypeParser.parse(typeDef.typeString);
+                assertEquals(typeDef.toString(), typeDef.typeString, parsedType.toString());
+            }
+            catch (AssertionError ae)
+            {
+                if (master == null)
+                    master = ae;
+                else
+                    master.addSuppressed(ae);
+            }
+        }
+        if (master != null)
+            throw master;
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org