You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ad...@apache.org on 2020/10/13 15:21:09 UTC

[cassandra] branch trunk updated (2ae1ec5 -> d890b7a)

This is an automated email from the ASF dual-hosted git repository.

adelapena pushed a change to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git.


    from 2ae1ec5  Merge branch 'cassandra-3.11' into trunk
     new 0700dfa  Check SSTables for latest version before dropping compact storage
     new 45982f5  Merge branch 'cassandra-3.0' into cassandra-3.11
     new d890b7a  Merge branch 'cassandra-3.11' into trunk

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 CHANGES.txt                                        |  1 +
 .../cassandra/exceptions/StartupException.java     |  1 +
 .../apache/cassandra/schema/SchemaKeyspace.java    | 32 ++++++++++++++++++++++
 .../org/apache/cassandra/schema/TableMetadata.java | 12 --------
 .../apache/cassandra/service/StartupChecks.java    |  4 +++
 5 files changed, 38 insertions(+), 12 deletions(-)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org


[cassandra] 01/01: Merge branch 'cassandra-3.11' into trunk

Posted by ad...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

adelapena pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit d890b7a5f14108bf60fbd52f94eefb53aaab1e18
Merge: 2ae1ec5 45982f5
Author: Andrés de la Peña <a....@gmail.com>
AuthorDate: Tue Oct 13 16:19:51 2020 +0100

    Merge branch 'cassandra-3.11' into trunk
    
    # Conflicts:
    #	src/java/org/apache/cassandra/cql3/statements/AlterTableStatement.java

 CHANGES.txt                                        |  1 +
 .../cassandra/exceptions/StartupException.java     |  1 +
 .../apache/cassandra/schema/SchemaKeyspace.java    | 32 ++++++++++++++++++++++
 .../org/apache/cassandra/schema/TableMetadata.java | 12 --------
 .../apache/cassandra/service/StartupChecks.java    |  4 +++
 5 files changed, 38 insertions(+), 12 deletions(-)

diff --cc CHANGES.txt
index 6829dac,98c23c5..a7701c7
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,32 -1,10 +1,33 @@@
 -3.11.9
 +4.0-beta3
++ * Move compact storage validation earlier in startup process (CASSANDRA-16063)
 + * Fix ByteBufferAccessor cast exceptions are thrown when trying to query a virtual table (CASSANDRA-16155)
 + * Consolidate node liveness check for forced repair (CASSANDRA-16113)
 + * Use unsigned short in ValueAccessor.sliceWithShortLength (CASSANDRA-16147)
 + * Abort repairs when getting a truncation request (CASSANDRA-15854)
 + * Remove bad assert when getting active compactions for an sstable (CASSANDRA-15457)
 + * Avoid failing compactions with very large partitions (CASSANDRA-15164)
 + * Prevent NPE in StreamMessage in type lookup (CASSANDRA-16131)
 + * Avoid invalid state transition exception during incremental repair (CASSANDRA-16067)
 + * Allow zero padding in timestamp serialization (CASSANDRA-16105)
 + * Add byte array backed cells (CASSANDRA-15393)
 + * Correctly handle pending ranges with adjacent range movements (CASSANDRA-14801)
 + * Avoid adding locahost when streaming trivial ranges (CASSANDRA-16099)
 + * Add nodetool getfullquerylog (CASSANDRA-15988)
 + * Fix yaml format and alignment in tpstats (CASSANDRA-11402)
 + * Avoid trying to keep track of RTs for endpoints we won't write to during read repair (CASSANDRA-16084)
 + * When compaction gets interrupted, the exception should include the compactionId (CASSANDRA-15954)
 + * Make Table/Keyspace Metric Names Consistent With Each Other (CASSANDRA-15909)
 + * Mutating sstable component may race with entire-sstable-streaming(ZCS) causing checksum validation failure (CASSANDRA-15861)
 + * NPE thrown while updating speculative execution time if keyspace is removed during task execution (CASSANDRA-15949)
 + * Show the progress of data streaming and index build (CASSANDRA-15406)
 + * Add flag to disable chunk cache and disable by default (CASSANDRA-16036)
 + * Upgrade to snakeyaml >= 1.26 version for CVE-2017-18640 fix (CASSANDRA-16150)
 +Merged from 3.11:
   * Fix memory leak in CompressedChunkReader (CASSANDRA-15880)
   * Don't attempt value skipping with mixed version cluster (CASSANDRA-15833)
 - * Avoid failing compactions with very large partitions (CASSANDRA-15164)
 + * Use IF NOT EXISTS for index and UDT create statements in snapshot schema files (CASSANDRA-13935)
   * Make sure LCS handles duplicate sstable added/removed notifications correctly (CASSANDRA-14103)
  Merged from 3.0:
 - * Check SSTables for latest version before dropping compact storage (CASSANDRA-16063)
   * Handle unexpected columns due to schema races (CASSANDRA-15899)
   * Add flag to ignore unreplicated keyspaces during repair (CASSANDRA-15160)
  Merged from 2.2:
diff --cc src/java/org/apache/cassandra/exceptions/StartupException.java
index 1513cf9,1513cf9..85fd64e
--- a/src/java/org/apache/cassandra/exceptions/StartupException.java
+++ b/src/java/org/apache/cassandra/exceptions/StartupException.java
@@@ -26,6 -26,6 +26,7 @@@ public class StartupException extends E
      public final static int ERR_WRONG_MACHINE_STATE = 1;
      public final static int ERR_WRONG_DISK_STATE = 3;
      public final static int ERR_WRONG_CONFIG = 100;
++    public final static int ERR_OUTDATED_SCHEMA = 101;
  
      public final int returnCode;
  
diff --cc src/java/org/apache/cassandra/schema/SchemaKeyspace.java
index c404403,8b7ac84..0333ee6
--- a/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
+++ b/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
@@@ -39,11 -41,10 +39,12 @@@ import org.apache.cassandra.db.filter.C
  import org.apache.cassandra.db.marshal.*;
  import org.apache.cassandra.db.partitions.*;
  import org.apache.cassandra.db.rows.*;
 -import org.apache.cassandra.db.filter.ColumnFilter;
 -import org.apache.cassandra.db.view.View;
 -import org.apache.cassandra.exceptions.ConfigurationException;
  import org.apache.cassandra.exceptions.InvalidRequestException;
++import org.apache.cassandra.exceptions.StartupException;
 +import org.apache.cassandra.service.reads.SpeculativeRetryPolicy;
 +import org.apache.cassandra.schema.ColumnMetadata.ClusteringOrder;
 +import org.apache.cassandra.schema.Keyspaces.KeyspacesDiff;
 +import org.apache.cassandra.service.reads.repair.ReadRepairStrategy;
  import org.apache.cassandra.transport.ProtocolVersion;
  import org.apache.cassandra.utils.ByteBufferUtil;
  import org.apache.cassandra.utils.FBUtilities;
@@@ -51,10 -53,9 +52,11 @@@
  import static java.lang.String.format;
  
  import static java.util.stream.Collectors.toList;
 +import static java.util.stream.Collectors.toSet;
 +
++import static org.apache.cassandra.cql3.ColumnIdentifier.maybeQuote;
  import static org.apache.cassandra.cql3.QueryProcessor.executeInternal;
  import static org.apache.cassandra.cql3.QueryProcessor.executeOnceInternal;
 -import static org.apache.cassandra.schema.CQLTypeParser.parse;
  
  /**
   * system_schema.* tables and methods for manipulating them.
@@@ -857,6 -924,6 +859,36 @@@ public final class SchemaKeyspac
          return fetchKeyspacesWithout(SchemaConstants.LOCAL_SYSTEM_KEYSPACE_NAMES);
      }
  
++    public static void validateNonCompact() throws StartupException
++    {
++        String query = String.format("SELECT keyspace_name, table_name, flags FROM %s.%s", SchemaConstants.SCHEMA_KEYSPACE_NAME, TABLES);
++
++        StringBuilder messages = new StringBuilder();
++        for (UntypedResultSet.Row row : query(query))
++        {
++            if (SchemaConstants.isLocalSystemKeyspace(row.getString("keyspace_name")))
++                continue;
++
++            Set<String> flags = row.getFrozenSet("flags", UTF8Type.instance);
++            if (TableMetadata.Flag.isLegacyCompactTable(TableMetadata.Flag.fromStringSet(flags)))
++            {
++                messages.append(String.format("ALTER TABLE %s.%s DROP COMPACT STORAGE;\n",
++                                              maybeQuote(row.getString("keyspace_name")),
++                                              maybeQuote(row.getString("table_name"))));
++            }
++        }
++
++        if (messages.length() != 0)
++        {
++            throw new StartupException(StartupException.ERR_OUTDATED_SCHEMA,
++                                       String.format("Compact Tables are not allowed in Cassandra starting with 4.0 version. " +
++                                                     "In order to migrate off Compact Storage, downgrade to the latest Cassandra version, " +
++                                                     "and run the following CQL commands: \n\n%s\n" +
++                                                     "Then restart the node with the new Cassandra version.",
++                                                     messages));
++        }
++    }
++
      private static Keyspaces fetchKeyspacesWithout(Set<String> excludedKeyspaceNames)
      {
          String query = format("SELECT keyspace_name FROM %s.%s", SchemaConstants.SCHEMA_KEYSPACE_NAME, KEYSPACES);
diff --cc src/java/org/apache/cassandra/schema/TableMetadata.java
index ab90564,0000000..5c145ee
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/schema/TableMetadata.java
+++ b/src/java/org/apache/cassandra/schema/TableMetadata.java
@@@ -1,1294 -1,0 +1,1282 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.schema;
 +
 +import java.nio.ByteBuffer;
 +import java.util.*;
 +import java.util.Map.Entry;
 +
 +import javax.annotation.Nullable;
 +
 +import com.google.common.base.MoreObjects;
 +import com.google.common.collect.*;
 +
 +import org.apache.cassandra.auth.DataResource;
 +import org.apache.cassandra.config.DatabaseDescriptor;
 +import org.apache.cassandra.cql3.ColumnIdentifier;
 +import org.apache.cassandra.cql3.CqlBuilder;
 +import org.apache.cassandra.cql3.SchemaElement;
 +import org.apache.cassandra.db.*;
 +import org.apache.cassandra.db.marshal.*;
 +import org.apache.cassandra.dht.IPartitioner;
 +import org.apache.cassandra.exceptions.ConfigurationException;
 +import org.apache.cassandra.exceptions.InvalidRequestException;
 +import org.apache.cassandra.service.reads.SpeculativeRetryPolicy;
 +import org.apache.cassandra.utils.AbstractIterator;
 +import org.github.jamm.Unmetered;
 +
 +import static com.google.common.collect.Iterables.any;
 +import static com.google.common.collect.Iterables.transform;
 +import static java.lang.String.format;
 +import static java.util.stream.Collectors.toList;
 +import static java.util.stream.Collectors.toSet;
 +import static org.apache.cassandra.schema.IndexMetadata.isNameValid;
 +
 +@Unmetered
 +public final class TableMetadata implements SchemaElement
 +{
-     public static final String COMPACT_STORAGE_HALT_MESSAGE =
-             "Detected table %s.%s with COMPACT STORAGE flags (%s). " +
-             "Compact Tables are not supported in Cassandra starting with version 4.0. " +
-             "Use the `ALTER ... DROP COMPACT STORAGE` command supplied in 3.x/3.11 Cassandra " +
-             "in order to migrate off Compact Storage before upgrading.";
- 
 +    // Please note that currently the only one truly useful flag is COUNTER, as the rest of the flags were about
 +    // differencing between CQL tables and the various types of COMPACT STORAGE tables (pre-4.0). As those "compact"
 +    // tables are not supported anymore, no tables should be either SUPER or DENSE, and they should all be COMPOUND.
 +    public enum Flag
 +    {
 +        // As mentioned above, all tables on 4.0+ will have the COMPOUND flag, making the flag of little value. However,
 +        // on upgrade from pre-4.0, we want to detect if a tables does _not_ have this flag, in which case this would
 +        // be a compact table on which DROP COMPACT STORAGE has _not_ be used and fail startup. This is also why we
 +        // still write this flag for all tables. Once we drop support for upgrading from pre-4.0 versions (and so are
 +        // sure all tables do have the flag), we can stop writing this flag and ignore it when present (deprecate it).
 +        // Later, we'll be able to drop the flag from this enum completely.
 +        COMPOUND,
 +        COUNTER,
 +        // The only reason we still have those is that on the first startup after an upgrade from pre-4.0, we cannot
 +        // guarantee some tables won't have those flags (users having forgotten to use DROP COMPACT STORAGE before
 +        // upgrading). So we still "deserialize" those flags correctly, but otherwise prevent startup if any table
 +        // have them. Once we drop support for upgrading from pre-4.0, we can remove those values.
 +        @Deprecated SUPER,
 +        @Deprecated DENSE;
 +
 +        static boolean isLegacyCompactTable(Set<Flag> flags)
 +        {
 +            return flags.contains(Flag.DENSE) || flags.contains(Flag.SUPER) || !flags.contains(Flag.COMPOUND);
 +        }
 +
 +        public static Set<Flag> fromStringSet(Set<String> strings)
 +        {
 +            return strings.stream().map(String::toUpperCase).map(Flag::valueOf).collect(toSet());
 +        }
 +
 +        public static Set<String> toStringSet(Set<Flag> flags)
 +        {
 +            return flags.stream().map(Flag::toString).map(String::toLowerCase).collect(toSet());
 +        }
 +    }
 +
 +    public enum Kind
 +    {
 +        REGULAR, INDEX, VIEW, VIRTUAL
 +    }
 +
 +    public final String keyspace;
 +    public final String name;
 +    public final TableId id;
 +
 +    public final IPartitioner partitioner;
 +    public final Kind kind;
 +    public final TableParams params;
 +    public final ImmutableSet<Flag> flags;
 +
 +    @Nullable
 +    private final String indexName; // derived from table name
 +
 +    /*
 +     * All CQL3 columns definition are stored in the columns map.
 +     * On top of that, we keep separated collection of each kind of definition, to
 +     * 1) allow easy access to each kind and
 +     * 2) for the partition key and clustering key ones, those list are ordered by the "component index" of the elements.
 +     */
 +    public final ImmutableMap<ByteBuffer, DroppedColumn> droppedColumns;
 +    final ImmutableMap<ByteBuffer, ColumnMetadata> columns;
 +
 +    private final ImmutableList<ColumnMetadata> partitionKeyColumns;
 +    private final ImmutableList<ColumnMetadata> clusteringColumns;
 +    private final RegularAndStaticColumns regularAndStaticColumns;
 +
 +    public final Indexes indexes;
 +    public final Triggers triggers;
 +
 +    // derived automatically from flags and columns
 +    public final AbstractType<?> partitionKeyType;
 +    public final ClusteringComparator comparator;
 +
 +    // performance hacks; TODO see if all are really necessary
 +    public final DataResource resource;
 +
 +    private TableMetadata(Builder builder)
 +    {
-         if (Flag.isLegacyCompactTable(builder.flags))
-             throw new IllegalStateException(format(COMPACT_STORAGE_HALT_MESSAGE,
-                                                    builder.keyspace,
-                                                    builder.name,
-                                                    builder.flags));
- 
 +        flags = Sets.immutableEnumSet(builder.flags);
 +        keyspace = builder.keyspace;
 +        name = builder.name;
 +        id = builder.id;
 +
 +        partitioner = builder.partitioner;
 +        kind = builder.kind;
 +        params = builder.params.build();
 +
 +        indexName = kind == Kind.INDEX ? name.substring(name.indexOf('.') + 1) : null;
 +
 +        droppedColumns = ImmutableMap.copyOf(builder.droppedColumns);
 +        Collections.sort(builder.partitionKeyColumns);
 +        partitionKeyColumns = ImmutableList.copyOf(builder.partitionKeyColumns);
 +        Collections.sort(builder.clusteringColumns);
 +        clusteringColumns = ImmutableList.copyOf(builder.clusteringColumns);
 +        regularAndStaticColumns = RegularAndStaticColumns.builder().addAll(builder.regularAndStaticColumns).build();
 +        columns = ImmutableMap.copyOf(builder.columns);
 +
 +        indexes = builder.indexes;
 +        triggers = builder.triggers;
 +
 +        partitionKeyType = partitionKeyColumns.size() == 1
 +                         ? partitionKeyColumns.get(0).type
 +                         : CompositeType.getInstance(transform(partitionKeyColumns, t -> t.type));
 +
 +        comparator = new ClusteringComparator(transform(clusteringColumns, c -> c.type));
 +
 +        resource = DataResource.table(keyspace, name);
 +    }
 +
 +    public static Builder builder(String keyspace, String table)
 +    {
 +        return new Builder(keyspace, table);
 +    }
 +
 +    public static Builder builder(String keyspace, String table, TableId id)
 +    {
 +        return new Builder(keyspace, table, id);
 +    }
 +
 +    public Builder unbuild()
 +    {
 +        return builder(keyspace, name, id)
 +               .partitioner(partitioner)
 +               .kind(kind)
 +               .params(params)
 +               .flags(flags)
 +               .addColumns(columns())
 +               .droppedColumns(droppedColumns)
 +               .indexes(indexes)
 +               .triggers(triggers);
 +    }
 +
 +    public boolean isIndex()
 +    {
 +        return kind == Kind.INDEX;
 +    }
 +
 +    public TableMetadata withSwapped(TableParams params)
 +    {
 +        return unbuild().params(params).build();
 +    }
 +
 +    public TableMetadata withSwapped(Triggers triggers)
 +    {
 +        return unbuild().triggers(triggers).build();
 +    }
 +
 +    public TableMetadata withSwapped(Indexes indexes)
 +    {
 +        return unbuild().indexes(indexes).build();
 +    }
 +
 +    public boolean isView()
 +    {
 +        return kind == Kind.VIEW;
 +    }
 +
 +    public boolean isVirtual()
 +    {
 +        return kind == Kind.VIRTUAL;
 +    }
 +
 +    public Optional<String> indexName()
 +    {
 +        return Optional.ofNullable(indexName);
 +    }
 +
 +    public boolean isCounter()
 +    {
 +        return flags.contains(Flag.COUNTER);
 +    }
 +
 +    public ImmutableCollection<ColumnMetadata> columns()
 +    {
 +        return columns.values();
 +    }
 +
 +    public Iterable<ColumnMetadata> primaryKeyColumns()
 +    {
 +        return Iterables.concat(partitionKeyColumns, clusteringColumns);
 +    }
 +
 +    public ImmutableList<ColumnMetadata> partitionKeyColumns()
 +    {
 +        return partitionKeyColumns;
 +    }
 +
 +    public ImmutableList<ColumnMetadata> clusteringColumns()
 +    {
 +        return clusteringColumns;
 +    }
 +
 +    public RegularAndStaticColumns regularAndStaticColumns()
 +    {
 +        return regularAndStaticColumns;
 +    }
 +
 +    public Columns regularColumns()
 +    {
 +        return regularAndStaticColumns.regulars;
 +    }
 +
 +    public Columns staticColumns()
 +    {
 +        return regularAndStaticColumns.statics;
 +    }
 +
 +    /*
 +     * An iterator over all column definitions but that respect the order of a SELECT *.
 +     */
 +    public Iterator<ColumnMetadata> allColumnsInSelectOrder()
 +    {
 +        Iterator<ColumnMetadata> partitionKeyIter = partitionKeyColumns.iterator();
 +        Iterator<ColumnMetadata> clusteringIter = clusteringColumns.iterator();
 +        Iterator<ColumnMetadata> otherColumns = regularAndStaticColumns.selectOrderIterator();
 +
 +        return columnsIterator(partitionKeyIter, clusteringIter, otherColumns);
 +    }
 +
 +    /**
 +     * Returns an iterator over all column definitions that respect the order of the CREATE statement.
 +     */
 +    public Iterator<ColumnMetadata> allColumnsInCreateOrder()
 +    {
 +        Iterator<ColumnMetadata> partitionKeyIter = partitionKeyColumns.iterator();
 +        Iterator<ColumnMetadata> clusteringIter = clusteringColumns.iterator();
 +        Iterator<ColumnMetadata> otherColumns = regularAndStaticColumns.iterator();
 +
 +        return columnsIterator(partitionKeyIter, clusteringIter, otherColumns);
 +    }
 +
 +    private static Iterator<ColumnMetadata> columnsIterator(Iterator<ColumnMetadata> partitionKeys,
 +                                                            Iterator<ColumnMetadata> clusteringColumns,
 +                                                            Iterator<ColumnMetadata> otherColumns)
 +    {
 +        return new AbstractIterator<ColumnMetadata>()
 +        {
 +            protected ColumnMetadata computeNext()
 +            {
 +                if (partitionKeys.hasNext())
 +                    return partitionKeys.next();
 +
 +                if (clusteringColumns.hasNext())
 +                    return clusteringColumns.next();
 +
 +                return otherColumns.hasNext() ? otherColumns.next() : endOfData();
 +            }
 +        };
 +    }
 +
 +    /**
 +     * Returns the ColumnMetadata for {@code name}.
 +     */
 +    public ColumnMetadata getColumn(ColumnIdentifier name)
 +    {
 +        return columns.get(name.bytes);
 +    }
 +    /**
 +     * Returns the column of the provided name if it exists, but throws a user-visible exception if that column doesn't
 +     * exist.
 +     *
 +     * <p>This method is for finding columns from a name provided by the user, and as such it does _not_ returne hidden
 +     * columns (throwing that the column is unknown instead).
 +     *
 +     * @param name the name of an existing non-hidden column of this table.
 +     * @return the column metadata corresponding to {@code name}.
 +     *
 +     * @throws InvalidRequestException if there is no non-hidden column named {@code name} in this table.
 +     */
 +    public ColumnMetadata getExistingColumn(ColumnIdentifier name)
 +    {
 +        ColumnMetadata def = getColumn(name);
 +        if (def == null)
 +            throw new InvalidRequestException(format("Undefined column name %s in table %s", name.toCQLString(), this));
 +        return def;
 +    }
 +    /*
 +     * In general it is preferable to work with ColumnIdentifier to make it
 +     * clear that we are talking about a CQL column, not a cell name, but there
 +     * is a few cases where all we have is a ByteBuffer (when dealing with IndexExpression
 +     * for instance) so...
 +     */
 +    public ColumnMetadata getColumn(ByteBuffer name)
 +    {
 +        return columns.get(name);
 +    }
 +
 +    public ColumnMetadata getDroppedColumn(ByteBuffer name)
 +    {
 +        DroppedColumn dropped = droppedColumns.get(name);
 +        return dropped == null ? null : dropped.column;
 +    }
 +
 +    /**
 +     * Returns a "fake" ColumnMetadata corresponding to the dropped column {@code name}
 +     * of {@code null} if there is no such dropped column.
 +     *
 +     * @param name - the column name
 +     * @param isStatic - whether the column was a static column, if known
 +     */
 +    public ColumnMetadata getDroppedColumn(ByteBuffer name, boolean isStatic)
 +    {
 +        DroppedColumn dropped = droppedColumns.get(name);
 +        if (dropped == null)
 +            return null;
 +
 +        if (isStatic && !dropped.column.isStatic())
 +            return ColumnMetadata.staticColumn(this, name, dropped.column.type);
 +
 +        return dropped.column;
 +    }
 +
 +    public boolean hasStaticColumns()
 +    {
 +        return !staticColumns().isEmpty();
 +    }
 +
 +    public void validate()
 +    {
 +        if (!isNameValid(keyspace))
 +            except("Keyspace name must not be empty, more than %s characters long, or contain non-alphanumeric-underscore characters (got \"%s\")", SchemaConstants.NAME_LENGTH, keyspace);
 +
 +        if (!isNameValid(name))
 +            except("Table name must not be empty, more than %s characters long, or contain non-alphanumeric-underscore characters (got \"%s\")", SchemaConstants.NAME_LENGTH, name);
 +
 +        params.validate();
 +
 +        if (partitionKeyColumns.stream().anyMatch(c -> c.type.isCounter()))
 +            except("PRIMARY KEY columns cannot contain counters");
 +
 +        // Mixing counter with non counter columns is not supported (#2614)
 +        if (isCounter())
 +        {
 +            for (ColumnMetadata column : regularAndStaticColumns)
 +                if (!(column.type.isCounter()) && !isSuperColumnMapColumnName(column.name))
 +                    except("Cannot have a non counter column (\"%s\") in a counter table", column.name);
 +        }
 +        else
 +        {
 +            for (ColumnMetadata column : regularAndStaticColumns)
 +                if (column.type.isCounter())
 +                    except("Cannot have a counter column (\"%s\") in a non counter column table", column.name);
 +        }
 +
 +        // All tables should have a partition key
 +        if (partitionKeyColumns.isEmpty())
 +            except("Missing partition keys for table %s", toString());
 +
 +        indexes.validate(this);
 +    }
 +
 +    /**
 +     * To support backward compatibility with thrift super columns in the C* 3.0+ storage engine, we encode said super
 +     * columns as a CQL {@code map<blob, blob>}. To ensure the name of this map did not conflict with any other user
 +     * defined columns, we used the empty name (which is otherwise not allowed for user created columns).
 +     * <p>
 +     * While all thrift-based tables must have been converted to "CQL" ones with "DROP COMPACT STORAGE" (before
 +     * upgrading to C* 4.0, which stop supporting non-CQL tables completely), a converted super-column table will still
 +     * have this map with an empty name. And the reason we need to recognize it still, is that for backward
 +     * compatibility we need to support counters in values of this map while it's not supported in any other map.
 +     *
 +     * TODO: it's probably worth lifting the limitation of not allowing counters as map values. It works fully
 +     *   internally (since we had to support it for this special map) and doesn't feel particularly dangerous to
 +     *   support. Doing so would remove this special case, but would also let user that do have an upgraded super-column
 +     *   table with counters to rename that weirdly name map to something more meaningful (it's not possible today
 +     *   as after renaming the validation in {@link #validate)} would trigger).
 +     */
 +    private static boolean isSuperColumnMapColumnName(ColumnIdentifier columnName)
 +    {
 +        return !columnName.bytes.hasRemaining();
 +    }
 +
 +    void validateCompatibility(TableMetadata previous)
 +    {
 +        if (isIndex())
 +            return;
 +
 +        if (!previous.keyspace.equals(keyspace))
 +            except("Keyspace mismatch (found %s; expected %s)", keyspace, previous.keyspace);
 +
 +        if (!previous.name.equals(name))
 +            except("Table mismatch (found %s; expected %s)", name, previous.name);
 +
 +        if (!previous.id.equals(id))
 +            except("Table ID mismatch (found %s; expected %s)", id, previous.id);
 +
 +        if (!previous.flags.equals(flags))
 +            except("Table type mismatch (found %s; expected %s)", flags, previous.flags);
 +
 +        if (previous.partitionKeyColumns.size() != partitionKeyColumns.size())
 +        {
 +            except("Partition keys of different length (found %s; expected %s)",
 +                   partitionKeyColumns.size(),
 +                   previous.partitionKeyColumns.size());
 +        }
 +
 +        for (int i = 0; i < partitionKeyColumns.size(); i++)
 +        {
 +            if (!partitionKeyColumns.get(i).type.isCompatibleWith(previous.partitionKeyColumns.get(i).type))
 +            {
 +                except("Partition key column mismatch (found %s; expected %s)",
 +                       partitionKeyColumns.get(i).type,
 +                       previous.partitionKeyColumns.get(i).type);
 +            }
 +        }
 +
 +        if (previous.clusteringColumns.size() != clusteringColumns.size())
 +        {
 +            except("Clustering columns of different length (found %s; expected %s)",
 +                   clusteringColumns.size(),
 +                   previous.clusteringColumns.size());
 +        }
 +
 +        for (int i = 0; i < clusteringColumns.size(); i++)
 +        {
 +            if (!clusteringColumns.get(i).type.isCompatibleWith(previous.clusteringColumns.get(i).type))
 +            {
 +                except("Clustering column mismatch (found %s; expected %s)",
 +                       clusteringColumns.get(i).type,
 +                       previous.clusteringColumns.get(i).type);
 +            }
 +        }
 +
 +        for (ColumnMetadata previousColumn : previous.regularAndStaticColumns)
 +        {
 +            ColumnMetadata column = getColumn(previousColumn.name);
 +            if (column != null && !column.type.isCompatibleWith(previousColumn.type))
 +                except("Column mismatch (found %s; expected %s)", column, previousColumn);
 +        }
 +    }
 +
 +    public ClusteringComparator partitionKeyAsClusteringComparator()
 +    {
 +        return new ClusteringComparator(partitionKeyColumns.stream().map(c -> c.type).collect(toList()));
 +    }
 +
 +    /**
 +     * Generate a table name for an index corresponding to the given column.
 +     * This is NOT the same as the index's name! This is only used in sstable filenames and is not exposed to users.
 +     *
 +     * @param info A definition of the column with index
 +     *
 +     * @return name of the index table
 +     */
 +    public String indexTableName(IndexMetadata info)
 +    {
 +        // TODO simplify this when info.index_name is guaranteed to be set
 +        return name + Directories.SECONDARY_INDEX_NAME_SEPARATOR + info.name;
 +    }
 +
 +    /**
 +     * @return true if the change as made impacts queries/updates on the table,
 +     *         e.g. any columns or indexes were added, removed, or altered; otherwise, false is returned.
 +     *         Used to determine whether prepared statements against this table need to be re-prepared.
 +     */
 +    boolean changeAffectsPreparedStatements(TableMetadata updated)
 +    {
 +        return !partitionKeyColumns.equals(updated.partitionKeyColumns)
 +            || !clusteringColumns.equals(updated.clusteringColumns)
 +            || !regularAndStaticColumns.equals(updated.regularAndStaticColumns)
 +            || !indexes.equals(updated.indexes)
 +            || params.defaultTimeToLive != updated.params.defaultTimeToLive
 +            || params.gcGraceSeconds != updated.params.gcGraceSeconds;
 +    }
 +
 +    /**
 +     * There is a couple of places in the code where we need a TableMetadata object and don't have one readily available
 +     * and know that only the keyspace and name matter. This creates such "fake" metadata. Use only if you know what
 +     * you're doing.
 +     */
 +    public static TableMetadata minimal(String keyspace, String name)
 +    {
 +        return TableMetadata.builder(keyspace, name)
 +                            .addPartitionKeyColumn("key", BytesType.instance)
 +                            .build();
 +    }
 +
 +    public TableMetadata updateIndexTableMetadata(TableParams baseTableParams)
 +    {
 +        TableParams.Builder builder = baseTableParams.unbuild().gcGraceSeconds(0);
 +
 +        // Depends on parent's cache setting, turn on its index table's cache.
 +        // Row caching is never enabled; see CASSANDRA-5732
 +        builder.caching(baseTableParams.caching.cacheKeys() ? CachingParams.CACHE_KEYS : CachingParams.CACHE_NOTHING);
 +
 +        return unbuild().params(builder.build()).build();
 +    }
 +
 +    boolean referencesUserType(ByteBuffer name)
 +    {
 +        return any(columns(), c -> c.type.referencesUserType(name));
 +    }
 +
 +    public TableMetadata withUpdatedUserType(UserType udt)
 +    {
 +        if (!referencesUserType(udt.name))
 +            return this;
 +
 +        Builder builder = unbuild();
 +        columns().forEach(c -> builder.alterColumnType(c.name, c.type.withUpdatedUserType(udt)));
 +
 +        return builder.build();
 +    }
 +
 +    private void except(String format, Object... args)
 +    {
 +        throw new ConfigurationException(keyspace + "." + name + ": " + format(format, args));
 +    }
 +
 +    @Override
 +    public boolean equals(Object o)
 +    {
 +        if (this == o)
 +            return true;
 +
 +        if (!(o instanceof TableMetadata))
 +            return false;
 +
 +        TableMetadata tm = (TableMetadata) o;
 +
 +        return equalsWithoutColumns(tm) && columns.equals(tm.columns);
 +    }
 +
 +    private boolean equalsWithoutColumns(TableMetadata tm)
 +    {
 +        return keyspace.equals(tm.keyspace)
 +            && name.equals(tm.name)
 +            && id.equals(tm.id)
 +            && partitioner.equals(tm.partitioner)
 +            && kind == tm.kind
 +            && params.equals(tm.params)
 +            && flags.equals(tm.flags)
 +            && droppedColumns.equals(tm.droppedColumns)
 +            && indexes.equals(tm.indexes)
 +            && triggers.equals(tm.triggers);
 +    }
 +
 +    Optional<Difference> compare(TableMetadata other)
 +    {
 +        return equalsWithoutColumns(other)
 +             ? compareColumns(other.columns)
 +             : Optional.of(Difference.SHALLOW);
 +    }
 +
 +    private Optional<Difference> compareColumns(Map<ByteBuffer, ColumnMetadata> other)
 +    {
 +        if (!columns.keySet().equals(other.keySet()))
 +            return Optional.of(Difference.SHALLOW);
 +
 +        boolean differsDeeply = false;
 +
 +        for (Map.Entry<ByteBuffer, ColumnMetadata> entry : columns.entrySet())
 +        {
 +            ColumnMetadata thisColumn = entry.getValue();
 +            ColumnMetadata thatColumn = other.get(entry.getKey());
 +
 +            Optional<Difference> difference = thisColumn.compare(thatColumn);
 +            if (difference.isPresent())
 +            {
 +                switch (difference.get())
 +                {
 +                    case SHALLOW:
 +                        return difference;
 +                    case DEEP:
 +                        differsDeeply = true;
 +                }
 +            }
 +        }
 +
 +        return differsDeeply ? Optional.of(Difference.DEEP) : Optional.empty();
 +    }
 +
 +    @Override
 +    public int hashCode()
 +    {
 +        return Objects.hash(keyspace, name, id, partitioner, kind, params, flags, columns, droppedColumns, indexes, triggers);
 +    }
 +
 +    @Override
 +    public String toString()
 +    {
 +        return format("%s.%s", ColumnIdentifier.maybeQuote(keyspace), ColumnIdentifier.maybeQuote(name));
 +    }
 +
 +    public String toDebugString()
 +    {
 +        return MoreObjects.toStringHelper(this)
 +                          .add("keyspace", keyspace)
 +                          .add("table", name)
 +                          .add("id", id)
 +                          .add("partitioner", partitioner)
 +                          .add("kind", kind)
 +                          .add("params", params)
 +                          .add("flags", flags)
 +                          .add("columns", columns())
 +                          .add("droppedColumns", droppedColumns.values())
 +                          .add("indexes", indexes)
 +                          .add("triggers", triggers)
 +                          .toString();
 +    }
 +
 +    public static final class Builder
 +    {
 +        final String keyspace;
 +        final String name;
 +
 +        private TableId id;
 +
 +        private IPartitioner partitioner;
 +        private Kind kind = Kind.REGULAR;
 +        private TableParams.Builder params = TableParams.builder();
 +
 +        // See the comment on Flag.COMPOUND definition for why we (still) inconditionally add this flag.
 +        private Set<Flag> flags = EnumSet.of(Flag.COMPOUND);
 +        private Triggers triggers = Triggers.none();
 +        private Indexes indexes = Indexes.none();
 +
 +        private final Map<ByteBuffer, DroppedColumn> droppedColumns = new HashMap<>();
 +        private final Map<ByteBuffer, ColumnMetadata> columns = new HashMap<>();
 +        private final List<ColumnMetadata> partitionKeyColumns = new ArrayList<>();
 +        private final List<ColumnMetadata> clusteringColumns = new ArrayList<>();
 +        private final List<ColumnMetadata> regularAndStaticColumns = new ArrayList<>();
 +
 +        private Builder(String keyspace, String name, TableId id)
 +        {
 +            this.keyspace = keyspace;
 +            this.name = name;
 +            this.id = id;
 +        }
 +
 +        private Builder(String keyspace, String name)
 +        {
 +            this.keyspace = keyspace;
 +            this.name = name;
 +        }
 +
 +        public TableMetadata build()
 +        {
 +            if (partitioner == null)
 +                partitioner = DatabaseDescriptor.getPartitioner();
 +
 +            if (id == null)
 +                id = TableId.generate();
 +
 +            return new TableMetadata(this);
 +        }
 +
 +        public Builder id(TableId val)
 +        {
 +            id = val;
 +            return this;
 +        }
 +
 +        public Builder partitioner(IPartitioner val)
 +        {
 +            partitioner = val;
 +            return this;
 +        }
 +
 +        public Builder kind(Kind val)
 +        {
 +            kind = val;
 +            return this;
 +        }
 +
 +        public Builder params(TableParams val)
 +        {
 +            params = val.unbuild();
 +            return this;
 +        }
 +
 +        public Builder bloomFilterFpChance(double val)
 +        {
 +            params.bloomFilterFpChance(val);
 +            return this;
 +        }
 +
 +        public Builder caching(CachingParams val)
 +        {
 +            params.caching(val);
 +            return this;
 +        }
 +
 +        public Builder comment(String val)
 +        {
 +            params.comment(val);
 +            return this;
 +        }
 +
 +        public Builder compaction(CompactionParams val)
 +        {
 +            params.compaction(val);
 +            return this;
 +        }
 +
 +        public Builder compression(CompressionParams val)
 +        {
 +            params.compression(val);
 +            return this;
 +        }
 +
 +        public Builder defaultTimeToLive(int val)
 +        {
 +            params.defaultTimeToLive(val);
 +            return this;
 +        }
 +
 +        public Builder gcGraceSeconds(int val)
 +        {
 +            params.gcGraceSeconds(val);
 +            return this;
 +        }
 +
 +        public Builder maxIndexInterval(int val)
 +        {
 +            params.maxIndexInterval(val);
 +            return this;
 +        }
 +
 +        public Builder memtableFlushPeriod(int val)
 +        {
 +            params.memtableFlushPeriodInMs(val);
 +            return this;
 +        }
 +
 +        public Builder minIndexInterval(int val)
 +        {
 +            params.minIndexInterval(val);
 +            return this;
 +        }
 +
 +        public Builder crcCheckChance(double val)
 +        {
 +            params.crcCheckChance(val);
 +            return this;
 +        }
 +
 +        public Builder speculativeRetry(SpeculativeRetryPolicy val)
 +        {
 +            params.speculativeRetry(val);
 +            return this;
 +        }
 +
 +        public Builder additionalWritePolicy(SpeculativeRetryPolicy val)
 +        {
 +            params.additionalWritePolicy(val);
 +            return this;
 +        }
 +
 +        public Builder extensions(Map<String, ByteBuffer> val)
 +        {
 +            params.extensions(val);
 +            return this;
 +        }
 +
 +        public Builder flags(Set<Flag> val)
 +        {
 +            flags = val;
 +            return this;
 +        }
 +
 +        public Builder isCounter(boolean val)
 +        {
 +            return flag(Flag.COUNTER, val);
 +        }
 +
 +        private Builder flag(Flag flag, boolean set)
 +        {
 +            if (set) flags.add(flag); else flags.remove(flag);
 +            return this;
 +        }
 +
 +        public Builder triggers(Triggers val)
 +        {
 +            triggers = val;
 +            return this;
 +        }
 +
 +        public Builder indexes(Indexes val)
 +        {
 +            indexes = val;
 +            return this;
 +        }
 +
 +        public Builder addPartitionKeyColumn(String name, AbstractType type)
 +        {
 +            return addPartitionKeyColumn(ColumnIdentifier.getInterned(name, false), type);
 +        }
 +
 +        public Builder addPartitionKeyColumn(ColumnIdentifier name, AbstractType type)
 +        {
 +            return addColumn(new ColumnMetadata(keyspace, this.name, name, type, partitionKeyColumns.size(), ColumnMetadata.Kind.PARTITION_KEY));
 +        }
 +
 +        public Builder addClusteringColumn(String name, AbstractType type)
 +        {
 +            return addClusteringColumn(ColumnIdentifier.getInterned(name, false), type);
 +        }
 +
 +        public Builder addClusteringColumn(ColumnIdentifier name, AbstractType type)
 +        {
 +            return addColumn(new ColumnMetadata(keyspace, this.name, name, type, clusteringColumns.size(), ColumnMetadata.Kind.CLUSTERING));
 +        }
 +
 +        public Builder addRegularColumn(String name, AbstractType type)
 +        {
 +            return addRegularColumn(ColumnIdentifier.getInterned(name, false), type);
 +        }
 +
 +        public Builder addRegularColumn(ColumnIdentifier name, AbstractType type)
 +        {
 +            return addColumn(new ColumnMetadata(keyspace, this.name, name, type, ColumnMetadata.NO_POSITION, ColumnMetadata.Kind.REGULAR));
 +        }
 +
 +        public Builder addStaticColumn(String name, AbstractType type)
 +        {
 +            return addStaticColumn(ColumnIdentifier.getInterned(name, false), type);
 +        }
 +
 +        public Builder addStaticColumn(ColumnIdentifier name, AbstractType type)
 +        {
 +            return addColumn(new ColumnMetadata(keyspace, this.name, name, type, ColumnMetadata.NO_POSITION, ColumnMetadata.Kind.STATIC));
 +        }
 +
 +        public Builder addColumn(ColumnMetadata column)
 +        {
 +            if (columns.containsKey(column.name.bytes))
 +                throw new IllegalArgumentException();
 +
 +            switch (column.kind)
 +            {
 +                case PARTITION_KEY:
 +                    partitionKeyColumns.add(column);
 +                    Collections.sort(partitionKeyColumns);
 +                    break;
 +                case CLUSTERING:
 +                    column.type.checkComparable();
 +                    clusteringColumns.add(column);
 +                    Collections.sort(clusteringColumns);
 +                    break;
 +                default:
 +                    regularAndStaticColumns.add(column);
 +            }
 +
 +            columns.put(column.name.bytes, column);
 +
 +            return this;
 +        }
 +
 +        public Builder addColumns(Iterable<ColumnMetadata> columns)
 +        {
 +            columns.forEach(this::addColumn);
 +            return this;
 +        }
 +
 +        public Builder droppedColumns(Map<ByteBuffer, DroppedColumn> droppedColumns)
 +        {
 +            this.droppedColumns.clear();
 +            this.droppedColumns.putAll(droppedColumns);
 +            return this;
 +        }
 +
 +        /**
 +         * Records a deprecated column for a system table.
 +         */
 +        public Builder recordDeprecatedSystemColumn(String name, AbstractType<?> type)
 +        {
 +            // As we play fast and loose with the removal timestamp, make sure this is misued for a non system table.
 +            assert SchemaConstants.isLocalSystemKeyspace(keyspace);
 +            recordColumnDrop(ColumnMetadata.regularColumn(keyspace, this.name, name, type), Long.MAX_VALUE);
 +            return this;
 +        }
 +
 +        public Builder recordColumnDrop(ColumnMetadata column, long timeMicros)
 +        {
 +            droppedColumns.put(column.name.bytes, new DroppedColumn(column.withNewType(column.type.expandUserTypes()), timeMicros));
 +            return this;
 +        }
 +
 +        public Iterable<ColumnMetadata> columns()
 +        {
 +            return columns.values();
 +        }
 +
 +        public Set<String> columnNames()
 +        {
 +            return columns.values().stream().map(c -> c.name.toString()).collect(toSet());
 +        }
 +
 +        public ColumnMetadata getColumn(ColumnIdentifier identifier)
 +        {
 +            return columns.get(identifier.bytes);
 +        }
 +
 +        public ColumnMetadata getColumn(ByteBuffer name)
 +        {
 +            return columns.get(name);
 +        }
 +
 +        public boolean hasRegularColumns()
 +        {
 +            return regularAndStaticColumns.stream().anyMatch(ColumnMetadata::isRegular);
 +        }
 +
 +        /*
 +         * The following methods all assume a Builder with valid set of partition key, clustering, regular and static columns.
 +         */
 +
 +        public Builder removeRegularOrStaticColumn(ColumnIdentifier identifier)
 +        {
 +            ColumnMetadata column = columns.get(identifier.bytes);
 +            if (column == null || column.isPrimaryKeyColumn())
 +                throw new IllegalArgumentException();
 +
 +            columns.remove(identifier.bytes);
 +            regularAndStaticColumns.remove(column);
 +
 +            return this;
 +        }
 +
 +        public Builder renamePrimaryKeyColumn(ColumnIdentifier from, ColumnIdentifier to)
 +        {
 +            if (columns.containsKey(to.bytes))
 +                throw new IllegalArgumentException();
 +
 +            ColumnMetadata column = columns.get(from.bytes);
 +            if (column == null || !column.isPrimaryKeyColumn())
 +                throw new IllegalArgumentException();
 +
 +            ColumnMetadata newColumn = column.withNewName(to);
 +            if (column.isPartitionKey())
 +                partitionKeyColumns.set(column.position(), newColumn);
 +            else
 +                clusteringColumns.set(column.position(), newColumn);
 +
 +            columns.remove(from.bytes);
 +            columns.put(to.bytes, newColumn);
 +
 +            return this;
 +        }
 +
 +        Builder alterColumnType(ColumnIdentifier name, AbstractType<?> type)
 +        {
 +            ColumnMetadata column = columns.get(name.bytes);
 +            if (column == null)
 +                throw new IllegalArgumentException();
 +
 +            ColumnMetadata newColumn = column.withNewType(type);
 +
 +            switch (column.kind)
 +            {
 +                case PARTITION_KEY:
 +                    partitionKeyColumns.set(column.position(), newColumn);
 +                    break;
 +                case CLUSTERING:
 +                    clusteringColumns.set(column.position(), newColumn);
 +                    break;
 +                case REGULAR:
 +                case STATIC:
 +                    regularAndStaticColumns.remove(column);
 +                    regularAndStaticColumns.add(newColumn);
 +                    break;
 +            }
 +
 +            columns.put(column.name.bytes, newColumn);
 +
 +            return this;
 +        }
 +    }
 +    
 +    /**
 +     * A table with strict liveness filters/ignores rows without PK liveness info,
 +     * effectively tying the row liveness to its primary key liveness.
 +     *
 +     * Currently this is only used by views with normal base column as PK column
 +     * so updates to other columns do not make the row live when the base column
 +     * is not live. See CASSANDRA-11500.
 +     *
 +     * TODO: does not belong here, should be gone
 +     */
 +    public boolean enforceStrictLiveness()
 +    {
 +        return isView() && Keyspace.open(keyspace).viewManager.getByName(name).enforceStrictLiveness();
 +    }
 +
 +    /**
 +     * Returns the names of all the user types referenced by this table.
 +     *
 +     * @return the names of all the user types referenced by this table.
 +     */
 +    public Set<ByteBuffer> getReferencedUserTypes()
 +    {
 +        Set<ByteBuffer> types = new LinkedHashSet<>();
 +        columns().forEach(c -> addUserTypes(c.type, types));
 +        return types;
 +    }
 +
 +    /**
 +     * Find all user types used by the specified type and add them to the set.
 +     *
 +     * @param type the type to check for user types.
 +     * @param types the set of UDT names to which to add new user types found in {@code type}. Note that the
 +     * insertion ordering is important and ensures that if a user type A uses another user type B, then B will appear
 +     * before A in iteration order.
 +     */
 +    private static void addUserTypes(AbstractType<?> type, Set<ByteBuffer> types)
 +    {
 +        // Reach into subtypes first, so that if the type is a UDT, it's dependencies are recreated first.
 +        type.subTypes().forEach(t -> addUserTypes(t, types));
 +
 +        if (type.isUDT())
 +            types.add(((UserType)type).name);
 +    }
 +
 +    @Override
 +    public SchemaElementType elementType()
 +    {
 +        return SchemaElementType.TABLE;
 +    }
 +
 +    @Override
 +    public String elementKeyspace()
 +    {
 +        return keyspace;
 +    }
 +
 +    @Override
 +    public String elementName()
 +    {
 +        return name;
 +    }
 +
 +    @Override
 +    public String toCqlString(boolean withInternals, boolean ifNotExists)
 +    {
 +        CqlBuilder builder = new CqlBuilder(2048);
 +        appendCqlTo(builder, withInternals, withInternals, ifNotExists);
 +        return builder.toString();
 +    }
 +
 +    public String toCqlString(boolean includeDroppedColumns,
 +                              boolean internals,
 +                              boolean ifNotExists)
 +    {
 +        CqlBuilder builder = new CqlBuilder(2048);
 +        appendCqlTo(builder, includeDroppedColumns, internals, ifNotExists);
 +        return builder.toString();
 +    }
 +
 +    public void appendCqlTo(CqlBuilder builder,
 +                            boolean includeDroppedColumns,
 +                            boolean internals,
 +                            boolean ifNotExists)
 +    {
 +        assert !isView();
 +
 +        String createKeyword = "CREATE";
 +        if (isVirtual())
 +        {
 +            builder.append(String.format("/*\n" +
 +                    "Warning: Table %s is a virtual table and cannot be recreated with CQL.\n" +
 +                    "Structure, for reference:\n",
 +                                         toString()));
 +            createKeyword = "VIRTUAL";
 +        }
 +
 +        builder.append(createKeyword)
 +               .append(" TABLE ");
 +
 +        if (ifNotExists)
 +            builder.append("IF NOT EXISTS ");
 +
 +        builder.append(toString())
 +               .append(" (")
 +               .newLine()
 +               .increaseIndent();
 +
 +        boolean hasSingleColumnPrimaryKey = partitionKeyColumns.size() == 1 && clusteringColumns.isEmpty();
 +
 +        appendColumnDefinitions(builder, includeDroppedColumns, hasSingleColumnPrimaryKey);
 +
 +        if (!hasSingleColumnPrimaryKey)
 +            appendPrimaryKey(builder);
 +
 +        builder.decreaseIndent()
 +               .append(')');
 +
 +        appendTableOptions(builder, internals);
 +
 +        builder.decreaseIndent();
 +
 +        if (isVirtual())
 +        {
 +            builder.newLine()
 +                   .append("*/");
 +        }
 +
 +        if (includeDroppedColumns)
 +            appendDropColumns(builder);
 +    }
 +
 +    private void appendColumnDefinitions(CqlBuilder builder,
 +                                         boolean includeDroppedColumns,
 +                                         boolean hasSingleColumnPrimaryKey)
 +    {
 +        Iterator<ColumnMetadata> iter = allColumnsInCreateOrder();
 +        while (iter.hasNext())
 +        {
 +            ColumnMetadata column = iter.next();
 +
 +            // If the column has been re-added after a drop, we don't include it right away. Instead, we'll add the
 +            // dropped one first below, then we'll issue the DROP and then the actual ADD for this column, thus
 +            // simulating the proper sequence of events.
 +            if (includeDroppedColumns && droppedColumns.containsKey(column.name.bytes))
 +                continue;
 +
 +            column.appendCqlTo(builder);
 +
 +            if (hasSingleColumnPrimaryKey && column.isPartitionKey())
 +                builder.append(" PRIMARY KEY");
 +
 +            if (!hasSingleColumnPrimaryKey || (includeDroppedColumns && !droppedColumns.isEmpty()) || iter.hasNext())
 +                builder.append(',');
 +
 +            builder.newLine();
 +        }
 +
 +        if (includeDroppedColumns)
 +        {
 +            Iterator<DroppedColumn> iterDropped = droppedColumns.values().iterator();
 +            while (iterDropped.hasNext())
 +            {
 +                DroppedColumn dropped = iterDropped.next();
 +                dropped.column.appendCqlTo(builder);
 +
 +                if (!hasSingleColumnPrimaryKey || iter.hasNext())
 +                    builder.append(',');
 +
 +                builder.newLine();
 +            }
 +        }
 +    }
 +
 +    void appendPrimaryKey(CqlBuilder builder)
 +    {
 +        List<ColumnMetadata> partitionKeyColumns = partitionKeyColumns();
 +        List<ColumnMetadata> clusteringColumns = clusteringColumns();
 +
 +        builder.append("PRIMARY KEY (");
 +        if (partitionKeyColumns.size() > 1)
 +        {
 +            builder.append('(')
 +                   .appendWithSeparators(partitionKeyColumns, (b, c) -> b.append(c.name), ", ")
 +                   .append(')');
 +        }
 +        else
 +        {
 +            builder.append(partitionKeyColumns.get(0).name);
 +        }
 +
 +        if (!clusteringColumns.isEmpty())
 +            builder.append(", ")
 +                   .appendWithSeparators(clusteringColumns, (b, c) -> b.append(c.name), ", ");
 +
 +        builder.append(')')
 +               .newLine();
 +    }
 +
 +    void appendTableOptions(CqlBuilder builder, boolean internals)
 +    {
 +        builder.append(" WITH ")
 +               .increaseIndent();
 +
 +        if (internals)
 +            builder.append("ID = ")
 +                   .append(id.toString())
 +                   .newLine()
 +                   .append("AND ");
 +
 +        List<ColumnMetadata> clusteringColumns = clusteringColumns();
 +        if (!clusteringColumns.isEmpty())
 +        {
 +            builder.append("CLUSTERING ORDER BY (")
 +                   .appendWithSeparators(clusteringColumns, (b, c) -> c.appendNameAndOrderTo(b), ", ")
 +                   .append(')')
 +                   .newLine()
 +                   .append("AND ");
 +        }
 +
 +        if (isVirtual())
 +        {
 +            builder.append("comment = ").appendWithSingleQuotes(params.comment);
 +        }
 +        else
 +        {
 +            params.appendCqlTo(builder);
 +        }
 +        builder.append(";");
 +    }
 +
 +    private void appendDropColumns(CqlBuilder builder)
 +    {
 +        for (Entry<ByteBuffer, DroppedColumn> entry : droppedColumns.entrySet())
 +        {
 +            DroppedColumn dropped = entry.getValue();
 +
 +            builder.newLine()
 +                   .append("ALTER TABLE ")
 +                   .append(toString())
 +                   .append(" DROP ")
 +                   .append(dropped.column.name)
 +                   .append(" USING TIMESTAMP ")
 +                   .append(dropped.droppedTime)
 +                   .append(';');
 +
 +            ColumnMetadata column = getColumn(entry.getKey());
 +            if (column != null)
 +            {
 +                builder.newLine()
 +                       .append("ALTER TABLE ")
 +                       .append(toString())
 +                       .append(" ADD ");
 +
 +                column.appendCqlTo(builder);
 +
 +                builder.append(';');
 +            }
 +        }
 +    }
 +}
diff --cc src/java/org/apache/cassandra/service/StartupChecks.java
index ecf9549,cb10ab4..621898a
--- a/src/java/org/apache/cassandra/service/StartupChecks.java
+++ b/src/java/org/apache/cassandra/service/StartupChecks.java
@@@ -35,15 -35,11 +35,16 @@@ import com.google.common.collect.Iterab
  import org.slf4j.Logger;
  import org.slf4j.LoggerFactory;
  
 -import org.apache.cassandra.config.CFMetaData;
 +import net.jpountz.lz4.LZ4Factory;
 +import org.apache.cassandra.config.CassandraRelevantProperties;
 +import org.apache.cassandra.cql3.QueryProcessor;
 +import org.apache.cassandra.cql3.UntypedResultSet;
++import org.apache.cassandra.schema.SchemaKeyspace;
 +import org.apache.cassandra.schema.TableMetadata;
  import org.apache.cassandra.config.Config;
  import org.apache.cassandra.config.DatabaseDescriptor;
 -import org.apache.cassandra.config.Schema;
 -import org.apache.cassandra.config.SchemaConstants;
 +import org.apache.cassandra.schema.Schema;
 +import org.apache.cassandra.schema.SchemaConstants;
  import org.apache.cassandra.db.ColumnFamilyStore;
  import org.apache.cassandra.db.Directories;
  import org.apache.cassandra.db.SystemKeyspace;
@@@ -100,6 -93,6 +101,7 @@@ public class StartupCheck
                                                                        checkMaxMapCount,
                                                                        checkDataDirs,
                                                                        checkSSTablesFormat,
++                                                                      checkOutdatedTables,
                                                                        checkSystemKeyspaceState,
                                                                        checkDatacenter,
                                                                        checkRack,
@@@ -425,6 -406,6 +427,8 @@@
          }
      };
  
++    public static final StartupCheck checkOutdatedTables = SchemaKeyspace::validateNonCompact;
++
      public static final StartupCheck checkSystemKeyspaceState = new StartupCheck()
      {
          public void execute() throws StartupException


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org