You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by bl...@apache.org on 2020/09/16 14:57:07 UTC

[cassandra] 01/01: Merge branch cassandra-3.11 into trunk

This is an automated email from the ASF dual-hosted git repository.

blerer pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit be7c9e22b085d67cf867e53ae9e9fed3d7c2e03c
Merge: 9a30167 cbb7644
Author: Benjamin Lerer <b....@gmail.com>
AuthorDate: Wed Sep 16 16:52:38 2020 +0200

    Merge branch cassandra-3.11 into trunk

 CHANGES.txt                                        |  1 +
 .../org/apache/cassandra/cql3/SchemaElement.java   |  3 +-
 .../cassandra/cql3/functions/UDAggregate.java      | 12 +++-
 .../cassandra/cql3/functions/UDFunction.java       | 13 +++--
 .../cql3/statements/DescribeStatement.java         |  8 +--
 .../org/apache/cassandra/db/SchemaCQLHelper.java   | 17 ++++--
 .../org/apache/cassandra/db/marshal/UserType.java  | 12 +++-
 .../org/apache/cassandra/schema/IndexMetadata.java | 28 +++++++---
 .../apache/cassandra/schema/KeyspaceMetadata.java  | 12 +++-
 .../org/apache/cassandra/schema/TableMetadata.java |  4 +-
 .../org/apache/cassandra/schema/ViewMetadata.java  | 12 +---
 .../cql3/validation/entities/UFJavaTest.java       | 65 +++++++++++++++++++++-
 .../apache/cassandra/db/SchemaCQLHelperTest.java   | 19 ++++---
 13 files changed, 154 insertions(+), 52 deletions(-)

diff --cc CHANGES.txt
index a0041d8,99083b1..7761d87
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,45 -1,12 +1,46 @@@
 -3.11.9
 +4.0-beta3
 + * Avoid invalid state transition exception during incremental repair (CASSANDRA-16067)
 + * Allow zero padding in timestamp serialization (CASSANDRA-16105)
 + * Add byte array backed cells (CASSANDRA-15393)
 + * Correctly handle pending ranges with adjacent range movements (CASSANDRA-14801)
 + * Avoid adding locahost when streaming trivial ranges (CASSANDRA-16099)
 + * Add nodetool getfullquerylog (CASSANDRA-15988)
 + * Fix yaml format and alignment in tpstats (CASSANDRA-11402)
 + * Avoid trying to keep track of RTs for endpoints we won't write to during read repair (CASSANDRA-16084)
 + * When compaction gets interrupted, the exception should include the compactionId (CASSANDRA-15954)
 + * Make Table/Keyspace Metric Names Consistent With Each Other (CASSANDRA-15909)
 + * Mutating sstable component may race with entire-sstable-streaming(ZCS) causing checksum validation failure (CASSANDRA-15861)
 +Merged from 3.11:
++ * Use IF NOT EXISTS for index and UDT create statements in snapshot schema files (CASSANDRA-13935)
   * Make sure LCS handles duplicate sstable added/removed notifications correctly (CASSANDRA-14103)
  
 -3.11.8
 +4.0-beta2
 + * Add addition incremental repair visibility to nodetool repair_admin (CASSANDRA-14939)
 + * Always access system properties and environment variables via the new CassandraRelevantProperties and CassandraRelevantEnv classes (CASSANDRA-15876)
 + * Remove deprecated HintedHandOffManager (CASSANDRA-15939)
 + * Prevent repair from overrunning compaction (CASSANDRA-15817)
 + * fix cqlsh COPY functions in Python 3.8 on Mac (CASSANDRA-16053)
 + * Strip comment blocks from cqlsh input before processing statements (CASSANDRA-15802)
 + * Fix unicode chars error input (CASSANDRA-15990)
 + * Improved testability for CacheMetrics and ChunkCacheMetrics (CASSANDRA-15788)
 + * Handle errors in StreamSession#prepare (CASSANDRA-15852)
 + * FQL replay should have options to ignore DDL statements (CASSANDRA-16039)
 + * Remove COMPACT STORAGE internals (CASSANDRA-13994)
 + * Make TimestampSerializer accept fractional seconds of varying precision (CASSANDRA-15976)
 + * Improve cassandra-stress logging when using a profile file that doesn't exist (CASSANDRA-14425)
 + * Improve logging for socket connection/disconnection (CASSANDRA-15980)
 + * Throw FSWriteError upon write failures in order to apply DiskFailurePolicy (CASSANDRA-15928)
 + * Forbid altering UDTs used in partition keys (CASSANDRA-15933)
 + * Fix version parsing logic when upgrading from 3.0 (CASSANDRA-15973)
 + * Optimize NoSpamLogger use in hot paths (CASSANDRA-15766)
 + * Verify sstable components on startup (CASSANDRA-15945)
 + * Resolve JMX output inconsistencies from CASSANDRA-7544 storage-port-configurable-per-node (CASSANDRA-15937)
 +Merged from 3.11:
   * Correctly interpret SASI's `max_compaction_flush_memory_in_mb` setting in megabytes not bytes (CASSANDRA-16071)
   * Fix short read protection for GROUP BY queries (CASSANDRA-15459)
 + * stop_paranoid disk failure policy is ignored on CorruptSSTableException after node is up (CASSANDRA-15191)
   * Frozen RawTuple is not annotated with frozen in the toString method (CASSANDRA-15857)
  Merged from 3.0:
 - * Use IF NOT EXISTS for index and UDT create statements in snapshot schema files (CASSANDRA-13935)
   * Fix gossip shutdown order (CASSANDRA-15816)
   * Remove broken 'defrag-on-read' optimization (CASSANDRA-15432)
   * Check for endpoint collision with hibernating nodes (CASSANDRA-14599)
diff --cc src/java/org/apache/cassandra/cql3/SchemaElement.java
index ec0dbee,0000000..d99de7c
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/cql3/SchemaElement.java
+++ b/src/java/org/apache/cassandra/cql3/SchemaElement.java
@@@ -1,96 -1,0 +1,97 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.cql3;
 +
 +import java.util.Comparator;
 +import java.util.Locale;
 +
 +/**
 + * A schema element (keyspace, udt, udf, uda, table, index, view).
 + */
 +public interface SchemaElement
 +{
 +    /**
 +     * Comparator used to sort {@code Describable} name.
 +     */
 +    Comparator<SchemaElement> NAME_COMPARATOR = (o1, o2) -> o1.elementName().compareToIgnoreCase(o2.elementName());
 +
 +    enum SchemaElementType
 +    {
 +        KEYSPACE,
 +        TYPE,
 +        FUNCTION,
 +        AGGREGATE,
 +        TABLE,
 +        INDEX,
 +        MATERIALIZED_VIEW;
 +
 +        @Override
 +        public String toString()
 +        {
 +            return super.toString().toLowerCase(Locale.US);
 +        }
 +    }
 +
 +    /**
 +     * Return the schema element type
 +     *
 +     * @return the schema element type
 +     */
 +    SchemaElementType elementType();
 +
 +    /**
 +     * Returns the CQL name of the keyspace to which this schema element belong.
 +     *
 +     * @return the keyspace name.
 +     */
 +    String elementKeyspace();
 +
 +    /**
 +     * Returns the CQL name of this schema element.
 +     *
 +     * @return the name of this schema element.
 +     */
 +    String elementName();
 +
 +    default String elementNameQuotedIfNeeded()
 +    {
 +        String name = elementName();
 +        if (elementType() == SchemaElementType.FUNCTION
 +                || elementType() == SchemaElementType.AGGREGATE)
 +        {
 +            int index = name.indexOf('(');
 +            return ColumnIdentifier.maybeQuote(name.substring(0, index)) + name.substring(index);
 +        }
 +
 +        return ColumnIdentifier.maybeQuote(name);
 +    }
 +
 +    default String elementKeyspaceQuotedIfNeeded()
 +    {
 +        return ColumnIdentifier.maybeQuote(elementKeyspace());
 +    }
 +
 +    /**
 +     * Returns a CQL representation of this element
 +     *
 +     * @param withInternals if the internals part of the CQL should be exposed.
++     * @param ifNotExists if "IF NOT EXISTS" should be included.
 +     * @return a CQL representation of this element
 +     */
-     String toCqlString(boolean withInternals);
++    String toCqlString(boolean withInternals, boolean ifNotExists);
 +}
diff --cc src/java/org/apache/cassandra/cql3/functions/UDAggregate.java
index db5859f,1a3174c..b201f09
--- a/src/java/org/apache/cassandra/cql3/functions/UDAggregate.java
+++ b/src/java/org/apache/cassandra/cql3/functions/UDAggregate.java
@@@ -324,41 -252,4 +324,47 @@@ public class UDAggregate extends Abstra
      {
          return Objects.hashCode(name, Functions.typeHashCode(argTypes), Functions.typeHashCode(returnType), stateFunction, finalFunction, stateType, initcond);
      }
 +
 +    @Override
 +    public SchemaElementType elementType()
 +    {
 +        return SchemaElementType.AGGREGATE;
 +    }
 +
 +    @Override
-     public String toCqlString(boolean withInternals)
++    public String toCqlString(boolean withInternals, boolean ifNotExists)
 +    {
 +        CqlBuilder builder = new CqlBuilder();
-         builder.append("CREATE AGGREGATE ")
-                .append(name())
++        builder.append("CREATE AGGREGATE ");
++
++        if (ifNotExists)
++        {
++            builder.append("IF NOT EXISTS ");
++        }
++
++        builder.append(name())
 +               .append('(')
 +               .appendWithSeparators(argTypes, (b, t) -> b.append(toCqlString(t)), ", ")
 +               .append(')')
 +               .newLine()
 +               .increaseIndent()
 +               .append("SFUNC ")
 +               .append(stateFunction().name().name)
 +               .newLine()
 +               .append("STYPE ")
 +               .append(toCqlString(stateType()));
 +
 +        if (finalFunction() != null)
 +            builder.newLine()
 +                   .append("FINALFUNC ")
 +                   .append(finalFunction().name().name);
 +
 +        if (initialCondition() != null)
 +            builder.newLine()
 +                   .append("INITCOND ")
 +                   .append(stateType().asCQL3Type().toCQLLiteral(initialCondition(), ProtocolVersion.CURRENT));
 +
 +        return builder.append(";")
 +                      .toString();
 +    }
  }
diff --cc src/java/org/apache/cassandra/cql3/functions/UDFunction.java
index bceb085,6928a06..a533669
--- a/src/java/org/apache/cassandra/cql3/functions/UDFunction.java
+++ b/src/java/org/apache/cassandra/cql3/functions/UDFunction.java
@@@ -302,48 -281,6 +302,53 @@@ public abstract class UDFunction extend
          };
      }
  
 +    @Override
 +    public SchemaElementType elementType()
 +    {
 +        return SchemaElementType.FUNCTION;
 +    }
 +
 +    @Override
-     public String toCqlString(boolean withInternals)
++    public String toCqlString(boolean withInternals, boolean ifNotExists)
 +    {
 +        CqlBuilder builder = new CqlBuilder();
-         builder.append("CREATE FUNCTION ")
-                .append(name())
-                .append("(");
++        builder.append("CREATE FUNCTION ");
++
++        if (ifNotExists)
++        {
++            builder.append("IF NOT EXISTS ");
++        }
++
++        builder.append(name()).append("(");
 +
 +        for (int i = 0, m = argNames().size(); i < m; i++)
 +        {
 +            if (i > 0)
 +                builder.append(", ");
 +            builder.append(argNames().get(i))
 +                   .append(' ')
 +                   .append(toCqlString(argTypes().get(i)));
 +        }
 +
 +        builder.append(')')
 +               .newLine()
 +               .increaseIndent()
 +               .append(isCalledOnNullInput() ? "CALLED" : "RETURNS NULL")
 +               .append(" ON NULL INPUT")
 +               .newLine()
 +               .append("RETURNS ")
 +               .append(toCqlString(returnType()))
 +               .newLine()
 +               .append("LANGUAGE ")
 +               .append(language())
 +               .newLine()
 +               .append("AS $$")
 +               .append(body())
 +               .append("$$;");
 +
 +        return builder.toString();
 +    }
 +
      public final ByteBuffer execute(ProtocolVersion protocolVersion, List<ByteBuffer> parameters)
      {
          assertUdfsEnabled(language);
diff --cc src/java/org/apache/cassandra/cql3/statements/DescribeStatement.java
index 48b4160,0000000..16671dd
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/cql3/statements/DescribeStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/DescribeStatement.java
@@@ -1,750 -1,0 +1,750 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.cql3.statements;
 +
 +import java.io.IOException;
 +import java.nio.ByteBuffer;
 +import java.util.*;
 +import java.util.function.BiFunction;
 +import java.util.stream.Collectors;
 +import java.util.stream.Stream;
 +
 +import com.google.common.collect.ImmutableList;
 +
 +import org.apache.cassandra.audit.AuditLogContext;
 +import org.apache.cassandra.audit.AuditLogEntryType;
 +import org.apache.cassandra.config.DatabaseDescriptor;
 +import org.apache.cassandra.cql3.*;
 +import org.apache.cassandra.cql3.functions.FunctionName;
 +import org.apache.cassandra.db.KeyspaceNotDefinedException;
 +import org.apache.cassandra.db.marshal.ListType;
 +import org.apache.cassandra.db.marshal.MapType;
 +import org.apache.cassandra.db.marshal.UTF8Type;
 +import org.apache.cassandra.db.virtual.VirtualKeyspaceRegistry;
 +import org.apache.cassandra.exceptions.InvalidRequestException;
 +import org.apache.cassandra.exceptions.RequestExecutionException;
 +import org.apache.cassandra.exceptions.RequestValidationException;
 +import org.apache.cassandra.io.util.DataInputBuffer;
 +import org.apache.cassandra.io.util.DataOutputBuffer;
 +import org.apache.cassandra.schema.*;
 +import org.apache.cassandra.service.ClientState;
 +import org.apache.cassandra.service.QueryState;
 +import org.apache.cassandra.service.StorageService;
 +import org.apache.cassandra.service.pager.PagingState;
 +import org.apache.cassandra.transport.messages.ResultMessage;
 +import org.apache.cassandra.utils.ByteBufferUtil;
 +import org.apache.cassandra.utils.FBUtilities;
 +import org.apache.cassandra.utils.UUIDGen;
 +
 +import static java.lang.String.format;
 +import static org.apache.cassandra.cql3.statements.RequestValidations.checkNotEmpty;
 +import static org.apache.cassandra.cql3.statements.RequestValidations.checkNotNull;
 +import static org.apache.cassandra.cql3.statements.RequestValidations.checkTrue;
 +import static org.apache.cassandra.cql3.statements.RequestValidations.invalidRequest;
 +import static org.apache.cassandra.utils.ByteBufferUtil.bytes;
 +
 +/**
 + * The differents <code>DESCRIBE</code> statements parsed from a CQL statement.
 + */
 +public abstract class DescribeStatement<T> extends CQLStatement.Raw implements CQLStatement
 +{
 +    private static final String KS = "system";
 +    private static final String CF = "describe";
 +
 +    /**
 +     * The columns returned by the describe queries that only list elements names (e.g. DESCRIBE KEYSPACES, DESCRIBE TABLES...) 
 +     */
 +    private static final List<ColumnSpecification> LIST_METADATA = 
 +            ImmutableList.of(new ColumnSpecification(KS, CF, new ColumnIdentifier("keyspace_name", true), UTF8Type.instance),
 +                             new ColumnSpecification(KS, CF, new ColumnIdentifier("type", true), UTF8Type.instance),
 +                             new ColumnSpecification(KS, CF, new ColumnIdentifier("name", true), UTF8Type.instance));
 +
 +    /**
 +     * The columns returned by the describe queries that returns the CREATE STATEMENT for the different elements (e.g. DESCRIBE KEYSPACE, DESCRIBE TABLE ...) 
 +     */
 +    private static final List<ColumnSpecification> ELEMENT_METADATA = 
 +            ImmutableList.<ColumnSpecification>builder().addAll(LIST_METADATA)
 +                                                        .add(new ColumnSpecification(KS, CF, new ColumnIdentifier("create_statement", true), UTF8Type.instance))
 +                                                        .build();
 +
 +    /**
 +     * "Magic version" for the paging state.
 +     */
 +    private static final int PAGING_STATE_VERSION = 0x0001;
 +
 +    static final String SCHEMA_CHANGED_WHILE_PAGING_MESSAGE = "The schema has changed since the previous page of the DESCRIBE statement result. " +
 +                                                              "Please retry the DESCRIBE statement.";
 +
 +    private boolean includeInternalDetails;
 +
 +    public final void withInternalDetails()
 +    {
 +        this.includeInternalDetails = true;
 +    }
 +
 +    @Override
 +    public final CQLStatement prepare(ClientState clientState) throws RequestValidationException
 +    {
 +        return this;
 +    }
 +
 +    public final List<ColumnSpecification> getBindVariables()
 +    {
 +        return Collections.emptyList();
 +    }
 +
 +    @Override
 +    public final void authorize(ClientState state)
 +    {
 +    }
 +
 +    @Override
 +    public final void validate(ClientState state)
 +    {
 +    }
 +
 +    public final AuditLogContext getAuditLogContext()
 +    {
 +        return new AuditLogContext(AuditLogEntryType.DESCRIBE);
 +    }
 +
 +    @Override
 +    public final ResultMessage execute(QueryState state, QueryOptions options, long queryStartNanoTime) throws RequestValidationException, RequestExecutionException
 +    {
 +        return executeLocally(state, options);
 +    }
 +
 +    @Override
 +    public ResultMessage executeLocally(QueryState state, QueryOptions options)
 +    {
 +        Keyspaces keyspaces = Schema.instance.snapshot();
 +        UUID schemaVersion = Schema.instance.getVersion();
 +
 +        keyspaces = Keyspaces.builder()
 +                             .add(keyspaces)
 +                             .add(VirtualKeyspaceRegistry.instance.virtualKeyspacesMetadata())
 +                             .build();
 +
 +        PagingState pagingState = options.getPagingState();
 +
 +        // The paging implemented here uses some arbitray row number as the partition-key for paging,
 +        // which is used to skip/limit the result from the Java Stream. This works good enough for
 +        // reasonably sized schemas. Even a 'DESCRIBE SCHEMA' for an abnormally schema with 10000 tables
 +        // completes within a few seconds. This seems good enough for now. Once Cassandra actually supports
 +        // more than a few hundred tables, the implementation here should be reconsidered.
 +        //
 +        // Paging is only supported on row-level.
 +        //
 +        // The "partition key" in the paging-state contains a serialized object:
 +        //   (short) version, currently 0x0001
 +        //   (long) row offset
 +        //   (vint bytes) serialized schema hash (currently the result of Keyspaces.hashCode())
 +        //
 +
 +        long offset = getOffset(pagingState, schemaVersion);
 +        int pageSize = options.getPageSize();
 +
 +        Stream<? extends T> stream = describe(state.getClientState(), keyspaces);
 +
 +        if (offset > 0L)
 +            stream = stream.skip(offset);
 +        if (pageSize > 0)
 +            stream = stream.limit(pageSize);
 +
 +        List<List<ByteBuffer>> rows = stream.map(e -> toRow(e, includeInternalDetails))
 +                                            .collect(Collectors.toList());
 +
 +        ResultSet.ResultMetadata resultMetadata = new ResultSet.ResultMetadata(metadata(state.getClientState()));
 +        ResultSet result = new ResultSet(resultMetadata, rows);
 +
 +        if (pageSize > 0 && rows.size() == pageSize)
 +        {
 +            result.metadata.setHasMorePages(getPagingState(offset + pageSize, schemaVersion));
 +        }
 +
 +        return new ResultMessage.Rows(result);
 +    }
 +
 +    /**
 +     * Returns the columns of the {@code ResultMetadata}
 +     */
 +    protected abstract List<ColumnSpecification> metadata(ClientState state);
 +
 +    private PagingState getPagingState(long nextPageOffset, UUID schemaVersion)
 +    {
 +        try (DataOutputBuffer out = new DataOutputBuffer())
 +        {
 +            out.writeShort(PAGING_STATE_VERSION);
 +            out.writeUTF(FBUtilities.getReleaseVersionString());
 +            out.write(UUIDGen.decompose(schemaVersion));
 +            out.writeLong(nextPageOffset);
 +
 +            return new PagingState(out.asNewBuffer(),
 +                                   null,
 +                                   Integer.MAX_VALUE,
 +                                   Integer.MAX_VALUE);
 +        }
 +        catch (IOException e)
 +        {
 +            throw new InvalidRequestException("Invalid paging state.", e);
 +        }
 +    }
 +
 +    private long getOffset(PagingState pagingState, UUID schemaVersion)
 +    {
 +        if (pagingState == null)
 +            return 0L;
 +
 +        try (DataInputBuffer in = new DataInputBuffer(pagingState.partitionKey, false))
 +        {
 +            checkTrue(in.readShort() == PAGING_STATE_VERSION, "Incompatible paging state");
 +
 +            final String pagingStateServerVersion = in.readUTF();
 +            final String releaseVersion = FBUtilities.getReleaseVersionString();
 +            checkTrue(pagingStateServerVersion.equals(releaseVersion),
 +                      "The server version of the paging state %s is different from the one of the server %s",
 +                      pagingStateServerVersion,
 +                      releaseVersion);
 +
 +            byte[] bytes = new byte[UUIDGen.UUID_LEN];
 +            in.read(bytes);
 +            UUID version = UUIDGen.getUUID(ByteBuffer.wrap(bytes));
 +            checkTrue(schemaVersion.equals(version), SCHEMA_CHANGED_WHILE_PAGING_MESSAGE);
 +
 +            return in.readLong();
 +        }
 +        catch (IOException e)
 +        {
 +            throw new InvalidRequestException("Invalid paging state.", e);
 +        }
 +    }
 +
 +    protected abstract List<ByteBuffer> toRow(T element, boolean withInternals);
 +
 +    /**
 +     * Returns the schema elements that must be part of the output.
 +     */
 +    protected abstract Stream<? extends T> describe(ClientState state, Keyspaces keyspaces);
 +
 +    /**
 +     * Returns the metadata for the given keyspace or throws a {@link KeyspaceNotDefinedException} exception.
 +     */
 +    private static KeyspaceMetadata validateKeyspace(String ks, Keyspaces keyspaces)
 +    {
 +        return keyspaces.get(ks)
 +                        .orElseThrow(() -> new KeyspaceNotDefinedException(format("'%s' not found in keyspaces", ks)));
 +    }
 +
 +    /**
 +     * {@code DescribeStatement} implementation used for describe queries that only list elements names.
 +     */
 +    public static final class Listing extends DescribeStatement<SchemaElement>
 +    {
 +        private final java.util.function.Function<KeyspaceMetadata, Stream<? extends SchemaElement>> elementsProvider;
 +
 +        public Listing(java.util.function.Function<KeyspaceMetadata, Stream<? extends SchemaElement>> elementsProvider)
 +        {
 +            this.elementsProvider = elementsProvider;
 +        }
 +
 +        @Override
 +        protected Stream<? extends SchemaElement> describe(ClientState state, Keyspaces keyspaces)
 +        {
 +            String keyspace = state.getRawKeyspace();
 +            Stream<KeyspaceMetadata> stream = keyspace == null ? keyspaces.stream().sorted(SchemaElement.NAME_COMPARATOR)
 +                                                               : Stream.of(validateKeyspace(keyspace, keyspaces));
 +
 +            return stream.flatMap(k -> elementsProvider.apply(k).sorted(SchemaElement.NAME_COMPARATOR));
 +        }
 +
 +        @Override
 +        protected List<ColumnSpecification> metadata(ClientState state)
 +        {
 +            return LIST_METADATA;
 +        }
 +
 +        @Override
 +        protected List<ByteBuffer> toRow(SchemaElement element, boolean withInternals)
 +        {
 +            return ImmutableList.of(bytes(element.elementKeyspaceQuotedIfNeeded()),
 +                                    bytes(element.elementType().toString()),
 +                                    bytes(element.elementNameQuotedIfNeeded()));
 +        }
 +    }
 +
 +    /**
 +     * Creates a {@link DescribeStatement} for {@code DESCRIBE TABLES}.
 +     */
 +    public static DescribeStatement<SchemaElement> tables()
 +    {
 +        return new Listing(ks -> ks.tables.stream());
 +    }
 +
 +    /**
 +     * Creates a {@link DescribeStatement} for {@code DESCRIBE TYPES}.
 +     */
 +    public static DescribeStatement<SchemaElement> types()
 +    {
 +        return new Listing(ks -> ks.types.stream());
 +    }
 +
 +    /**
 +     * Creates a {@link DescribeStatement} for {@code DESCRIBE FUNCTIONS}.
 +     */
 +    public static DescribeStatement<SchemaElement> functions()
 +    {
 +        return new Listing(ks -> ks.functions.udfs());
 +    }
 +
 +    /**
 +     * Creates a {@link DescribeStatement} for {@code DESCRIBE AGGREGATES}.
 +     */
 +    public static DescribeStatement<SchemaElement> aggregates()
 +    {
 +        return new Listing(ks -> ks.functions.udas());
 +    }
 +
 +    /**
 +     * Creates a {@link DescribeStatement} for {@code DESCRIBE KEYSPACES}.
 +     */
 +    public static DescribeStatement<SchemaElement> keyspaces()
 +    {
 +        return new DescribeStatement<SchemaElement>()
 +        {
 +            @Override
 +            protected Stream<? extends SchemaElement> describe(ClientState state, Keyspaces keyspaces)
 +            {
 +                return keyspaces.stream().sorted(SchemaElement.NAME_COMPARATOR);
 +            }
 +
 +            @Override
 +            protected List<ColumnSpecification> metadata(ClientState state)
 +            {
 +                return LIST_METADATA;
 +            }
 +
 +            @Override
 +            protected List<ByteBuffer> toRow(SchemaElement element, boolean withInternals)
 +            {
 +                return ImmutableList.of(bytes(element.elementKeyspaceQuotedIfNeeded()),
 +                                        bytes(element.elementType().toString()),
 +                                        bytes(element.elementNameQuotedIfNeeded()));
 +            }
 +        };
 +    }
 +
 +    /**
 +     * Creates a {@link DescribeStatement} for {@code DESCRIBE [FULL] SCHEMA}.
 +     */
 +    public static DescribeStatement<SchemaElement> schema(boolean includeSystemKeyspaces)
 +    {
 +        return new DescribeStatement<SchemaElement>()
 +        {
 +            @Override
 +            protected Stream<? extends SchemaElement> describe(ClientState state, Keyspaces keyspaces)
 +            {
 +                return keyspaces.stream()
 +                                .filter(ks -> includeSystemKeyspaces || !SchemaConstants.isSystemKeyspace(ks.name))
 +                                .sorted(SchemaElement.NAME_COMPARATOR)
 +                                .flatMap(ks -> getKeyspaceElements(ks, false));
 +            }
 +
 +            @Override
 +            protected List<ColumnSpecification> metadata(ClientState state)
 +            {
 +                return ELEMENT_METADATA;
 +            }
 +
 +            @Override
 +            protected List<ByteBuffer> toRow(SchemaElement element, boolean withInternals)
 +            {
 +                return ImmutableList.of(bytes(element.elementKeyspaceQuotedIfNeeded()),
 +                                        bytes(element.elementType().toString()),
 +                                        bytes(element.elementNameQuotedIfNeeded()),
-                                         bytes(element.toCqlString(withInternals)));
++                                        bytes(element.toCqlString(withInternals, false)));
 +            }
 +        };
 +    }
 +
 +    /**
 +     * {@code DescribeStatement} implementation used for describe queries for a single schema element.
 +     */
 +    public static class Element extends DescribeStatement<SchemaElement>
 +    {
 +        /**
 +         * The keyspace name 
 +         */
 +        private final String keyspace;
 +
 +        /**
 +         * The element name
 +         */
 +        private final String name;
 +
 +        private final BiFunction<KeyspaceMetadata, String, Stream<? extends SchemaElement>> elementsProvider;
 +
 +        public Element(String keyspace, String name, BiFunction<KeyspaceMetadata, String, Stream<? extends SchemaElement>> elementsProvider)
 +        {
 +            this.keyspace = keyspace;
 +            this.name = name;
 +            this.elementsProvider = elementsProvider;
 +        }
 +
 +        @Override
 +        protected Stream<? extends SchemaElement> describe(ClientState state, Keyspaces keyspaces)
 +        {
 +            String ks = keyspace == null ? checkNotNull(state.getRawKeyspace(), "No keyspace specified and no current keyspace")
 +                                         : keyspace;
 +
 +            return elementsProvider.apply(validateKeyspace(ks, keyspaces), name);
 +        }
 +
 +        @Override
 +        protected List<ColumnSpecification> metadata(ClientState state)
 +        {
 +            return ELEMENT_METADATA;
 +        }
 +
 +        @Override
 +        protected List<ByteBuffer> toRow(SchemaElement element, boolean withInternals)
 +        {
 +            return ImmutableList.of(bytes(element.elementKeyspaceQuotedIfNeeded()),
 +                                    bytes(element.elementType().toString()),
 +                                    bytes(element.elementNameQuotedIfNeeded()),
-                                     bytes(element.toCqlString(withInternals)));
++                                    bytes(element.toCqlString(withInternals, false)));
 +        }
 +    }
 +
 +    /**
 +     * Creates a {@link DescribeStatement} for {@code DESCRIBE KEYSPACE}.
 +     */
 +    public static DescribeStatement<SchemaElement> keyspace(String keyspace, boolean onlyKeyspaceDefinition)
 +    {
 +        return new Element(keyspace, null, (ks, t) -> getKeyspaceElements(ks, onlyKeyspaceDefinition));
 +    }
 +
 +    private static Stream<? extends SchemaElement> getKeyspaceElements(KeyspaceMetadata ks, boolean onlyKeyspace)
 +    {
 +        Stream<? extends SchemaElement> s = Stream.of(ks);
 +
 +        if (!onlyKeyspace)
 +        {
 +            s = Stream.concat(s, ks.types.sortedStream());
 +            s = Stream.concat(s, ks.functions.udfs().sorted(SchemaElement.NAME_COMPARATOR));
 +            s = Stream.concat(s, ks.functions.udas().sorted(SchemaElement.NAME_COMPARATOR));
 +            s = Stream.concat(s, ks.tables.stream().sorted(SchemaElement.NAME_COMPARATOR)
 +                                                   .flatMap(tm -> getTableElements(ks, tm)));
 +        }
 +
 +        return s;
 +    }
 +
 +    private static Stream<? extends SchemaElement> getTableElements(KeyspaceMetadata ks, TableMetadata table)
 +    {
 +        Stream<? extends SchemaElement> s = Stream.of(table);
 +        s = Stream.concat(s, table.indexes.stream()
 +                                          .map(i -> toDescribable(table, i))
 +                                          .sorted(SchemaElement.NAME_COMPARATOR));
 +        s = Stream.concat(s, ks.views.stream(table.id)
 +                                     .sorted(SchemaElement.NAME_COMPARATOR));
 +        return s;
 +    }
 +
 +    /**
 +     * Creates a {@link DescribeStatement} for {@code DESCRIBE TABLE}.
 +     */
 +    public static DescribeStatement<SchemaElement> table(String keyspace, String name)
 +    {
 +        return new Element(keyspace, name, (ks, t) -> {
 +
 +            TableMetadata table = checkNotNull(ks.getTableOrViewNullable(t),
 +                                               "Table '%s' not found in keyspace '%s'", t, ks.name);
 +
 +            return Stream.concat(Stream.of(table), table.indexes.stream()
 +                                                                .map(index -> toDescribable(table, index))
 +                                                                .sorted(SchemaElement.NAME_COMPARATOR));
 +        });
 +    }
 +
 +    /**
 +     * Creates a {@link DescribeStatement} for {@code DESCRIBE INDEX}.
 +     */
 +    public static DescribeStatement<SchemaElement> index(String keyspace, String name)
 +    {
 +        return new Element(keyspace, name, (ks, index) -> {
 +
 +            TableMetadata tm = ks.findIndexedTable(index)
 +                                 .orElseThrow(() -> invalidRequest("Table for existing index '%s' not found in '%s'",
 +                                                                   index,
 +                                                                   ks.name));
 +            return tm.indexes.get(index)
 +                             .map(i -> toDescribable(tm, i))
 +                             .map(Stream::of)
 +                             .orElseThrow(() -> invalidRequest("Index '%s' not found in '%s'", index, ks.name));
 +        });
 +    }
 +
 +    /**
 +     * Creates a {@link DescribeStatement} for {@code DESCRIBE MATERIALIZED VIEW}.
 +     */
 +    public static DescribeStatement<SchemaElement> view(String keyspace, String name)
 +    {
 +        return new Element(keyspace, name, (ks, view) -> {
 +
 +            return ks.views.get(view)
 +                           .map(Stream::of)
 +                           .orElseThrow(() -> invalidRequest("Materialized view '%s' not found in '%s'", view, ks.name));
 +        });
 +    }
 +
 +    /**
 +     * Creates a {@link DescribeStatement} for {@code DESCRIBE TYPE}.
 +     */
 +    public static DescribeStatement<SchemaElement> type(String keyspace, String name)
 +    {
 +        return new Element(keyspace, name, (ks, type) -> {
 +
 +            return ks.types.get(ByteBufferUtil.bytes(type))
 +                           .map(Stream::of)
 +                           .orElseThrow(() -> invalidRequest("User defined type '%s' not found in '%s'",
 +                                                             type,
 +                                                             ks.name));
 +        });
 +    }
 +
 +    /**
 +     * Creates a {@link DescribeStatement} for {@code DESCRIBE FUNCTION}.
 +     */
 +    public static DescribeStatement<SchemaElement> function(String keyspace, String name)
 +    {
 +        return new Element(keyspace, name, (ks, n) -> {
 +
 +            return checkNotEmpty(ks.functions.getUdfs(new FunctionName(ks.name, n)),
 +                                 "User defined function '%s' not found in '%s'", n, ks.name).stream()
 +                                                                                             .sorted(SchemaElement.NAME_COMPARATOR);
 +        });
 +    }
 +
 +    /**
 +     * Creates a {@link DescribeStatement} for {@code DESCRIBE FUNCTION}.
 +     */
 +    public static DescribeStatement<SchemaElement> aggregate(String keyspace, String name)
 +    {
 +        return new Element(keyspace, name, (ks, n) -> {
 +
 +            return checkNotEmpty(ks.functions.getUdas(new FunctionName(ks.name, n)),
 +                                 "User defined aggregate '%s' not found in '%s'", n, ks.name).stream()
 +                                                                                              .sorted(SchemaElement.NAME_COMPARATOR);
 +        });
 +    }
 +
 +    private static SchemaElement toDescribable(TableMetadata table, IndexMetadata index)
 +    {
 +        return new SchemaElement()
 +                {
 +                    @Override
 +                    public SchemaElementType elementType()
 +                    {
 +                        return SchemaElementType.INDEX;
 +                    }
 +
 +                    @Override
 +                    public String elementKeyspace()
 +                    {
 +                        return table.keyspace;
 +                    }
 +
 +                    @Override
 +                    public String elementName()
 +                    {
 +                        return index.name;
 +                    }
 +
 +                    @Override
-                     public String toCqlString(boolean withInternals)
++                    public String toCqlString(boolean withInternals, boolean ifNotExists)
 +                    {
-                         return index.toCqlString(table);
++                        return index.toCqlString(table, ifNotExists);
 +                    }
 +                };
 +    }
 +
 +    /**
 +     * Creates a {@link DescribeStatement} for the generic {@code DESCRIBE ...}.
 +     */
 +    public static DescribeStatement<SchemaElement> generic(String keyspace, String name)
 +    {
 +        return new DescribeStatement<SchemaElement>()
 +        {
 +            private DescribeStatement<SchemaElement> delegate;
 +
 +            private DescribeStatement<SchemaElement> resolve(ClientState state, Keyspaces keyspaces)
 +            {
 +                String ks = keyspace;
 +
 +                // from cqlsh help: "keyspace or a table or an index or a materialized view (in this order)."
 +                if (keyspace == null)
 +                {
 +                    if (keyspaces.containsKeyspace(name))
 +                        return keyspace(name, false);
 +
 +                    String rawKeyspace = state.getRawKeyspace();
 +                    ks = rawKeyspace == null ? name : rawKeyspace;
 +                }
 +
 +                KeyspaceMetadata keyspaceMetadata = validateKeyspace(ks, keyspaces);
 +
 +                if (keyspaceMetadata.tables.getNullable(name) != null)
 +                    return table(ks, name);
 +
 +                Optional<TableMetadata> indexed = keyspaceMetadata.findIndexedTable(name);
 +                if (indexed.isPresent())
 +                {
 +                    Optional<IndexMetadata> index = indexed.get().indexes.get(name);
 +                    if (index.isPresent())
 +                        return index(ks, name);
 +                }
 +
 +                if (keyspaceMetadata.views.getNullable(name) != null)
 +                    return view(ks, name);
 +
 +                throw invalidRequest("'%s' not found in keyspace '%s'", name, ks);
 +            }
 +
 +            @Override
 +            protected Stream<? extends SchemaElement> describe(ClientState state, Keyspaces keyspaces)
 +            {
 +                delegate = resolve(state, keyspaces);
 +                return delegate.describe(state, keyspaces);
 +            }
 +
 +            @Override
 +            protected List<ColumnSpecification> metadata(ClientState state)
 +            {
 +                return delegate.metadata(state);
 +            }
 +
 +            @Override
 +            protected List<ByteBuffer> toRow(SchemaElement element, boolean withInternals)
 +            {
 +                return delegate.toRow(element, withInternals);
 +            }
 +        };
 +    }
 +
 +    /**
 +     * Creates a {@link DescribeStatement} for {@code DESCRIBE CLUSTER}.
 +     */
 +    public static DescribeStatement<List<Object>> cluster()
 +    {
 +        return new DescribeStatement<List<Object>>()
 +        {
 +            /**
 +             * The column index of the cluster name
 +             */
 +            private static final int CLUSTER_NAME_INDEX = 0;
 +
 +            /**
 +             * The column index of the partitioner name
 +             */
 +            private static final int PARTITIONER_NAME_INDEX = 1;
 +
 +            /**
 +             * The column index of the snitch class
 +             */
 +            private static final int SNITCH_CLASS_INDEX = 2;
 +
 +            /**
 +             * The range ownerships index
 +             */
 +            private static final int RANGE_OWNERSHIPS_INDEX = 3;
 +
 +            @Override
 +            protected Stream<List<Object>> describe(ClientState state, Keyspaces keyspaces)
 +            {
 +                List<Object> list = new ArrayList<Object>();
 +                list.add(DatabaseDescriptor.getClusterName());
 +                list.add(trimIfPresent(DatabaseDescriptor.getPartitionerName(), "org.apache.cassandra.dht."));
 +                list.add(trimIfPresent(DatabaseDescriptor.getEndpointSnitch().getClass().getName(),
 +                                            "org.apache.cassandra.locator."));
 + 
 +                String useKs = state.getRawKeyspace();
 +                if (mustReturnsRangeOwnerships(useKs))
 +                {
 +                    list.add(StorageService.instance.getRangeToAddressMap(useKs)
 +                                                    .entrySet()
 +                                                    .stream()
 +                                                    .sorted(Comparator.comparing(Map.Entry::getKey))
 +                                                    .collect(Collectors.toMap(e -> e.getKey().right.toString(),
 +                                                                              e -> e.getValue()
 +                                                                                    .stream()
 +                                                                                    .map(r -> r.endpoint().toString())
 +                                                                                    .collect(Collectors.toList()))));
 +                }
 +                return Stream.of(list);
 +            }
 +
 +            private boolean mustReturnsRangeOwnerships(String useKs)
 +            {
 +                return useKs != null && !SchemaConstants.isLocalSystemKeyspace(useKs) && !SchemaConstants.isSystemKeyspace(useKs);
 +            }
 +
 +            @Override
 +            protected List<ColumnSpecification> metadata(ClientState state)
 +            {
 +                ImmutableList.Builder<ColumnSpecification> builder = ImmutableList.builder();
 +                builder.add(new ColumnSpecification(KS, CF, new ColumnIdentifier("cluster", true), UTF8Type.instance),
 +                                        new ColumnSpecification(KS, CF, new ColumnIdentifier("partitioner", true), UTF8Type.instance),
 +                                        new ColumnSpecification(KS, CF, new ColumnIdentifier("snitch", true), UTF8Type.instance));
 +
 +                if (mustReturnsRangeOwnerships(state.getRawKeyspace()))
 +                    builder.add(new ColumnSpecification(KS, CF, new ColumnIdentifier("range_ownership", true), MapType.getInstance(UTF8Type.instance,
 +                                                                                                                                   ListType.getInstance(UTF8Type.instance, false), false)));
 +
 +                return builder.build();
 +            }
 +
 +            @Override
 +            protected List<ByteBuffer> toRow(List<Object> elements, boolean withInternals)
 +            {
 +                ImmutableList.Builder<ByteBuffer> builder = ImmutableList.builder(); 
 +
 +                builder.add(UTF8Type.instance.decompose((String) elements.get(CLUSTER_NAME_INDEX)),
 +                            UTF8Type.instance.decompose((String) elements.get(PARTITIONER_NAME_INDEX)),
 +                            UTF8Type.instance.decompose((String) elements.get(SNITCH_CLASS_INDEX)));
 +
 +                if (elements.size() > 3)
 +                {
 +                    MapType<String, List<String>> rangeOwnershipType = MapType.getInstance(UTF8Type.instance,
 +                                                                                           ListType.getInstance(UTF8Type.instance, false),
 +                                                                                           false);
 +
 +                    builder.add(rangeOwnershipType.decompose((Map<String, List<String>>) elements.get(RANGE_OWNERSHIPS_INDEX)));
 +                }
 +
 +                return builder.build();
 +            }
 +
 +            private String trimIfPresent(String src, String begin)
 +            {
 +                if (src.startsWith(begin))
 +                    return src.substring(begin.length());
 +                return src;
 +            }
 +        };
 +    }
 +}
diff --cc src/java/org/apache/cassandra/db/SchemaCQLHelper.java
index ded1692,0000000..5d83a2b
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/db/SchemaCQLHelper.java
+++ b/src/java/org/apache/cassandra/db/SchemaCQLHelper.java
@@@ -1,180 -1,0 +1,185 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.cassandra.db;
 +
 +import java.nio.ByteBuffer;
 +import java.util.regex.Matcher;
 +import java.util.regex.Pattern;
 +import java.util.stream.Stream;
 +
 +import com.google.common.annotations.VisibleForTesting;
 +
 +import org.apache.cassandra.db.marshal.AbstractType;
 +import org.apache.cassandra.db.marshal.UTF8Type;
 +import org.apache.cassandra.db.marshal.UserType;
 +import org.apache.cassandra.schema.*;
 +
 +/**
 + * Helper methods to represent TableMetadata and related objects in CQL format
 + */
 +public class SchemaCQLHelper
 +{
 +    private static final Pattern EMPTY_TYPE_REGEX = Pattern.compile("empty", Pattern.LITERAL);
 +    private static final String EMPTY_TYPE_QUOTED = Matcher.quoteReplacement("'org.apache.cassandra.db.marshal.EmptyType'");
 +
 +    /**
 +     * Generates the DDL statement for a {@code schema.cql} snapshot file.
 +     */
 +    public static Stream<String> reCreateStatementsForSchemaCql(TableMetadata metadata, Types types)
 +    {
 +        // Types come first, as table can't be created without them
-         Stream<String> udts = SchemaCQLHelper.getUserTypesAsCQL(metadata, types);
++        Stream<String> udts = SchemaCQLHelper.getUserTypesAsCQL(metadata, types, true);
 +
 +        return Stream.concat(udts,
 +                             reCreateStatements(metadata,
 +                                                true,
 +                                                true,
 +                                                true,
 +                                                true));
 +    }
 +
 +    public static Stream<String> reCreateStatements(TableMetadata metadata,
 +                                                    boolean includeDroppedColumns,
 +                                                    boolean internals,
 +                                                    boolean ifNotExists,
 +                                                    boolean includeIndexes)
 +    {
 +        // Record re-create schema statements
 +        Stream<String> r = Stream.of(metadata)
 +                                         .map((tm) -> SchemaCQLHelper.getTableMetadataAsCQL(tm,
 +                                                                                            includeDroppedColumns,
 +                                                                                            internals,
 +                                                                                            ifNotExists));
 +
 +        if (includeIndexes)
 +        {
 +            // Indexes applied as last, since otherwise they may interfere with column drops / re-additions
-             r = Stream.concat(r, SchemaCQLHelper.getIndexesAsCQL(metadata));
++            r = Stream.concat(r, SchemaCQLHelper.getIndexesAsCQL(metadata, ifNotExists));
 +        }
 +
 +        return r;
 +    }
 +
 +    /**
 +     * Build a CQL String representation of Column Family Metadata.
 +     *
 +     * *Note*: this is _only_ visible for testing; you generally shouldn't re-create a single table in isolation as
 +     * that will not contain everything needed for user types.
 +     */
 +    @VisibleForTesting
 +    public static String getTableMetadataAsCQL(TableMetadata metadata,
 +                                               boolean includeDroppedColumns,
 +                                               boolean internals,
 +                                               boolean ifNotExists)
 +    {
 +        if (metadata.isView())
 +        {
 +            KeyspaceMetadata keyspaceMetadata = Schema.instance.getKeyspaceMetadata(metadata.keyspace);
 +            ViewMetadata viewMetadata = keyspaceMetadata.views.get(metadata.name).orElse(null);
 +            assert viewMetadata != null;
 +            return viewMetadata.toCqlString(internals, ifNotExists);
 +        }
 +
 +        return metadata.toCqlString(includeDroppedColumns, internals, ifNotExists);
 +    }
 +
 +    /**
 +     * Build a CQL String representation of User Types used in the given table.
 +     *
 +     * Type order is ensured as types are built incrementally: from the innermost (most nested)
 +     * to the outermost.
 +     *
 +     * @param metadata the table for which to extract the user types CQL statements.
 +     * @param types the user types defined in the keyspace of the dumped table (which will thus contain any user type
 +     * used by {@code metadata}).
++     * @param ifNotExists set to true if IF NOT EXISTS should be appended after CREATE TYPE string.
 +     * @return a list of {@code CREATE TYPE} statements corresponding to all the types used in {@code metadata}.
 +     */
 +    @VisibleForTesting
-     public static Stream<String> getUserTypesAsCQL(TableMetadata metadata, Types types)
++    public static Stream<String> getUserTypesAsCQL(TableMetadata metadata, Types types, boolean ifNotExists)
 +    {
 +        /*
 +         * Implementation note: at first approximation, it may seem like we don't need the Types argument and instead
 +         * directly extract the user types from the provided TableMetadata. Indeed, full user types definitions are
 +         * contained in UserType instances.
 +         *
 +         * However, the UserType instance found within the TableMetadata may have been frozen in such a way that makes
 +         * it challenging.
 +         *
 +         * Consider the user has created:
 +         *   CREATE TYPE inner (a set<int>);
 +         *   CREATE TYPE outer (b inner);
 +         *   CREATE TABLE t (k int PRIMARY KEY, c1 frozen<outer>, c2 set<frozen<inner>>)
 +         * The corresponding TableMetadata would have, as types (where 'mc=true' means that the type has his isMultiCell
 +         * set to true):
 +         *   c1: UserType(mc=false, "outer", b->UserType(mc=false, "inner", a->SetType(mc=fase, Int32Type)))
 +         *   c2: SetType(mc=true, UserType(mc=false, "inner", a->SetType(mc=fase, Int32Type)))
 +         * From which, it's impossible to decide if we should dump the types above, or instead:
 +         *   CREATE TYPE inner (a frozen<set<int>>);
 +         *   CREATE TYPE outer (b frozen<inner>);
 +         * or anything in-between.
 +         *
 +         * And while, as of the current limitation around multi-cell types (that are only support non-frozen at
 +         * top-level), any of the generated definition would kind of "work", 1) this could confuse users and 2) this
 +         * would break if we do lift the limitation, which wouldn't be future proof.
 +         */
 +        return metadata.getReferencedUserTypes()
 +                       .stream()
-                        .map(name -> getType(metadata, types, name).toCqlString(false));
++                       .map(name -> getType(metadata, types, name).toCqlString(false, ifNotExists));
 +    }
 +
 +    /**
 +     * Build a CQL String representation of Indexes on columns in the given Column Family
++     *
++     * @param metadata the table for which to extract the index CQL statements.
++     * @param ifNotExists set to true if IF NOT EXISTS should be appended after CREATE INDEX string.
++     * @return a list of {@code CREATE INDEX} statements corresponding to table {@code metadata}.
 +     */
 +    @VisibleForTesting
-     public static Stream<String> getIndexesAsCQL(TableMetadata metadata)
++    public static Stream<String> getIndexesAsCQL(TableMetadata metadata, boolean ifNotExists)
 +    {
 +        return metadata.indexes
 +                .stream()
-                 .map(indexMetadata -> indexMetadata.toCqlString(metadata));
++                .map(indexMetadata -> indexMetadata.toCqlString(metadata, ifNotExists));
 +    }
 +
 +    private static UserType getType(TableMetadata metadata, Types types, ByteBuffer name)
 +    {
 +        return types.get(name)
 +                    .orElseThrow(() -> new IllegalStateException(String.format("user type %s is part of table %s definition but its definition was missing", 
 +                                                                              UTF8Type.instance.getString(name),
 +                                                                              metadata)));
 +    }
 +
 +    /**
 +     * Converts the type to a CQL type.  This method special cases empty and UDTs so the string can be used in a create
 +     * statement.
 +     *
 +     * Special cases
 +     * <ul>
 +     *     <li>empty - replaces with 'org.apache.cassandra.db.marshal.EmptyType'.  empty is the tostring of the type in
 +     *     CQL but not allowed to create as empty, but fully qualified name is allowed</li>
 +     *     <li>UserType - replaces with TupleType</li>
 +     * </ul>
 +     */
 +    public static String toCqlType(AbstractType<?> type)
 +    {
 +        return EMPTY_TYPE_REGEX.matcher(type.expandUserTypes().asCQL3Type().toString()).replaceAll(EMPTY_TYPE_QUOTED);
 +    }
 +}
diff --cc src/java/org/apache/cassandra/db/marshal/UserType.java
index f3cd7d7,febd91c..29afad9
--- a/src/java/org/apache/cassandra/db/marshal/UserType.java
+++ b/src/java/org/apache/cassandra/db/marshal/UserType.java
@@@ -452,83 -409,4 +452,89 @@@ public class UserType extends TupleTyp
      {
          return serializer;
      }
 +
 +    @Override
 +    public SchemaElementType elementType()
 +    {
 +        return SchemaElementType.TYPE;
 +    }
 +
 +    @Override
 +    public String elementKeyspace()
 +    {
 +        return keyspace;
 +    }
 +
 +    @Override
 +    public String elementName()
 +    {
 +        return getNameAsString();
 +    }
 +
 +    @Override
-     public String toCqlString(boolean withInternals)
++    public String toCqlString(boolean withInternals, boolean ifNotExists)
 +    {
 +        CqlBuilder builder = new CqlBuilder();
-         builder.append("CREATE TYPE ")
-                .appendQuotingIfNeeded(keyspace)
++        builder.append("CREATE TYPE ");
++
++        if (ifNotExists)
++        {
++            builder.append("IF NOT EXISTS ");
++        }
++
++        builder.appendQuotingIfNeeded(keyspace)
 +               .append('.')
 +               .appendQuotingIfNeeded(getNameAsString())
 +               .append(" (")
 +               .newLine()
 +               .increaseIndent();
 +
 +        for (int i = 0; i < size(); i++)
 +        {
 +            if (i > 0)
 +                builder.append(",")
 +                       .newLine();
 +
 +            builder.append(fieldNameAsString(i))
 +                   .append(' ')
 +                   .append(fieldType(i));
 +        }
 +
 +        builder.newLine()
 +               .decreaseIndent()
 +               .append(");");
 +
 +        return builder.toString();
 +    }
 +
 +    private enum ConflictBehavior
 +    {
 +        LOG {
 +            void onConflict(String keyspace, String name, String fieldName)
 +            {
 +                logger.error("Duplicate names found in UDT {}.{} for column {}",
 +                             maybeQuote(keyspace), maybeQuote(name), maybeQuote(fieldName));
 +            }
 +        },
 +        REJECT {
 +            @Override
 +            void onConflict(String keyspace, String name, String fieldName)
 +            {
 +
 +                throw new AssertionError(String.format("Duplicate names found in UDT %s.%s for column %s; " +
 +                                                       "to resolve set -D" + UDT_CONFLICT_BEHAVIOR + "=LOG on startup and remove the type",
 +                                                       maybeQuote(keyspace), maybeQuote(name), maybeQuote(fieldName)));
 +            }
 +        };
 +
 +        private static final String UDT_CONFLICT_BEHAVIOR = "cassandra.type.udt.conflict_behavior";
 +
 +        abstract void onConflict(String keyspace, String name, String fieldName);
 +
 +        static ConflictBehavior get()
 +        {
 +            String value = System.getProperty(UDT_CONFLICT_BEHAVIOR, REJECT.name());
 +            return ConflictBehavior.valueOf(value);
 +        }
 +    }
  }
diff --cc src/java/org/apache/cassandra/schema/IndexMetadata.java
index 81f48ff,04e06ab..6580624
--- a/src/java/org/apache/cassandra/schema/IndexMetadata.java
+++ b/src/java/org/apache/cassandra/schema/IndexMetadata.java
@@@ -218,56 -253,11 +218,68 @@@ public final class IndexMetadat
      public String toString()
      {
          return new ToStringBuilder(this)
 -            .append("id", id.toString())
 -            .append("name", name)
 -            .append("kind", kind)
 -            .append("options", options)
 -            .build();
 +               .append("id", id.toString())
 +               .append("name", name)
 +               .append("kind", kind)
 +               .append("options", options)
 +               .build();
 +    }
 +
-     public String toCqlString(TableMetadata table)
++    public String toCqlString(TableMetadata table, boolean ifNotExists)
 +    {
 +        CqlBuilder builder = new CqlBuilder();
-         appendCqlTo(builder, table);
++        appendCqlTo(builder, table, ifNotExists);
 +        return builder.toString();
 +    }
 +
 +    /**
 +     * Appends to the specified builder the CQL used to create this index.
-      *
 +     * @param builder the builder to which the CQL myst be appended
 +     * @param table the parent table
++     * @param ifNotExists includes "IF NOT EXISTS" into statement
 +     */
-     public void appendCqlTo(CqlBuilder builder, TableMetadata table)
++    public void appendCqlTo(CqlBuilder builder, TableMetadata table, boolean ifNotExists)
 +    {
 +        if (isCustom())
 +        {
 +            Map<String, String> copyOptions = new HashMap<>(options);
 +
-             builder.append("CREATE CUSTOM INDEX ")
-                    .appendQuotingIfNeeded(name)
++            builder.append("CREATE CUSTOM INDEX ");
++
++            if (ifNotExists)
++            {
++                builder.append("IF NOT EXISTS ");
++            }
++
++            builder.appendQuotingIfNeeded(name)
 +                   .append(" ON ")
 +                   .append(table.toString())
 +                   .append(" (")
 +                   .append(copyOptions.remove(IndexTarget.TARGET_OPTION_NAME))
 +                   .append(") USING ")
 +                   .appendWithSingleQuotes(copyOptions.remove(IndexTarget.CUSTOM_INDEX_OPTION_NAME));
 +
 +            if (!copyOptions.isEmpty())
 +                builder.append(" WITH OPTIONS = ")
 +                       .append(options);
 +        }
 +        else
 +        {
-             builder.append("CREATE INDEX ")
-                    .appendQuotingIfNeeded(name)
++            builder.append("CREATE INDEX ");
++
++            if (ifNotExists)
++            {
++                builder.append("IF NOT EXISTS ");
++            }
++
++            builder.appendQuotingIfNeeded(name)
 +                   .append(" ON ")
 +                   .append(table.toString())
 +                   .append(" (")
 +                   .append(options.get(IndexTarget.TARGET_OPTION_NAME))
 +                   .append(')');
 +        }
 +        builder.append(';');
      }
  
      public static class Serializer
diff --cc src/java/org/apache/cassandra/schema/KeyspaceMetadata.java
index 23c931f,4fefd44..a029168
--- a/src/java/org/apache/cassandra/schema/KeyspaceMetadata.java
+++ b/src/java/org/apache/cassandra/schema/KeyspaceMetadata.java
@@@ -232,158 -165,14 +232,164 @@@ public final class KeyspaceMetadata imp
                            .toString();
      }
  
 +    @Override
 +    public SchemaElementType elementType()
 +    {
 +        return SchemaElementType.KEYSPACE;
 +    }
 +
 +    @Override
 +    public String elementKeyspace()
 +    {
 +        return name;
 +    }
 +
 +    @Override
 +    public String elementName()
 +    {
 +        return name;
 +    }
 +
 +    @Override
-     public String toCqlString(boolean withInternals)
++    public String toCqlString(boolean withInternals, boolean ifNotExists)
 +    {
 +        CqlBuilder builder = new CqlBuilder();
 +        if (isVirtual())
 +        {
 +            builder.append("/*")
 +                   .newLine()
 +                   .append("Warning: Keyspace ")
 +                   .appendQuotingIfNeeded(name)
 +                   .append(" is a virtual keyspace and cannot be recreated with CQL.")
 +                   .newLine()
 +                   .append("Structure, for reference:")
 +                   .newLine()
 +                   .append("VIRTUAL KEYSPACE ")
 +                   .appendQuotingIfNeeded(name)
 +                   .append(';')
 +                   .newLine()
 +                   .append("*/")
 +                   .toString();
 +        }
 +        else
 +        {
-             builder.append("CREATE KEYSPACE ")
-                    .appendQuotingIfNeeded(name)
++            builder.append("CREATE KEYSPACE ");
++
++            if (ifNotExists)
++            {
++                builder.append("IF NOT EXISTS ");
++            }
++
++            builder.appendQuotingIfNeeded(name)
 +                   .append(" WITH replication = ");
 +
 +            params.replication.appendCqlTo(builder);
 +
 +            builder.append("  AND durable_writes = ")
 +                   .append(params.durableWrites)
 +                   .append(';')
 +                   .toString();
 +        }
 +        return builder.toString();
 +    }
 +
      public void validate()
      {
 -        if (!CFMetaData.isNameValid(name))
 -            throw new ConfigurationException(String.format("Keyspace name must not be empty, more than %s characters long, "
 -                                                           + "or contain non-alphanumeric-underscore characters (got \"%s\")",
 -                                                           SchemaConstants.NAME_LENGTH,
 -                                                           name));
 +        if (!SchemaConstants.isValidName(name))
 +        {
 +            throw new ConfigurationException(format("Keyspace name must not be empty, more than %s characters long, "
 +                                                    + "or contain non-alphanumeric-underscore characters (got \"%s\")",
 +                                                    SchemaConstants.NAME_LENGTH,
 +                                                    name));
 +        }
 +
          params.validate(name);
 -        tablesAndViews().forEach(CFMetaData::validate);
 +
 +        tablesAndViews().forEach(TableMetadata::validate);
 +
 +        Set<String> indexNames = new HashSet<>();
 +        for (TableMetadata table : tables)
 +        {
 +            for (IndexMetadata index : table.indexes)
 +            {
 +                if (indexNames.contains(index.name))
 +                    throw new ConfigurationException(format("Duplicate index name %s in keyspace %s", index.name, name));
 +
 +                indexNames.add(index.name);
 +            }
 +        }
 +    }
 +
 +    public AbstractReplicationStrategy createReplicationStrategy()
 +    {
 +        return AbstractReplicationStrategy.createReplicationStrategy(name,
 +                                                                     params.replication.klass,
 +                                                                     StorageService.instance.getTokenMetadata(),
 +                                                                     DatabaseDescriptor.getEndpointSnitch(),
 +                                                                     params.replication.options);
 +    }
 +
 +    static Optional<KeyspaceDiff> diff(KeyspaceMetadata before, KeyspaceMetadata after)
 +    {
 +        return KeyspaceDiff.diff(before, after);
 +    }
 +
 +    public static final class KeyspaceDiff
 +    {
 +        public final KeyspaceMetadata before;
 +        public final KeyspaceMetadata after;
 +
 +        public final TablesDiff tables;
 +        public final ViewsDiff views;
 +        public final TypesDiff types;
 +
 +        public final FunctionsDiff<UDFunction> udfs;
 +        public final FunctionsDiff<UDAggregate> udas;
 +
 +        private KeyspaceDiff(KeyspaceMetadata before,
 +                             KeyspaceMetadata after,
 +                             TablesDiff tables,
 +                             ViewsDiff views,
 +                             TypesDiff types,
 +                             FunctionsDiff<UDFunction> udfs,
 +                             FunctionsDiff<UDAggregate> udas)
 +        {
 +            this.before = before;
 +            this.after = after;
 +            this.tables = tables;
 +            this.views = views;
 +            this.types = types;
 +            this.udfs = udfs;
 +            this.udas = udas;
 +        }
 +
 +        private static Optional<KeyspaceDiff> diff(KeyspaceMetadata before, KeyspaceMetadata after)
 +        {
 +            if (before == after)
 +                return Optional.empty();
 +
 +            if (!before.name.equals(after.name))
 +            {
 +                String msg = String.format("Attempting to diff two keyspaces with different names ('%s' and '%s')", before.name, after.name);
 +                throw new IllegalArgumentException(msg);
 +            }
 +
 +            TablesDiff tables = Tables.diff(before.tables, after.tables);
 +            ViewsDiff views = Views.diff(before.views, after.views);
 +            TypesDiff types = Types.diff(before.types, after.types);
 +
 +            @SuppressWarnings("unchecked") FunctionsDiff<UDFunction>  udfs = FunctionsDiff.NONE;
 +            @SuppressWarnings("unchecked") FunctionsDiff<UDAggregate> udas = FunctionsDiff.NONE;
 +            if (before.functions != after.functions)
 +            {
 +                udfs = Functions.udfsDiff(before.functions, after.functions);
 +                udas = Functions.udasDiff(before.functions, after.functions);
 +            }
 +
 +            if (before.params.equals(after.params) && tables.isEmpty() && views.isEmpty() && types.isEmpty() && udfs.isEmpty() && udas.isEmpty())
 +                return Optional.empty();
 +
 +            return Optional.of(new KeyspaceDiff(before, after, tables, views, types, udfs, udas));
 +        }
      }
  }
diff --cc src/java/org/apache/cassandra/schema/TableMetadata.java
index 7880c2a,0000000..ab90564
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/schema/TableMetadata.java
+++ b/src/java/org/apache/cassandra/schema/TableMetadata.java
@@@ -1,1294 -1,0 +1,1294 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.schema;
 +
 +import java.nio.ByteBuffer;
 +import java.util.*;
 +import java.util.Map.Entry;
 +
 +import javax.annotation.Nullable;
 +
 +import com.google.common.base.MoreObjects;
 +import com.google.common.collect.*;
 +
 +import org.apache.cassandra.auth.DataResource;
 +import org.apache.cassandra.config.DatabaseDescriptor;
 +import org.apache.cassandra.cql3.ColumnIdentifier;
 +import org.apache.cassandra.cql3.CqlBuilder;
 +import org.apache.cassandra.cql3.SchemaElement;
 +import org.apache.cassandra.db.*;
 +import org.apache.cassandra.db.marshal.*;
 +import org.apache.cassandra.dht.IPartitioner;
 +import org.apache.cassandra.exceptions.ConfigurationException;
 +import org.apache.cassandra.exceptions.InvalidRequestException;
 +import org.apache.cassandra.service.reads.SpeculativeRetryPolicy;
 +import org.apache.cassandra.utils.AbstractIterator;
 +import org.github.jamm.Unmetered;
 +
 +import static com.google.common.collect.Iterables.any;
 +import static com.google.common.collect.Iterables.transform;
 +import static java.lang.String.format;
 +import static java.util.stream.Collectors.toList;
 +import static java.util.stream.Collectors.toSet;
 +import static org.apache.cassandra.schema.IndexMetadata.isNameValid;
 +
 +@Unmetered
 +public final class TableMetadata implements SchemaElement
 +{
 +    public static final String COMPACT_STORAGE_HALT_MESSAGE =
 +            "Detected table %s.%s with COMPACT STORAGE flags (%s). " +
 +            "Compact Tables are not supported in Cassandra starting with version 4.0. " +
 +            "Use the `ALTER ... DROP COMPACT STORAGE` command supplied in 3.x/3.11 Cassandra " +
 +            "in order to migrate off Compact Storage before upgrading.";
 +
 +    // Please note that currently the only one truly useful flag is COUNTER, as the rest of the flags were about
 +    // differencing between CQL tables and the various types of COMPACT STORAGE tables (pre-4.0). As those "compact"
 +    // tables are not supported anymore, no tables should be either SUPER or DENSE, and they should all be COMPOUND.
 +    public enum Flag
 +    {
 +        // As mentioned above, all tables on 4.0+ will have the COMPOUND flag, making the flag of little value. However,
 +        // on upgrade from pre-4.0, we want to detect if a tables does _not_ have this flag, in which case this would
 +        // be a compact table on which DROP COMPACT STORAGE has _not_ be used and fail startup. This is also why we
 +        // still write this flag for all tables. Once we drop support for upgrading from pre-4.0 versions (and so are
 +        // sure all tables do have the flag), we can stop writing this flag and ignore it when present (deprecate it).
 +        // Later, we'll be able to drop the flag from this enum completely.
 +        COMPOUND,
 +        COUNTER,
 +        // The only reason we still have those is that on the first startup after an upgrade from pre-4.0, we cannot
 +        // guarantee some tables won't have those flags (users having forgotten to use DROP COMPACT STORAGE before
 +        // upgrading). So we still "deserialize" those flags correctly, but otherwise prevent startup if any table
 +        // have them. Once we drop support for upgrading from pre-4.0, we can remove those values.
 +        @Deprecated SUPER,
 +        @Deprecated DENSE;
 +
 +        static boolean isLegacyCompactTable(Set<Flag> flags)
 +        {
 +            return flags.contains(Flag.DENSE) || flags.contains(Flag.SUPER) || !flags.contains(Flag.COMPOUND);
 +        }
 +
 +        public static Set<Flag> fromStringSet(Set<String> strings)
 +        {
 +            return strings.stream().map(String::toUpperCase).map(Flag::valueOf).collect(toSet());
 +        }
 +
 +        public static Set<String> toStringSet(Set<Flag> flags)
 +        {
 +            return flags.stream().map(Flag::toString).map(String::toLowerCase).collect(toSet());
 +        }
 +    }
 +
 +    public enum Kind
 +    {
 +        REGULAR, INDEX, VIEW, VIRTUAL
 +    }
 +
 +    public final String keyspace;
 +    public final String name;
 +    public final TableId id;
 +
 +    public final IPartitioner partitioner;
 +    public final Kind kind;
 +    public final TableParams params;
 +    public final ImmutableSet<Flag> flags;
 +
 +    @Nullable
 +    private final String indexName; // derived from table name
 +
 +    /*
 +     * All CQL3 columns definition are stored in the columns map.
 +     * On top of that, we keep separated collection of each kind of definition, to
 +     * 1) allow easy access to each kind and
 +     * 2) for the partition key and clustering key ones, those list are ordered by the "component index" of the elements.
 +     */
 +    public final ImmutableMap<ByteBuffer, DroppedColumn> droppedColumns;
 +    final ImmutableMap<ByteBuffer, ColumnMetadata> columns;
 +
 +    private final ImmutableList<ColumnMetadata> partitionKeyColumns;
 +    private final ImmutableList<ColumnMetadata> clusteringColumns;
 +    private final RegularAndStaticColumns regularAndStaticColumns;
 +
 +    public final Indexes indexes;
 +    public final Triggers triggers;
 +
 +    // derived automatically from flags and columns
 +    public final AbstractType<?> partitionKeyType;
 +    public final ClusteringComparator comparator;
 +
 +    // performance hacks; TODO see if all are really necessary
 +    public final DataResource resource;
 +
 +    private TableMetadata(Builder builder)
 +    {
 +        if (Flag.isLegacyCompactTable(builder.flags))
 +            throw new IllegalStateException(format(COMPACT_STORAGE_HALT_MESSAGE,
 +                                                   builder.keyspace,
 +                                                   builder.name,
 +                                                   builder.flags));
 +
 +        flags = Sets.immutableEnumSet(builder.flags);
 +        keyspace = builder.keyspace;
 +        name = builder.name;
 +        id = builder.id;
 +
 +        partitioner = builder.partitioner;
 +        kind = builder.kind;
 +        params = builder.params.build();
 +
 +        indexName = kind == Kind.INDEX ? name.substring(name.indexOf('.') + 1) : null;
 +
 +        droppedColumns = ImmutableMap.copyOf(builder.droppedColumns);
 +        Collections.sort(builder.partitionKeyColumns);
 +        partitionKeyColumns = ImmutableList.copyOf(builder.partitionKeyColumns);
 +        Collections.sort(builder.clusteringColumns);
 +        clusteringColumns = ImmutableList.copyOf(builder.clusteringColumns);
 +        regularAndStaticColumns = RegularAndStaticColumns.builder().addAll(builder.regularAndStaticColumns).build();
 +        columns = ImmutableMap.copyOf(builder.columns);
 +
 +        indexes = builder.indexes;
 +        triggers = builder.triggers;
 +
 +        partitionKeyType = partitionKeyColumns.size() == 1
 +                         ? partitionKeyColumns.get(0).type
 +                         : CompositeType.getInstance(transform(partitionKeyColumns, t -> t.type));
 +
 +        comparator = new ClusteringComparator(transform(clusteringColumns, c -> c.type));
 +
 +        resource = DataResource.table(keyspace, name);
 +    }
 +
 +    public static Builder builder(String keyspace, String table)
 +    {
 +        return new Builder(keyspace, table);
 +    }
 +
 +    public static Builder builder(String keyspace, String table, TableId id)
 +    {
 +        return new Builder(keyspace, table, id);
 +    }
 +
 +    public Builder unbuild()
 +    {
 +        return builder(keyspace, name, id)
 +               .partitioner(partitioner)
 +               .kind(kind)
 +               .params(params)
 +               .flags(flags)
 +               .addColumns(columns())
 +               .droppedColumns(droppedColumns)
 +               .indexes(indexes)
 +               .triggers(triggers);
 +    }
 +
 +    public boolean isIndex()
 +    {
 +        return kind == Kind.INDEX;
 +    }
 +
 +    public TableMetadata withSwapped(TableParams params)
 +    {
 +        return unbuild().params(params).build();
 +    }
 +
 +    public TableMetadata withSwapped(Triggers triggers)
 +    {
 +        return unbuild().triggers(triggers).build();
 +    }
 +
 +    public TableMetadata withSwapped(Indexes indexes)
 +    {
 +        return unbuild().indexes(indexes).build();
 +    }
 +
 +    public boolean isView()
 +    {
 +        return kind == Kind.VIEW;
 +    }
 +
 +    public boolean isVirtual()
 +    {
 +        return kind == Kind.VIRTUAL;
 +    }
 +
 +    public Optional<String> indexName()
 +    {
 +        return Optional.ofNullable(indexName);
 +    }
 +
 +    public boolean isCounter()
 +    {
 +        return flags.contains(Flag.COUNTER);
 +    }
 +
 +    public ImmutableCollection<ColumnMetadata> columns()
 +    {
 +        return columns.values();
 +    }
 +
 +    public Iterable<ColumnMetadata> primaryKeyColumns()
 +    {
 +        return Iterables.concat(partitionKeyColumns, clusteringColumns);
 +    }
 +
 +    public ImmutableList<ColumnMetadata> partitionKeyColumns()
 +    {
 +        return partitionKeyColumns;
 +    }
 +
 +    public ImmutableList<ColumnMetadata> clusteringColumns()
 +    {
 +        return clusteringColumns;
 +    }
 +
 +    public RegularAndStaticColumns regularAndStaticColumns()
 +    {
 +        return regularAndStaticColumns;
 +    }
 +
 +    public Columns regularColumns()
 +    {
 +        return regularAndStaticColumns.regulars;
 +    }
 +
 +    public Columns staticColumns()
 +    {
 +        return regularAndStaticColumns.statics;
 +    }
 +
 +    /*
 +     * An iterator over all column definitions but that respect the order of a SELECT *.
 +     */
 +    public Iterator<ColumnMetadata> allColumnsInSelectOrder()
 +    {
 +        Iterator<ColumnMetadata> partitionKeyIter = partitionKeyColumns.iterator();
 +        Iterator<ColumnMetadata> clusteringIter = clusteringColumns.iterator();
 +        Iterator<ColumnMetadata> otherColumns = regularAndStaticColumns.selectOrderIterator();
 +
 +        return columnsIterator(partitionKeyIter, clusteringIter, otherColumns);
 +    }
 +
 +    /**
 +     * Returns an iterator over all column definitions that respect the order of the CREATE statement.
 +     */
 +    public Iterator<ColumnMetadata> allColumnsInCreateOrder()
 +    {
 +        Iterator<ColumnMetadata> partitionKeyIter = partitionKeyColumns.iterator();
 +        Iterator<ColumnMetadata> clusteringIter = clusteringColumns.iterator();
 +        Iterator<ColumnMetadata> otherColumns = regularAndStaticColumns.iterator();
 +
 +        return columnsIterator(partitionKeyIter, clusteringIter, otherColumns);
 +    }
 +
 +    private static Iterator<ColumnMetadata> columnsIterator(Iterator<ColumnMetadata> partitionKeys,
 +                                                            Iterator<ColumnMetadata> clusteringColumns,
 +                                                            Iterator<ColumnMetadata> otherColumns)
 +    {
 +        return new AbstractIterator<ColumnMetadata>()
 +        {
 +            protected ColumnMetadata computeNext()
 +            {
 +                if (partitionKeys.hasNext())
 +                    return partitionKeys.next();
 +
 +                if (clusteringColumns.hasNext())
 +                    return clusteringColumns.next();
 +
 +                return otherColumns.hasNext() ? otherColumns.next() : endOfData();
 +            }
 +        };
 +    }
 +
 +    /**
 +     * Returns the ColumnMetadata for {@code name}.
 +     */
 +    public ColumnMetadata getColumn(ColumnIdentifier name)
 +    {
 +        return columns.get(name.bytes);
 +    }
 +    /**
 +     * Returns the column of the provided name if it exists, but throws a user-visible exception if that column doesn't
 +     * exist.
 +     *
 +     * <p>This method is for finding columns from a name provided by the user, and as such it does _not_ returne hidden
 +     * columns (throwing that the column is unknown instead).
 +     *
 +     * @param name the name of an existing non-hidden column of this table.
 +     * @return the column metadata corresponding to {@code name}.
 +     *
 +     * @throws InvalidRequestException if there is no non-hidden column named {@code name} in this table.
 +     */
 +    public ColumnMetadata getExistingColumn(ColumnIdentifier name)
 +    {
 +        ColumnMetadata def = getColumn(name);
 +        if (def == null)
 +            throw new InvalidRequestException(format("Undefined column name %s in table %s", name.toCQLString(), this));
 +        return def;
 +    }
 +    /*
 +     * In general it is preferable to work with ColumnIdentifier to make it
 +     * clear that we are talking about a CQL column, not a cell name, but there
 +     * is a few cases where all we have is a ByteBuffer (when dealing with IndexExpression
 +     * for instance) so...
 +     */
 +    public ColumnMetadata getColumn(ByteBuffer name)
 +    {
 +        return columns.get(name);
 +    }
 +
 +    public ColumnMetadata getDroppedColumn(ByteBuffer name)
 +    {
 +        DroppedColumn dropped = droppedColumns.get(name);
 +        return dropped == null ? null : dropped.column;
 +    }
 +
 +    /**
 +     * Returns a "fake" ColumnMetadata corresponding to the dropped column {@code name}
 +     * of {@code null} if there is no such dropped column.
 +     *
 +     * @param name - the column name
 +     * @param isStatic - whether the column was a static column, if known
 +     */
 +    public ColumnMetadata getDroppedColumn(ByteBuffer name, boolean isStatic)
 +    {
 +        DroppedColumn dropped = droppedColumns.get(name);
 +        if (dropped == null)
 +            return null;
 +
 +        if (isStatic && !dropped.column.isStatic())
 +            return ColumnMetadata.staticColumn(this, name, dropped.column.type);
 +
 +        return dropped.column;
 +    }
 +
 +    public boolean hasStaticColumns()
 +    {
 +        return !staticColumns().isEmpty();
 +    }
 +
 +    public void validate()
 +    {
 +        if (!isNameValid(keyspace))
 +            except("Keyspace name must not be empty, more than %s characters long, or contain non-alphanumeric-underscore characters (got \"%s\")", SchemaConstants.NAME_LENGTH, keyspace);
 +
 +        if (!isNameValid(name))
 +            except("Table name must not be empty, more than %s characters long, or contain non-alphanumeric-underscore characters (got \"%s\")", SchemaConstants.NAME_LENGTH, name);
 +
 +        params.validate();
 +
 +        if (partitionKeyColumns.stream().anyMatch(c -> c.type.isCounter()))
 +            except("PRIMARY KEY columns cannot contain counters");
 +
 +        // Mixing counter with non counter columns is not supported (#2614)
 +        if (isCounter())
 +        {
 +            for (ColumnMetadata column : regularAndStaticColumns)
 +                if (!(column.type.isCounter()) && !isSuperColumnMapColumnName(column.name))
 +                    except("Cannot have a non counter column (\"%s\") in a counter table", column.name);
 +        }
 +        else
 +        {
 +            for (ColumnMetadata column : regularAndStaticColumns)
 +                if (column.type.isCounter())
 +                    except("Cannot have a counter column (\"%s\") in a non counter column table", column.name);
 +        }
 +
 +        // All tables should have a partition key
 +        if (partitionKeyColumns.isEmpty())
 +            except("Missing partition keys for table %s", toString());
 +
 +        indexes.validate(this);
 +    }
 +
 +    /**
 +     * To support backward compatibility with thrift super columns in the C* 3.0+ storage engine, we encode said super
 +     * columns as a CQL {@code map<blob, blob>}. To ensure the name of this map did not conflict with any other user
 +     * defined columns, we used the empty name (which is otherwise not allowed for user created columns).
 +     * <p>
 +     * While all thrift-based tables must have been converted to "CQL" ones with "DROP COMPACT STORAGE" (before
 +     * upgrading to C* 4.0, which stop supporting non-CQL tables completely), a converted super-column table will still
 +     * have this map with an empty name. And the reason we need to recognize it still, is that for backward
 +     * compatibility we need to support counters in values of this map while it's not supported in any other map.
 +     *
 +     * TODO: it's probably worth lifting the limitation of not allowing counters as map values. It works fully
 +     *   internally (since we had to support it for this special map) and doesn't feel particularly dangerous to
 +     *   support. Doing so would remove this special case, but would also let user that do have an upgraded super-column
 +     *   table with counters to rename that weirdly name map to something more meaningful (it's not possible today
 +     *   as after renaming the validation in {@link #validate)} would trigger).
 +     */
 +    private static boolean isSuperColumnMapColumnName(ColumnIdentifier columnName)
 +    {
 +        return !columnName.bytes.hasRemaining();
 +    }
 +
 +    void validateCompatibility(TableMetadata previous)
 +    {
 +        if (isIndex())
 +            return;
 +
 +        if (!previous.keyspace.equals(keyspace))
 +            except("Keyspace mismatch (found %s; expected %s)", keyspace, previous.keyspace);
 +
 +        if (!previous.name.equals(name))
 +            except("Table mismatch (found %s; expected %s)", name, previous.name);
 +
 +        if (!previous.id.equals(id))
 +            except("Table ID mismatch (found %s; expected %s)", id, previous.id);
 +
 +        if (!previous.flags.equals(flags))
 +            except("Table type mismatch (found %s; expected %s)", flags, previous.flags);
 +
 +        if (previous.partitionKeyColumns.size() != partitionKeyColumns.size())
 +        {
 +            except("Partition keys of different length (found %s; expected %s)",
 +                   partitionKeyColumns.size(),
 +                   previous.partitionKeyColumns.size());
 +        }
 +
 +        for (int i = 0; i < partitionKeyColumns.size(); i++)
 +        {
 +            if (!partitionKeyColumns.get(i).type.isCompatibleWith(previous.partitionKeyColumns.get(i).type))
 +            {
 +                except("Partition key column mismatch (found %s; expected %s)",
 +                       partitionKeyColumns.get(i).type,
 +                       previous.partitionKeyColumns.get(i).type);
 +            }
 +        }
 +
 +        if (previous.clusteringColumns.size() != clusteringColumns.size())
 +        {
 +            except("Clustering columns of different length (found %s; expected %s)",
 +                   clusteringColumns.size(),
 +                   previous.clusteringColumns.size());
 +        }
 +
 +        for (int i = 0; i < clusteringColumns.size(); i++)
 +        {
 +            if (!clusteringColumns.get(i).type.isCompatibleWith(previous.clusteringColumns.get(i).type))
 +            {
 +                except("Clustering column mismatch (found %s; expected %s)",
 +                       clusteringColumns.get(i).type,
 +                       previous.clusteringColumns.get(i).type);
 +            }
 +        }
 +
 +        for (ColumnMetadata previousColumn : previous.regularAndStaticColumns)
 +        {
 +            ColumnMetadata column = getColumn(previousColumn.name);
 +            if (column != null && !column.type.isCompatibleWith(previousColumn.type))
 +                except("Column mismatch (found %s; expected %s)", column, previousColumn);
 +        }
 +    }
 +
 +    public ClusteringComparator partitionKeyAsClusteringComparator()
 +    {
 +        return new ClusteringComparator(partitionKeyColumns.stream().map(c -> c.type).collect(toList()));
 +    }
 +
 +    /**
 +     * Generate a table name for an index corresponding to the given column.
 +     * This is NOT the same as the index's name! This is only used in sstable filenames and is not exposed to users.
 +     *
 +     * @param info A definition of the column with index
 +     *
 +     * @return name of the index table
 +     */
 +    public String indexTableName(IndexMetadata info)
 +    {
 +        // TODO simplify this when info.index_name is guaranteed to be set
 +        return name + Directories.SECONDARY_INDEX_NAME_SEPARATOR + info.name;
 +    }
 +
 +    /**
 +     * @return true if the change as made impacts queries/updates on the table,
 +     *         e.g. any columns or indexes were added, removed, or altered; otherwise, false is returned.
 +     *         Used to determine whether prepared statements against this table need to be re-prepared.
 +     */
 +    boolean changeAffectsPreparedStatements(TableMetadata updated)
 +    {
 +        return !partitionKeyColumns.equals(updated.partitionKeyColumns)
 +            || !clusteringColumns.equals(updated.clusteringColumns)
 +            || !regularAndStaticColumns.equals(updated.regularAndStaticColumns)
 +            || !indexes.equals(updated.indexes)
 +            || params.defaultTimeToLive != updated.params.defaultTimeToLive
 +            || params.gcGraceSeconds != updated.params.gcGraceSeconds;
 +    }
 +
 +    /**
 +     * There is a couple of places in the code where we need a TableMetadata object and don't have one readily available
 +     * and know that only the keyspace and name matter. This creates such "fake" metadata. Use only if you know what
 +     * you're doing.
 +     */
 +    public static TableMetadata minimal(String keyspace, String name)
 +    {
 +        return TableMetadata.builder(keyspace, name)
 +                            .addPartitionKeyColumn("key", BytesType.instance)
 +                            .build();
 +    }
 +
 +    public TableMetadata updateIndexTableMetadata(TableParams baseTableParams)
 +    {
 +        TableParams.Builder builder = baseTableParams.unbuild().gcGraceSeconds(0);
 +
 +        // Depends on parent's cache setting, turn on its index table's cache.
 +        // Row caching is never enabled; see CASSANDRA-5732
 +        builder.caching(baseTableParams.caching.cacheKeys() ? CachingParams.CACHE_KEYS : CachingParams.CACHE_NOTHING);
 +
 +        return unbuild().params(builder.build()).build();
 +    }
 +
 +    boolean referencesUserType(ByteBuffer name)
 +    {
 +        return any(columns(), c -> c.type.referencesUserType(name));
 +    }
 +
 +    public TableMetadata withUpdatedUserType(UserType udt)
 +    {
 +        if (!referencesUserType(udt.name))
 +            return this;
 +
 +        Builder builder = unbuild();
 +        columns().forEach(c -> builder.alterColumnType(c.name, c.type.withUpdatedUserType(udt)));
 +
 +        return builder.build();
 +    }
 +
 +    private void except(String format, Object... args)
 +    {
 +        throw new ConfigurationException(keyspace + "." + name + ": " + format(format, args));
 +    }
 +
 +    @Override
 +    public boolean equals(Object o)
 +    {
 +        if (this == o)
 +            return true;
 +
 +        if (!(o instanceof TableMetadata))
 +            return false;
 +
 +        TableMetadata tm = (TableMetadata) o;
 +
 +        return equalsWithoutColumns(tm) && columns.equals(tm.columns);
 +    }
 +
 +    private boolean equalsWithoutColumns(TableMetadata tm)
 +    {
 +        return keyspace.equals(tm.keyspace)
 +            && name.equals(tm.name)
 +            && id.equals(tm.id)
 +            && partitioner.equals(tm.partitioner)
 +            && kind == tm.kind
 +            && params.equals(tm.params)
 +            && flags.equals(tm.flags)
 +            && droppedColumns.equals(tm.droppedColumns)
 +            && indexes.equals(tm.indexes)
 +            && triggers.equals(tm.triggers);
 +    }
 +
 +    Optional<Difference> compare(TableMetadata other)
 +    {
 +        return equalsWithoutColumns(other)
 +             ? compareColumns(other.columns)
 +             : Optional.of(Difference.SHALLOW);
 +    }
 +
 +    private Optional<Difference> compareColumns(Map<ByteBuffer, ColumnMetadata> other)
 +    {
 +        if (!columns.keySet().equals(other.keySet()))
 +            return Optional.of(Difference.SHALLOW);
 +
 +        boolean differsDeeply = false;
 +
 +        for (Map.Entry<ByteBuffer, ColumnMetadata> entry : columns.entrySet())
 +        {
 +            ColumnMetadata thisColumn = entry.getValue();
 +            ColumnMetadata thatColumn = other.get(entry.getKey());
 +
 +            Optional<Difference> difference = thisColumn.compare(thatColumn);
 +            if (difference.isPresent())
 +            {
 +                switch (difference.get())
 +                {
 +                    case SHALLOW:
 +                        return difference;
 +                    case DEEP:
 +                        differsDeeply = true;
 +                }
 +            }
 +        }
 +
 +        return differsDeeply ? Optional.of(Difference.DEEP) : Optional.empty();
 +    }
 +
 +    @Override
 +    public int hashCode()
 +    {
 +        return Objects.hash(keyspace, name, id, partitioner, kind, params, flags, columns, droppedColumns, indexes, triggers);
 +    }
 +
 +    @Override
 +    public String toString()
 +    {
 +        return format("%s.%s", ColumnIdentifier.maybeQuote(keyspace), ColumnIdentifier.maybeQuote(name));
 +    }
 +
 +    public String toDebugString()
 +    {
 +        return MoreObjects.toStringHelper(this)
 +                          .add("keyspace", keyspace)
 +                          .add("table", name)
 +                          .add("id", id)
 +                          .add("partitioner", partitioner)
 +                          .add("kind", kind)
 +                          .add("params", params)
 +                          .add("flags", flags)
 +                          .add("columns", columns())
 +                          .add("droppedColumns", droppedColumns.values())
 +                          .add("indexes", indexes)
 +                          .add("triggers", triggers)
 +                          .toString();
 +    }
 +
 +    public static final class Builder
 +    {
 +        final String keyspace;
 +        final String name;
 +
 +        private TableId id;
 +
 +        private IPartitioner partitioner;
 +        private Kind kind = Kind.REGULAR;
 +        private TableParams.Builder params = TableParams.builder();
 +
 +        // See the comment on Flag.COMPOUND definition for why we (still) inconditionally add this flag.
 +        private Set<Flag> flags = EnumSet.of(Flag.COMPOUND);
 +        private Triggers triggers = Triggers.none();
 +        private Indexes indexes = Indexes.none();
 +
 +        private final Map<ByteBuffer, DroppedColumn> droppedColumns = new HashMap<>();
 +        private final Map<ByteBuffer, ColumnMetadata> columns = new HashMap<>();
 +        private final List<ColumnMetadata> partitionKeyColumns = new ArrayList<>();
 +        private final List<ColumnMetadata> clusteringColumns = new ArrayList<>();
 +        private final List<ColumnMetadata> regularAndStaticColumns = new ArrayList<>();
 +
 +        private Builder(String keyspace, String name, TableId id)
 +        {
 +            this.keyspace = keyspace;
 +            this.name = name;
 +            this.id = id;
 +        }
 +
 +        private Builder(String keyspace, String name)
 +        {
 +            this.keyspace = keyspace;
 +            this.name = name;
 +        }
 +
 +        public TableMetadata build()
 +        {
 +            if (partitioner == null)
 +                partitioner = DatabaseDescriptor.getPartitioner();
 +
 +            if (id == null)
 +                id = TableId.generate();
 +
 +            return new TableMetadata(this);
 +        }
 +
 +        public Builder id(TableId val)
 +        {
 +            id = val;
 +            return this;
 +        }
 +
 +        public Builder partitioner(IPartitioner val)
 +        {
 +            partitioner = val;
 +            return this;
 +        }
 +
 +        public Builder kind(Kind val)
 +        {
 +            kind = val;
 +            return this;
 +        }
 +
 +        public Builder params(TableParams val)
 +        {
 +            params = val.unbuild();
 +            return this;
 +        }
 +
 +        public Builder bloomFilterFpChance(double val)
 +        {
 +            params.bloomFilterFpChance(val);
 +            return this;
 +        }
 +
 +        public Builder caching(CachingParams val)
 +        {
 +            params.caching(val);
 +            return this;
 +        }
 +
 +        public Builder comment(String val)
 +        {
 +            params.comment(val);
 +            return this;
 +        }
 +
 +        public Builder compaction(CompactionParams val)
 +        {
 +            params.compaction(val);
 +            return this;
 +        }
 +
 +        public Builder compression(CompressionParams val)
 +        {
 +            params.compression(val);
 +            return this;
 +        }
 +
 +        public Builder defaultTimeToLive(int val)
 +        {
 +            params.defaultTimeToLive(val);
 +            return this;
 +        }
 +
 +        public Builder gcGraceSeconds(int val)
 +        {
 +            params.gcGraceSeconds(val);
 +            return this;
 +        }
 +
 +        public Builder maxIndexInterval(int val)
 +        {
 +            params.maxIndexInterval(val);
 +            return this;
 +        }
 +
 +        public Builder memtableFlushPeriod(int val)
 +        {
 +            params.memtableFlushPeriodInMs(val);
 +            return this;
 +        }
 +
 +        public Builder minIndexInterval(int val)
 +        {
 +            params.minIndexInterval(val);
 +            return this;
 +        }
 +
 +        public Builder crcCheckChance(double val)
 +        {
 +            params.crcCheckChance(val);
 +            return this;
 +        }
 +
 +        public Builder speculativeRetry(SpeculativeRetryPolicy val)
 +        {
 +            params.speculativeRetry(val);
 +            return this;
 +        }
 +
 +        public Builder additionalWritePolicy(SpeculativeRetryPolicy val)
 +        {
 +            params.additionalWritePolicy(val);
 +            return this;
 +        }
 +
 +        public Builder extensions(Map<String, ByteBuffer> val)
 +        {
 +            params.extensions(val);
 +            return this;
 +        }
 +
 +        public Builder flags(Set<Flag> val)
 +        {
 +            flags = val;
 +            return this;
 +        }
 +
 +        public Builder isCounter(boolean val)
 +        {
 +            return flag(Flag.COUNTER, val);
 +        }
 +
 +        private Builder flag(Flag flag, boolean set)
 +        {
 +            if (set) flags.add(flag); else flags.remove(flag);
 +            return this;
 +        }
 +
 +        public Builder triggers(Triggers val)
 +        {
 +            triggers = val;
 +            return this;
 +        }
 +
 +        public Builder indexes(Indexes val)
 +        {
 +            indexes = val;
 +            return this;
 +        }
 +
 +        public Builder addPartitionKeyColumn(String name, AbstractType type)
 +        {
 +            return addPartitionKeyColumn(ColumnIdentifier.getInterned(name, false), type);
 +        }
 +
 +        public Builder addPartitionKeyColumn(ColumnIdentifier name, AbstractType type)
 +        {
 +            return addColumn(new ColumnMetadata(keyspace, this.name, name, type, partitionKeyColumns.size(), ColumnMetadata.Kind.PARTITION_KEY));
 +        }
 +
 +        public Builder addClusteringColumn(String name, AbstractType type)
 +        {
 +            return addClusteringColumn(ColumnIdentifier.getInterned(name, false), type);
 +        }
 +
 +        public Builder addClusteringColumn(ColumnIdentifier name, AbstractType type)
 +        {
 +            return addColumn(new ColumnMetadata(keyspace, this.name, name, type, clusteringColumns.size(), ColumnMetadata.Kind.CLUSTERING));
 +        }
 +
 +        public Builder addRegularColumn(String name, AbstractType type)
 +        {
 +            return addRegularColumn(ColumnIdentifier.getInterned(name, false), type);
 +        }
 +
 +        public Builder addRegularColumn(ColumnIdentifier name, AbstractType type)
 +        {
 +            return addColumn(new ColumnMetadata(keyspace, this.name, name, type, ColumnMetadata.NO_POSITION, ColumnMetadata.Kind.REGULAR));
 +        }
 +
 +        public Builder addStaticColumn(String name, AbstractType type)
 +        {
 +            return addStaticColumn(ColumnIdentifier.getInterned(name, false), type);
 +        }
 +
 +        public Builder addStaticColumn(ColumnIdentifier name, AbstractType type)
 +        {
 +            return addColumn(new ColumnMetadata(keyspace, this.name, name, type, ColumnMetadata.NO_POSITION, ColumnMetadata.Kind.STATIC));
 +        }
 +
 +        public Builder addColumn(ColumnMetadata column)
 +        {
 +            if (columns.containsKey(column.name.bytes))
 +                throw new IllegalArgumentException();
 +
 +            switch (column.kind)
 +            {
 +                case PARTITION_KEY:
 +                    partitionKeyColumns.add(column);
 +                    Collections.sort(partitionKeyColumns);
 +                    break;
 +                case CLUSTERING:
 +                    column.type.checkComparable();
 +                    clusteringColumns.add(column);
 +                    Collections.sort(clusteringColumns);
 +                    break;
 +                default:
 +                    regularAndStaticColumns.add(column);
 +            }
 +
 +            columns.put(column.name.bytes, column);
 +
 +            return this;
 +        }
 +
 +        public Builder addColumns(Iterable<ColumnMetadata> columns)
 +        {
 +            columns.forEach(this::addColumn);
 +            return this;
 +        }
 +
 +        public Builder droppedColumns(Map<ByteBuffer, DroppedColumn> droppedColumns)
 +        {
 +            this.droppedColumns.clear();
 +            this.droppedColumns.putAll(droppedColumns);
 +            return this;
 +        }
 +
 +        /**
 +         * Records a deprecated column for a system table.
 +         */
 +        public Builder recordDeprecatedSystemColumn(String name, AbstractType<?> type)
 +        {
 +            // As we play fast and loose with the removal timestamp, make sure this is misued for a non system table.
 +            assert SchemaConstants.isLocalSystemKeyspace(keyspace);
 +            recordColumnDrop(ColumnMetadata.regularColumn(keyspace, this.name, name, type), Long.MAX_VALUE);
 +            return this;
 +        }
 +
 +        public Builder recordColumnDrop(ColumnMetadata column, long timeMicros)
 +        {
 +            droppedColumns.put(column.name.bytes, new DroppedColumn(column.withNewType(column.type.expandUserTypes()), timeMicros));
 +            return this;
 +        }
 +
 +        public Iterable<ColumnMetadata> columns()
 +        {
 +            return columns.values();
 +        }
 +
 +        public Set<String> columnNames()
 +        {
 +            return columns.values().stream().map(c -> c.name.toString()).collect(toSet());
 +        }
 +
 +        public ColumnMetadata getColumn(ColumnIdentifier identifier)
 +        {
 +            return columns.get(identifier.bytes);
 +        }
 +
 +        public ColumnMetadata getColumn(ByteBuffer name)
 +        {
 +            return columns.get(name);
 +        }
 +
 +        public boolean hasRegularColumns()
 +        {
 +            return regularAndStaticColumns.stream().anyMatch(ColumnMetadata::isRegular);
 +        }
 +
 +        /*
 +         * The following methods all assume a Builder with valid set of partition key, clustering, regular and static columns.
 +         */
 +
 +        public Builder removeRegularOrStaticColumn(ColumnIdentifier identifier)
 +        {
 +            ColumnMetadata column = columns.get(identifier.bytes);
 +            if (column == null || column.isPrimaryKeyColumn())
 +                throw new IllegalArgumentException();
 +
 +            columns.remove(identifier.bytes);
 +            regularAndStaticColumns.remove(column);
 +
 +            return this;
 +        }
 +
 +        public Builder renamePrimaryKeyColumn(ColumnIdentifier from, ColumnIdentifier to)
 +        {
 +            if (columns.containsKey(to.bytes))
 +                throw new IllegalArgumentException();
 +
 +            ColumnMetadata column = columns.get(from.bytes);
 +            if (column == null || !column.isPrimaryKeyColumn())
 +                throw new IllegalArgumentException();
 +
 +            ColumnMetadata newColumn = column.withNewName(to);
 +            if (column.isPartitionKey())
 +                partitionKeyColumns.set(column.position(), newColumn);
 +            else
 +                clusteringColumns.set(column.position(), newColumn);
 +
 +            columns.remove(from.bytes);
 +            columns.put(to.bytes, newColumn);
 +
 +            return this;
 +        }
 +
 +        Builder alterColumnType(ColumnIdentifier name, AbstractType<?> type)
 +        {
 +            ColumnMetadata column = columns.get(name.bytes);
 +            if (column == null)
 +                throw new IllegalArgumentException();
 +
 +            ColumnMetadata newColumn = column.withNewType(type);
 +
 +            switch (column.kind)
 +            {
 +                case PARTITION_KEY:
 +                    partitionKeyColumns.set(column.position(), newColumn);
 +                    break;
 +                case CLUSTERING:
 +                    clusteringColumns.set(column.position(), newColumn);
 +                    break;
 +                case REGULAR:
 +                case STATIC:
 +                    regularAndStaticColumns.remove(column);
 +                    regularAndStaticColumns.add(newColumn);
 +                    break;
 +            }
 +
 +            columns.put(column.name.bytes, newColumn);
 +
 +            return this;
 +        }
 +    }
 +    
 +    /**
 +     * A table with strict liveness filters/ignores rows without PK liveness info,
 +     * effectively tying the row liveness to its primary key liveness.
 +     *
 +     * Currently this is only used by views with normal base column as PK column
 +     * so updates to other columns do not make the row live when the base column
 +     * is not live. See CASSANDRA-11500.
 +     *
 +     * TODO: does not belong here, should be gone
 +     */
 +    public boolean enforceStrictLiveness()
 +    {
 +        return isView() && Keyspace.open(keyspace).viewManager.getByName(name).enforceStrictLiveness();
 +    }
 +
 +    /**
 +     * Returns the names of all the user types referenced by this table.
 +     *
 +     * @return the names of all the user types referenced by this table.
 +     */
 +    public Set<ByteBuffer> getReferencedUserTypes()
 +    {
 +        Set<ByteBuffer> types = new LinkedHashSet<>();
 +        columns().forEach(c -> addUserTypes(c.type, types));
 +        return types;
 +    }
 +
 +    /**
 +     * Find all user types used by the specified type and add them to the set.
 +     *
 +     * @param type the type to check for user types.
 +     * @param types the set of UDT names to which to add new user types found in {@code type}. Note that the
 +     * insertion ordering is important and ensures that if a user type A uses another user type B, then B will appear
 +     * before A in iteration order.
 +     */
 +    private static void addUserTypes(AbstractType<?> type, Set<ByteBuffer> types)
 +    {
 +        // Reach into subtypes first, so that if the type is a UDT, it's dependencies are recreated first.
 +        type.subTypes().forEach(t -> addUserTypes(t, types));
 +
 +        if (type.isUDT())
 +            types.add(((UserType)type).name);
 +    }
 +
 +    @Override
 +    public SchemaElementType elementType()
 +    {
 +        return SchemaElementType.TABLE;
 +    }
 +
 +    @Override
 +    public String elementKeyspace()
 +    {
 +        return keyspace;
 +    }
 +
 +    @Override
 +    public String elementName()
 +    {
 +        return name;
 +    }
 +
 +    @Override
-     public String toCqlString(boolean withInternals)
++    public String toCqlString(boolean withInternals, boolean ifNotExists)
 +    {
 +        CqlBuilder builder = new CqlBuilder(2048);
-         appendCqlTo(builder, withInternals, withInternals, false);
++        appendCqlTo(builder, withInternals, withInternals, ifNotExists);
 +        return builder.toString();
 +    }
 +
 +    public String toCqlString(boolean includeDroppedColumns,
 +                              boolean internals,
 +                              boolean ifNotExists)
 +    {
 +        CqlBuilder builder = new CqlBuilder(2048);
 +        appendCqlTo(builder, includeDroppedColumns, internals, ifNotExists);
 +        return builder.toString();
 +    }
 +
 +    public void appendCqlTo(CqlBuilder builder,
 +                            boolean includeDroppedColumns,
 +                            boolean internals,
 +                            boolean ifNotExists)
 +    {
 +        assert !isView();
 +
 +        String createKeyword = "CREATE";
 +        if (isVirtual())
 +        {
 +            builder.append(String.format("/*\n" +
 +                    "Warning: Table %s is a virtual table and cannot be recreated with CQL.\n" +
 +                    "Structure, for reference:\n",
 +                                         toString()));
 +            createKeyword = "VIRTUAL";
 +        }
 +
 +        builder.append(createKeyword)
 +               .append(" TABLE ");
 +
 +        if (ifNotExists)
 +            builder.append("IF NOT EXISTS ");
 +
 +        builder.append(toString())
 +               .append(" (")
 +               .newLine()
 +               .increaseIndent();
 +
 +        boolean hasSingleColumnPrimaryKey = partitionKeyColumns.size() == 1 && clusteringColumns.isEmpty();
 +
 +        appendColumnDefinitions(builder, includeDroppedColumns, hasSingleColumnPrimaryKey);
 +
 +        if (!hasSingleColumnPrimaryKey)
 +            appendPrimaryKey(builder);
 +
 +        builder.decreaseIndent()
 +               .append(')');
 +
 +        appendTableOptions(builder, internals);
 +
 +        builder.decreaseIndent();
 +
 +        if (isVirtual())
 +        {
 +            builder.newLine()
 +                   .append("*/");
 +        }
 +
 +        if (includeDroppedColumns)
 +            appendDropColumns(builder);
 +    }
 +
 +    private void appendColumnDefinitions(CqlBuilder builder,
 +                                         boolean includeDroppedColumns,
 +                                         boolean hasSingleColumnPrimaryKey)
 +    {
 +        Iterator<ColumnMetadata> iter = allColumnsInCreateOrder();
 +        while (iter.hasNext())
 +        {
 +            ColumnMetadata column = iter.next();
 +
 +            // If the column has been re-added after a drop, we don't include it right away. Instead, we'll add the
 +            // dropped one first below, then we'll issue the DROP and then the actual ADD for this column, thus
 +            // simulating the proper sequence of events.
 +            if (includeDroppedColumns && droppedColumns.containsKey(column.name.bytes))
 +                continue;
 +
 +            column.appendCqlTo(builder);
 +
 +            if (hasSingleColumnPrimaryKey && column.isPartitionKey())
 +                builder.append(" PRIMARY KEY");
 +
 +            if (!hasSingleColumnPrimaryKey || (includeDroppedColumns && !droppedColumns.isEmpty()) || iter.hasNext())
 +                builder.append(',');
 +
 +            builder.newLine();
 +        }
 +
 +        if (includeDroppedColumns)
 +        {
 +            Iterator<DroppedColumn> iterDropped = droppedColumns.values().iterator();
 +            while (iterDropped.hasNext())
 +            {
 +                DroppedColumn dropped = iterDropped.next();
 +                dropped.column.appendCqlTo(builder);
 +
 +                if (!hasSingleColumnPrimaryKey || iter.hasNext())
 +                    builder.append(',');
 +
 +                builder.newLine();
 +            }
 +        }
 +    }
 +
 +    void appendPrimaryKey(CqlBuilder builder)
 +    {
 +        List<ColumnMetadata> partitionKeyColumns = partitionKeyColumns();
 +        List<ColumnMetadata> clusteringColumns = clusteringColumns();
 +
 +        builder.append("PRIMARY KEY (");
 +        if (partitionKeyColumns.size() > 1)
 +        {
 +            builder.append('(')
 +                   .appendWithSeparators(partitionKeyColumns, (b, c) -> b.append(c.name), ", ")
 +                   .append(')');
 +        }
 +        else
 +        {
 +            builder.append(partitionKeyColumns.get(0).name);
 +        }
 +
 +        if (!clusteringColumns.isEmpty())
 +            builder.append(", ")
 +                   .appendWithSeparators(clusteringColumns, (b, c) -> b.append(c.name), ", ");
 +
 +        builder.append(')')
 +               .newLine();
 +    }
 +
 +    void appendTableOptions(CqlBuilder builder, boolean internals)
 +    {
 +        builder.append(" WITH ")
 +               .increaseIndent();
 +
 +        if (internals)
 +            builder.append("ID = ")
 +                   .append(id.toString())
 +                   .newLine()
 +                   .append("AND ");
 +
 +        List<ColumnMetadata> clusteringColumns = clusteringColumns();
 +        if (!clusteringColumns.isEmpty())
 +        {
 +            builder.append("CLUSTERING ORDER BY (")
 +                   .appendWithSeparators(clusteringColumns, (b, c) -> c.appendNameAndOrderTo(b), ", ")
 +                   .append(')')
 +                   .newLine()
 +                   .append("AND ");
 +        }
 +
 +        if (isVirtual())
 +        {
 +            builder.append("comment = ").appendWithSingleQuotes(params.comment);
 +        }
 +        else
 +        {
 +            params.appendCqlTo(builder);
 +        }
 +        builder.append(";");
 +    }
 +
 +    private void appendDropColumns(CqlBuilder builder)
 +    {
 +        for (Entry<ByteBuffer, DroppedColumn> entry : droppedColumns.entrySet())
 +        {
 +            DroppedColumn dropped = entry.getValue();
 +
 +            builder.newLine()
 +                   .append("ALTER TABLE ")
 +                   .append(toString())
 +                   .append(" DROP ")
 +                   .append(dropped.column.name)
 +                   .append(" USING TIMESTAMP ")
 +                   .append(dropped.droppedTime)
 +                   .append(';');
 +
 +            ColumnMetadata column = getColumn(entry.getKey());
 +            if (column != null)
 +            {
 +                builder.newLine()
 +                       .append("ALTER TABLE ")
 +                       .append(toString())
 +                       .append(" ADD ");
 +
 +                column.appendCqlTo(builder);
 +
 +                builder.append(';');
 +            }
 +        }
 +    }
 +}
diff --cc src/java/org/apache/cassandra/schema/ViewMetadata.java
index 0cccfb9,0000000..7b5af2d
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/schema/ViewMetadata.java
+++ b/src/java/org/apache/cassandra/schema/ViewMetadata.java
@@@ -1,235 -1,0 +1,227 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.schema;
 +
 +import java.nio.ByteBuffer;
 +import java.util.Optional;
 +
 +import org.apache.commons.lang3.builder.HashCodeBuilder;
 +import org.apache.commons.lang3.builder.ToStringBuilder;
 +
 +import org.apache.cassandra.cql3.*;
 +import org.apache.cassandra.db.marshal.UserType;
 +
 +public final class ViewMetadata implements SchemaElement
 +{
 +    public final TableId baseTableId;
 +    public final String baseTableName;
 +
 +    public final boolean includeAllColumns;
 +    public final TableMetadata metadata;
 +
 +    public final WhereClause whereClause;
 +
 +    /**
 +     * @param baseTableId       Internal ID of the table which this view is based off of
 +     * @param includeAllColumns Whether to include all columns or not
 +     */
 +    public ViewMetadata(TableId baseTableId,
 +                        String baseTableName,
 +                        boolean includeAllColumns,
 +                        WhereClause whereClause,
 +                        TableMetadata metadata)
 +    {
 +        this.baseTableId = baseTableId;
 +        this.baseTableName = baseTableName;
 +        this.includeAllColumns = includeAllColumns;
 +        this.whereClause = whereClause;
 +        this.metadata = metadata;
 +    }
 +
 +    public String keyspace()
 +    {
 +        return metadata.keyspace;
 +    }
 +
 +    public String name()
 +    {
 +        return metadata.name;
 +    }
 +
 +    /**
 +     * @return true if the view specified by this definition will include the column, false otherwise
 +     */
 +    public boolean includes(ColumnIdentifier column)
 +    {
 +        return metadata.getColumn(column) != null;
 +    }
 +
 +    public ViewMetadata copy(TableMetadata newMetadata)
 +    {
 +        return new ViewMetadata(baseTableId, baseTableName, includeAllColumns, whereClause, newMetadata);
 +    }
 +
 +    public TableMetadata baseTableMetadata()
 +    {
 +        return Schema.instance.getTableMetadata(baseTableId);
 +    }
 +
 +    @Override
 +    public boolean equals(Object o)
 +    {
 +        if (this == o)
 +            return true;
 +
 +        if (!(o instanceof ViewMetadata))
 +            return false;
 +
 +        ViewMetadata other = (ViewMetadata) o;
 +        return baseTableId.equals(other.baseTableId)
 +            && includeAllColumns == other.includeAllColumns
 +            && whereClause.equals(other.whereClause)
 +            && metadata.equals(other.metadata);
 +    }
 +
 +    Optional<Difference> compare(ViewMetadata other)
 +    {
 +        if (!baseTableId.equals(other.baseTableId) || includeAllColumns != other.includeAllColumns || !whereClause.equals(other.whereClause))
 +            return Optional.of(Difference.SHALLOW);
 +
 +        return metadata.compare(other.metadata);
 +    }
 +
 +    @Override
 +    public int hashCode()
 +    {
 +        return new HashCodeBuilder(29, 1597)
 +               .append(baseTableId)
 +               .append(includeAllColumns)
 +               .append(whereClause)
 +               .append(metadata)
 +               .toHashCode();
 +    }
 +
 +    @Override
 +    public String toString()
 +    {
 +        return new ToStringBuilder(this)
 +               .append("baseTableId", baseTableId)
 +               .append("baseTableName", baseTableName)
 +               .append("includeAllColumns", includeAllColumns)
 +               .append("whereClause", whereClause)
 +               .append("metadata", metadata)
 +               .toString();
 +    }
 +
 +    public boolean referencesUserType(ByteBuffer name)
 +    {
 +        return metadata.referencesUserType(name);
 +    }
 +
 +    public ViewMetadata withUpdatedUserType(UserType udt)
 +    {
 +        return referencesUserType(udt.name)
 +             ? copy(metadata.withUpdatedUserType(udt))
 +             : this;
 +    }
 +
 +    public ViewMetadata withRenamedPrimaryKeyColumn(ColumnIdentifier from, ColumnIdentifier to)
 +    {
 +        return new ViewMetadata(baseTableId,
 +                                baseTableName,
 +                                includeAllColumns,
 +                                whereClause.renameIdentifier(from, to),
 +                                metadata.unbuild().renamePrimaryKeyColumn(from, to).build());
 +    }
 +
 +    public ViewMetadata withAddedRegularColumn(ColumnMetadata column)
 +    {
 +        return new ViewMetadata(baseTableId,
 +                                baseTableName,
 +                                includeAllColumns,
 +                                whereClause,
 +                                metadata.unbuild().addColumn(column).build());
 +    }
 +
 +    public void appendCqlTo(CqlBuilder builder,
 +                            boolean internals,
 +                            boolean ifNotExists)
 +    {
 +        builder.append("CREATE MATERIALIZED VIEW ");
 +
 +        if (ifNotExists)
 +            builder.append("IF NOT EXISTS ");
 +
 +        builder.append(metadata.toString())
 +               .append(" AS")
 +               .newLine()
 +               .increaseIndent()
 +               .append("SELECT ");
 +
 +        if (includeAllColumns)
 +        {
 +            builder.append('*');
 +        }
 +        else
 +        {
 +            builder.appendWithSeparators(metadata.allColumnsInSelectOrder(), (b, c) -> b.append(c.name), ", ");
 +        }
 +
 +        builder.newLine()
 +               .append("FROM ")
 +               .appendQuotingIfNeeded(metadata.keyspace)
 +               .append('.')
 +               .appendQuotingIfNeeded(baseTableName)
 +               .newLine()
 +               .append("WHERE ")
 +               .append(whereClause.toString())
 +               .newLine();
 +
 +        metadata.appendPrimaryKey(builder);
 +
 +        builder.decreaseIndent();
 +
 +        metadata.appendTableOptions(builder, internals);
 +    }
 +
 +    @Override
 +    public SchemaElementType elementType()
 +    {
 +        return SchemaElementType.MATERIALIZED_VIEW;
 +    }
 +
 +    @Override
 +    public String elementKeyspace()
 +    {
 +        return keyspace();
 +    }
 +
 +    @Override
 +    public String elementName()
 +    {
 +        return name();
 +    }
 +
 +    @Override
-     public String toCqlString(boolean withInternals)
++    public String toCqlString(boolean withInternals, boolean ifNotExists)
 +    {
 +        CqlBuilder builder = new CqlBuilder(2048);
-         appendCqlTo(builder, withInternals, false);
-         return builder.toString();
-     }
- 
-     public String toCqlString(boolean internals,
-                               boolean ifNotExists)
-     {
-         CqlBuilder builder = new CqlBuilder(2048);
-         appendCqlTo(builder, internals, ifNotExists);
++        appendCqlTo(builder, withInternals, ifNotExists);
 +        return builder.toString();
 +    }
 +}
diff --cc test/unit/org/apache/cassandra/cql3/validation/entities/UFJavaTest.java
index 596e944,4d46b8b..47f1cbf
--- a/test/unit/org/apache/cassandra/cql3/validation/entities/UFJavaTest.java
+++ b/test/unit/org/apache/cassandra/cql3/validation/entities/UFJavaTest.java
@@@ -19,6 -19,6 +19,7 @@@
  package org.apache.cassandra.cql3.validation.entities;
  
  import java.util.Arrays;
++import java.util.Collections;
  import java.util.List;
  import java.util.Map;
  import java.util.Set;
@@@ -33,13 -33,13 +34,18 @@@ import com.datastax.driver.core.Row
  import com.datastax.driver.core.TupleType;
  import com.datastax.driver.core.TupleValue;
  import com.datastax.driver.core.UDTValue;
 -import org.apache.cassandra.config.Schema;
  import org.apache.cassandra.cql3.CQL3Type;
  import org.apache.cassandra.cql3.CQLTester;
++import org.apache.cassandra.cql3.ColumnIdentifier;
  import org.apache.cassandra.cql3.UntypedResultSet;
  import org.apache.cassandra.cql3.functions.FunctionName;
++import org.apache.cassandra.cql3.functions.UDAggregate;
++import org.apache.cassandra.cql3.functions.UDFunction;
++import org.apache.cassandra.db.marshal.Int32Type;
++import org.apache.cassandra.db.marshal.UTF8Type;
  import org.apache.cassandra.exceptions.FunctionExecutionException;
  import org.apache.cassandra.exceptions.InvalidRequestException;
 +import org.apache.cassandra.schema.Schema;
  import org.apache.cassandra.transport.ProtocolVersion;
  
  public class UFJavaTest extends CQLTester
@@@ -56,7 -56,7 +62,7 @@@
                                        "RETURNS NULL ON NULL INPUT " +
                                        "RETURNS bigint " +
                                        "LANGUAGE JAVA\n" +
--                                      "AS '" +functionBody + "';");
++                                      "AS '" + functionBody + "';");
  
          assertRows(execute("SELECT language, body FROM system_schema.functions WHERE keyspace_name=? AND function_name=?",
                             KEYSPACE, parseFunctionName(fName).name),
@@@ -796,4 -794,4 +802,61 @@@
                             "AS 'return 0;'");
          }
      }
++
++    @Test
++    public void testUDFToCqlString()
++    {
++        UDFunction function = UDFunction.create(new FunctionName("my_ks", "my_function"),
++                                                Arrays.asList(ColumnIdentifier.getInterned("column", false)),
++                                                Arrays.asList(UTF8Type.instance),
++                                                Int32Type.instance,
++                                                false,
++                                                "java",
++                                                "return 0;");
++
++        Assert.assertTrue(function.toCqlString(true, true).contains("CREATE FUNCTION IF NOT EXISTS"));
++        Assert.assertFalse(function.toCqlString(true, false).contains("CREATE FUNCTION IF NOT EXISTS"));
++
++        Assert.assertEquals(function.toCqlString(true, true), function.toCqlString(false, true));
++        Assert.assertEquals(function.toCqlString(true, false), function.toCqlString(false, false));
++    }
++
++    @Test
++    public void testUDAToCqlString() throws Throwable
++    {
++        // we have to create this function in DB otherwise UDAggregate creation below fails
++        String stateFunctionName = createFunction(KEYSPACE, "int,int",
++                                                  "CREATE OR REPLACE FUNCTION %s(state int, val int)\n" +
++                                                  "    CALLED ON NULL INPUT\n" +
++                                                  "    RETURNS int\n" +
++                                                  "    LANGUAGE java\n" +
++                                                  "    AS $$\n" +
++                                                  "        return state + val;\n" +
++                                                  "    $$;");
++
++        // Java representation of state function so we can construct aggregate programmatically
++        UDFunction stateFunction = UDFunction.create(new FunctionName(KEYSPACE, stateFunctionName.split("\\.")[1]),
++                                                     Arrays.asList(ColumnIdentifier.getInterned("state", false),
++                                                                   ColumnIdentifier.getInterned("val", false)),
++                                                     Arrays.asList(Int32Type.instance, Int32Type.instance),
++                                                     Int32Type.instance,
++                                                     true,
++                                                     "java",
++                                                     "return state + val;");
++
++        UDAggregate aggregate = UDAggregate.create(Collections.singleton(stateFunction),
++                                                   new FunctionName(KEYSPACE, "my_aggregate"),
++                                                   Collections.singletonList(Int32Type.instance),
++                                                   Int32Type.instance,
++                                                   new FunctionName(KEYSPACE, stateFunctionName.split("\\.")[1]),
++                                                   null,
++                                                   Int32Type.instance,
++                                                   null);
++
++        Assert.assertTrue(aggregate.toCqlString(true, true).contains("CREATE AGGREGATE IF NOT EXISTS"));
++        Assert.assertFalse(aggregate.toCqlString(true, false).contains("CREATE AGGREGATE IF NOT EXISTS"));
++
++        Assert.assertEquals(aggregate.toCqlString(true, true), aggregate.toCqlString(false, true));
++        Assert.assertEquals(aggregate.toCqlString(true, false), aggregate.toCqlString(false, false));
++    }
  }
diff --cc test/unit/org/apache/cassandra/db/SchemaCQLHelperTest.java
index 6857dd3,0000000..c09a1f1
mode 100644,000000..100644
--- a/test/unit/org/apache/cassandra/db/SchemaCQLHelperTest.java
+++ b/test/unit/org/apache/cassandra/db/SchemaCQLHelperTest.java
@@@ -1,468 -1,0 +1,473 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.cassandra.db;
 +
 +import com.google.common.collect.ImmutableList;
 +import com.google.common.collect.ImmutableMap;
 +import com.google.common.io.Files;
 +
 +import org.junit.Assert;
 +import org.junit.Before;
 +import org.junit.Test;
 +
 +import org.apache.cassandra.*;
 +import org.apache.cassandra.cql3.*;
 +import org.apache.cassandra.cql3.statements.schema.IndexTarget;
 +import org.apache.cassandra.db.marshal.*;
 +import org.apache.cassandra.exceptions.ConfigurationException;
 +import org.apache.cassandra.index.sasi.SASIIndex;
 +import org.apache.cassandra.schema.*;
 +import org.apache.cassandra.service.reads.SpeculativeRetryPolicy;
 +import org.apache.cassandra.utils.ByteBufferUtil;
 +import org.apache.cassandra.utils.FBUtilities;
 +import org.json.simple.JSONArray;
 +import org.json.simple.JSONObject;
 +import org.json.simple.parser.JSONParser;
 +
 +import java.io.FileReader;
 +import java.nio.ByteBuffer;
 +import java.nio.charset.Charset;
 +import java.util.Arrays;
 +import java.util.Collections;
 +import java.util.stream.Collectors;
 +
 +import static org.hamcrest.CoreMatchers.allOf;
 +import static org.hamcrest.CoreMatchers.containsString;
 +import static org.hamcrest.CoreMatchers.startsWith;
 +import static org.junit.Assert.assertEquals;
 +import static org.junit.Assert.assertThat;
 +
 +public class SchemaCQLHelperTest extends CQLTester
 +{
 +    @Before
 +    public void defineSchema() throws ConfigurationException
 +    {
 +        SchemaLoader.prepareServer();
 +    }
 +
 +    @Test
 +    public void testUserTypesCQL()
 +    {
 +        String keyspace = "cql_test_keyspace_user_types";
 +        String table = "test_table_user_types";
 +
 +        UserType typeA = new UserType(keyspace, ByteBufferUtil.bytes("a"),
 +                                      Arrays.asList(FieldIdentifier.forUnquoted("a1"),
 +                                                    FieldIdentifier.forUnquoted("a2"),
 +                                                    FieldIdentifier.forUnquoted("a3")),
 +                                      Arrays.asList(IntegerType.instance,
 +                                                    IntegerType.instance,
 +                                                    IntegerType.instance),
 +                                      true);
 +
 +        UserType typeB = new UserType(keyspace, ByteBufferUtil.bytes("b"),
 +                                      Arrays.asList(FieldIdentifier.forUnquoted("b1"),
 +                                                    FieldIdentifier.forUnquoted("b2"),
 +                                                    FieldIdentifier.forUnquoted("b3")),
 +                                      Arrays.asList(typeA,
 +                                                    typeA,
 +                                                    typeA),
 +                                      true);
 +
 +        UserType typeC = new UserType(keyspace, ByteBufferUtil.bytes("c"),
 +                                      Arrays.asList(FieldIdentifier.forUnquoted("c1"),
 +                                                    FieldIdentifier.forUnquoted("c2"),
 +                                                    FieldIdentifier.forUnquoted("c3")),
 +                                      Arrays.asList(typeB,
 +                                                    typeB,
 +                                                    typeB),
 +                                      true);
 +
 +        TableMetadata cfm =
 +        TableMetadata.builder(keyspace, table)
 +                     .addPartitionKeyColumn("pk1", IntegerType.instance)
 +                     .addClusteringColumn("ck1", IntegerType.instance)
 +                     .addRegularColumn("reg1", typeC.freeze())
 +                     .addRegularColumn("reg2", ListType.getInstance(IntegerType.instance, false))
 +                     .addRegularColumn("reg3", MapType.getInstance(AsciiType.instance, IntegerType.instance, true))
 +                     .build();
 +
 +        SchemaLoader.createKeyspace(keyspace, KeyspaceParams.simple(1), Tables.of(cfm), Types.of(typeA, typeB, typeC));
 +
 +        ColumnFamilyStore cfs = Keyspace.open(keyspace).getColumnFamilyStore(table);
 +
 +        assertEquals(ImmutableList.of("CREATE TYPE cql_test_keyspace_user_types.a (\n" +
 +                                      "    a1 varint,\n" +
 +                                      "    a2 varint,\n" +
 +                                      "    a3 varint\n" +
 +                                      ");",
 +                                      "CREATE TYPE cql_test_keyspace_user_types.b (\n" +
 +                                      "    b1 a,\n" +
 +                                      "    b2 a,\n" +
 +                                      "    b3 a\n" +
 +                                      ");",
 +                                      "CREATE TYPE cql_test_keyspace_user_types.c (\n" +
 +                                      "    c1 b,\n" +
 +                                      "    c2 b,\n" +
 +                                      "    c3 b\n" +
 +                                      ");"),
-                      SchemaCQLHelper.getUserTypesAsCQL(cfs.metadata(), cfs.keyspace.getMetadata().types).collect(Collectors.toList()));
++                     SchemaCQLHelper.getUserTypesAsCQL(cfs.metadata(), cfs.keyspace.getMetadata().types, false).collect(Collectors.toList()));
 +    }
 +
 +    @Test
 +    public void testDroppedColumnsCQL()
 +    {
 +        String keyspace = "cql_test_keyspace_dropped_columns";
 +        String table = "test_table_dropped_columns";
 +
 +        TableMetadata.Builder builder =
 +        TableMetadata.builder(keyspace, table)
 +                     .addPartitionKeyColumn("pk1", IntegerType.instance)
 +                     .addClusteringColumn("ck1", IntegerType.instance)
 +                     .addStaticColumn("st1", IntegerType.instance)
 +                     .addRegularColumn("reg1", IntegerType.instance)
 +                     .addRegularColumn("reg2", IntegerType.instance)
 +                     .addRegularColumn("reg3", IntegerType.instance);
 +
 +        ColumnMetadata st1 = builder.getColumn(ByteBufferUtil.bytes("st1"));
 +        ColumnMetadata reg1 = builder.getColumn(ByteBufferUtil.bytes("reg1"));
 +        ColumnMetadata reg2 = builder.getColumn(ByteBufferUtil.bytes("reg2"));
 +        ColumnMetadata reg3 = builder.getColumn(ByteBufferUtil.bytes("reg3"));
 +
 +        builder.removeRegularOrStaticColumn(st1.name)
 +               .removeRegularOrStaticColumn(reg1.name)
 +               .removeRegularOrStaticColumn(reg2.name)
 +               .removeRegularOrStaticColumn(reg3.name);
 +
 +        builder.recordColumnDrop(st1, 5000)
 +               .recordColumnDrop(reg1, 10000)
 +               .recordColumnDrop(reg2, 20000)
 +               .recordColumnDrop(reg3, 30000);
 +
 +        SchemaLoader.createKeyspace(keyspace, KeyspaceParams.simple(1), builder);
 +
 +        ColumnFamilyStore cfs = Keyspace.open(keyspace).getColumnFamilyStore(table);
 +
 +        String expected = "CREATE TABLE IF NOT EXISTS cql_test_keyspace_dropped_columns.test_table_dropped_columns (\n" +
 +                          "    pk1 varint,\n" +
 +                          "    ck1 varint,\n" +
 +                          "    reg1 varint,\n" +
 +                          "    reg3 varint,\n" +
 +                          "    reg2 varint,\n" +
 +                          "    st1 varint static,\n" +
 +                          "    PRIMARY KEY (pk1, ck1)\n) WITH ID =";
 +        String actual = SchemaCQLHelper.getTableMetadataAsCQL(cfs.metadata(), true, true, true);
 +
 +        assertThat(actual,
 +                   allOf(startsWith(expected),
 +                         containsString("ALTER TABLE cql_test_keyspace_dropped_columns.test_table_dropped_columns DROP reg1 USING TIMESTAMP 10000;"),
 +                         containsString("ALTER TABLE cql_test_keyspace_dropped_columns.test_table_dropped_columns DROP reg3 USING TIMESTAMP 30000;"),
 +                         containsString("ALTER TABLE cql_test_keyspace_dropped_columns.test_table_dropped_columns DROP reg2 USING TIMESTAMP 20000;"),
 +                         containsString("ALTER TABLE cql_test_keyspace_dropped_columns.test_table_dropped_columns DROP st1 USING TIMESTAMP 5000;")));
 +    }
 +
 +    @Test
 +    public void testReaddedColumns()
 +    {
 +        String keyspace = "cql_test_keyspace_readded_columns";
 +        String table = "test_table_readded_columns";
 +
 +        TableMetadata.Builder builder =
 +        TableMetadata.builder(keyspace, table)
 +                     .addPartitionKeyColumn("pk1", IntegerType.instance)
 +                     .addClusteringColumn("ck1", IntegerType.instance)
 +                     .addRegularColumn("reg1", IntegerType.instance)
 +                     .addStaticColumn("st1", IntegerType.instance)
 +                     .addRegularColumn("reg2", IntegerType.instance);
 +
 +        ColumnMetadata reg1 = builder.getColumn(ByteBufferUtil.bytes("reg1"));
 +        ColumnMetadata st1 = builder.getColumn(ByteBufferUtil.bytes("st1"));
 +
 +        builder.removeRegularOrStaticColumn(reg1.name);
 +        builder.removeRegularOrStaticColumn(st1.name);
 +
 +        builder.recordColumnDrop(reg1, 10000);
 +        builder.recordColumnDrop(st1, 20000);
 +
 +        builder.addColumn(reg1);
 +        builder.addColumn(st1);
 +
 +        SchemaLoader.createKeyspace(keyspace, KeyspaceParams.simple(1), builder);
 +
 +        ColumnFamilyStore cfs = Keyspace.open(keyspace).getColumnFamilyStore(table);
 +
 +        // when re-adding, column is present as both column and as dropped column record.
 +        String actual = SchemaCQLHelper.getTableMetadataAsCQL(cfs.metadata(), true, true, true);
 +        String expected = "CREATE TABLE IF NOT EXISTS cql_test_keyspace_readded_columns.test_table_readded_columns (\n" +
 +                          "    pk1 varint,\n" +
 +                          "    ck1 varint,\n" +
 +                          "    reg2 varint,\n" +
 +                          "    reg1 varint,\n" +
 +                          "    st1 varint static,\n" +
 +                          "    PRIMARY KEY (pk1, ck1)\n" +
 +                          ") WITH ID";
 +
 +        assertThat(actual,
 +                   allOf(startsWith(expected),
 +                         containsString("ALTER TABLE cql_test_keyspace_readded_columns.test_table_readded_columns DROP reg1 USING TIMESTAMP 10000;"),
 +                         containsString("ALTER TABLE cql_test_keyspace_readded_columns.test_table_readded_columns ADD reg1 varint;"),
 +                         containsString("ALTER TABLE cql_test_keyspace_readded_columns.test_table_readded_columns DROP st1 USING TIMESTAMP 20000;"),
 +                         containsString("ALTER TABLE cql_test_keyspace_readded_columns.test_table_readded_columns ADD st1 varint static;")));
 +    }
 +
 +    @Test
 +    public void testCfmColumnsCQL()
 +    {
 +        String keyspace = "cql_test_keyspace_create_table";
 +        String table = "test_table_create_table";
 +
 +        TableMetadata.Builder metadata =
 +        TableMetadata.builder(keyspace, table)
 +                     .addPartitionKeyColumn("pk1", IntegerType.instance)
 +                     .addPartitionKeyColumn("pk2", AsciiType.instance)
 +                     .addClusteringColumn("ck1", ReversedType.getInstance(IntegerType.instance))
 +                     .addClusteringColumn("ck2", IntegerType.instance)
 +                     .addStaticColumn("st1", AsciiType.instance)
 +                     .addRegularColumn("reg1", AsciiType.instance)
 +                     .addRegularColumn("reg2", ListType.getInstance(IntegerType.instance, false))
 +                     .addRegularColumn("reg3", MapType.getInstance(AsciiType.instance, IntegerType.instance, true));
 +
 +        SchemaLoader.createKeyspace(keyspace, KeyspaceParams.simple(1), metadata);
 +
 +        ColumnFamilyStore cfs = Keyspace.open(keyspace).getColumnFamilyStore(table);
 +
 +        assertThat(SchemaCQLHelper.getTableMetadataAsCQL(cfs.metadata(), true, true, true),
 +                   startsWith(
 +                   "CREATE TABLE IF NOT EXISTS cql_test_keyspace_create_table.test_table_create_table (\n" +
 +                   "    pk1 varint,\n" +
 +                   "    pk2 ascii,\n" +
 +                   "    ck1 varint,\n" +
 +                   "    ck2 varint,\n" +
 +                   "    st1 ascii static,\n" +
 +                   "    reg1 ascii,\n" +
 +                   "    reg2 frozen<list<varint>>,\n" +
 +                   "    reg3 map<ascii, varint>,\n" +
 +                   "    PRIMARY KEY ((pk1, pk2), ck1, ck2)\n" +
 +                   ") WITH ID = " + cfs.metadata.id + "\n" +
 +                   "    AND CLUSTERING ORDER BY (ck1 DESC, ck2 ASC)"));
 +    }
 +
 +    @Test
 +    public void testCfmOptionsCQL()
 +    {
 +        String keyspace = "cql_test_keyspace_options";
 +        String table = "test_table_options";
 +
 +        TableMetadata.Builder builder = TableMetadata.builder(keyspace, table);
 +        long droppedTimestamp = FBUtilities.timestampMicros();
 +        builder.addPartitionKeyColumn("pk1", IntegerType.instance)
 +               .addClusteringColumn("cl1", IntegerType.instance)
 +               .addRegularColumn("reg1", AsciiType.instance)
 +               .bloomFilterFpChance(1.0)
 +               .comment("comment")
 +               .compaction(CompactionParams.lcs(Collections.singletonMap("sstable_size_in_mb", "1")))
 +               .compression(CompressionParams.lz4(1 << 16, 1 << 15))
 +               .crcCheckChance(0.3)
 +               .defaultTimeToLive(4)
 +               .gcGraceSeconds(5)
 +               .minIndexInterval(6)
 +               .maxIndexInterval(7)
 +               .memtableFlushPeriod(8)
 +               .speculativeRetry(SpeculativeRetryPolicy.fromString("always"))
 +               .additionalWritePolicy(SpeculativeRetryPolicy.fromString("always"))
 +               .extensions(ImmutableMap.of("ext1", ByteBuffer.wrap("val1".getBytes())))
 +               .recordColumnDrop(ColumnMetadata.regularColumn(keyspace, table, "reg1", AsciiType.instance),
 +                                 droppedTimestamp);
 +
 +        SchemaLoader.createKeyspace(keyspace, KeyspaceParams.simple(1), builder);
 +
 +        ColumnFamilyStore cfs = Keyspace.open(keyspace).getColumnFamilyStore(table);
 +
 +        assertThat(SchemaCQLHelper.getTableMetadataAsCQL(cfs.metadata(), true, true, true),
 +                   containsString("CLUSTERING ORDER BY (cl1 ASC)\n" +
 +                            "    AND additional_write_policy = 'ALWAYS'\n" +
 +                            "    AND bloom_filter_fp_chance = 1.0\n" +
 +                            "    AND caching = {'keys': 'ALL', 'rows_per_partition': 'NONE'}\n" +
 +                            "    AND cdc = false\n" +
 +                            "    AND comment = 'comment'\n" +
 +                            "    AND compaction = {'class': 'org.apache.cassandra.db.compaction.LeveledCompactionStrategy', 'max_threshold': '32', 'min_threshold': '4', 'sstable_size_in_mb': '1'}\n" +
 +                            "    AND compression = {'chunk_length_in_kb': '64', 'class': 'org.apache.cassandra.io.compress.LZ4Compressor', 'min_compress_ratio': '2.0'}\n" +
 +                            "    AND crc_check_chance = 0.3\n" +
 +                            "    AND default_time_to_live = 4\n" +
 +                            "    AND extensions = {'ext1': 0x76616c31}\n" +
 +                            "    AND gc_grace_seconds = 5\n" +
 +                            "    AND max_index_interval = 7\n" +
 +                            "    AND memtable_flush_period_in_ms = 8\n" +
 +                            "    AND min_index_interval = 6\n" +
 +                            "    AND read_repair = 'BLOCKING'\n" +
 +                            "    AND speculative_retry = 'ALWAYS';"
 +                   ));
 +    }
 +
 +    @Test
 +    public void testCfmIndexJson()
 +    {
 +        String keyspace = "cql_test_keyspace_3";
 +        String table = "test_table_3";
 +
 +        TableMetadata.Builder builder =
 +        TableMetadata.builder(keyspace, table)
 +                     .addPartitionKeyColumn("pk1", IntegerType.instance)
 +                     .addClusteringColumn("cl1", IntegerType.instance)
 +                     .addRegularColumn("reg1", AsciiType.instance);
 +
 +        ColumnIdentifier reg1 = ColumnIdentifier.getInterned("reg1", true);
 +
 +        builder.indexes(
 +        Indexes.of(IndexMetadata.fromIndexTargets(
 +        Collections.singletonList(new IndexTarget(reg1, IndexTarget.Type.VALUES)),
 +        "indexName",
 +        IndexMetadata.Kind.COMPOSITES,
 +        Collections.emptyMap()),
 +                   IndexMetadata.fromIndexTargets(
 +                   Collections.singletonList(new IndexTarget(reg1, IndexTarget.Type.KEYS)),
 +                   "indexName2",
 +                   IndexMetadata.Kind.COMPOSITES,
 +                   Collections.emptyMap()),
 +                   IndexMetadata.fromIndexTargets(
 +                   Collections.singletonList(new IndexTarget(reg1, IndexTarget.Type.KEYS_AND_VALUES)),
 +                   "indexName3",
 +                   IndexMetadata.Kind.COMPOSITES,
 +                   Collections.emptyMap()),
 +                   IndexMetadata.fromIndexTargets(
 +                   Collections.singletonList(new IndexTarget(reg1, IndexTarget.Type.KEYS_AND_VALUES)),
 +                   "indexName4",
 +                   IndexMetadata.Kind.CUSTOM,
 +                   Collections.singletonMap(IndexTarget.CUSTOM_INDEX_OPTION_NAME, SASIIndex.class.getName()))));
 +
 +
 +        SchemaLoader.createKeyspace(keyspace, KeyspaceParams.simple(1), builder);
 +
 +        ColumnFamilyStore cfs = Keyspace.open(keyspace).getColumnFamilyStore(table);
 +
 +        assertEquals(ImmutableList.of("CREATE INDEX \"indexName\" ON cql_test_keyspace_3.test_table_3 (values(reg1));",
 +                                      "CREATE INDEX \"indexName2\" ON cql_test_keyspace_3.test_table_3 (keys(reg1));",
 +                                      "CREATE INDEX \"indexName3\" ON cql_test_keyspace_3.test_table_3 (entries(reg1));",
 +                                      "CREATE CUSTOM INDEX \"indexName4\" ON cql_test_keyspace_3.test_table_3 (entries(reg1)) USING 'org.apache.cassandra.index.sasi.SASIIndex';"),
-                      SchemaCQLHelper.getIndexesAsCQL(cfs.metadata()).collect(Collectors.toList()));
++                     SchemaCQLHelper.getIndexesAsCQL(cfs.metadata(), false).collect(Collectors.toList()));
 +    }
 +
 +    private final static String SNAPSHOT = "testsnapshot";
 +
 +    @Test
 +    public void testSnapshot() throws Throwable
 +    {
 +        String typeA = createType("CREATE TYPE %s (a1 varint, a2 varint, a3 varint);");
 +        String typeB = createType("CREATE TYPE %s (b1 frozen<" + typeA + ">, b2 frozen<" + typeA + ">, b3 frozen<" + typeA + ">);");
 +        String typeC = createType("CREATE TYPE %s (c1 frozen<" + typeB + ">, c2 frozen<" + typeB + ">, c3 frozen<" + typeB + ">);");
 +
 +        String tableName = createTable("CREATE TABLE IF NOT EXISTS %s (" +
 +                                       "pk1 varint," +
 +                                       "pk2 ascii," +
 +                                       "ck1 varint," +
 +                                       "ck2 varint," +
 +                                       "reg1 " + typeC + "," +
 +                                       "reg2 int," +
 +                                       "reg3 int," +
 +                                       "PRIMARY KEY ((pk1, pk2), ck1, ck2)) WITH " +
 +                                       "CLUSTERING ORDER BY (ck1 ASC, ck2 DESC);");
 +
 +        alterTable("ALTER TABLE %s DROP reg3 USING TIMESTAMP 10000;");
 +        alterTable("ALTER TABLE %s ADD reg3 int;");
++        // CREATE INDEX def_name_idx ON abc.def (name);
++        createIndex("CREATE INDEX ON %s(reg2)");
 +
 +        for (int i = 0; i < 10; i++)
 +            execute("INSERT INTO %s (pk1, pk2, ck1, ck2, reg1, reg2) VALUES (?, ?, ?, ?, ?, ?)", i, i + 1, i + 2, i + 3, null, i + 5);
 +
 +        ColumnFamilyStore cfs = Keyspace.open(keyspace()).getColumnFamilyStore(tableName);
 +        cfs.snapshot(SNAPSHOT);
 +
 +        String schema = Files.toString(cfs.getDirectories().getSnapshotSchemaFile(SNAPSHOT), Charset.defaultCharset());
 +        assertThat(schema,
-                    allOf(containsString(String.format("CREATE TYPE %s.%s (\n" +
++                   allOf(containsString(String.format("CREATE TYPE IF NOT EXISTS %s.%s (\n" +
 +                                                      "    a1 varint,\n" +
 +                                                      "    a2 varint,\n" +
 +                                                      "    a3 varint\n" +
 +                                                      ");", keyspace(), typeA)),
-                          containsString(String.format("CREATE TYPE %s.%s (\n" +
++                         containsString(String.format("CREATE TYPE IF NOT EXISTS %s.%s (\n" +
 +                                                      "    a1 varint,\n" +
 +                                                      "    a2 varint,\n" +
 +                                                      "    a3 varint\n" +
 +                                                      ");", keyspace(), typeA)),
-                          containsString(String.format("CREATE TYPE %s.%s (\n" +
++                         containsString(String.format("CREATE TYPE IF NOT EXISTS %s.%s (\n" +
 +                                                      "    b1 frozen<%s>,\n" +
 +                                                      "    b2 frozen<%s>,\n" +
 +                                                      "    b3 frozen<%s>\n" +
 +                                                      ");", keyspace(), typeB, typeA, typeA, typeA)),
-                          containsString(String.format("CREATE TYPE %s.%s (\n" +
++                         containsString(String.format("CREATE TYPE IF NOT EXISTS %s.%s (\n" +
 +                                                      "    c1 frozen<%s>,\n" +
 +                                                      "    c2 frozen<%s>,\n" +
 +                                                      "    c3 frozen<%s>\n" +
 +                                                      ");", keyspace(), typeC, typeB, typeB, typeB))));
 +
 +        schema = schema.substring(schema.indexOf("CREATE TABLE")); // trim to ensure order
 +        String expected = "CREATE TABLE IF NOT EXISTS " + keyspace() + "." + tableName + " (\n" +
 +                          "    pk1 varint,\n" +
 +                          "    pk2 ascii,\n" +
 +                          "    ck1 varint,\n" +
 +                          "    ck2 varint,\n" +
 +                          "    reg2 int,\n" +
 +                          "    reg1 " + typeC+ ",\n" +
 +                          "    reg3 int,\n" +
 +                          "    PRIMARY KEY ((pk1, pk2), ck1, ck2)\n" +
 +                          ") WITH ID = " + cfs.metadata.id + "\n" +
 +                          "    AND CLUSTERING ORDER BY (ck1 ASC, ck2 DESC)";
 +
 +        assertThat(schema,
 +                   allOf(startsWith(expected),
 +                         containsString("ALTER TABLE " + keyspace() + "." + tableName + " DROP reg3 USING TIMESTAMP 10000;"),
 +                         containsString("ALTER TABLE " + keyspace() + "." + tableName + " ADD reg3 int;")));
 +
++        assertThat(schema, containsString("CREATE INDEX IF NOT EXISTS " + tableName + "_reg2_idx ON " + keyspace() + '.' + tableName + " (reg2);"));
++
 +        JSONObject manifest = (JSONObject) new JSONParser().parse(new FileReader(cfs.getDirectories().getSnapshotManifestFile(SNAPSHOT)));
 +        JSONArray files = (JSONArray) manifest.get("files");
-         Assert.assertEquals(1, files.size());
++        // two files, the second is index
++        Assert.assertEquals(2, files.size());
 +    }
 +
 +    @Test
 +    public void testSystemKsSnapshot()
 +    {
 +        ColumnFamilyStore cfs = Keyspace.open("system").getColumnFamilyStore("peers");
 +        cfs.snapshot(SNAPSHOT);
 +
 +        Assert.assertTrue(cfs.getDirectories().getSnapshotManifestFile(SNAPSHOT).exists());
 +        Assert.assertFalse(cfs.getDirectories().getSnapshotSchemaFile(SNAPSHOT).exists());
 +    }
 +
 +    @Test
 +    public void testBooleanCompositeKey() throws Throwable
 +    {
 +        createTable("CREATE TABLE %s (t_id boolean, id boolean, ck boolean, nk boolean, PRIMARY KEY ((t_id, id), ck))");
 +
 +        execute("insert into %s (t_id, id, ck, nk) VALUES (true, false, false, true)");
 +        assertRows(execute("select * from %s"), row(true, false, false, true));
 +
 +        // CASSANDRA-14752 -
 +        // a problem with composite boolean types meant that calling this would
 +        // prevent any boolean values to be inserted afterwards
 +        ColumnFamilyStore cfs = getCurrentColumnFamilyStore();
 +        cfs.getSSTablesForKey("false:true");
 +
 +        execute("insert into %s (t_id, id, ck, nk) VALUES (true, true, false, true)");
 +        assertRows(execute("select t_id, id, ck, nk from %s"), row(true, false, false, true), row(true, true, false, true));
 +    }
 +}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org