You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sn...@apache.org on 2020/01/31 09:10:27 UTC

[cassandra] 01/02: C* 3.0 sstables w/ UDTs are corrupted in C* 3.11 and 4.0

This is an automated email from the ASF dual-hosted git repository.

snazy pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit 4f27a37d7dd2750cc25261773a67ee8b4a07142c
Author: Robert Stupp <sn...@snazy.de>
AuthorDate: Fri Feb 15 14:24:39 2019 +0100

    C* 3.0 sstables w/ UDTs are corrupted in C* 3.11 and 4.0
    
    patch by Robert Stupp; reviewed by Brandon Williams for CASSANDRA-15035
---
 CHANGES.txt                                        |   1 +
 NEWS.txt                                           |  18 +
 src/java/org/apache/cassandra/cql3/CQL3Type.java   |   4 +-
 .../apache/cassandra/db/SerializationHeader.java   |  12 +
 .../org/apache/cassandra/db/SystemKeyspace.java    |   2 +
 .../org/apache/cassandra/db/rows/AbstractCell.java |  15 +-
 .../org/apache/cassandra/db/rows/AbstractRow.java  |  22 +-
 .../db/rows/AbstractTypeVersionComparator.java     | 121 ---
 .../db/rows/ColumnMetadataVersionComparator.java   |  85 ++
 src/java/org/apache/cassandra/db/rows/Row.java     |   2 +-
 src/java/org/apache/cassandra/db/rows/Rows.java    |   2 +-
 .../cassandra/io/sstable/SSTableHeaderFix.java     | 918 ++++++++++++++++++++
 .../cassandra/io/sstable/format/SSTableReader.java |   5 +
 .../io/sstable/metadata/IMetadataSerializer.java   |   5 +
 .../io/sstable/metadata/MetadataSerializer.java    |   2 +-
 .../apache/cassandra/service/CassandraDaemon.java  |   3 +
 .../apache/cassandra/tools/StandaloneScrubber.java | 144 ++-
 .../org/apache/cassandra/utils/FBUtilities.java    |  12 +
 ...va => ColumnMetadataVersionComparatorTest.java} |  29 +-
 .../cassandra/io/sstable/SSTableHeaderFixTest.java | 964 +++++++++++++++++++++
 .../schema/TupleTypesRepresentationTest.java       | 403 +++++++++
 21 files changed, 2621 insertions(+), 148 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 98c189a..9554a00 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -25,6 +25,7 @@
    otherwise synchronize their clocks, and that clocks are mostly in sync, since
    this is a requirement for general correctness of last write wins. (CASSANDRA-15216)
 Merged from 3.11:
+ * Fix bad UDT sstable metadata serialization headers written by C* 3.0 on upgrade and in sstablescrub (CASSANDRA-15035)
  * Fix nodetool compactionstats showing extra pending task for TWCS - patch implemented (CASSANDRA-15409)
  * Fix SELECT JSON formatting for the "duration" type (CASSANDRA-15075)
  * Update nodetool help stop output (CASSANDRA-15401)
diff --git a/NEWS.txt b/NEWS.txt
index 9c6b43f..7d716fc 100644
--- a/NEWS.txt
+++ b/NEWS.txt
@@ -113,6 +113,24 @@ New features
 
 Upgrading
 ---------
+    - Sstables for tables using with a frozen UDT written by C* 3.0 appear as corrupted.
+
+      Background: The serialization-header in the -Statistics.db sstable component contains the type information
+      of the table columns. C* 3.0 write incorrect type information for frozen UDTs by omitting the
+      "frozen" information. Non-frozen UDTs were introduced by CASSANDRA-7423 in C* 3.6. Since then, the missing
+      "frozen" information leads to deserialization issues that result in CorruptSSTableExceptions, potentially other
+      exceptions as well.
+
+      As a mitigation, the sstable serialization-headers are rewritten to contain the missing "frozen" information for
+      UDTs once, when an upgrade from C* 3.0 is detected. This migration does not touch snapshots or backups.
+
+      The sstablescrub tool now performs a check of the sstable serialization-header against the schema. A mismatch of
+      the types in the serialization-header and the schema will cause sstablescrub to error out and stop by default.
+      See the new `-e` option. `-e off` disables the new validation code. `-e fix` or `-e fix-only`, e.g.
+      `sstablescrub -e fix keyspace table`, will validate the serialization-header, rewrite the non-frozen UDTs
+      in the serialzation-header to frozen UDTs, if that matches the schema, and continue with scrub.
+      See `sstablescrub -h`.
+      (CASSANDRA-15035)
     - CASSANDRA-13241 lowered the default chunk_lengh_in_kb for compresesd tables from
       64kb to 16kb. For highly compressible data this can have a noticeable impact
       on space utilization. You may want to consider manually specifying this value.
diff --git a/src/java/org/apache/cassandra/cql3/CQL3Type.java b/src/java/org/apache/cassandra/cql3/CQL3Type.java
index 340a992..ee2db68 100644
--- a/src/java/org/apache/cassandra/cql3/CQL3Type.java
+++ b/src/java/org/apache/cassandra/cql3/CQL3Type.java
@@ -704,8 +704,10 @@ public interface CQL3Type
             {
                 if (innerType instanceof RawCollection)
                     throw new InvalidRequestException("Non-frozen collections are not allowed inside collections: " + this);
-                else
+                else if (innerType.isUDT())
                     throw new InvalidRequestException("Non-frozen UDTs are not allowed inside collections: " + this);
+                else
+                    throw new InvalidRequestException("Non-frozen tuples are not allowed inside collections: " + this);
             }
 
             public boolean referencesUserType(String name)
diff --git a/src/java/org/apache/cassandra/db/SerializationHeader.java b/src/java/org/apache/cassandra/db/SerializationHeader.java
index 2e5211c..15ef268 100644
--- a/src/java/org/apache/cassandra/db/SerializationHeader.java
+++ b/src/java/org/apache/cassandra/db/SerializationHeader.java
@@ -288,6 +288,18 @@ public class SerializationHeader
             this.stats = stats;
         }
 
+        /**
+         * <em>Only</em> exposed for {@link org.apache.cassandra.io.sstable.SSTableHeaderFix}.
+         */
+        public static Component buildComponentForTools(AbstractType<?> keyType,
+                                                       List<AbstractType<?>> clusteringTypes,
+                                                       Map<ByteBuffer, AbstractType<?>> staticColumns,
+                                                       Map<ByteBuffer, AbstractType<?>> regularColumns,
+                                                       EncodingStats stats)
+        {
+            return new Component(keyType, clusteringTypes, staticColumns, regularColumns, stats);
+        }
+
         public MetadataType getType()
         {
             return MetadataType.HEADER;
diff --git a/src/java/org/apache/cassandra/db/SystemKeyspace.java b/src/java/org/apache/cassandra/db/SystemKeyspace.java
index 0d79ae9..c427c8f 100644
--- a/src/java/org/apache/cassandra/db/SystemKeyspace.java
+++ b/src/java/org/apache/cassandra/db/SystemKeyspace.java
@@ -1400,6 +1400,8 @@ public final class SystemKeyspace
         String previous = getPreviousVersionString();
         String next = FBUtilities.getReleaseVersionString();
 
+        FBUtilities.setPreviousReleaseVersionString(previous);
+
         // if we're restarting after an upgrade, snapshot the system and schema keyspaces
         if (!previous.equals(NULL_VERSION.toString()) && !previous.equals(next))
 
diff --git a/src/java/org/apache/cassandra/db/rows/AbstractCell.java b/src/java/org/apache/cassandra/db/rows/AbstractCell.java
index 51c9ff4..3f2da96 100644
--- a/src/java/org/apache/cassandra/db/rows/AbstractCell.java
+++ b/src/java/org/apache/cassandra/db/rows/AbstractCell.java
@@ -28,6 +28,7 @@ import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.db.marshal.CollectionType;
 import org.apache.cassandra.schema.ColumnMetadata;
 import org.apache.cassandra.serializers.MarshalException;
+import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.memory.AbstractAllocator;
 
 /**
@@ -204,7 +205,19 @@ public abstract class AbstractCell extends Cell
         if (isTombstone())
             return String.format("[%s=<tombstone> %s]", column().name, livenessInfoString());
         else
-            return String.format("[%s=%s %s]", column().name, type.getString(value()), livenessInfoString());
+            return String.format("[%s=%s %s]", column().name, safeToString(type, value()), livenessInfoString());
+    }
+
+    private static String safeToString(AbstractType<?> type, ByteBuffer data)
+    {
+        try
+        {
+            return type.getString(data);
+        }
+        catch (Exception e)
+        {
+            return "0x" + ByteBufferUtil.bytesToHex(data);
+        }
     }
 
     private String livenessInfoString()
diff --git a/src/java/org/apache/cassandra/db/rows/AbstractRow.java b/src/java/org/apache/cassandra/db/rows/AbstractRow.java
index 2018d4e..957ffd4 100644
--- a/src/java/org/apache/cassandra/db/rows/AbstractRow.java
+++ b/src/java/org/apache/cassandra/db/rows/AbstractRow.java
@@ -77,15 +77,31 @@ public abstract class AbstractRow implements Row
         {
             ByteBuffer value = clustering.get(i);
             if (value != null)
-                metadata.comparator.subtype(i).validate(value);
+            {
+                try
+                {
+                    metadata.comparator.subtype(i).validate(value);
+                }
+                catch (Exception e)
+                {
+                    throw new MarshalException("comparator #" + i + " '" + metadata.comparator.subtype(i) + "' in '" + metadata + "' didn't validate", e);
+                }
+            }
         }
 
         primaryKeyLivenessInfo().validate();
         if (deletion().time().localDeletionTime() < 0)
-            throw new MarshalException("A local deletion time should not be negative");
+            throw new MarshalException("A local deletion time should not be negative in '" + metadata + "'");
 
         for (ColumnData cd : this)
-            cd.validate();
+            try
+            {
+                cd.validate();
+            }
+            catch (Exception e)
+            {
+                throw new MarshalException("data for '" + cd.column.debugString() + "', " + cd + " in '" + metadata + "' didn't validate", e);
+            }
     }
 
     public boolean hasInvalidDeletions()
diff --git a/src/java/org/apache/cassandra/db/rows/AbstractTypeVersionComparator.java b/src/java/org/apache/cassandra/db/rows/AbstractTypeVersionComparator.java
deleted file mode 100644
index e47f681..0000000
--- a/src/java/org/apache/cassandra/db/rows/AbstractTypeVersionComparator.java
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db.rows;
-
-import java.util.Comparator;
-import java.util.List;
-
-import org.apache.cassandra.db.marshal.*;
-
-/**
- * A {@code Comparator} use to determine which version of a type should be used.
- * <p>In the case of UDTs it is possible to have 2 versions or more of the same type, if some fields has been added to
- * the type. To avoid problems the latest type need to be used.</p>
- */
-final class AbstractTypeVersionComparator implements Comparator<AbstractType<?>>
-{
-    public static final Comparator<AbstractType<?>> INSTANCE = new AbstractTypeVersionComparator();
-
-    private AbstractTypeVersionComparator()
-    {
-    }
-
-    @Override
-    public int compare(AbstractType<?> type, AbstractType<?> otherType)
-    {
-        if (!type.getClass().equals(otherType.getClass()))
-            throw new IllegalArgumentException(String.format("Trying to compare 2 different types: %s and %s",
-                                                             type,
-                                                             otherType));
-
-        if (type.equals(otherType))
-            return 0;
-
-        // The only case where 2 types can differ is if they contains some UDTs and one of them has more
-        // fields (due to an ALTER type ADD) than in the other type. In this case we need to pick the type with
-        // the bigger amount of fields.
-        if (type.isUDT())
-            return compareUserType((UserType) type, (UserType) otherType);
-
-        if (type.isTuple())
-            return compareTuple((TupleType) type, (TupleType) otherType);
-
-        if (type.isCollection())
-            return compareCollectionTypes(type, otherType);
-
-        if (type instanceof CompositeType)
-            return compareCompositeTypes((CompositeType) type, (CompositeType) otherType);
-
-        // In theory we should never reach that point but to be on the safe side we allow it.
-        return 0;
-    }
-
-    private int compareCompositeTypes(CompositeType type, CompositeType otherType)
-    {
-        List<AbstractType<?>> types = type.getComponents();
-        List<AbstractType<?>> otherTypes = otherType.getComponents();
-
-        if (types.size() != otherTypes.size())
-            return Integer.compare(types.size(), otherTypes.size());
-
-        for (int i = 0, m = type.componentsCount(); i < m ; i++)
-        {
-            int test = compare(types.get(i), otherTypes.get(i));
-            if (test != 0);
-                return test;
-        }
-        return 0;
-    }
-
-    private int compareCollectionTypes(AbstractType<?> type, AbstractType<?> otherType)
-    {
-        if (type instanceof MapType)
-            return compareMapType((MapType<?, ?>) type, (MapType<?, ?>) otherType);
-
-        if (type instanceof SetType)
-            return compare(((SetType<?>) type).getElementsType(), ((SetType<?>) otherType).getElementsType());
-
-        return compare(((ListType<?>) type).getElementsType(), ((ListType<?>) otherType).getElementsType());
-    }
-
-    private int compareMapType(MapType<?, ?> type, MapType<?, ?> otherType)
-    {
-        int test = compare(type.getKeysType(), otherType.getKeysType());
-        return test != 0 ? test : compare(type.getValuesType(), otherType.getValuesType());
-    }
-
-    private int compareUserType(UserType type, UserType otherType)
-    {
-        return compareTuple(type, otherType);
-    }
-
-    private int compareTuple(TupleType type, TupleType otherType)
-    {
-        if (type.size() != otherType.size())
-            return Integer.compare(type.size(), otherType.size());
-
-        int test = 0;
-        int i = 0;
-        while (test == 0 && i < type.size())
-        {
-            test = compare(type.type(i), otherType.type(i));
-            i++;
-        }
-        return test;
-    }
-}
diff --git a/src/java/org/apache/cassandra/db/rows/ColumnMetadataVersionComparator.java b/src/java/org/apache/cassandra/db/rows/ColumnMetadataVersionComparator.java
new file mode 100644
index 0000000..6b2d97c
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/rows/ColumnMetadataVersionComparator.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.rows;
+
+import java.util.Comparator;
+
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.schema.ColumnMetadata;
+
+/**
+ * A {@code Comparator} use to determine which version of a {@link ColumnMetadata} should be used.
+ * <p>
+ * We can sometimes get 2 different versions of the definition of a give column due to differing types. This can happen
+ * in at least 2 cases:
+ * <ul>
+ *     <li>for UDT, where new fields can be added (see CASSANDRA-13776).</li>
+ *     <li>pre-CASSANDRA-12443, when we allowed type altering. And while we don't allow it anymore, it is possible
+ *     to still have sstables with metadata mentioning an old pre-altering type (such old version of pre-altering
+ *     types will be eventually eliminated from the system by compaction and thanks to this comparator, but we
+ *     cannot guarantee when that's fully done).</li>
+ * </ul>
+ */
+final class ColumnMetadataVersionComparator implements Comparator<ColumnMetadata>
+{
+    public static final Comparator<ColumnMetadata> INSTANCE = new ColumnMetadataVersionComparator();
+
+    private ColumnMetadataVersionComparator()
+    {
+    }
+
+    @Override
+    public int compare(ColumnMetadata v1, ColumnMetadata v2)
+    {
+        assert v1.ksName.equals(v2.ksName)
+               && v1.cfName.equals(v2.cfName)
+               && v1.name.equals(v2.name) : v1.debugString() + " != " + v2.debugString();
+
+        AbstractType<?> v1Type = v1.type;
+        AbstractType<?> v2Type = v2.type;
+
+        // In most cases, this is used on equal types, and on most types, equality is cheap (most are singleton classes
+        // and just use reference equality), so evacuating that case first.
+        if (v1Type.equals(v2Type))
+            return 0;
+
+        // If those aren't the same type, one must be "more general" than the other, that is accept strictly more values.
+        if (v1Type.isValueCompatibleWith(v2Type))
+        {
+            // Note: if both accept the same values, there is really no good way to prefer one over the other and so we
+            // consider them equal here. In practice, this mean we have 2 types that accepts the same values but are
+            // not equal. For internal types, TimestampType/DataType/LongType is, afaik, the only example, but as user
+            // can write custom types, who knows when this can happen. But excluding any user custom type weirdness
+            // (that would really be a bug of their type), such types should only differ in the way they sort, and as
+            // this method is only used for regular/static columns in practice, where sorting has no impact whatsoever,
+            // it shouldn't matter too much what we return here.
+            return v2Type.isValueCompatibleWith(v1Type) ? 0 : 1;
+        }
+        else if (v2Type.isValueCompatibleWith(v1Type))
+        {
+            return -1;
+        }
+        else
+        {
+            // Neither is a super type of the other: something is pretty wrong and we probably shouldn't ignore it.
+            throw new IllegalArgumentException(String.format("Found 2 incompatible versions of column %s in %s.%s: one " +
+                                                             "of type %s and one of type %s (but both types are incompatible)",
+                                                             v1.name, v1.ksName, v1.cfName, v1Type, v2Type));
+        }
+    }
+}
diff --git a/src/java/org/apache/cassandra/db/rows/Row.java b/src/java/org/apache/cassandra/db/rows/Row.java
index 6f0b43e..2f752b8 100644
--- a/src/java/org/apache/cassandra/db/rows/Row.java
+++ b/src/java/org/apache/cassandra/db/rows/Row.java
@@ -764,7 +764,7 @@ public interface Row extends Unfiltered, Iterable<ColumnData>
                 if (column == null)
                     return true;
 
-                return AbstractTypeVersionComparator.INSTANCE.compare(column.type, dataColumn.type) < 0;
+                return ColumnMetadataVersionComparator.INSTANCE.compare(column, dataColumn) < 0;
             }
 
             @SuppressWarnings("resource")
diff --git a/src/java/org/apache/cassandra/db/rows/Rows.java b/src/java/org/apache/cassandra/db/rows/Rows.java
index c0c84b6..d62d3b5 100644
--- a/src/java/org/apache/cassandra/db/rows/Rows.java
+++ b/src/java/org/apache/cassandra/db/rows/Rows.java
@@ -410,7 +410,7 @@ public abstract class Rows
         if (curb == null)
             return cura.column;
 
-        if (AbstractTypeVersionComparator.INSTANCE.compare(cura.column.type, curb.column.type) >= 0)
+        if (ColumnMetadataVersionComparator.INSTANCE.compare(cura.column, curb.column) >= 0)
             return cura.column;
 
         return curb.column;
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableHeaderFix.java b/src/java/org/apache/cassandra/io/sstable/SSTableHeaderFix.java
new file mode 100644
index 0000000..3577259
--- /dev/null
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableHeaderFix.java
@@ -0,0 +1,918 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.io.sstable;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.charset.CharacterCodingException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.EnumSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.cql3.CQL3Type;
+import org.apache.cassandra.cql3.statements.schema.IndexTarget;
+import org.apache.cassandra.db.Directories;
+import org.apache.cassandra.db.SerializationHeader;
+import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
+import org.apache.cassandra.db.marshal.AbstractCompositeType;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.marshal.CollectionType;
+import org.apache.cassandra.db.marshal.CompositeType;
+import org.apache.cassandra.db.marshal.DynamicCompositeType;
+import org.apache.cassandra.db.marshal.ListType;
+import org.apache.cassandra.db.marshal.MapType;
+import org.apache.cassandra.db.marshal.SetType;
+import org.apache.cassandra.db.marshal.TupleType;
+import org.apache.cassandra.db.marshal.UserType;
+import org.apache.cassandra.io.sstable.metadata.MetadataComponent;
+import org.apache.cassandra.io.sstable.metadata.MetadataType;
+import org.apache.cassandra.schema.ColumnMetadata;
+import org.apache.cassandra.schema.IndexMetadata;
+import org.apache.cassandra.schema.Schema;
+import org.apache.cassandra.schema.SchemaConstants;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.CassandraVersion;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.Pair;
+
+/**
+ * Validates and fixes type issues in the serialization-header of sstables.
+ */
+public abstract class SSTableHeaderFix
+{
+    // C* 3.0 upgrade code
+
+    private static final String SKIPAUTOMATICUDTFIX = "cassandra.skipautomaticudtfix";
+    private static final boolean SKIP_AUTOMATIC_FIX_ON_UPGRADE = Boolean.getBoolean(SKIPAUTOMATICUDTFIX);
+
+    public static void fixNonFrozenUDTIfUpgradeFrom30()
+    {
+        String previousVersionString = FBUtilities.getPreviousReleaseVersionString();
+        if (previousVersionString == null)
+            return;
+        CassandraVersion previousVersion = new CassandraVersion(previousVersionString);
+        if (previousVersion.major != 3 || previousVersion.minor > 0)
+        {
+            // Not an upgrade from 3.0 to 3.x, nothing to do here
+            return;
+        }
+
+        if (SKIP_AUTOMATIC_FIX_ON_UPGRADE)
+        {
+            logger.warn("Detected upgrade from {} to {}, but -D{}=true, NOT fixing UDT type references in " +
+                        "sstable metadata serialization-headers",
+                        previousVersionString,
+                        FBUtilities.getReleaseVersionString(),
+                        SKIPAUTOMATICUDTFIX);
+            return;
+        }
+
+        logger.info("Detected upgrade from {} to {}, fixing UDT type references in sstable metadata serialization-headers",
+                    previousVersionString,
+                    FBUtilities.getReleaseVersionString());
+
+        SSTableHeaderFix instance = SSTableHeaderFix.builder()
+                                                    .schemaCallback(() -> Schema.instance::getTableMetadata)
+                                                    .build();
+        instance.execute();
+    }
+
+    // "regular" SSTableHeaderFix code, also used by StandaloneScrubber.
+
+    private static final Logger logger = LoggerFactory.getLogger(SSTableHeaderFix.class);
+
+    protected final Consumer<String> info;
+    protected final Consumer<String> warn;
+    protected final Consumer<String> error;
+    protected final boolean dryRun;
+    protected final Function<Descriptor, TableMetadata> schemaCallback;
+
+    private final List<Descriptor> descriptors;
+
+    private final List<Pair<Descriptor, Map<MetadataType, MetadataComponent>>> updates = new ArrayList<>();
+    private boolean hasErrors;
+
+    SSTableHeaderFix(Builder builder)
+    {
+        this.info = builder.info;
+        this.warn = builder.warn;
+        this.error = builder.error;
+        this.dryRun = builder.dryRun;
+        this.schemaCallback = builder.schemaCallback.get();
+        this.descriptors = new ArrayList<>(builder.descriptors);
+        Objects.requireNonNull(this.info, "info is null");
+        Objects.requireNonNull(this.warn, "warn is null");
+        Objects.requireNonNull(this.error, "error is null");
+        Objects.requireNonNull(this.schemaCallback, "schemaCallback is null");
+    }
+
+    public static Builder builder()
+    {
+        return new Builder();
+    }
+
+    /**
+     * Builder to configure and construct an instance of {@link SSTableHeaderFix}.
+     * Default settings:
+     * <ul>
+     *     <li>log via the slf4j logger of {@link SSTableHeaderFix}</li>
+     *     <li>no dry-run (i.e. validate and fix, if no serious errors are detected)</li>
+     *     <li>no schema callback</li>
+     * </ul>
+     * If neither {@link #withDescriptor(Descriptor)} nor {@link #withPath(Path)} are used,
+     * all "live" sstables in all data directories will be scanned.
+     */
+    public static class Builder
+    {
+        private final List<Path> paths = new ArrayList<>();
+        private final List<Descriptor> descriptors = new ArrayList<>();
+        private Consumer<String> info = (ln) -> logger.info("{}", ln);
+        private Consumer<String> warn = (ln) -> logger.warn("{}", ln);
+        private Consumer<String> error = (ln) -> logger.error("{}", ln);
+        private boolean dryRun;
+        private Supplier<Function<Descriptor, TableMetadata>> schemaCallback = () -> null;
+
+        private Builder()
+        {}
+
+        /**
+         * Only validate and prepare fix, but do not write updated (fixed) sstable serialization-headers.
+         */
+        public Builder dryRun()
+        {
+            dryRun = true;
+            return this;
+        }
+
+        public Builder info(Consumer<String> output)
+        {
+            this.info = output;
+            return this;
+        }
+
+        public Builder warn(Consumer<String> warn)
+        {
+            this.warn = warn;
+            return this;
+        }
+
+        public Builder error(Consumer<String> error)
+        {
+            this.error = error;
+            return this;
+        }
+
+        /**
+         * Manually provide an individual sstable or directory containing sstables.
+         *
+         * Implementation note: procesing "live" sstables in their data directories as well as sstables
+         * in snapshots and backups in the data directories works.
+         *
+         * But processing sstables that reside somewhere else (i.e. verifying sstables before import)
+         * requires the use of {@link #withDescriptor(Descriptor)}.
+         */
+        public Builder withPath(Path path)
+        {
+            this.paths.add(path);
+            return this;
+        }
+
+        public Builder withDescriptor(Descriptor descriptor)
+        {
+            this.descriptors.add(descriptor);
+            return this;
+        }
+
+        /**
+         * Schema callback to retrieve the schema of a table. Production code always delegates to the
+         * live schema ({@code Schema.instance}). Unit tests use this method to feed a custom schema.
+         */
+        public Builder schemaCallback(Supplier<Function<Descriptor, TableMetadata>> schemaCallback)
+        {
+            this.schemaCallback = schemaCallback;
+            return this;
+        }
+
+        public SSTableHeaderFix build()
+        {
+            if (paths.isEmpty() && descriptors.isEmpty())
+                return new AutomaticHeaderFix(this);
+
+            return new ManualHeaderFix(this);
+        }
+
+        public Builder logToList(List<String> output)
+        {
+            return info(ln -> output.add("INFO  " + ln))
+                   .warn(ln -> output.add("WARN  " + ln))
+                   .error(ln -> output.add("ERROR " + ln));
+        }
+    }
+
+    public final void execute()
+    {
+        prepare();
+
+        logger.debug("Processing {} sstables:{}",
+                     descriptors.size(),
+                     descriptors.stream().map(Descriptor::toString).collect(Collectors.joining("\n    ", "\n    ", "")));
+
+        descriptors.forEach(this::processSSTable);
+
+        if (updates.isEmpty())
+            return;
+
+        if (hasErrors)
+        {
+            info.accept("Stopping due to previous errors. Either fix the errors or specify the ignore-errors option.");
+            return;
+        }
+
+        if (dryRun)
+        {
+            info.accept("Not fixing identified and fixable serialization-header issues.");
+            return;
+        }
+
+        info.accept("Writing new metadata files");
+        updates.forEach(descAndMeta -> writeNewMetadata(descAndMeta.left, descAndMeta.right));
+        info.accept("Finished writing new metadata files");
+    }
+
+    /**
+     * Whether {@link #execute()} encountered an error.
+     */
+    public boolean hasError()
+    {
+        return hasErrors;
+    }
+
+    /**
+     * Whether {@link #execute()} found mismatches.
+     */
+    public boolean hasChanges()
+    {
+        return !updates.isEmpty();
+    }
+
+    abstract void prepare();
+
+    private void error(String format, Object... args)
+    {
+        hasErrors = true;
+        error.accept(String.format(format, args));
+    }
+
+    void processFileOrDirectory(Path path)
+    {
+        Stream.of(path)
+              .flatMap(SSTableHeaderFix::maybeExpandDirectory)
+              .filter(p -> Descriptor.fromFilenameWithComponent(p.toFile()).right.type == Component.Type.DATA)
+              .map(Path::toString)
+              .map(Descriptor::fromFilename)
+              .forEach(descriptors::add);
+    }
+
+    private static Stream<Path> maybeExpandDirectory(Path path)
+    {
+        if (Files.isRegularFile(path))
+            return Stream.of(path);
+        return LifecycleTransaction.getFiles(path, (file, fileType) -> fileType == Directories.FileType.FINAL, Directories.OnTxnErr.IGNORE)
+                                   .stream()
+                                   .map(File::toPath);
+    }
+
+    private void processSSTable(Descriptor desc)
+    {
+        if (desc.cfname.indexOf('.') != -1)
+        {
+            // secondary index not checked
+
+            // partition-key is the indexed column type
+            // clustering-key is org.apache.cassandra.db.marshal.PartitionerDefinedOrder
+            // no static columns, no regular columns
+            return;
+        }
+
+        TableMetadata tableMetadata = schemaCallback.apply(desc);
+        if (tableMetadata == null)
+        {
+            error("Table %s.%s not found in the schema - NOT checking sstable %s", desc.ksname, desc.cfname, desc);
+            return;
+        }
+
+        Set<Component> components = SSTable.discoverComponentsFor(desc);
+        if (components.stream().noneMatch(c -> c.type == Component.Type.STATS))
+        {
+            error("sstable %s has no -Statistics.db component.", desc);
+            return;
+        }
+
+        Map<MetadataType, MetadataComponent> metadata = readSSTableMetadata(desc);
+        if (metadata == null)
+            return;
+
+        MetadataComponent component = metadata.get(MetadataType.HEADER);
+        if (!(component instanceof SerializationHeader.Component))
+        {
+            error("sstable %s: Expected %s, but got %s from metadata.get(MetadataType.HEADER)",
+                  desc,
+                  SerializationHeader.Component.class.getName(),
+                  component != null ? component.getClass().getName() : "'null'");
+            return;
+        }
+        SerializationHeader.Component header = (SerializationHeader.Component) component;
+
+        // check partition key type
+        AbstractType<?> keyType = validatePartitionKey(desc, tableMetadata, header);
+
+        // check clustering columns
+        List<AbstractType<?>> clusteringTypes = validateClusteringColumns(desc, tableMetadata, header);
+
+        // check static and regular columns
+        Map<ByteBuffer, AbstractType<?>> staticColumns = validateColumns(desc, tableMetadata, header.getStaticColumns(), ColumnMetadata.Kind.STATIC);
+        Map<ByteBuffer, AbstractType<?>> regularColumns = validateColumns(desc, tableMetadata, header.getRegularColumns(), ColumnMetadata.Kind.REGULAR);
+
+        SerializationHeader.Component newHeader = SerializationHeader.Component.buildComponentForTools(keyType,
+                                                                                                       clusteringTypes,
+                                                                                                       staticColumns,
+                                                                                                       regularColumns,
+                                                                                                       header.getEncodingStats());
+
+        // SerializationHeader.Component has no equals(), but a "good" toString()
+        if (header.toString().equals(newHeader.toString()))
+            return;
+
+        Map<MetadataType, MetadataComponent> newMetadata = new LinkedHashMap<>(metadata);
+        newMetadata.put(MetadataType.HEADER, newHeader);
+
+        updates.add(Pair.create(desc, newMetadata));
+    }
+
+    private AbstractType<?> validatePartitionKey(Descriptor desc, TableMetadata tableMetadata, SerializationHeader.Component header)
+    {
+        boolean keyMismatch = false;
+        AbstractType<?> headerKeyType = header.getKeyType();
+        AbstractType<?> schemaKeyType = tableMetadata.partitionKeyType;
+        boolean headerKeyComposite = headerKeyType instanceof CompositeType;
+        boolean schemaKeyComposite = schemaKeyType instanceof CompositeType;
+        if (headerKeyComposite != schemaKeyComposite)
+        {
+            // one is a composite partition key, the other is not - very suspicious
+            keyMismatch = true;
+        }
+        else if (headerKeyComposite) // && schemaKeyComposite
+        {
+            // Note, the logic is similar as just calling 'fixType()' using the composite partition key,
+            // but the log messages should use the composite partition key column names.
+            List<AbstractType<?>> headerKeyComponents = ((CompositeType) headerKeyType).types;
+            List<AbstractType<?>> schemaKeyComponents = ((CompositeType) schemaKeyType).types;
+            if (headerKeyComponents.size() != schemaKeyComponents.size())
+            {
+                // different number of components in composite partition keys - very suspicious
+                keyMismatch = true;
+                // Just use the original type from the header. Since the number of partition key components
+                // don't match, there's nothing to meaningfully validate against.
+            }
+            else
+            {
+                // fix components in composite partition key, if necessary
+                List<AbstractType<?>> newComponents = new ArrayList<>(schemaKeyComponents.size());
+                for (int i = 0; i < schemaKeyComponents.size(); i++)
+                {
+                    AbstractType<?> headerKeyComponent = headerKeyComponents.get(i);
+                    AbstractType<?> schemaKeyComponent = schemaKeyComponents.get(i);
+                    AbstractType<?> fixedType = fixType(desc,
+                                                        tableMetadata.partitionKeyColumns().get(i).name.bytes,
+                                                        headerKeyComponent,
+                                                        schemaKeyComponent,
+                                                        false);
+                    if (fixedType == null)
+                        keyMismatch = true;
+                    else
+                        headerKeyComponent = fixedType;
+                    newComponents.add(fixType(desc,
+                                              tableMetadata.partitionKeyColumns().get(i).name.bytes,
+                                              headerKeyComponent,
+                                              schemaKeyComponent,
+                                              false));
+                }
+                headerKeyType = CompositeType.getInstance(newComponents);
+            }
+        }
+        else
+        {
+            // fix non-composite partition key, if necessary
+            AbstractType<?> fixedType = fixType(desc, tableMetadata.partitionKeyColumns().get(0).name.bytes, headerKeyType, schemaKeyType, false);
+            if (fixedType == null)
+                // non-composite partition key doesn't match and cannot be fixed
+                keyMismatch = true;
+            else
+                headerKeyType = fixedType;
+        }
+        if (keyMismatch)
+            error("sstable %s: Mismatch in partition key type between sstable serialization-header and schema (%s vs %s)",
+                  desc,
+                  headerKeyType.asCQL3Type(),
+                  schemaKeyType.asCQL3Type());
+        return headerKeyType;
+    }
+
+    private List<AbstractType<?>> validateClusteringColumns(Descriptor desc, TableMetadata tableMetadata, SerializationHeader.Component header)
+    {
+        List<AbstractType<?>> headerClusteringTypes = header.getClusteringTypes();
+        List<AbstractType<?>> clusteringTypes = new ArrayList<>();
+        boolean clusteringMismatch = false;
+        List<ColumnMetadata> schemaClustering = tableMetadata.clusteringColumns();
+        if (schemaClustering.size() != headerClusteringTypes.size())
+        {
+            clusteringMismatch = true;
+            // Just use the original types. Since the number of clustering columns don't match, there's nothing to
+            // meaningfully validate against.
+            clusteringTypes.addAll(headerClusteringTypes);
+        }
+        else
+        {
+            for (int i = 0; i < headerClusteringTypes.size(); i++)
+            {
+                AbstractType<?> headerType = headerClusteringTypes.get(i);
+                ColumnMetadata column = schemaClustering.get(i);
+                AbstractType<?> schemaType = column.type;
+                AbstractType<?> fixedType = fixType(desc, column.name.bytes, headerType, schemaType, false);
+                if (fixedType == null)
+                    clusteringMismatch = true;
+                else
+                    headerType = fixedType;
+                clusteringTypes.add(headerType);
+            }
+        }
+        if (clusteringMismatch)
+            error("sstable %s: mismatch in clustering columns between sstable serialization-header and schema (%s vs %s)",
+                  desc,
+                  headerClusteringTypes.stream().map(AbstractType::asCQL3Type).map(CQL3Type::toString).collect(Collectors.joining(",")),
+                  schemaClustering.stream().map(cd -> cd.type.asCQL3Type().toString()).collect(Collectors.joining(",")));
+        return clusteringTypes;
+    }
+
+    private Map<ByteBuffer, AbstractType<?>> validateColumns(Descriptor desc, TableMetadata tableMetadata, Map<ByteBuffer, AbstractType<?>> columns, ColumnMetadata.Kind kind)
+    {
+        Map<ByteBuffer, AbstractType<?>> target = new LinkedHashMap<>();
+        for (Map.Entry<ByteBuffer, AbstractType<?>> nameAndType : columns.entrySet())
+        {
+            ByteBuffer name = nameAndType.getKey();
+            AbstractType<?> type = nameAndType.getValue();
+
+            AbstractType<?> fixedType = validateColumn(desc, tableMetadata, kind, name, type);
+            if (fixedType == null)
+            {
+                error("sstable %s: contains column '%s' of type '%s', which could not be validated",
+                      desc,
+                      type,
+                      logColumnName(name));
+                // don't use a "null" type instance
+                fixedType = type;
+            }
+
+            target.put(name, fixedType);
+        }
+        return target;
+    }
+
+    private AbstractType<?> validateColumn(Descriptor desc, TableMetadata tableMetadata, ColumnMetadata.Kind kind, ByteBuffer name, AbstractType<?> type)
+    {
+        ColumnMetadata cd = tableMetadata.getColumn(name);
+        if (cd == null)
+        {
+            // In case the column was dropped, there is not much that we can actually validate.
+            // The column could have been recreated using the same or a different kind or the same or
+            // a different type. Lottery...
+
+            cd = tableMetadata.getDroppedColumn(name, kind == ColumnMetadata.Kind.STATIC);
+            if (cd == null)
+            {
+                for (IndexMetadata indexMetadata : tableMetadata.indexes)
+                {
+                    String target = indexMetadata.options.get(IndexTarget.TARGET_OPTION_NAME);
+                    if (target != null && ByteBufferUtil.bytes(target).equals(name))
+                    {
+                        warn.accept(String.format("sstable %s: contains column '%s', which is not a column in the table '%s.%s', but a target for that table's index '%s'",
+                                desc,
+                                logColumnName(name),
+                                tableMetadata.keyspace,
+                                tableMetadata.name,
+                                indexMetadata.name));
+                        return type;
+                    }
+                }
+
+                warn.accept(String.format("sstable %s: contains column '%s', which is not present in the schema",
+                                          desc,
+                                          logColumnName(name)));
+            }
+            else
+            {
+                // This is a best-effort approach to handle the case of a UDT column created *AND* dropped in
+                // C* 3.0.
+                if (type instanceof UserType && cd.type instanceof TupleType)
+                {
+                    // At this point, we know that the type belongs to a dropped column, recorded with the
+                    // dropped column type "TupleType" and using "UserType" in the sstable. So it is very
+                    // likely, that this belongs to a dropped UDT. Fix that information to tuple-type.
+                    return fixType(desc, name, type, cd.type, true);
+                }
+            }
+
+            return type;
+        }
+
+        // At this point, the column name is known to be a "non-dropped" column in the table.
+        if (cd.kind != kind)
+            error("sstable %s: contains column '%s' as a %s column, but is of kind %s in the schema",
+                  desc,
+                  logColumnName(name),
+                  kind.name().toLowerCase(),
+                  cd.kind.name().toLowerCase());
+        else
+            type = fixType(desc, name, type, cd.type, false);
+        return type;
+    }
+
+    private AbstractType<?> fixType(Descriptor desc, ByteBuffer name, AbstractType<?> typeInHeader, AbstractType<?> typeInSchema, boolean droppedColumnMode)
+    {
+        AbstractType<?> fixedType = fixTypeInner(typeInHeader, typeInSchema, droppedColumnMode);
+        if (fixedType != null)
+        {
+            if (fixedType != typeInHeader)
+                info.accept(String.format("sstable %s: Column '%s' needs to be updated from type '%s' to '%s'",
+                                          desc,
+                                          logColumnName(name),
+                                          typeInHeader.asCQL3Type(),
+                                          fixedType.asCQL3Type()));
+            return fixedType;
+        }
+
+        error("sstable %s: contains column '%s' as type '%s', but schema mentions '%s'",
+              desc,
+              logColumnName(name),
+              typeInHeader.asCQL3Type(),
+              typeInSchema.asCQL3Type());
+
+        return typeInHeader;
+    }
+
+    private AbstractType<?> fixTypeInner(AbstractType<?> typeInHeader, AbstractType<?> typeInSchema, boolean droppedColumnMode)
+    {
+        if (typeEquals(typeInHeader, typeInSchema))
+            return typeInHeader;
+
+        if (typeInHeader instanceof CollectionType)
+            return fixTypeInnerCollection(typeInHeader, typeInSchema, droppedColumnMode);
+
+        if (typeInHeader instanceof AbstractCompositeType)
+            return fixTypeInnerAbstractComposite(typeInHeader, typeInSchema, droppedColumnMode);
+
+        if (typeInHeader instanceof TupleType)
+            return fixTypeInnerAbstractTuple(typeInHeader, typeInSchema, droppedColumnMode);
+
+        // all types, beside CollectionType + AbstractCompositeType + TupleType, should be ok (no nested types) - just check for compatibility
+        if (typeInHeader.isCompatibleWith(typeInSchema))
+            return typeInHeader;
+
+        return null;
+    }
+
+    private AbstractType<?> fixTypeInnerAbstractTuple(AbstractType<?> typeInHeader, AbstractType<?> typeInSchema, boolean droppedColumnMode)
+    {
+        // This first 'if' handles the case when a UDT has been dropped, as a dropped UDT is recorded as a tuple
+        // in dropped_columns. If a UDT is to be replaced with a tuple, then also do that for the inner UDTs.
+        if (droppedColumnMode && typeInHeader.getClass() == UserType.class && typeInSchema instanceof TupleType)
+            return fixTypeInnerUserTypeDropped((UserType) typeInHeader, (TupleType) typeInSchema);
+
+        if (typeInHeader.getClass() != typeInSchema.getClass())
+            return null;
+
+        if (typeInHeader.getClass() == UserType.class)
+            return fixTypeInnerUserType((UserType) typeInHeader, (UserType) typeInSchema);
+
+        if (typeInHeader.getClass() == TupleType.class)
+            return fixTypeInnerTuple((TupleType) typeInHeader, (TupleType) typeInSchema, droppedColumnMode);
+
+        throw new IllegalArgumentException("Unknown tuple type class " + typeInHeader.getClass().getName());
+    }
+
+    private AbstractType<?> fixTypeInnerCollection(AbstractType<?> typeInHeader, AbstractType<?> typeInSchema, boolean droppedColumnMode)
+    {
+        if (typeInHeader.getClass() != typeInSchema.getClass())
+            return null;
+
+        if (typeInHeader.getClass() == ListType.class)
+            return fixTypeInnerList((ListType<?>) typeInHeader, (ListType<?>) typeInSchema, droppedColumnMode);
+
+        if (typeInHeader.getClass() == SetType.class)
+            return fixTypeInnerSet((SetType<?>) typeInHeader, (SetType<?>) typeInSchema, droppedColumnMode);
+
+        if (typeInHeader.getClass() == MapType.class)
+            return fixTypeInnerMap((MapType<?, ?>) typeInHeader, (MapType<?, ?>) typeInSchema, droppedColumnMode);
+
+        throw new IllegalArgumentException("Unknown collection type class " + typeInHeader.getClass().getName());
+    }
+
+    private AbstractType<?> fixTypeInnerAbstractComposite(AbstractType<?> typeInHeader, AbstractType<?> typeInSchema, boolean droppedColumnMode)
+    {
+        if (typeInHeader.getClass() != typeInSchema.getClass())
+            return null;
+
+        if (typeInHeader.getClass() == CompositeType.class)
+            return fixTypeInnerComposite((CompositeType) typeInHeader, (CompositeType) typeInSchema, droppedColumnMode);
+
+        if (typeInHeader.getClass() == DynamicCompositeType.class)
+        {
+            // Not sure if we should care about UDTs in DynamicCompositeType at all...
+            if (!typeInHeader.isCompatibleWith(typeInSchema))
+                return null;
+
+            return typeInHeader;
+        }
+
+        throw new IllegalArgumentException("Unknown composite type class " + typeInHeader.getClass().getName());
+    }
+
+    private AbstractType<?> fixTypeInnerUserType(UserType cHeader, UserType cSchema)
+    {
+        if (!cHeader.keyspace.equals(cSchema.keyspace) || !cHeader.name.equals(cSchema.name))
+            // different UDT - bummer...
+            return null;
+
+        if (cHeader.isMultiCell() != cSchema.isMultiCell())
+        {
+            if (cHeader.isMultiCell() && !cSchema.isMultiCell())
+            {
+                // C* 3.0 writes broken SerializationHeader.Component instances - i.e. broken UDT type
+                // definitions into the sstable -Stats.db file, because 3.0 does not enclose frozen UDTs
+                // (and all UDTs in 3.0 were frozen) with an '' bracket. Since CASSANDRA-7423 (support
+                // for non-frozen UDTs, committed to C* 3.6), that frozen-bracket is quite important.
+                // Non-frozen (= multi-cell) UDTs are serialized in a fundamentally different way than
+                // frozen UDTs in sstables - most importantly, the order of serialized columns depends on
+                // the type: fixed-width types first, then variable length types (like frozen types),
+                // multi-cell types last. If C* >= 3.6 reads an sstable with a UDT that's written by
+                // C* < 3.6, a variety of CorruptSSTableExceptions get logged and clients will encounter
+                // read errors.
+                // At this point, we know that the type belongs to a "live" (non-dropped) column, so it
+                // is safe to correct the information from the header.
+                return cSchema;
+            }
+
+            // In all other cases, there's not much we can do.
+            return null;
+        }
+
+        return cHeader;
+    }
+
+    private AbstractType<?> fixTypeInnerUserTypeDropped(UserType cHeader, TupleType cSchema)
+    {
+        // Do not mess around with the UserType in the serialization header, if the column has been dropped.
+        // Only fix the multi-cell status when the header contains it as a multicell (non-frozen) UserType,
+        // but the schema says "frozen".
+        if (cHeader.isMultiCell() && !cSchema.isMultiCell())
+        {
+            return new UserType(cHeader.keyspace, cHeader.name, cHeader.fieldNames(), cHeader.fieldTypes(), cSchema.isMultiCell());
+        }
+
+        return cHeader;
+    }
+
+    private AbstractType<?> fixTypeInnerTuple(TupleType cHeader, TupleType cSchema, boolean droppedColumnMode)
+    {
+        if (cHeader.size() != cSchema.size())
+            // different number of components - bummer...
+            return null;
+        List<AbstractType<?>> cHeaderFixed = new ArrayList<>(cHeader.size());
+        boolean anyChanged = false;
+        for (int i = 0; i < cHeader.size(); i++)
+        {
+            AbstractType<?> cHeaderComp = cHeader.type(i);
+            AbstractType<?> cHeaderCompFixed = fixTypeInner(cHeaderComp, cSchema.type(i), droppedColumnMode);
+            if (cHeaderCompFixed == null)
+                // incompatible, bummer...
+                return null;
+            cHeaderFixed.add(cHeaderCompFixed);
+            anyChanged |= cHeaderComp != cHeaderCompFixed;
+        }
+        if (anyChanged || cSchema.isMultiCell() != cHeader.isMultiCell())
+            // TODO this should create a non-frozen tuple type for the sake of handling a dropped, non-frozen UDT
+            return new TupleType(cHeaderFixed);
+        return cHeader;
+    }
+
+    private AbstractType<?> fixTypeInnerComposite(CompositeType cHeader, CompositeType cSchema, boolean droppedColumnMode)
+    {
+        if (cHeader.types.size() != cSchema.types.size())
+            // different number of components - bummer...
+            return null;
+        List<AbstractType<?>> cHeaderFixed = new ArrayList<>(cHeader.types.size());
+        boolean anyChanged = false;
+        for (int i = 0; i < cHeader.types.size(); i++)
+        {
+            AbstractType<?> cHeaderComp = cHeader.types.get(i);
+            AbstractType<?> cHeaderCompFixed = fixTypeInner(cHeaderComp, cSchema.types.get(i), droppedColumnMode);
+            if (cHeaderCompFixed == null)
+                // incompatible, bummer...
+                return null;
+            cHeaderFixed.add(cHeaderCompFixed);
+            anyChanged |= cHeaderComp != cHeaderCompFixed;
+        }
+        if (anyChanged)
+            return CompositeType.getInstance(cHeaderFixed);
+        return cHeader;
+    }
+
+    private AbstractType<?> fixTypeInnerList(ListType<?> cHeader, ListType<?> cSchema, boolean droppedColumnMode)
+    {
+        AbstractType<?> cHeaderElem = cHeader.getElementsType();
+        AbstractType<?> cHeaderElemFixed = fixTypeInner(cHeaderElem, cSchema.getElementsType(), droppedColumnMode);
+        if (cHeaderElemFixed == null)
+            // bummer...
+            return null;
+        if (cHeaderElem != cHeaderElemFixed)
+            // element type changed
+            return ListType.getInstance(cHeaderElemFixed, cHeader.isMultiCell());
+        return cHeader;
+    }
+
+    private AbstractType<?> fixTypeInnerSet(SetType<?> cHeader, SetType<?> cSchema, boolean droppedColumnMode)
+    {
+        AbstractType<?> cHeaderElem = cHeader.getElementsType();
+        AbstractType<?> cHeaderElemFixed = fixTypeInner(cHeaderElem, cSchema.getElementsType(), droppedColumnMode);
+        if (cHeaderElemFixed == null)
+            // bummer...
+            return null;
+        if (cHeaderElem != cHeaderElemFixed)
+            // element type changed
+            return SetType.getInstance(cHeaderElemFixed, cHeader.isMultiCell());
+        return cHeader;
+    }
+
+    private AbstractType<?> fixTypeInnerMap(MapType<?, ?> cHeader, MapType<?, ?> cSchema, boolean droppedColumnMode)
+    {
+        AbstractType<?> cHeaderKey = cHeader.getKeysType();
+        AbstractType<?> cHeaderVal = cHeader.getValuesType();
+        AbstractType<?> cHeaderKeyFixed = fixTypeInner(cHeaderKey, cSchema.getKeysType(), droppedColumnMode);
+        AbstractType<?> cHeaderValFixed = fixTypeInner(cHeaderVal, cSchema.getValuesType(), droppedColumnMode);
+        if (cHeaderKeyFixed == null || cHeaderValFixed == null)
+            // bummer...
+            return null;
+        if (cHeaderKey != cHeaderKeyFixed || cHeaderVal != cHeaderValFixed)
+            // element type changed
+            return MapType.getInstance(cHeaderKeyFixed, cHeaderValFixed, cHeader.isMultiCell());
+        return cHeader;
+    }
+
+    private boolean typeEquals(AbstractType<?> typeInHeader, AbstractType<?> typeInSchema)
+    {
+        // Quite annoying, but the implementations of equals() on some implementation of AbstractType seems to be
+        // wrong, but toString() seems to work in such cases.
+        return typeInHeader.equals(typeInSchema) || typeInHeader.toString().equals(typeInSchema.toString());
+    }
+
+    private static String logColumnName(ByteBuffer columnName)
+    {
+        try
+        {
+            return ByteBufferUtil.string(columnName);
+        }
+        catch (CharacterCodingException e)
+        {
+            return "?? " + e;
+        }
+    }
+
+    private Map<MetadataType, MetadataComponent> readSSTableMetadata(Descriptor desc)
+    {
+        Map<MetadataType, MetadataComponent> metadata;
+        try
+        {
+            metadata = desc.getMetadataSerializer().deserialize(desc, EnumSet.allOf(MetadataType.class));
+        }
+        catch (IOException e)
+        {
+            error("Failed to deserialize metadata for sstable %s: %s", desc, e.toString());
+            return null;
+        }
+        return metadata;
+    }
+
+    private void writeNewMetadata(Descriptor desc, Map<MetadataType, MetadataComponent> newMetadata)
+    {
+        String file = desc.filenameFor(Component.STATS);
+        info.accept(String.format("  Writing new metadata file %s", file));
+        try
+        {
+            desc.getMetadataSerializer().rewriteSSTableMetadata(desc, newMetadata);
+        }
+        catch (IOException e)
+        {
+            error("Failed to write metadata component for %s: %s", file, e.toString());
+            throw new RuntimeException(e);
+        }
+    }
+
+    /**
+     * Fix individually provided sstables or directories containing sstables.
+     */
+    static class ManualHeaderFix extends SSTableHeaderFix
+    {
+        private final List<Path> paths;
+
+        ManualHeaderFix(Builder builder)
+        {
+            super(builder);
+            this.paths = builder.paths;
+        }
+
+        public void prepare()
+        {
+            paths.forEach(this::processFileOrDirectory);
+        }
+    }
+
+    /**
+     * Fix all sstables in the configured data-directories.
+     */
+    static class AutomaticHeaderFix extends SSTableHeaderFix
+    {
+        AutomaticHeaderFix(Builder builder)
+        {
+            super(builder);
+        }
+
+        public void prepare()
+        {
+            info.accept("Scanning all data directories...");
+            for (Directories.DataDirectory dataDirectory : Directories.dataDirectories)
+                scanDataDirectory(dataDirectory);
+            info.accept("Finished scanning all data directories...");
+        }
+
+        private void scanDataDirectory(Directories.DataDirectory dataDirectory)
+        {
+            info.accept(String.format("Scanning data directory %s", dataDirectory.location));
+            File[] ksDirs = dataDirectory.location.listFiles();
+            if (ksDirs == null)
+                return;
+            for (File ksDir : ksDirs)
+            {
+                if (!ksDir.isDirectory() || !ksDir.canRead())
+                    continue;
+
+                String name = ksDir.getName();
+
+                // silently ignore all system keyspaces
+                if (SchemaConstants.isLocalSystemKeyspace(name) || SchemaConstants.isReplicatedSystemKeyspace(name))
+                    continue;
+
+                File[] tabDirs = ksDir.listFiles();
+                if (tabDirs == null)
+                    continue;
+                for (File tabDir : tabDirs)
+                {
+                    if (!tabDir.isDirectory() || !tabDir.canRead())
+                        continue;
+
+                    processFileOrDirectory(tabDir.toPath());
+                }
+            }
+        }
+    }
+}
diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
index 36a1e63..9a467af 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
@@ -1821,6 +1821,11 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
 
     public void createLinks(String snapshotDirectoryPath)
     {
+        createLinks(descriptor, components, snapshotDirectoryPath);
+    }
+
+    public static void createLinks(Descriptor descriptor, Set<Component> components, String snapshotDirectoryPath)
+    {
         for (Component component : components)
         {
             File sourceFile = new File(descriptor.filenameFor(component));
diff --git a/src/java/org/apache/cassandra/io/sstable/metadata/IMetadataSerializer.java b/src/java/org/apache/cassandra/io/sstable/metadata/IMetadataSerializer.java
index eb7b2c7..c842d02 100644
--- a/src/java/org/apache/cassandra/io/sstable/metadata/IMetadataSerializer.java
+++ b/src/java/org/apache/cassandra/io/sstable/metadata/IMetadataSerializer.java
@@ -74,4 +74,9 @@ public interface IMetadataSerializer
      * Mutate the repairedAt time, pendingRepair ID, and transient status
      */
     public void mutateRepairMetadata(Descriptor descriptor, long newRepairedAt, UUID newPendingRepair, boolean isTransient) throws IOException;
+
+    /**
+     * Replace the sstable metadata file ({@code -Statistics.db}) with the given components.
+     */
+    void rewriteSSTableMetadata(Descriptor descriptor, Map<MetadataType, MetadataComponent> currentComponents) throws IOException;
 }
diff --git a/src/java/org/apache/cassandra/io/sstable/metadata/MetadataSerializer.java b/src/java/org/apache/cassandra/io/sstable/metadata/MetadataSerializer.java
index f76db2d..9cb9a20 100644
--- a/src/java/org/apache/cassandra/io/sstable/metadata/MetadataSerializer.java
+++ b/src/java/org/apache/cassandra/io/sstable/metadata/MetadataSerializer.java
@@ -242,7 +242,7 @@ public class MetadataSerializer implements IMetadataSerializer
         rewriteSSTableMetadata(descriptor, currentComponents);
     }
 
-    private void rewriteSSTableMetadata(Descriptor descriptor, Map<MetadataType, MetadataComponent> currentComponents) throws IOException
+    public void rewriteSSTableMetadata(Descriptor descriptor, Map<MetadataType, MetadataComponent> currentComponents) throws IOException
     {
         String filePath = descriptor.tmpFilenameFor(Component.STATS);
         try (DataOutputStreamPlus out = new BufferedDataOutputStreamPlus(new FileOutputStream(filePath)))
diff --git a/src/java/org/apache/cassandra/service/CassandraDaemon.java b/src/java/org/apache/cassandra/service/CassandraDaemon.java
index b3cfd19..7465bb3 100644
--- a/src/java/org/apache/cassandra/service/CassandraDaemon.java
+++ b/src/java/org/apache/cassandra/service/CassandraDaemon.java
@@ -62,6 +62,7 @@ import org.apache.cassandra.exceptions.StartupException;
 import org.apache.cassandra.gms.Gossiper;
 import org.apache.cassandra.io.FSError;
 import org.apache.cassandra.io.sstable.CorruptSSTableException;
+import org.apache.cassandra.io.sstable.SSTableHeaderFix;
 import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.metrics.CassandraMetricsRegistry;
 import org.apache.cassandra.metrics.DefaultNameFactory;
@@ -246,6 +247,8 @@ public class CassandraDaemon
 
         setupVirtualKeyspaces();
 
+        SSTableHeaderFix.fixNonFrozenUDTIfUpgradeFrom30();
+
         // clean up debris in the rest of the keyspaces
         for (String keyspaceName : Schema.instance.getKeyspaces())
         {
diff --git a/src/java/org/apache/cassandra/tools/StandaloneScrubber.java b/src/java/org/apache/cassandra/tools/StandaloneScrubber.java
index f50f937..d9d8db1 100644
--- a/src/java/org/apache/cassandra/tools/StandaloneScrubber.java
+++ b/src/java/org/apache/cassandra/tools/StandaloneScrubber.java
@@ -19,6 +19,7 @@
 package org.apache.cassandra.tools;
 
 import java.io.File;
+import java.nio.file.Paths;
 import java.util.*;
 import java.util.concurrent.TimeUnit;
 
@@ -38,6 +39,7 @@ import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.io.sstable.*;
 import org.apache.cassandra.utils.JVMStabilityInspector;
 import org.apache.cassandra.utils.OutputHandler;
+import org.apache.cassandra.utils.Pair;
 
 import static org.apache.cassandra.tools.BulkLoader.CmdLineOptions;
 
@@ -57,6 +59,7 @@ public class StandaloneScrubber
     private static final String SKIP_CORRUPTED_OPTION = "skip-corrupted";
     private static final String NO_VALIDATE_OPTION = "no-validate";
     private static final String REINSERT_OVERFLOWED_TTL_OPTION = "reinsert-overflowed-ttl";
+    private static final String HEADERFIX_OPTION = "header-fix";
 
     public static void main(String args[])
     {
@@ -93,34 +96,106 @@ public class StandaloneScrubber
 
             OutputHandler handler = new OutputHandler.SystemOutput(options.verbose, options.debug);
             Directories.SSTableLister lister = cfs.getDirectories().sstableLister(Directories.OnTxnErr.THROW).skipTemporary(true);
+            List<Pair<Descriptor, Set<Component>>> listResult = new ArrayList<>();
 
-            List<SSTableReader> sstables = new ArrayList<>();
-
-            // Scrub sstables
+            // create snapshot
             for (Map.Entry<Descriptor, Set<Component>> entry : lister.list().entrySet())
             {
+                Descriptor descriptor = entry.getKey();
                 Set<Component> components = entry.getValue();
                 if (!components.contains(Component.DATA))
                     continue;
 
+                listResult.add(Pair.create(descriptor, components));
+
+                File snapshotDirectory = Directories.getSnapshotDirectory(descriptor, snapshotName);
+                SSTableReader.createLinks(descriptor, components, snapshotDirectory.getPath());
+            }
+            System.out.println(String.format("Pre-scrub sstables snapshotted into snapshot %s", snapshotName));
+
+            if (options.headerFixMode != Options.HeaderFixMode.OFF)
+            {
+                // Run the frozen-UDT checks _before_ the sstables are opened
+
+                List<String> logOutput = new ArrayList<>();
+
+                SSTableHeaderFix.Builder headerFixBuilder = SSTableHeaderFix.builder()
+                                                                            .logToList(logOutput)
+                                                                            .schemaCallback(() -> Schema.instance::getTableMetadata);
+                if (options.headerFixMode == Options.HeaderFixMode.VALIDATE)
+                    headerFixBuilder = headerFixBuilder.dryRun();
+
+                for (Pair<Descriptor, Set<Component>> p : listResult)
+                    headerFixBuilder.withPath(Paths.get(p.left.filenameFor(Component.DATA)));
+
+                SSTableHeaderFix headerFix = headerFixBuilder.build();
                 try
                 {
-                    SSTableReader sstable = SSTableReader.openNoValidation(entry.getKey(), components, cfs);
-                    sstables.add(sstable);
+                    headerFix.execute();
+                }
+                catch (Exception e)
+                {
+                    JVMStabilityInspector.inspectThrowable(e);
+                    if (options.debug)
+                        e.printStackTrace(System.err);
+                }
+
+                if (headerFix.hasChanges() || headerFix.hasError())
+                    logOutput.forEach(System.out::println);
+
+                if (headerFix.hasError())
+                {
+                    System.err.println("Errors in serialization-header detected, aborting.");
+                    System.exit(1);
+                }
+
+                switch (options.headerFixMode)
+                {
+                    case VALIDATE_ONLY:
+                    case FIX_ONLY:
+                        System.out.printf("Not continuing with scrub, since '--%s %s' was specified.%n",
+                                          HEADERFIX_OPTION,
+                                          options.headerFixMode.asCommandLineOption());
+                        System.exit(0);
+                    case VALIDATE:
+                        if (headerFix.hasChanges())
+                        {
+                            System.err.printf("Unfixed, but fixable errors in serialization-header detected, aborting. " +
+                                              "Use a non-validating mode ('-e %s' or '-e %s') for --%s%n",
+                                              Options.HeaderFixMode.FIX.asCommandLineOption(),
+                                              Options.HeaderFixMode.FIX_ONLY.asCommandLineOption(),
+                                              HEADERFIX_OPTION);
+                            System.exit(2);
+                        }
+                        break;
+                    case FIX:
+                        break;
+                }
+            }
+
+            List<SSTableReader> sstables = new ArrayList<>();
 
-                    File snapshotDirectory = Directories.getSnapshotDirectory(sstable.descriptor, snapshotName);
-                    sstable.createLinks(snapshotDirectory.getPath());
+            // Open sstables
+            for (Pair<Descriptor, Set<Component>> pair : listResult)
+            {
+                Descriptor descriptor = pair.left;
+                Set<Component> components = pair.right;
+                if (!components.contains(Component.DATA))
+                    continue;
 
+                try
+                {
+                    SSTableReader sstable = SSTableReader.openNoValidation(descriptor, components, cfs);
+                    sstables.add(sstable);
                 }
                 catch (Exception e)
                 {
                     JVMStabilityInspector.inspectThrowable(e);
-                    System.err.println(String.format("Error Loading %s: %s", entry.getKey(), e.getMessage()));
+                    System.err.println(String.format("Error Loading %s: %s", descriptor, e.getMessage()));
                     if (options.debug)
                         e.printStackTrace(System.err);
                 }
             }
-            System.out.println(String.format("Pre-scrub sstables snapshotted into snapshot %s", snapshotName));
 
             if (!options.manifestCheckOnly)
             {
@@ -208,6 +283,26 @@ public class StandaloneScrubber
         public boolean skipCorrupted;
         public boolean noValidate;
         public boolean reinserOverflowedTTL;
+        public HeaderFixMode headerFixMode = HeaderFixMode.VALIDATE;
+
+        enum HeaderFixMode
+        {
+            VALIDATE_ONLY,
+            VALIDATE,
+            FIX_ONLY,
+            FIX,
+            OFF;
+
+            static HeaderFixMode fromCommandLine(String value)
+            {
+                return valueOf(value.replace('-', '_').toUpperCase().trim());
+            }
+
+            String asCommandLineOption()
+            {
+                return name().toLowerCase().replace('_', '-');
+            }
+        }
 
         private Options(String keyspaceName, String cfName)
         {
@@ -249,7 +344,18 @@ public class StandaloneScrubber
                 opts.skipCorrupted = cmd.hasOption(SKIP_CORRUPTED_OPTION);
                 opts.noValidate = cmd.hasOption(NO_VALIDATE_OPTION);
                 opts.reinserOverflowedTTL = cmd.hasOption(REINSERT_OVERFLOWED_TTL_OPTION);
-
+                if (cmd.hasOption(HEADERFIX_OPTION))
+                {
+                    try
+                    {
+                        opts.headerFixMode = HeaderFixMode.fromCommandLine(cmd.getOptionValue(HEADERFIX_OPTION));
+                    }
+                    catch (Exception e)
+                    {
+                        errorMsg(String.format("Invalid argument value '%s' for --%s", cmd.getOptionValue(HEADERFIX_OPTION), HEADERFIX_OPTION), options);
+                        return null;
+                    }
+                }
                 return opts;
             }
             catch (ParseException e)
@@ -275,6 +381,22 @@ public class StandaloneScrubber
             options.addOption("m",  MANIFEST_CHECK_OPTION, "only check and repair the leveled manifest, without actually scrubbing the sstables");
             options.addOption("s",  SKIP_CORRUPTED_OPTION, "skip corrupt rows in counter tables");
             options.addOption("n",  NO_VALIDATE_OPTION,    "do not validate columns using column validator");
+            options.addOption("e",  HEADERFIX_OPTION,      true, "Option whether and how to perform a " +
+                                                                 "check of the sstable serialization-headers and fix known, " +
+                                                                 "fixable issues.\n" +
+                                                                 "Possible argument values:\n" +
+                                                                 "- validate-only: validate the serialization-headers, " +
+                                                                 "but do not fix those. Do not continue with scrub - " +
+                                                                 "i.e. only validate the header (dry-run of fix-only).\n" +
+                                                                 "- validate: (default) validate the serialization-headers, " +
+                                                                 "but do not fix those and only continue with scrub if no " +
+                                                                 "error were detected.\n" +
+                                                                 "- fix-only: validate and fix the serialization-headers, " +
+                                                                 "don't continue with scrub.\n" +
+                                                                 "- fix: validate and fix the serialization-headers, do not " +
+                                                                 "fix and do not continue with scrub if the serialization-header " +
+                                                                 "check encountered errors.\n" +
+                                                                 "- off: don't perform the serialization-header checks.");
             options.addOption("r", REINSERT_OVERFLOWED_TTL_OPTION, REINSERT_OVERFLOWED_TTL_OPTION_DESCRIPTION);
             return options;
         }
@@ -287,7 +409,7 @@ public class StandaloneScrubber
             header.append("Scrub the sstable for the provided table." );
             header.append("\n--\n");
             header.append("Options are:");
-            new HelpFormatter().printHelp(usage, header.toString(), options, "");
+            new HelpFormatter().printHelp(120, usage, header.toString(), options, "");
         }
     }
 }
diff --git a/src/java/org/apache/cassandra/utils/FBUtilities.java b/src/java/org/apache/cassandra/utils/FBUtilities.java
index 1797087..1df84ab 100644
--- a/src/java/org/apache/cassandra/utils/FBUtilities.java
+++ b/src/java/org/apache/cassandra/utils/FBUtilities.java
@@ -92,6 +92,8 @@ public class FBUtilities
     private static volatile InetAddressAndPort broadcastInetAddressAndPort;
     private static volatile InetAddressAndPort localInetAddressAndPort;
 
+    private static volatile String previousReleaseVersionString;
+
     public static int getAvailableProcessors()
     {
         String availableProcessors = System.getProperty("cassandra.available_processors");
@@ -338,6 +340,16 @@ public class FBUtilities
         return triggerDir;
     }
 
+    public static void setPreviousReleaseVersionString(String previousReleaseVersionString)
+    {
+        FBUtilities.previousReleaseVersionString = previousReleaseVersionString;
+    }
+
+    public static String getPreviousReleaseVersionString()
+    {
+        return previousReleaseVersionString;
+    }
+
     public static String getReleaseVersionString()
     {
         try (InputStream in = FBUtilities.class.getClassLoader().getResourceAsStream("org/apache/cassandra/config/version.properties"))
diff --git a/test/unit/org/apache/cassandra/db/rows/AbstractTypeVersionComparatorTest.java b/test/unit/org/apache/cassandra/db/rows/ColumnMetadataVersionComparatorTest.java
similarity index 71%
rename from test/unit/org/apache/cassandra/db/rows/AbstractTypeVersionComparatorTest.java
rename to test/unit/org/apache/cassandra/db/rows/ColumnMetadataVersionComparatorTest.java
index 7170696..854421a 100644
--- a/test/unit/org/apache/cassandra/db/rows/AbstractTypeVersionComparatorTest.java
+++ b/test/unit/org/apache/cassandra/db/rows/ColumnMetadataVersionComparatorTest.java
@@ -25,6 +25,7 @@ import org.junit.Before;
 import org.junit.Test;
 
 import org.apache.cassandra.db.marshal.*;
+import org.apache.cassandra.schema.ColumnMetadata;
 
 import static java.util.Arrays.asList;
 import static org.apache.cassandra.cql3.FieldIdentifier.forUnquoted;
@@ -32,7 +33,7 @@ import static org.apache.cassandra.utils.ByteBufferUtil.bytes;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
 
-public class AbstractTypeVersionComparatorTest
+public class ColumnMetadataVersionComparatorTest
 {
     private UserType udtWith2Fields;
     private UserType udtWith3Fields;
@@ -60,6 +61,13 @@ public class AbstractTypeVersionComparatorTest
     }
 
     @Test
+    public void testWithSimpleTypes()
+    {
+        checkComparisonResults(Int32Type.instance, BytesType.instance);
+        checkComparisonResults(EmptyType.instance, BytesType.instance);
+    }
+
+    @Test
     public void testWithTuples()
     {
         checkComparisonResults(new TupleType(asList(Int32Type.instance, Int32Type.instance)),
@@ -142,19 +150,22 @@ public class AbstractTypeVersionComparatorTest
     @Test
     public void testInvalidComparison()
     {
-        assertInvalidComparison("Trying to compare 2 different types: org.apache.cassandra.db.marshal.FrozenType(org.apache.cassandra.db.marshal.UserType(ks,6d7954797065,61:org.apache.cassandra.db.marshal.Int32Type,62:org.apache.cassandra.db.marshal.Int32Type)) and org.apache.cassandra.db.marshal.Int32Type",
+        assertInvalidComparison("Found 2 incompatible versions of column c in ks.t: one of type org.apache.cassandra.db.marshal.Int32Type and one of type org.apache.cassandra.db.marshal.UTF8Type (but both types are incompatible)",
+                                Int32Type.instance,
+                                UTF8Type.instance);
+        assertInvalidComparison("Found 2 incompatible versions of column c in ks.t: one of type org.apache.cassandra.db.marshal.FrozenType(org.apache.cassandra.db.marshal.UserType(ks,6d7954797065,61:org.apache.cassandra.db.marshal.Int32Type,62:org.apache.cassandra.db.marshal.Int32Type)) and one of type org.apache.cassandra.db.marshal.Int32Type (but both types are incompatible)",
                                 udtWith2Fields,
                                 Int32Type.instance);
-        assertInvalidComparison("Trying to compare 2 different types: org.apache.cassandra.db.marshal.UTF8Type and org.apache.cassandra.db.marshal.InetAddressType",
+        assertInvalidComparison("Found 2 incompatible versions of column c in ks.t: one of type org.apache.cassandra.db.marshal.SetType(org.apache.cassandra.db.marshal.UTF8Type) and one of type org.apache.cassandra.db.marshal.SetType(org.apache.cassandra.db.marshal.InetAddressType) (but both types are incompatible)",
                                 SetType.getInstance(UTF8Type.instance, true),
                                 SetType.getInstance(InetAddressType.instance, true));
-        assertInvalidComparison("Trying to compare 2 different types: org.apache.cassandra.db.marshal.UTF8Type and org.apache.cassandra.db.marshal.InetAddressType",
+        assertInvalidComparison("Found 2 incompatible versions of column c in ks.t: one of type org.apache.cassandra.db.marshal.ListType(org.apache.cassandra.db.marshal.UTF8Type) and one of type org.apache.cassandra.db.marshal.ListType(org.apache.cassandra.db.marshal.InetAddressType) (but both types are incompatible)",
                                 ListType.getInstance(UTF8Type.instance, true),
                                 ListType.getInstance(InetAddressType.instance, true));
-        assertInvalidComparison("Trying to compare 2 different types: org.apache.cassandra.db.marshal.UTF8Type and org.apache.cassandra.db.marshal.InetAddressType",
+        assertInvalidComparison("Found 2 incompatible versions of column c in ks.t: one of type org.apache.cassandra.db.marshal.MapType(org.apache.cassandra.db.marshal.UTF8Type,org.apache.cassandra.db.marshal.IntegerType) and one of type org.apache.cassandra.db.marshal.MapType(org.apache.cassandra.db.marshal.InetAddressType,org.apache.cassandra.db.marshal.IntegerType) (but both types are incompatible)",
                                 MapType.getInstance(UTF8Type.instance, IntegerType.instance, true),
                                 MapType.getInstance(InetAddressType.instance, IntegerType.instance, true));
-        assertInvalidComparison("Trying to compare 2 different types: org.apache.cassandra.db.marshal.UTF8Type and org.apache.cassandra.db.marshal.InetAddressType",
+        assertInvalidComparison("Found 2 incompatible versions of column c in ks.t: one of type org.apache.cassandra.db.marshal.MapType(org.apache.cassandra.db.marshal.IntegerType,org.apache.cassandra.db.marshal.UTF8Type) and one of type org.apache.cassandra.db.marshal.MapType(org.apache.cassandra.db.marshal.IntegerType,org.apache.cassandra.db.marshal.InetAddressType) (but both types are incompatible)",
                                 MapType.getInstance(IntegerType.instance, UTF8Type.instance, true),
                                 MapType.getInstance(IntegerType.instance, InetAddressType.instance, true));
     }
@@ -169,7 +180,7 @@ public class AbstractTypeVersionComparatorTest
         catch (IllegalArgumentException e)
         {
             System.out.println(e.getMessage());
-            assertEquals(e.getMessage(), expectedMessage);
+            assertEquals(expectedMessage, e.getMessage());
         }
     }
 
@@ -183,6 +194,8 @@ public class AbstractTypeVersionComparatorTest
 
     private static int compare(AbstractType<?> left, AbstractType<?> right)
     {
-        return AbstractTypeVersionComparator.INSTANCE.compare(left, right);
+        ColumnMetadata v1 = ColumnMetadata.regularColumn("ks", "t", "c", left);
+        ColumnMetadata v2 = ColumnMetadata.regularColumn("ks", "t", "c", right);
+        return ColumnMetadataVersionComparator.INSTANCE.compare(v1, v2);
     }
 }
diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableHeaderFixTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableHeaderFixTest.java
new file mode 100644
index 0000000..d07187b
--- /dev/null
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableHeaderFixTest.java
@@ -0,0 +1,964 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.io.sstable;
+
+import java.io.File;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import com.google.common.collect.Sets;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.cql3.ColumnIdentifier;
+import org.apache.cassandra.cql3.FieldIdentifier;
+import org.apache.cassandra.cql3.statements.schema.IndexTarget;
+import org.apache.cassandra.db.SerializationHeader;
+import org.apache.cassandra.db.marshal.AbstractCompositeType;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.marshal.CollectionType;
+import org.apache.cassandra.db.marshal.CompositeType;
+import org.apache.cassandra.db.marshal.FloatType;
+import org.apache.cassandra.db.marshal.FrozenType;
+import org.apache.cassandra.db.marshal.Int32Type;
+import org.apache.cassandra.db.marshal.ListType;
+import org.apache.cassandra.db.marshal.MapType;
+import org.apache.cassandra.db.marshal.SetType;
+import org.apache.cassandra.db.marshal.TupleType;
+import org.apache.cassandra.db.marshal.UTF8Type;
+import org.apache.cassandra.db.marshal.UserType;
+import org.apache.cassandra.db.rows.EncodingStats;
+import org.apache.cassandra.io.sstable.format.SSTableFormat;
+import org.apache.cassandra.io.sstable.format.Version;
+import org.apache.cassandra.io.sstable.format.big.BigFormat;
+import org.apache.cassandra.io.sstable.metadata.MetadataType;
+import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.io.util.SequentialWriter;
+import org.apache.cassandra.schema.ColumnMetadata;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.schema.IndexMetadata;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * Test the functionality of {@link SSTableHeaderFix}.
+ * It writes an 'big-m' version sstable(s) and executes against these.
+ */
+public class SSTableHeaderFixTest
+{
+    static
+    {
+        DatabaseDescriptor.toolInitialization();
+    }
+
+    private File temporaryFolder;
+
+    @Before
+    public void setup()
+    {
+        File f = FileUtils.createTempFile("SSTableUDTFixTest", "");
+        f.delete();
+        f.mkdirs();
+        temporaryFolder = f;
+    }
+
+    @After
+    public void teardown()
+    {
+        FileUtils.deleteRecursive(temporaryFolder);
+    }
+
+    private static final AbstractType<?> udtPK = makeUDT("udt_pk");
+    private static final AbstractType<?> udtCK = makeUDT("udt_ck");
+    private static final AbstractType<?> udtStatic = makeUDT("udt_static");
+    private static final AbstractType<?> udtRegular = makeUDT("udt_regular");
+    private static final AbstractType<?> udtInner = makeUDT("udt_inner");
+    private static final AbstractType<?> udtNested = new UserType("ks",
+                                                                  ByteBufferUtil.bytes("udt_nested"),
+                                                                  Arrays.asList(new FieldIdentifier(ByteBufferUtil.bytes("a_field")),
+                                                                                new FieldIdentifier(ByteBufferUtil.bytes("a_udt"))),
+                                                                  Arrays.asList(UTF8Type.instance,
+                                                                                udtInner),
+                                                                  true);
+    private static final AbstractType<?> tupleInTuple = makeTuple(makeTuple());
+    private static final AbstractType<?> udtInTuple = makeTuple(udtInner);
+    private static final AbstractType<?> tupleInComposite = CompositeType.getInstance(UTF8Type.instance, makeTuple());
+    private static final AbstractType<?> udtInComposite = CompositeType.getInstance(UTF8Type.instance, udtInner);
+    private static final AbstractType<?> udtInList = ListType.getInstance(udtInner, true);
+    private static final AbstractType<?> udtInSet = SetType.getInstance(udtInner, true);
+    private static final AbstractType<?> udtInMap = MapType.getInstance(UTF8Type.instance, udtInner, true);
+    private static final AbstractType<?> udtInFrozenList = ListType.getInstance(udtInner, false);
+    private static final AbstractType<?> udtInFrozenSet = SetType.getInstance(udtInner, false);
+    private static final AbstractType<?> udtInFrozenMap = MapType.getInstance(UTF8Type.instance, udtInner, false);
+
+    private static AbstractType<?> makeUDT2(String udtName, boolean multiCell)
+    {
+        return new UserType("ks",
+                            ByteBufferUtil.bytes(udtName),
+                            Arrays.asList(new FieldIdentifier(ByteBufferUtil.bytes("a_field")),
+                                          new FieldIdentifier(ByteBufferUtil.bytes("a_udt"))),
+                            Arrays.asList(UTF8Type.instance,
+                                          udtInner),
+                            multiCell);
+    }
+
+    private static AbstractType<?> makeUDT(String udtName)
+    {
+        return new UserType("ks",
+                            ByteBufferUtil.bytes(udtName),
+                            Collections.singletonList(new FieldIdentifier(ByteBufferUtil.bytes("a_field"))),
+                            Collections.singletonList(UTF8Type.instance),
+                            true);
+    }
+
+    private static TupleType makeTuple()
+    {
+        return makeTuple(Int32Type.instance);
+    }
+
+    private static TupleType makeTuple(AbstractType<?> second)
+    {
+        return new TupleType(Arrays.asList(UTF8Type.instance,
+                                           second));
+    }
+
+    private static TupleType makeTupleSimple()
+    {
+        // TODO this should create a non-frozen tuple type for the sake of handling a dropped, non-frozen UDT
+        return new TupleType(Collections.singletonList(UTF8Type.instance));
+    }
+
+    private static final Version version = BigFormat.instance.getVersion("mc");
+
+    private TableMetadata tableMetadata;
+    private final Set<String> updatedColumns = new HashSet<>();
+
+    private ColumnMetadata getColDef(String n)
+    {
+        return tableMetadata.getColumn(ByteBufferUtil.bytes(n));
+    }
+
+    /**
+     * Very basic test whether {@link SSTableHeaderFix} detect a type mismatch (regular_c 'int' vs 'float').
+     */
+    @Test
+    public void verifyTypeMismatchTest() throws Exception
+    {
+        File dir = temporaryFolder;
+        File sstable = generateFakeSSTable(dir, 1);
+
+        SerializationHeader.Component header = readHeader(sstable);
+        assertFrozenUdt(header, false, true);
+
+        ColumnMetadata cd = getColDef("regular_c");
+        tableMetadata = tableMetadata.unbuild()
+                                     .removeRegularOrStaticColumn(cd.name)
+                                     .addRegularColumn("regular_c", FloatType.instance)
+                                     .build();
+
+        SSTableHeaderFix headerFix = builder().withPath(sstable.toPath())
+                                              .build();
+        headerFix.execute();
+
+        assertTrue(headerFix.hasError());
+        assertTrue(headerFix.hasChanges());
+
+        // must not have re-written the stats-component
+        header = readHeader(sstable);
+        assertFrozenUdt(header, false, true);
+    }
+
+    @Test
+    public void verifyTypeMatchTest() throws Exception
+    {
+        File dir = temporaryFolder;
+
+        TableMetadata.Builder cols = TableMetadata.builder("ks", "cf")
+                                                  .addPartitionKeyColumn("pk", udtPK)
+                                                  .addClusteringColumn("ck", udtCK);
+        commonColumns(cols);
+        File sstable = buildFakeSSTable(dir, 1, cols, false);
+
+        SerializationHeader.Component header = readHeader(sstable);
+        assertFrozenUdt(header, false, true);
+
+        SSTableHeaderFix headerFix = builder().withPath(sstable.toPath())
+                                              .build();
+        headerFix.execute();
+
+        assertTrue(updatedColumns.isEmpty());
+        assertFalse(headerFix.hasError());
+        assertFalse(headerFix.hasChanges());
+
+        // must not have re-written the stats-component
+        header = readHeader(sstable);
+        assertFrozenUdt(header, false, true);
+    }
+
+    /**
+     * Simulates the case when an sstable contains a column not present in the schema, which can just be ignored.
+     */
+    @Test
+    public void verifyWithUnknownColumnTest() throws Exception
+    {
+        File dir = temporaryFolder;
+        TableMetadata.Builder cols = TableMetadata.builder("ks", "cf")
+                                                  .addPartitionKeyColumn("pk", udtPK)
+                                                  .addClusteringColumn("ck", udtCK);
+        commonColumns(cols);
+        cols.addRegularColumn("solr_query", UTF8Type.instance);
+        File sstable = buildFakeSSTable(dir, 1, cols, true);
+
+        SerializationHeader.Component header = readHeader(sstable);
+        assertFrozenUdt(header, false, true);
+
+        ColumnMetadata cd = getColDef("solr_query");
+        tableMetadata = tableMetadata.unbuild()
+                                     .removeRegularOrStaticColumn(cd.name)
+                                     .build();
+
+        SSTableHeaderFix headerFix = builder().withPath(sstable.toPath())
+                                              .build();
+        headerFix.execute();
+
+        assertFalse(headerFix.hasError());
+        assertTrue(headerFix.hasChanges());
+
+        // must not have re-written the stats-component
+        header = readHeader(sstable);
+        assertFrozenUdt(header, true, true);
+    }
+
+    /**
+     * Simulates the case when an sstable contains a column not present in the table but as a target for an index.
+     * It can just be ignored.
+     */
+    @Test
+    public void verifyWithIndexedUnknownColumnTest() throws Exception
+    {
+        File dir = temporaryFolder;
+        TableMetadata.Builder cols = TableMetadata.builder("ks", "cf")
+                                                  .addPartitionKeyColumn("pk", udtPK)
+                                                  .addClusteringColumn("ck", udtCK);
+        commonColumns(cols);
+        cols.addRegularColumn("solr_query", UTF8Type.instance);
+        File sstable = buildFakeSSTable(dir, 1, cols, true);
+
+        SerializationHeader.Component header = readHeader(sstable);
+        assertFrozenUdt(header, false, true);
+
+        ColumnMetadata cd = getColDef("solr_query");
+        tableMetadata = tableMetadata.unbuild()
+                                     .indexes(tableMetadata.indexes.with(IndexMetadata.fromSchemaMetadata("some search index", IndexMetadata.Kind.CUSTOM, Collections.singletonMap(IndexTarget.TARGET_OPTION_NAME, "solr_query"))))
+                                     .removeRegularOrStaticColumn(cd.name)
+                                     .build();
+
+        SSTableHeaderFix headerFix = builder().withPath(sstable.toPath())
+                                              .build();
+        headerFix.execute();
+
+        assertFalse(headerFix.hasError());
+        assertTrue(headerFix.hasChanges());
+
+        // must not have re-written the stats-component
+        header = readHeader(sstable);
+        assertFrozenUdt(header, true, true);
+    }
+
+    @Test
+    public void complexTypeMatchTest() throws Exception
+    {
+        File dir = temporaryFolder;
+
+        TableMetadata.Builder cols = TableMetadata.builder("ks", "cf")
+                                                  .addPartitionKeyColumn("pk", udtPK)
+                                                  .addClusteringColumn("ck", udtCK);
+        commonColumns(cols);
+        cols.addRegularColumn("tuple_in_tuple", tupleInTuple)
+            .addRegularColumn("udt_nested", udtNested)
+            .addRegularColumn("udt_in_tuple", udtInTuple)
+            .addRegularColumn("tuple_in_composite", tupleInComposite)
+            .addRegularColumn("udt_in_composite", udtInComposite)
+            .addRegularColumn("udt_in_list", udtInList)
+            .addRegularColumn("udt_in_set", udtInSet)
+            .addRegularColumn("udt_in_map", udtInMap)
+            .addRegularColumn("udt_in_frozen_list", udtInFrozenList)
+            .addRegularColumn("udt_in_frozen_set", udtInFrozenSet)
+            .addRegularColumn("udt_in_frozen_map", udtInFrozenMap);
+        File sstable = buildFakeSSTable(dir, 1, cols, true);
+
+        SerializationHeader.Component header = readHeader(sstable);
+        assertFrozenUdt(header, false, true);
+
+        SSTableHeaderFix headerFix = builder().withPath(sstable.toPath())
+                                              .build();
+        headerFix.execute();
+
+        assertFalse(headerFix.hasError());
+        assertTrue(headerFix.hasChanges());
+        assertEquals(Sets.newHashSet("pk", "ck", "regular_b", "static_b",
+                                     "udt_nested", "udt_in_composite", "udt_in_list", "udt_in_set", "udt_in_map"), updatedColumns);
+
+        // must not have re-written the stats-component
+        header = readHeader(sstable);
+        assertFrozenUdt(header, true, true);
+    }
+
+    @Test
+    public void complexTypeDroppedColumnsMatchTest() throws Exception
+    {
+        File dir = temporaryFolder;
+
+        TableMetadata.Builder cols = TableMetadata.builder("ks", "cf")
+                                                  .addPartitionKeyColumn("pk", udtPK)
+                                                  .addClusteringColumn("ck", udtCK);
+        commonColumns(cols);
+        cols.addRegularColumn("tuple_in_tuple", tupleInTuple)
+            .addRegularColumn("udt_nested", udtNested)
+            .addRegularColumn("udt_in_tuple", udtInTuple)
+            .addRegularColumn("tuple_in_composite", tupleInComposite)
+            .addRegularColumn("udt_in_composite", udtInComposite)
+            .addRegularColumn("udt_in_list", udtInList)
+            .addRegularColumn("udt_in_set", udtInSet)
+            .addRegularColumn("udt_in_map", udtInMap)
+            .addRegularColumn("udt_in_frozen_list", udtInFrozenList)
+            .addRegularColumn("udt_in_frozen_set", udtInFrozenSet)
+            .addRegularColumn("udt_in_frozen_map", udtInFrozenMap);
+        File sstable = buildFakeSSTable(dir, 1, cols, true);
+
+        cols = tableMetadata.unbuild();
+        for (String col : new String[]{"tuple_in_tuple", "udt_nested", "udt_in_tuple",
+                                       "tuple_in_composite", "udt_in_composite",
+                                       "udt_in_list", "udt_in_set", "udt_in_map",
+                                       "udt_in_frozen_list", "udt_in_frozen_set", "udt_in_frozen_map"})
+        {
+            ColumnIdentifier ci = new ColumnIdentifier(col, true);
+            ColumnMetadata cd = getColDef(col);
+            AbstractType<?> dropType = cd.type.expandUserTypes();
+            cols.removeRegularOrStaticColumn(ci)
+                .recordColumnDrop(new ColumnMetadata(cd.ksName, cd.cfName, cd.name, dropType, cd.position(), cd.kind), FBUtilities.timestampMicros());
+        }
+        tableMetadata = cols.build();
+
+        SerializationHeader.Component header = readHeader(sstable);
+        assertFrozenUdt(header, false, true);
+
+        SSTableHeaderFix headerFix = builder().withPath(sstable.toPath())
+                                              .build();
+        headerFix.execute();
+
+        assertFalse(headerFix.hasError());
+        assertTrue(headerFix.hasChanges());
+        assertEquals(Sets.newHashSet("pk", "ck", "regular_b", "static_b", "udt_nested"), updatedColumns);
+
+        // must not have re-written the stats-component
+        header = readHeader(sstable);
+        // do not check the inner types, as the inner types were not fixed in the serialization-header (test thing)
+        assertFrozenUdt(header, true, false);
+    }
+
+    @Test
+    public void variousDroppedUserTypes() throws Exception
+    {
+        File dir = temporaryFolder;
+
+        TableMetadata.Builder cols = TableMetadata.builder("ks", "cf")
+                                                  .addPartitionKeyColumn("pk", udtPK)
+                                                  .addClusteringColumn("ck", udtCK);
+
+        ColSpec[] colSpecs = new ColSpec[]
+                {
+                        // 'frozen<udt>' / live
+                        new ColSpec("frozen_udt_as_frozen_udt_live",
+                                    makeUDT2("frozen_udt_as_frozen_udt_live", false),
+                                    makeUDT2("frozen_udt_as_frozen_udt_live", false),
+                                    false,
+                                    false),
+                        // 'frozen<udt>' / live / as 'udt'
+                        new ColSpec("frozen_udt_as_unfrozen_udt_live",
+                                    makeUDT2("frozen_udt_as_unfrozen_udt_live", false),
+                                    makeUDT2("frozen_udt_as_unfrozen_udt_live", true),
+                                    false,
+                                    true),
+                        // 'frozen<udt>' / dropped
+                        new ColSpec("frozen_udt_as_frozen_udt_dropped",
+                                    makeUDT2("frozen_udt_as_frozen_udt_dropped", true).freezeNestedMulticellTypes().freeze().expandUserTypes(),
+                                    makeUDT2("frozen_udt_as_frozen_udt_dropped", false),
+                                    makeUDT2("frozen_udt_as_frozen_udt_dropped", false),
+                                    true,
+                                    false),
+                        // 'frozen<udt>' / dropped / as 'udt'
+                        new ColSpec("frozen_udt_as_unfrozen_udt_dropped",
+                                    makeUDT2("frozen_udt_as_unfrozen_udt_dropped", true).freezeNestedMulticellTypes().freeze().expandUserTypes(),
+                                    makeUDT2("frozen_udt_as_unfrozen_udt_dropped", true),
+                                    makeUDT2("frozen_udt_as_unfrozen_udt_dropped", false),
+                                    true,
+                                    true),
+                        // 'udt' / live
+                        new ColSpec("unfrozen_udt_as_unfrozen_udt_live",
+                                    makeUDT2("unfrozen_udt_as_unfrozen_udt_live", true),
+                                    makeUDT2("unfrozen_udt_as_unfrozen_udt_live", true),
+                                    false,
+                                    false),
+                        // 'udt' / dropped
+// TODO unable to test dropping a non-frozen UDT, as that requires an unfrozen tuple as well
+//                        new ColSpec("unfrozen_udt_as_unfrozen_udt_dropped",
+//                                    makeUDT2("unfrozen_udt_as_unfrozen_udt_dropped", true).freezeNestedMulticellTypes().expandUserTypes(),
+//                                    makeUDT2("unfrozen_udt_as_unfrozen_udt_dropped", true),
+//                                    makeUDT2("unfrozen_udt_as_unfrozen_udt_dropped", true),
+//                                    true,
+//                                    false),
+                        // 'frozen<tuple>' as 'TupleType(multiCell=false' (there is nothing like 'FrozenType(TupleType(')
+                        new ColSpec("frozen_tuple_as_frozen_tuple_live",
+                                    makeTupleSimple(),
+                                    makeTupleSimple(),
+                                    false,
+                                    false),
+                        // 'frozen<tuple>' as 'TupleType(multiCell=false' (there is nothing like 'FrozenType(TupleType(')
+                        new ColSpec("frozen_tuple_as_frozen_tuple_dropped",
+                                    makeTupleSimple(),
+                                    makeTupleSimple(),
+                                    true,
+                                    false)
+                };
+
+        Arrays.stream(colSpecs).forEach(c -> cols.addRegularColumn(c.name,
+                                                                   // use the initial column type for the serialization header header.
+                                                                   c.preFix));
+
+        Map<String, ColSpec> colSpecMap = Arrays.stream(colSpecs).collect(Collectors.toMap(c -> c.name, c -> c));
+        File sstable = buildFakeSSTable(dir, 1, cols, c -> {
+            ColSpec cs = colSpecMap.get(c.name.toString());
+            if (cs == null)
+                return c;
+            // update the column type in the schema to the "correct" one.
+            return c.withNewType(cs.schema);
+        });
+
+        Arrays.stream(colSpecs)
+              .filter(c -> c.dropped)
+              .forEach(c -> {
+                  ColumnMetadata cd = getColDef(c.name);
+                  tableMetadata = tableMetadata.unbuild()
+                                               .removeRegularOrStaticColumn(cd.name)
+                                               .recordColumnDrop(cd, FBUtilities.timestampMicros())
+                                               .build();
+              });
+
+        SerializationHeader.Component header = readHeader(sstable);
+        for (ColSpec colSpec : colSpecs)
+        {
+            AbstractType<?> hdrType = header.getRegularColumns().get(ByteBufferUtil.bytes(colSpec.name));
+            assertEquals(colSpec.name, colSpec.preFix, hdrType);
+            assertEquals(colSpec.name, colSpec.preFix.isMultiCell(), hdrType.isMultiCell());
+        }
+
+        SSTableHeaderFix headerFix = builder().withPath(sstable.toPath())
+                                              .build();
+        headerFix.execute();
+
+        assertFalse(headerFix.hasError());
+        assertTrue(headerFix.hasChanges());
+        // Verify that all columns to fix are in the updatedColumns set (paranoid, yet)
+        Arrays.stream(colSpecs)
+              .filter(c -> c.mustFix)
+              .forEach(c -> assertTrue("expect " + c.name + " to be updated, but was not (" + updatedColumns + ")", updatedColumns.contains(c.name)));
+        // Verify that the number of updated columns maches the expected number of columns to fix
+        assertEquals(Arrays.stream(colSpecs).filter(c -> c.mustFix).count(), updatedColumns.size());
+
+        header = readHeader(sstable);
+        for (ColSpec colSpec : colSpecs)
+        {
+            AbstractType<?> hdrType = header.getRegularColumns().get(ByteBufferUtil.bytes(colSpec.name));
+            assertEquals(colSpec.name, colSpec.expect, hdrType);
+            assertEquals(colSpec.name, colSpec.expect.isMultiCell(), hdrType.isMultiCell());
+        }
+    }
+
+    static class ColSpec
+    {
+        final String name;
+        final AbstractType<?> schema;
+        final AbstractType<?> preFix;
+        final AbstractType<?> expect;
+        final boolean dropped;
+        final boolean mustFix;
+
+        ColSpec(String name, AbstractType<?> schema, AbstractType<?> preFix, boolean dropped, boolean mustFix)
+        {
+            this(name, schema, preFix, schema, dropped, mustFix);
+        }
+
+        ColSpec(String name, AbstractType<?> schema, AbstractType<?> preFix, AbstractType<?> expect, boolean dropped, boolean mustFix)
+        {
+            this.name = name;
+            this.schema = schema;
+            this.preFix = preFix;
+            this.expect = expect;
+            this.dropped = dropped;
+            this.mustFix = mustFix;
+        }
+    }
+
+    @Test
+    public void verifyTypeMatchCompositeKeyTest() throws Exception
+    {
+        File dir = temporaryFolder;
+
+        TableMetadata.Builder cols = TableMetadata.builder("ks", "cf")
+                                                  .addPartitionKeyColumn("pk1", UTF8Type.instance)
+                                                  .addPartitionKeyColumn("pk2", udtPK)
+                                                  .addClusteringColumn("ck", udtCK);
+        commonColumns(cols);
+        File sstable = buildFakeSSTable(dir, 1, cols, false);
+
+        SerializationHeader.Component header = readHeader(sstable);
+        assertFrozenUdt(header, false, true);
+
+        SSTableHeaderFix headerFix = builder().withPath(sstable.toPath())
+                                              .build();
+        headerFix.execute();
+
+        assertFalse(headerFix.hasError());
+        assertFalse(headerFix.hasChanges());
+        assertTrue(updatedColumns.isEmpty());
+
+        // must not have re-written the stats-component
+        header = readHeader(sstable);
+        assertFrozenUdt(header, false, true);
+    }
+
+    @Test
+    public void compositePartitionKey() throws Exception
+    {
+        TableMetadata.Builder cols = TableMetadata.builder("ks", "cf")
+                                                  .addPartitionKeyColumn("pk1", UTF8Type.instance)
+                                                  .addPartitionKeyColumn("pk2", udtPK)
+                                                  .addClusteringColumn("ck", udtCK);
+        commonColumns(cols);
+
+        File dir = temporaryFolder;
+        File sstable = buildFakeSSTable(dir, 1, cols, true);
+
+        SerializationHeader.Component header = readHeader(sstable);
+        assertTrue(header.getKeyType() instanceof CompositeType);
+        CompositeType keyType = (CompositeType) header.getKeyType();
+        assertEquals(Arrays.asList(UTF8Type.instance, udtPK), keyType.getComponents());
+
+        SSTableHeaderFix headerFix = builder().withPath(sstable.toPath())
+                                              .build();
+        headerFix.execute();
+
+        assertFalse(headerFix.hasError());
+        assertTrue(headerFix.hasChanges());
+        assertEquals(Sets.newHashSet("pk2", "ck", "regular_b", "static_b"), updatedColumns);
+
+        header = readHeader(sstable);
+        assertTrue(header.getKeyType() instanceof CompositeType);
+        keyType = (CompositeType) header.getKeyType();
+        assertEquals(Arrays.asList(UTF8Type.instance, udtPK.freeze()), keyType.getComponents());
+    }
+
+    @Test
+    public void compositeClusteringKey() throws Exception
+    {
+        TableMetadata.Builder cols = TableMetadata.builder("ks", "cf")
+                                                  .addPartitionKeyColumn("pk", udtPK)
+                                                  .addClusteringColumn("ck1", Int32Type.instance)
+                                                  .addClusteringColumn("ck2", udtCK);
+        commonColumns(cols);
+
+        File dir = temporaryFolder;
+        File sstable = buildFakeSSTable(dir, 1, cols, true);
+
+        SerializationHeader.Component header = readHeader(sstable);
+        assertEquals(Arrays.asList(Int32Type.instance, udtCK), header.getClusteringTypes());
+
+        SSTableHeaderFix headerFix = builder().withPath(sstable.toPath())
+                                              .build();
+        headerFix.execute();
+
+        assertFalse(headerFix.hasError());
+        assertTrue(headerFix.hasChanges());
+        assertEquals(Sets.newHashSet("pk", "ck2", "regular_b", "static_b"), updatedColumns);
+
+        header = readHeader(sstable);
+        assertEquals(Arrays.asList(Int32Type.instance, udtCK.freeze()), header.getClusteringTypes());
+    }
+
+    /**
+     * Check whether {@link SSTableHeaderFix} can operate on a single file.
+     */
+    @Test
+    public void singleFileUDTFixTest() throws Exception
+    {
+        File dir = temporaryFolder;
+        File sstable = generateFakeSSTable(dir, 1);
+
+        SerializationHeader.Component header = readHeader(sstable);
+        assertFrozenUdt(header, false, true);
+
+        SSTableHeaderFix headerFix = builder().withPath(sstable.toPath())
+                                              .build();
+        headerFix.execute();
+
+        assertTrue(headerFix.hasChanges());
+        assertFalse(headerFix.hasError());
+
+        header = readHeader(sstable);
+        assertFrozenUdt(header, true, true);
+    }
+
+    /**
+     * Check whether {@link SSTableHeaderFix} can operate on a file in a directory.
+     */
+    @Test
+    public void singleDirectoryUDTFixTest() throws Exception
+    {
+        File dir = temporaryFolder;
+        List<File> sstables = IntStream.range(1, 11)
+                                       .mapToObj(g -> generateFakeSSTable(dir, g))
+                                       .collect(Collectors.toList());
+
+        for (File sstable : sstables)
+        {
+            SerializationHeader.Component header = readHeader(sstable);
+            assertFrozenUdt(header, false, true);
+        }
+
+        SSTableHeaderFix headerFix = builder().withPath(dir.toPath())
+                                              .build();
+        headerFix.execute();
+
+        assertTrue(headerFix.hasChanges());
+        assertFalse(headerFix.hasError());
+
+        for (File sstable : sstables)
+        {
+            SerializationHeader.Component header = readHeader(sstable);
+            assertFrozenUdt(header, true, true);
+        }
+    }
+
+    /**
+     * Check whether {@link SSTableHeaderFix} can operate multiple, single files.
+     */
+    @Test
+    public void multipleFilesUDTFixTest() throws Exception
+    {
+        File dir = temporaryFolder;
+        List<File> sstables = IntStream.range(1, 11)
+                                       .mapToObj(g -> generateFakeSSTable(dir, g))
+                                       .collect(Collectors.toList());
+
+        for (File sstable : sstables)
+        {
+            SerializationHeader.Component header = readHeader(sstable);
+            assertFrozenUdt(header, false, true);
+        }
+
+        SSTableHeaderFix.Builder builder = builder();
+        sstables.stream().map(File::toPath).forEach(builder::withPath);
+        SSTableHeaderFix headerFix = builder.build();
+        headerFix.execute();
+
+        assertTrue(headerFix.hasChanges());
+        assertFalse(headerFix.hasError());
+
+        for (File sstable : sstables)
+        {
+            SerializationHeader.Component header = readHeader(sstable);
+            assertFrozenUdt(header, true, true);
+        }
+    }
+
+    /**
+     * Check whether {@link SSTableHeaderFix} can operate multiple files in a directory.
+     */
+    @Test
+    public void multipleFilesInDirectoryUDTFixTest() throws Exception
+    {
+        File dir = temporaryFolder;
+        List<File> sstables = IntStream.range(1, 11)
+                                       .mapToObj(g -> generateFakeSSTable(dir, g))
+                                       .collect(Collectors.toList());
+
+        for (File sstable : sstables)
+        {
+            SerializationHeader.Component header = readHeader(sstable);
+            assertFrozenUdt(header, false, true);
+        }
+
+        SSTableHeaderFix headerFix = builder().withPath(dir.toPath())
+                                              .build();
+        headerFix.execute();
+
+        assertTrue(headerFix.hasChanges());
+        assertFalse(headerFix.hasError());
+
+        for (File sstable : sstables)
+        {
+            SerializationHeader.Component header = readHeader(sstable);
+            assertFrozenUdt(header, true, true);
+        }
+    }
+
+    private static final Pattern p = Pattern.compile(".* Column '([^']+)' needs to be updated from type .*");
+
+    private SSTableHeaderFix.Builder builder()
+    {
+        updatedColumns.clear();
+        return SSTableHeaderFix.builder()
+                               .schemaCallback(() -> (desc) -> tableMetadata)
+                               .info(ln -> {
+                                   System.out.println("INFO: " + ln);
+                                   Matcher m = p.matcher(ln);
+                                   if (m.matches())
+                                       updatedColumns.add(m.group(1));
+                               })
+                               .warn(ln -> System.out.println("WARN: " + ln))
+                               .error(ln -> System.out.println("ERROR: " + ln));
+    }
+
+    private File generateFakeSSTable(File dir, int generation)
+    {
+        TableMetadata.Builder cols = TableMetadata.builder("ks", "cf")
+                                                  .addPartitionKeyColumn("pk", udtPK)
+                                                  .addClusteringColumn("ck", udtCK);
+        commonColumns(cols);
+        return buildFakeSSTable(dir, generation, cols, true);
+    }
+
+    private void commonColumns(TableMetadata.Builder cols)
+    {
+        cols.addRegularColumn("regular_a", UTF8Type.instance)
+            .addRegularColumn("regular_b", udtRegular)
+            .addRegularColumn("regular_c", Int32Type.instance)
+            .addStaticColumn("static_a", UTF8Type.instance)
+            .addStaticColumn("static_b", udtStatic)
+            .addStaticColumn("static_c", Int32Type.instance);
+    }
+
+    private File buildFakeSSTable(File dir, int generation, TableMetadata.Builder cols, boolean freezeInSchema)
+    {
+        return buildFakeSSTable(dir, generation, cols, freezeInSchema
+                                                       ? c -> c.withNewType(freezeUdt(c.type))
+                                                       : c -> c);
+    }
+
+    private File buildFakeSSTable(File dir, int generation, TableMetadata.Builder cols, Function<ColumnMetadata, ColumnMetadata> freezer)
+    {
+        TableMetadata headerMetadata = cols.build();
+
+        TableMetadata.Builder schemaCols = TableMetadata.builder("ks", "cf");
+        for (ColumnMetadata cm : cols.columns())
+            schemaCols.addColumn(freezer.apply(cm));
+        tableMetadata = schemaCols.build();
+
+        try
+        {
+
+            Descriptor desc = new Descriptor(version, dir, "ks", "cf", generation, SSTableFormat.Type.BIG);
+
+            // Just create the component files - we don't really need those.
+            for (Component component : requiredComponents)
+                assertTrue(new File(desc.filenameFor(component)).createNewFile());
+
+            AbstractType<?> partitionKey = headerMetadata.partitionKeyType;
+            List<AbstractType<?>> clusteringKey = headerMetadata.clusteringColumns()
+                                                                .stream()
+                                                                .map(cd -> cd.type)
+                                                                .collect(Collectors.toList());
+            Map<ByteBuffer, AbstractType<?>> staticColumns = headerMetadata.columns()
+                                                                           .stream()
+                                                                           .filter(cd -> cd.kind == ColumnMetadata.Kind.STATIC)
+                                                                           .collect(Collectors.toMap(cd -> cd.name.bytes, cd -> cd.type, (a, b) -> a));
+            Map<ByteBuffer, AbstractType<?>> regularColumns = headerMetadata.columns()
+                                                                            .stream()
+                                                                            .filter(cd -> cd.kind == ColumnMetadata.Kind.REGULAR)
+                                                                            .collect(Collectors.toMap(cd -> cd.name.bytes, cd -> cd.type, (a, b) -> a));
+
+            File statsFile = new File(desc.filenameFor(Component.STATS));
+            SerializationHeader.Component header = SerializationHeader.Component.buildComponentForTools(partitionKey,
+                                                                                                        clusteringKey,
+                                                                                                        staticColumns,
+                                                                                                        regularColumns,
+                                                                                                        EncodingStats.NO_STATS);
+
+            try (SequentialWriter out = new SequentialWriter(statsFile))
+            {
+                desc.getMetadataSerializer().serialize(Collections.singletonMap(MetadataType.HEADER, header), out, version);
+                out.finish();
+            }
+
+            return new File(desc.filenameFor(Component.DATA));
+        }
+        catch (Exception e)
+        {
+            throw new RuntimeException(e);
+        }
+    }
+
+    private AbstractType<?> freezeUdt(AbstractType<?> type)
+    {
+        if (type instanceof CollectionType)
+        {
+            if (type.getClass() == ListType.class)
+            {
+                ListType<?> cHeader = (ListType<?>) type;
+                return ListType.getInstance(freezeUdt(cHeader.getElementsType()), cHeader.isMultiCell());
+            }
+            else if (type.getClass() == SetType.class)
+            {
+                SetType<?> cHeader = (SetType<?>) type;
+                return SetType.getInstance(freezeUdt(cHeader.getElementsType()), cHeader.isMultiCell());
+            }
+            else if (type.getClass() == MapType.class)
+            {
+                MapType<?, ?> cHeader = (MapType<?, ?>) type;
+                return MapType.getInstance(freezeUdt(cHeader.getKeysType()), freezeUdt(cHeader.getValuesType()), cHeader.isMultiCell());
+            }
+        }
+        else if (type instanceof AbstractCompositeType)
+        {
+            if (type.getClass() == CompositeType.class)
+            {
+                CompositeType cHeader = (CompositeType) type;
+                return CompositeType.getInstance(cHeader.types.stream().map(this::freezeUdt).collect(Collectors.toList()));
+            }
+        }
+        else if (type instanceof TupleType)
+        {
+            if (type.getClass() == UserType.class)
+            {
+                UserType cHeader = (UserType) type;
+                cHeader = cHeader.freeze();
+                return new UserType(cHeader.keyspace, cHeader.name, cHeader.fieldNames(),
+                                    cHeader.allTypes().stream().map(this::freezeUdt).collect(Collectors.toList()),
+                                    cHeader.isMultiCell());
+            }
+        }
+        return type;
+    }
+
+    private void assertFrozenUdt(SerializationHeader.Component header, boolean frozen, boolean checkInner)
+    {
+        AbstractType<?> keyType = header.getKeyType();
+        if (keyType instanceof CompositeType)
+        {
+            for (AbstractType<?> component : ((CompositeType) keyType).types)
+                assertFrozenUdt("partition-key-component", component, frozen, checkInner);
+        }
+        assertFrozenUdt("partition-key", keyType, frozen, checkInner);
+
+        for (AbstractType<?> type : header.getClusteringTypes())
+            assertFrozenUdt("clustering-part", type, frozen, checkInner);
+        for (Map.Entry<ByteBuffer, AbstractType<?>> col : header.getStaticColumns().entrySet())
+            assertFrozenUdt(UTF8Type.instance.compose(col.getKey()), col.getValue(), frozen, checkInner);
+        for (Map.Entry<ByteBuffer, AbstractType<?>> col : header.getRegularColumns().entrySet())
+            assertFrozenUdt(UTF8Type.instance.compose(col.getKey()), col.getValue(), frozen, checkInner);
+    }
+
+    private void assertFrozenUdt(String name, AbstractType<?> type, boolean frozen, boolean checkInner)
+    {
+        if (type instanceof CompositeType)
+        {
+            if (checkInner)
+                for (AbstractType<?> component : ((CompositeType) type).types)
+                    assertFrozenUdt(name, component, frozen, true);
+        }
+        else if (type instanceof CollectionType)
+        {
+            if (checkInner)
+            {
+                if (type instanceof MapType)
+                {
+                    MapType map = (MapType) type;
+                    // only descend for non-frozen types (checking frozen in frozen is just stupid)
+                    if (map.isMultiCell())
+                    {
+                        assertFrozenUdt(name + "<map-key>", map.getKeysType(), frozen, true);
+                        assertFrozenUdt(name + "<map-value>", map.getValuesType(), frozen, true);
+                    }
+                }
+                else if (type instanceof SetType)
+                {
+                    SetType set = (SetType) type;
+                    // only descend for non-frozen types (checking frozen in frozen is just stupid)
+                    if (set.isMultiCell())
+                        assertFrozenUdt(name + "<set>", set.getElementsType(), frozen, true);
+                }
+                else if (type instanceof ListType)
+                {
+                    ListType list = (ListType) type;
+                    // only descend for non-frozen types (checking frozen in frozen is just stupid)
+                    if (list.isMultiCell())
+                        assertFrozenUdt(name + "<list>", list.getElementsType(), frozen, true);
+                }
+            }
+        }
+        else if (type instanceof TupleType)
+        {
+            if (checkInner)
+            {
+                TupleType tuple = (TupleType) type;
+                // only descend for non-frozen types (checking frozen in frozen is just stupid)
+                if (tuple.isMultiCell())
+                    for (AbstractType<?> component : tuple.allTypes())
+                        assertFrozenUdt(name + "<tuple>", component, frozen, true);
+            }
+        }
+
+        if (type instanceof UserType)
+        {
+            String typeString = type.toString();
+            assertEquals(name + ": " + typeString, frozen, !type.isMultiCell());
+            if (typeString.startsWith(UserType.class.getName() + '('))
+                if (frozen)
+                    fail(name + ": " + typeString);
+            if (typeString.startsWith(FrozenType.class.getName() + '(' + UserType.class.getName() + '('))
+                if (!frozen)
+                    fail(name + ": " + typeString);
+        }
+    }
+
+    private SerializationHeader.Component readHeader(File sstable) throws Exception
+    {
+        Descriptor desc = Descriptor.fromFilename(sstable);
+        return (SerializationHeader.Component) desc.getMetadataSerializer().deserialize(desc, MetadataType.HEADER);
+    }
+
+    private static final Component[] requiredComponents = new Component[]{ Component.DATA, Component.FILTER, Component.PRIMARY_INDEX, Component.TOC };
+}
diff --git a/test/unit/org/apache/cassandra/schema/TupleTypesRepresentationTest.java b/test/unit/org/apache/cassandra/schema/TupleTypesRepresentationTest.java
new file mode 100644
index 0000000..e6daa1f
--- /dev/null
+++ b/test/unit/org/apache/cassandra/schema/TupleTypesRepresentationTest.java
@@ -0,0 +1,403 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.schema;
+
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+
+import org.junit.Ignore;
+import org.junit.Test;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.cql3.CQL3Type;
+import org.apache.cassandra.cql3.CQLFragmentParser;
+import org.apache.cassandra.cql3.CqlParser;
+import org.apache.cassandra.cql3.FieldIdentifier;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.marshal.TypeParser;
+import org.apache.cassandra.db.marshal.UTF8Type;
+import org.apache.cassandra.db.marshal.UserType;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Verifies that the string representations of {@link AbstractType} and {@link CQL3Type} are as expected and compatible.
+ *
+ * C* 3.0 is known to <em>not</em> enclose a frozen UDT in a "frozen bracket" in the {@link AbstractType}.
+ * The string representation of a frozuen UDT using the {@link CQL3Type} type hierarchy is correct in C* 3.0.
+ */
+public class TupleTypesRepresentationTest
+{
+    static
+    {
+        DatabaseDescriptor.toolInitialization();
+    }
+
+    private static final String keyspace = "ks";
+    private static final String mcUdtName = "mc_udt";
+    private static final ByteBuffer mcUdtNameBytes = ByteBufferUtil.bytes(mcUdtName);
+    private static final String iUdtName = "i_udt";
+    private static final ByteBuffer iUdtNameBytes = ByteBufferUtil.bytes(iUdtName);
+    private static final String fUdtName = "f_udt";
+    private static final ByteBuffer fUdtNameBytes = ByteBufferUtil.bytes(fUdtName);
+    private static final String udtField1 = "a";
+    private static final String udtField2 = "b";
+    private static final AbstractType<?> udtType1 = UTF8Type.instance;
+    private static final AbstractType<?> udtType2 = UTF8Type.instance;
+
+    private static final Types types = Types.builder()
+                                            .add(new UserType(keyspace,
+                                                              mcUdtNameBytes,
+                                                              Arrays.asList(new FieldIdentifier(ByteBufferUtil.bytes(udtField1)),
+                                                                            new FieldIdentifier(ByteBufferUtil.bytes(udtField2))),
+                                                              Arrays.asList(udtType1,
+                                                                            udtType2),
+                                                              true))
+                                            .add(new UserType(keyspace,
+                                                              iUdtNameBytes,
+                                                              Arrays.asList(new FieldIdentifier(ByteBufferUtil.bytes(udtField1)),
+                                                                            new FieldIdentifier(ByteBufferUtil.bytes(udtField2))),
+                                                              Arrays.asList(udtType1,
+                                                                            udtType2),
+                                                              true))
+                                            .add(new UserType(keyspace,
+                                                              fUdtNameBytes,
+                                                              Arrays.asList(new FieldIdentifier(ByteBufferUtil.bytes(udtField1)),
+                                                                            new FieldIdentifier(ByteBufferUtil.bytes(udtField2))),
+                                                              Arrays.asList(udtType1,
+                                                                            udtType2),
+                                                              true))
+                                            .build();
+
+    static class TypeDef
+    {
+        final String typeString;
+        final String cqlTypeString;
+        final String droppedCqlTypeString;
+        final boolean multiCell;
+        final String cqlValue;
+
+        final AbstractType<?> type;
+        final CQL3Type cqlType;
+
+        final AbstractType<?> droppedType;
+        final CQL3Type droppedCqlType;
+
+        TypeDef(String typeString, String cqlTypeString, String droppedCqlTypeString, boolean multiCell, String cqlValue)
+        {
+            this.typeString = typeString;
+            this.cqlTypeString = cqlTypeString;
+            this.droppedCqlTypeString = droppedCqlTypeString;
+            this.multiCell = multiCell;
+            this.cqlValue = cqlValue;
+
+            cqlType = CQLFragmentParser.parseAny(CqlParser::comparatorType, cqlTypeString, "non-dropped type")
+                                       .prepare(keyspace, types);
+            type = cqlType.getType();
+
+            droppedCqlType = CQLFragmentParser.parseAny(CqlParser::comparatorType, droppedCqlTypeString, "dropped type")
+                                              .prepare(keyspace, types);
+            // NOTE: TupleType is *always* parsed as frozen, but never toString()'d with the surrounding FrozenType
+            droppedType = droppedCqlType.getType();
+        }
+
+        @Override
+        public String toString()
+        {
+            return "TypeDef{\n" +
+                   "typeString='" + typeString + "'\n" +
+                   ", type=" + type + '\n' +
+                   ", cqlTypeString='" + cqlTypeString + "'\n" +
+                   ", cqlType=" + cqlType + '\n' +
+                   ", droppedType=" + droppedType + '\n' +
+                   ", droppedCqlTypeString='" + droppedCqlTypeString + "'\n" +
+                   ", droppedCqlType=" + droppedCqlType + '\n' +
+                   '}';
+        }
+    }
+
+    private static final TypeDef text = new TypeDef(
+            "org.apache.cassandra.db.marshal.UTF8Type",
+            "text",
+            "text",
+            false,
+            "'foobar'");
+
+    private static final TypeDef tuple_text__text_ = new TypeDef(
+            "org.apache.cassandra.db.marshal.TupleType(org.apache.cassandra.db.marshal.UTF8Type,org.apache.cassandra.db.marshal.UTF8Type)",
+            "tuple<text, text>",
+            "frozen<tuple<text, text>>",
+            false,
+            "('foo','bar')");
+
+    // Currently, dropped non-frozen-UDT columns are recorded as frozen<tuple<...>>, which is technically wrong
+    //private static final TypeDef mc_udt = new TypeDef(
+    //        "org.apache.cassandra.db.marshal.UserType(ks,6d635f756474,61:org.apache.cassandra.db.marshal.UTF8Type,62:org.apache.cassandra.db.marshal.UTF8Type)",
+    //        "mc_udt",
+    //        "tuple<text, text>",
+    //        true,
+    //        "{a:'foo',b:'bar'}");
+
+    private static final TypeDef frozen_f_udt_ = new TypeDef(
+            "org.apache.cassandra.db.marshal.FrozenType(org.apache.cassandra.db.marshal.UserType(ks,665f756474,61:org.apache.cassandra.db.marshal.UTF8Type,62:org.apache.cassandra.db.marshal.UTF8Type))",
+            "frozen<f_udt>",
+            "frozen<tuple<text, text>>",
+            false,
+            "{a:'foo',b:'bar'}");
+
+    private static final TypeDef list_text_ = new TypeDef(
+            "org.apache.cassandra.db.marshal.ListType(org.apache.cassandra.db.marshal.UTF8Type)",
+            "list<text>",
+            "list<text>",
+            true,
+            "['foobar']");
+
+    private static final TypeDef frozen_list_text__ = new TypeDef(
+            "org.apache.cassandra.db.marshal.FrozenType(org.apache.cassandra.db.marshal.ListType(org.apache.cassandra.db.marshal.UTF8Type))",
+            "frozen<list<text>>",
+            "frozen<list<text>>",
+            true,
+            "['foobar']");
+
+    private static final TypeDef set_text_ = new TypeDef(
+            "org.apache.cassandra.db.marshal.SetType(org.apache.cassandra.db.marshal.UTF8Type)",
+            "set<text>",
+            "set<text>",
+            true,
+            "{'foobar'}");
+
+    private static final TypeDef frozen_set_text__ = new TypeDef(
+            "org.apache.cassandra.db.marshal.FrozenType(org.apache.cassandra.db.marshal.SetType(org.apache.cassandra.db.marshal.UTF8Type))",
+            "frozen<set<text>>",
+            "frozen<set<text>>",
+            true,
+            "{'foobar'}");
+
+    private static final TypeDef map_text__text_ = new TypeDef(
+            "org.apache.cassandra.db.marshal.MapType(org.apache.cassandra.db.marshal.UTF8Type,org.apache.cassandra.db.marshal.UTF8Type)",
+            "map<text, text>",
+            "map<text, text>",
+            true,
+            "{'foo':'bar'}");
+
+    private static final TypeDef frozen_map_text__text__ = new TypeDef(
+            "org.apache.cassandra.db.marshal.FrozenType(org.apache.cassandra.db.marshal.MapType(org.apache.cassandra.db.marshal.UTF8Type,org.apache.cassandra.db.marshal.UTF8Type))",
+            "frozen<map<text, text>>",
+            "frozen<map<text, text>>",
+            true,
+            "{'foo':'bar'}");
+
+    private static final TypeDef list_frozen_tuple_text__text___ = new TypeDef(
+            // in consequence, this should be:
+            // "org.apache.cassandra.db.marshal.ListType(org.apache.cassandra.db.marshal.FrozenType(org.apache.cassandra.db.marshal.TupleType(org.apache.cassandra.db.marshal.UTF8Type,org.apache.cassandra.db.marshal.UTF8Type)))",
+            "org.apache.cassandra.db.marshal.ListType(org.apache.cassandra.db.marshal.TupleType(org.apache.cassandra.db.marshal.UTF8Type,org.apache.cassandra.db.marshal.UTF8Type))",
+            "list<frozen<tuple<text, text>>>",
+            "list<frozen<tuple<text, text>>>",
+            true,
+            "[('foo','bar')]");
+
+    private static final TypeDef frozen_list_tuple_text__text___ = new TypeDef(
+            "org.apache.cassandra.db.marshal.FrozenType(org.apache.cassandra.db.marshal.ListType(org.apache.cassandra.db.marshal.TupleType(org.apache.cassandra.db.marshal.UTF8Type,org.apache.cassandra.db.marshal.UTF8Type)))",
+            "frozen<list<frozen<tuple<text, text>>>>",
+            "frozen<list<frozen<tuple<text, text>>>>",
+            true,
+            "[('foo','bar')]");
+
+    private static final TypeDef set_frozen_tuple_text__text___ = new TypeDef(
+            // in consequence, this should be:
+            // "org.apache.cassandra.db.marshal.SetType(org.apache.cassandra.db.marshal.FrozenType(org.apache.cassandra.db.marshal.TupleType(org.apache.cassandra.db.marshal.UTF8Type,org.apache.cassandra.db.marshal.UTF8Type)))",
+            "org.apache.cassandra.db.marshal.SetType(org.apache.cassandra.db.marshal.TupleType(org.apache.cassandra.db.marshal.UTF8Type,org.apache.cassandra.db.marshal.UTF8Type))",
+            "set<frozen<tuple<text, text>>>",
+            "set<frozen<tuple<text, text>>>",
+            true,
+            "{('foo','bar')}");
+
+    private static final TypeDef frozen_set_tuple_text__text___ = new TypeDef(
+            "org.apache.cassandra.db.marshal.FrozenType(org.apache.cassandra.db.marshal.SetType(org.apache.cassandra.db.marshal.TupleType(org.apache.cassandra.db.marshal.UTF8Type,org.apache.cassandra.db.marshal.UTF8Type)))",
+            "frozen<set<frozen<tuple<text, text>>>>",
+            "frozen<set<frozen<tuple<text, text>>>>",
+            true,
+            "{('foo','bar')}");
+
+    private static final TypeDef map_text__frozen_tuple_text__text___ = new TypeDef(
+            // in consequence, this should be:
+            // "org.apache.cassandra.db.marshal.MapType(org.apache.cassandra.db.marshal.UTF8Type,org.apache.cassandra.db.marshal.FrozenType(org.apache.cassandra.db.marshal.TupleType(org.apache.cassandra.db.marshal.UTF8Type,org.apache.cassandra.db.marshal.UTF8Type)))",
+            "org.apache.cassandra.db.marshal.MapType(org.apache.cassandra.db.marshal.UTF8Type,org.apache.cassandra.db.marshal.TupleType(org.apache.cassandra.db.marshal.UTF8Type,org.apache.cassandra.db.marshal.UTF8Type))",
+            "map<text, frozen<tuple<text, text>>>",
+            "map<text, frozen<tuple<text, text>>>",
+            true,
+            "{'foobar':('foo','bar')}");
+
+    private static final TypeDef frozen_map_text__tuple_text__text___ = new TypeDef(
+            "org.apache.cassandra.db.marshal.FrozenType(org.apache.cassandra.db.marshal.MapType(org.apache.cassandra.db.marshal.UTF8Type,org.apache.cassandra.db.marshal.TupleType(org.apache.cassandra.db.marshal.UTF8Type,org.apache.cassandra.db.marshal.UTF8Type)))",
+            "frozen<map<text, frozen<tuple<text, text>>>>",
+            "frozen<map<text, frozen<tuple<text, text>>>>",
+            true,
+            "{'foobar':('foo','bar')}");
+
+    private static final TypeDef list_frozen_i_udt__ = new TypeDef(
+            "org.apache.cassandra.db.marshal.ListType(org.apache.cassandra.db.marshal.FrozenType(org.apache.cassandra.db.marshal.UserType(ks,695f756474,61:org.apache.cassandra.db.marshal.UTF8Type,62:org.apache.cassandra.db.marshal.UTF8Type)))",
+            "list<frozen<i_udt>>",
+            "list<frozen<tuple<text, text>>>",
+            true,
+            "[{a:'foo',b:'bar'}]");
+
+    private static final TypeDef frozen_list_i_udt__ = new TypeDef(
+            "org.apache.cassandra.db.marshal.FrozenType(org.apache.cassandra.db.marshal.ListType(org.apache.cassandra.db.marshal.UserType(ks,695f756474,61:org.apache.cassandra.db.marshal.UTF8Type,62:org.apache.cassandra.db.marshal.UTF8Type)))",
+            "frozen<list<frozen<i_udt>>>",
+            "frozen<list<frozen<tuple<text, text>>>>",
+            true,
+            "[{a:'foo',b:'bar'}]");
+
+    private static final TypeDef set_frozen_i_udt__ = new TypeDef(
+            "org.apache.cassandra.db.marshal.SetType(org.apache.cassandra.db.marshal.FrozenType(org.apache.cassandra.db.marshal.UserType(ks,695f756474,61:org.apache.cassandra.db.marshal.UTF8Type,62:org.apache.cassandra.db.marshal.UTF8Type)))",
+            "set<frozen<i_udt>>",
+            "set<frozen<tuple<text, text>>>",
+            true,
+            "{{a:'foo',b:'bar'}}");
+
+    private static final TypeDef frozen_set_i_udt__ = new TypeDef(
+            "org.apache.cassandra.db.marshal.FrozenType(org.apache.cassandra.db.marshal.SetType(org.apache.cassandra.db.marshal.UserType(ks,695f756474,61:org.apache.cassandra.db.marshal.UTF8Type,62:org.apache.cassandra.db.marshal.UTF8Type)))",
+            "frozen<set<frozen<i_udt>>>",
+            "frozen<set<frozen<tuple<text, text>>>>",
+            true,
+            "{{a:'foo',b:'bar'}}");
+
+    private static final TypeDef map_text__frozen_i_udt__ = new TypeDef(
+            "org.apache.cassandra.db.marshal.MapType(org.apache.cassandra.db.marshal.UTF8Type,org.apache.cassandra.db.marshal.FrozenType(org.apache.cassandra.db.marshal.UserType(ks,695f756474,61:org.apache.cassandra.db.marshal.UTF8Type,62:org.apache.cassandra.db.marshal.UTF8Type)))",
+            "map<text, frozen<i_udt>>",
+            "map<text, frozen<tuple<text, text>>>",
+            true,
+            "{'foobar':{a:'foo',b:'bar'}}");
+
+    private static final TypeDef frozen_map_text__i_udt__ = new TypeDef(
+            "org.apache.cassandra.db.marshal.FrozenType(org.apache.cassandra.db.marshal.MapType(org.apache.cassandra.db.marshal.UTF8Type,org.apache.cassandra.db.marshal.UserType(ks,695f756474,61:org.apache.cassandra.db.marshal.UTF8Type,62:org.apache.cassandra.db.marshal.UTF8Type)))",
+            "frozen<map<text, frozen<i_udt>>>",
+            "frozen<map<text, frozen<tuple<text, text>>>>",
+            true,
+            "{'foobar':{a:'foo',b:'bar'}}");
+
+    private static final TypeDef[] allTypes = {
+            text,
+            tuple_text__text_,
+            frozen_f_udt_,
+            list_text_,
+            frozen_list_text__,
+            set_text_,
+            frozen_set_text__,
+            map_text__text_,
+            frozen_map_text__text__,
+            list_frozen_tuple_text__text___,
+            frozen_list_tuple_text__text___,
+            set_frozen_tuple_text__text___,
+            frozen_set_tuple_text__text___,
+            map_text__frozen_tuple_text__text___,
+            frozen_map_text__tuple_text__text___,
+            list_frozen_i_udt__,
+            frozen_list_i_udt__,
+            set_frozen_i_udt__,
+            frozen_set_i_udt__,
+            map_text__frozen_i_udt__,
+            frozen_map_text__i_udt__,
+            };
+
+    @Ignore("Only used to ")
+    @Test
+    public void generateCqlStatements() throws InterruptedException
+    {
+        StringWriter sw = new StringWriter();
+        PrintWriter pw = new PrintWriter(sw);
+
+        pw.println("DROP TABLE sstableheaderfixtest;");
+        pw.println();
+        pw.println("CREATE TYPE i_udt (a text, b text);");
+        pw.println("CREATE TYPE f_udt (a text, b text);");
+        pw.println("CREATE TYPE mc_udt (a text, b text);");
+        pw.println();
+        pw.println("CREATE TABLE sstableheaderfixtest (");
+        pw.print("  id int PRIMARY KEY");
+        for (TypeDef typeDef : allTypes)
+        {
+            String cname = typeDef.cqlTypeString.replaceAll("[, <>]", "_");
+            pw.printf(",%n  %s %s", cname, typeDef.cqlTypeString);
+        }
+        pw.println(");");
+        pw.println();
+
+        pw.printf("INSERT INTO sstableheaderfixtest%n  (id");
+        for (TypeDef typeDef : allTypes)
+        {
+            String cname = typeDef.cqlTypeString.replaceAll("[, <>]", "_");
+            pw.printf(",%n    %s", cname);
+        }
+        pw.printf(")%n  VALUES%n  (1");
+        for (TypeDef typeDef : allTypes)
+        {
+            pw.printf(",%n    %s", typeDef.cqlValue);
+        }
+        pw.println(");");
+
+        pw.println();
+        pw.println();
+        pw.println("-- Run tools/bin/sstablemetadata data/data/<keyspace>/<table>/*-Data.db to show the sstable");
+        pw.println("-- serialization-header (types not shown in the C* 3.0 variant of the sstablemetadata tool)");
+
+        sw.flush();
+
+        System.out.println(sw.toString());
+
+        Thread.sleep(1000);
+    }
+
+    @Test
+    public void verifyTypes()
+    {
+        AssertionError master = null;
+        for (TypeDef typeDef : allTypes)
+        {
+            try
+            {
+                assertEquals(typeDef.toString() + "\n typeString vs type\n", typeDef.typeString, typeDef.type.toString());
+                assertEquals(typeDef.toString() + "\n typeString vs cqlType.getType()\n", typeDef.typeString, typeDef.cqlType.getType().toString());
+                AbstractType<?> expanded = typeDef.type.expandUserTypes();
+                CQL3Type expandedCQL = expanded.asCQL3Type();
+                // Note: cannot include this commented-out assertion, because the parsed CQL3Type instance for
+                // 'frozen<list<tuple<text, text>>>' returns 'frozen<list<frozen<tuple<text, text>>>>' via it's CQL3Type.toString()
+                // implementation.
+                assertEquals(typeDef.toString() + "\n droppedCqlType\n", typeDef.droppedCqlType, expandedCQL);
+                assertEquals(typeDef.toString() + "\n droppedCqlTypeString\n", typeDef.droppedCqlTypeString, expandedCQL.toString());
+                assertEquals(typeDef.toString() + "\n multiCell\n", typeDef.type.isMultiCell(), typeDef.droppedType.isMultiCell());
+
+                AbstractType<?> parsedType = TypeParser.parse(typeDef.typeString);
+                assertEquals(typeDef.toString(), typeDef.typeString, parsedType.toString());
+            }
+            catch (AssertionError ae)
+            {
+                if (master == null)
+                    master = ae;
+                else
+                    master.addSuppressed(ae);
+            }
+        }
+        if (master != null)
+            throw master;
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org