You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by bl...@apache.org on 2020/06/29 16:05:53 UTC

[cassandra] branch trunk updated: Add support for server side DESCRIBE statements

This is an automated email from the ASF dual-hosted git repository.

blerer pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 4e47bfb  Add support for server side DESCRIBE statements
4e47bfb is described below

commit 4e47bfb3a1abb8074fb9a24f98a97dbf25806522
Author: Robert Stupp <sn...@snazy.de>
AuthorDate: Mon Sep 9 22:28:54 2019 +0200

    Add support for server side DESCRIBE statements
    
    patch by Robert Stupp; reviewed by Dinesh Joshi and Benjamin Lerer for CASSANDRA-14825
---
 CHANGES.txt                                        |   1 +
 NEWS.txt                                           |   1 +
 bin/cqlsh.py                                       | 349 +++------
 doc/source/cql/ddl.rst                             | 322 ++++++++
 pylib/cqlshlib/cqlshhandling.py                    |   8 +-
 pylib/cqlshlib/test/test_cqlsh_output.py           |   9 +-
 src/antlr/Lexer.g                                  |  10 +-
 src/antlr/Parser.g                                 |  48 ++
 .../apache/cassandra/audit/AuditLogEntryType.java  |   1 +
 src/java/org/apache/cassandra/cql3/CQL3Type.java   |  14 +-
 src/java/org/apache/cassandra/cql3/CqlBuilder.java | 226 ++++++
 src/java/org/apache/cassandra/cql3/ResultSet.java  |   9 +-
 .../org/apache/cassandra/cql3/SchemaElement.java   |  96 +++
 .../cassandra/cql3/functions/AbstractFunction.java |  48 +-
 .../cassandra/cql3/functions/FunctionName.java     |  21 +
 .../cassandra/cql3/functions/UDAggregate.java      |  41 +-
 .../cassandra/cql3/functions/UDFunction.java       |  51 +-
 .../cql3/statements/DescribeStatement.java         | 750 +++++++++++++++++++
 .../cql3/statements/RequestValidations.java        |  18 +
 .../cql3/statements/schema/AlterTypeStatement.java |  10 +-
 .../org/apache/cassandra/db/ColumnFamilyStore.java |   5 +-
 .../org/apache/cassandra/db/SchemaCQLHelper.java   | 158 ++++
 .../org/apache/cassandra/db/TableCQLHelper.java    | 430 -----------
 .../apache/cassandra/db/marshal/AbstractType.java  |  21 +-
 .../org/apache/cassandra/db/marshal/ListType.java  |   6 +
 .../org/apache/cassandra/db/marshal/MapType.java   |   6 +
 .../org/apache/cassandra/db/marshal/SetType.java   |   6 +
 .../org/apache/cassandra/db/marshal/TupleType.java |   6 +
 .../org/apache/cassandra/db/marshal/UserType.java  |  54 +-
 .../cassandra/db/virtual/SystemViewsKeyspace.java  |  24 +-
 .../db/virtual/VirtualSchemaKeyspace.java          |   6 +-
 .../exceptions/InvalidRequestException.java        |   5 +
 .../apache/cassandra/locator/EndpointsByRange.java |   2 -
 .../apache/cassandra/schema/ColumnMetadata.java    |  21 +-
 .../org/apache/cassandra/schema/Functions.java     |  31 +
 .../org/apache/cassandra/schema/IndexMetadata.java |  45 +-
 .../apache/cassandra/schema/KeyspaceMetadata.java  |  59 +-
 .../apache/cassandra/schema/ReplicationParams.java |  18 +
 src/java/org/apache/cassandra/schema/Schema.java   |   5 +
 .../apache/cassandra/schema/SchemaConstants.java   |  24 +
 .../org/apache/cassandra/schema/TableMetadata.java | 334 ++++++++-
 .../org/apache/cassandra/schema/TableParams.java   |  45 ++
 src/java/org/apache/cassandra/schema/Tables.java   |   7 +
 src/java/org/apache/cassandra/schema/Types.java    |  48 ++
 .../org/apache/cassandra/schema/ViewMetadata.java  |  77 +-
 src/java/org/apache/cassandra/schema/Views.java    |  12 +
 .../org/apache/cassandra/utils/FBUtilities.java    |   3 +
 test/unit/org/apache/cassandra/cql3/CQLTester.java |  21 +-
 .../cql3/statements/DescribeStatementTest.java     | 815 +++++++++++++++++++++
 .../apache/cassandra/db/SchemaCQLHelperTest.java   | 468 ++++++++++++
 .../apache/cassandra/db/TableCQLHelperTest.java    | 446 -----------
 .../apache/cassandra/utils/FBUtilitiesTest.java    |   1 -
 52 files changed, 4013 insertions(+), 1229 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 30d111e..9feaa5e 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.0-alpha5
+ * Add support for server side DESCRIBE statements (CASSANDRA-14825)
  * Fail startup if -Xmn is set when the G1 garbage collector is used (CASSANDRA-15839)
  * generateSplits method replaced the generateRandomTokens for ReplicationAwareTokenAllocator. (CASSANDRA-15877)
  * Several mbeans are not unregistered when dropping a keyspace and table (CASSANDRA-14888)
diff --git a/NEWS.txt b/NEWS.txt
index 351f80d..4d2252a 100644
--- a/NEWS.txt
+++ b/NEWS.txt
@@ -113,6 +113,7 @@ New features
     - Added Python 3 support so cqlsh and cqlshlib is now compatible with Python 2.7 and Python 3.6.
       Added --python option to cqlsh so users can specify the path to their chosen Python interpreter.
       See CASSANDRA-10190 for details.
+    - Support for server side DESCRIBE statements has been added. See CASSANDRA-14825
 
 Upgrading
 ---------
diff --git a/bin/cqlsh.py b/bin/cqlsh.py
index c4f38b6..3c250be 100644
--- a/bin/cqlsh.py
+++ b/bin/cqlsh.py
@@ -1244,208 +1244,16 @@ class Shell(cmd.Cmd):
         if valstr is not None:
             return cqlruleset.dequote_value(valstr)
 
-    def print_recreate_keyspace(self, ksdef, out):
-        out.write(ksdef.export_as_string())
-        out.write("\n")
-
-    def print_recreate_columnfamily(self, ksname, cfname, out):
-        """
-        Output CQL commands which should be pasteable back into a CQL session
-        to recreate the given table.
-
-        Writes output to the given out stream.
-        """
-        out.write(self.get_table_meta(ksname, cfname).export_as_string())
-        out.write("\n")
-
-    def print_recreate_index(self, ksname, idxname, out):
-        """
-        Output CQL commands which should be pasteable back into a CQL session
-        to recreate the given index.
-
-        Writes output to the given out stream.
-        """
-        out.write(self.get_index_meta(ksname, idxname).export_as_string())
-        out.write("\n")
-
-    def print_recreate_materialized_view(self, ksname, viewname, out):
-        """
-        Output CQL commands which should be pasteable back into a CQL session
-        to recreate the given materialized view.
-
-        Writes output to the given out stream.
-        """
-        out.write(self.get_view_meta(ksname, viewname).export_as_string())
-        out.write("\n")
-
-    def print_recreate_object(self, ks, name, out):
-        """
-        Output CQL commands which should be pasteable back into a CQL session
-        to recreate the given object (ks, table or index).
-
-        Writes output to the given out stream.
-        """
-        out.write(self.get_object_meta(ks, name).export_as_string())
-        out.write("\n")
-
-    def describe_keyspaces(self):
-        print('')
-        cmd.Cmd.columnize(self, protect_names(self.get_keyspace_names()))
-        print('')
-
-    def describe_keyspace(self, ksname):
-        print('')
-        self.print_recreate_keyspace(self.get_keyspace_meta(ksname), sys.stdout)
-        print('')
-
-    def describe_columnfamily(self, ksname, cfname):
-        if ksname is None:
-            ksname = self.current_keyspace
-        if ksname is None:
-            raise NoKeyspaceError("No keyspace specified and no current keyspace")
-        print('')
-        self.print_recreate_columnfamily(ksname, cfname, sys.stdout)
-        print('')
-
-    def describe_index(self, ksname, idxname):
-        print('')
-        self.print_recreate_index(ksname, idxname, sys.stdout)
-        print('')
-
-    def describe_materialized_view(self, ksname, viewname):
-        if ksname is None:
-            ksname = self.current_keyspace
-        if ksname is None:
-            raise NoKeyspaceError("No keyspace specified and no current keyspace")
-        print('')
-        self.print_recreate_materialized_view(ksname, viewname, sys.stdout)
-        print('')
-
-    def describe_object(self, ks, name):
-        print('')
-        self.print_recreate_object(ks, name, sys.stdout)
-        print('')
-
-    def describe_columnfamilies(self, ksname):
-        print('')
-        if ksname is None:
-            for k in self.get_keyspaces():
-                name = protect_name(k.name)
-                print('Keyspace %s' % (name,))
-                print('---------%s' % ('-' * len(name)))
-                cmd.Cmd.columnize(self, protect_names(self.get_columnfamily_names(k.name)))
-                print('')
-        else:
-            cmd.Cmd.columnize(self, protect_names(self.get_columnfamily_names(ksname)))
-            print('')
-
-    def describe_functions(self, ksname):
-        print('')
-        if ksname is None:
-            for ksmeta in self.get_keyspaces():
-                name = protect_name(ksmeta.name)
-                print('Keyspace %s' % (name,))
-                print('---------%s' % ('-' * len(name)))
-                self._columnize_unicode(list(ksmeta.functions.keys()))
-        else:
-            ksmeta = self.get_keyspace_meta(ksname)
-            self._columnize_unicode(list(ksmeta.functions.keys()))
-
-    def describe_function(self, ksname, functionname):
-        if ksname is None:
-            ksname = self.current_keyspace
-        if ksname is None:
-            raise NoKeyspaceError("No keyspace specified and no current keyspace")
-        print('')
-        ksmeta = self.get_keyspace_meta(ksname)
-        functions = [f for f in list(ksmeta.functions.values()) if f.name == functionname]
-        if len(functions) == 0:
-            raise FunctionNotFound("User defined function {} not found".format(functionname))
-        print("\n\n".join(func.export_as_string() for func in functions))
-        print('')
-
-    def describe_aggregates(self, ksname):
-        print('')
-        if ksname is None:
-            for ksmeta in self.get_keyspaces():
-                name = protect_name(ksmeta.name)
-                print('Keyspace %s' % (name,))
-                print('---------%s' % ('-' * len(name)))
-                self._columnize_unicode(list(ksmeta.aggregates.keys()))
-        else:
-            ksmeta = self.get_keyspace_meta(ksname)
-            self._columnize_unicode(list(ksmeta.aggregates.keys()))
-
-    def describe_aggregate(self, ksname, aggregatename):
-        if ksname is None:
-            ksname = self.current_keyspace
-        if ksname is None:
-            raise NoKeyspaceError("No keyspace specified and no current keyspace")
-        print('')
-        ksmeta = self.get_keyspace_meta(ksname)
-        aggregates = [f for f in list(ksmeta.aggregates.values()) if f.name == aggregatename]
-        if len(aggregates) == 0:
-            raise FunctionNotFound("User defined aggregate {} not found".format(aggregatename))
-        print("\n\n".join(aggr.export_as_string() for aggr in aggregates))
-        print('')
-
-    def describe_usertypes(self, ksname):
-        print('')
-        if ksname is None:
-            for ksmeta in self.get_keyspaces():
-                name = protect_name(ksmeta.name)
-                print('Keyspace %s' % (name,))
-                print('---------%s' % ('-' * len(name)))
-                self._columnize_unicode(list(ksmeta.user_types.keys()), quote=True)
-        else:
-            ksmeta = self.get_keyspace_meta(ksname)
-            self._columnize_unicode(list(ksmeta.user_types.keys()), quote=True)
-
-    def describe_usertype(self, ksname, typename):
-        if ksname is None:
-            ksname = self.current_keyspace
-        if ksname is None:
-            raise NoKeyspaceError("No keyspace specified and no current keyspace")
-        print('')
-        ksmeta = self.get_keyspace_meta(ksname)
-        try:
-            usertype = ksmeta.user_types[typename]
-        except KeyError:
-            raise UserTypeNotFound("User type {} not found".format(typename))
-        print(usertype.export_as_string())
-
-    def _columnize_unicode(self, name_list, quote=False):
+    def _columnize_unicode(self, name_list):
         """
         Used when columnizing identifiers that may contain unicode
         """
         names = [n for n in name_list]
-        if quote:
-            names = protect_names(names)
         cmd.Cmd.columnize(self, names)
         print('')
 
-    def describe_cluster(self):
-        print('\nCluster: %s' % self.get_cluster_name())
-        p = trim_if_present(self.get_partitioner(), 'org.apache.cassandra.dht.')
-        print('Partitioner: %s\n' % p)
-        # TODO: snitch?
-        # snitch = trim_if_present(self.get_snitch(), 'org.apache.cassandra.locator.')
-        # print 'Snitch: %s\n' % snitch
-        if self.current_keyspace is not None and self.current_keyspace != 'system':
-            print("Range ownership:")
-            ring = self.get_ring(self.current_keyspace)
-            for entry in list(ring.items()):
-                print(' %39s  [%s]' % (str(entry[0].value), ', '.join([host.address for host in entry[1]])))
-            print('')
-
-    def describe_schema(self, include_system=False):
-        print('')
-        for k in self.get_keyspaces():
-            if include_system or k.name not in cql3handling.SYSTEM_KEYSPACES:
-                self.print_recreate_keyspace(k, sys.stdout)
-                print('')
-
     def do_describe(self, parsed):
+
         """
         DESCRIBE [cqlsh only]
 
@@ -1464,7 +1272,6 @@ class Shell(cmd.Cmd):
           and the objects in it (such as tables, types, functions, etc.).
           In some cases, as the CQL interface matures, there will be some metadata
           about a keyspace that is not representable with CQL. That metadata will not be shown.
-
           The '<keyspacename>' argument may be omitted, in which case the current
           keyspace will be described.
 
@@ -1536,66 +1343,102 @@ class Shell(cmd.Cmd):
           Output CQL commands that could be used to recreate the entire object schema,
           where object can be either a keyspace or a table or an index or a materialized
           view (in this order).
-  """
-        what = parsed.matched[1][1].lower()
-        if what == 'functions':
-            self.describe_functions(self.current_keyspace)
-        elif what == 'function':
-            ksname = self.cql_unprotect_name(parsed.get_binding('ksname', None))
-            functionname = self.cql_unprotect_name(parsed.get_binding('udfname'))
-            self.describe_function(ksname, functionname)
-        elif what == 'aggregates':
-            self.describe_aggregates(self.current_keyspace)
-        elif what == 'aggregate':
-            ksname = self.cql_unprotect_name(parsed.get_binding('ksname', None))
-            aggregatename = self.cql_unprotect_name(parsed.get_binding('udaname'))
-            self.describe_aggregate(ksname, aggregatename)
-        elif what == 'keyspaces':
-            self.describe_keyspaces()
-        elif what == 'keyspace':
-            ksname = self.cql_unprotect_name(parsed.get_binding('ksname', ''))
-            if not ksname:
-                ksname = self.current_keyspace
-                if ksname is None:
-                    self.printerr('Not in any keyspace.')
-                    return
-            self.describe_keyspace(ksname)
-        elif what in ('columnfamily', 'table'):
-            ks = self.cql_unprotect_name(parsed.get_binding('ksname', None))
-            cf = self.cql_unprotect_name(parsed.get_binding('cfname'))
-            self.describe_columnfamily(ks, cf)
-        elif what == 'index':
-            ks = self.cql_unprotect_name(parsed.get_binding('ksname', None))
-            idx = self.cql_unprotect_name(parsed.get_binding('idxname', None))
-            self.describe_index(ks, idx)
-        elif what == 'materialized' and parsed.matched[2][1].lower() == 'view':
-            ks = self.cql_unprotect_name(parsed.get_binding('ksname', None))
-            mv = self.cql_unprotect_name(parsed.get_binding('mvname'))
-            self.describe_materialized_view(ks, mv)
-        elif what in ('columnfamilies', 'tables'):
-            self.describe_columnfamilies(self.current_keyspace)
-        elif what == 'types':
-            self.describe_usertypes(self.current_keyspace)
-        elif what == 'type':
-            ks = self.cql_unprotect_name(parsed.get_binding('ksname', None))
-            ut = self.cql_unprotect_name(parsed.get_binding('utname'))
-            self.describe_usertype(ks, ut)
-        elif what == 'cluster':
-            self.describe_cluster()
-        elif what == 'schema':
-            self.describe_schema(False)
-        elif what == 'full' and parsed.matched[2][1].lower() == 'schema':
-            self.describe_schema(True)
-        elif what:
-            ks = self.cql_unprotect_name(parsed.get_binding('ksname', None))
-            name = self.cql_unprotect_name(parsed.get_binding('cfname'))
-            if not name:
-                name = self.cql_unprotect_name(parsed.get_binding('idxname', None))
-            if not name:
-                name = self.cql_unprotect_name(parsed.get_binding('mvname', None))
-            self.describe_object(ks, name)
+        """
+        stmt = SimpleStatement(parsed.extract_orig(), consistency_level=cassandra.ConsistencyLevel.LOCAL_ONE, fetch_size=self.page_size if self.use_paging else None)
+        future = self.session.execute_async(stmt)
+
+        try:
+            result = future.result()
+
+            what = parsed.matched[1][1].lower()
+
+            if what in ('columnfamilies', 'tables', 'types', 'functions', 'aggregates'):
+                self.describe_list(result)
+            elif what == 'keyspaces':
+                self.describe_keyspaces(result)
+            elif what == 'cluster':
+                self.describe_cluster(result)
+            elif what:
+                self.describe_element(result)
+
+        except CQL_ERRORS as err:
+            err_msg = ensure_text(err.message if hasattr(err, 'message') else str(err))
+            self.printerr(err_msg.partition("message=")[2].strip('"'))
+        except Exception:
+            import traceback
+            self.printerr(traceback.format_exc())
+
+        if future:
+            if future.warnings:
+                self.print_warnings(future.warnings)
+
     do_desc = do_describe
 
+    def describe_keyspaces(self, rows):
+        """
+        Print the output for a DESCRIBE KEYSPACES query
+        """
+        names = list()
+        for row in rows:
+            names.append(str(row['name']))
+
+        print('')
+        cmd.Cmd.columnize(self, names)
+        print('')
+
+    def describe_list(self, rows):
+        """
+        Print the output for all the DESCRIBE queries for element names (e.g DESCRIBE TABLES, DESCRIBE FUNCTIONS ...)
+        """
+        keyspace = None
+        names = list()
+        for row in rows:
+            if row['keyspace_name'] != keyspace:
+                if keyspace is not None:
+                    self.print_keyspace_element_names(keyspace, names)
+
+                keyspace = row['keyspace_name']
+                names = list()
+
+            names.append(str(row['name']))
+
+        if keyspace is not None:
+            self.print_keyspace_element_names(keyspace, names)
+            print('')
+
+    def print_keyspace_element_names(self, keyspace, names):
+        print('')
+        if self.current_keyspace is None:
+            print('Keyspace %s' % (keyspace))
+            print('---------%s' % ('-' * len(keyspace)))
+        cmd.Cmd.columnize(self, names)
+
+    def describe_element(self, rows):
+        """
+        Print the output for all the DESCRIBE queries where an element name as been specified (e.g DESCRIBE TABLE, DESCRIBE INDEX ...)
+        """
+        for row in rows:
+            print('')
+            self.query_out.write(row['create_statement'])
+            print('')
+
+    def describe_cluster(self, rows):
+        """
+        Print the output for a DESCRIBE CLUSTER query.
+
+        If a specified keyspace was in use the returned ResultSet will contains a 'range_ownership' column,
+        otherwise not.
+        """
+        for row in rows:
+            print('\nCluster: %s' % row['cluster'])
+            print('Partitioner: %s' % row['partitioner'])
+            print('Snitch: %s\n' % row['snitch'])
+            if 'range_ownership' in row:
+                print("Range ownership:")
+                for entry in list(row['range_ownership'].items()):
+                    print(' %39s  [%s]' % (entry[0], ', '.join([host for host in entry[1]])))
+                print('')
+
     def do_copy(self, parsed):
         r"""
         COPY [cqlsh only]
diff --git a/doc/source/cql/ddl.rst b/doc/source/cql/ddl.rst
index 88df05b..bd578ca 100644
--- a/doc/source/cql/ddl.rst
+++ b/doc/source/cql/ddl.rst
@@ -850,3 +850,325 @@ Note that ``TRUNCATE TABLE foo`` is allowed for consistency with other DDL state
 that can be truncated currently and so the ``TABLE`` keyword can be omitted.
 
 Truncating a table permanently removes all existing data from the table, but without removing the table itself.
+
+.. _describe-statements:
+
+DESCRIBE
+^^^^^^^^
+
+Statements used to outputs information about the connected Cassandra cluster,
+or about the data objects stored in the cluster.
+
+.. warning:: Describe statement resultset that exceed the page size are paged. If a schema changes is detected between
+   two pages the query will fail with an ``InvalidRequestException``. It is safe to retry the whole ``DESCRIBE``
+   statement after such an error.
+ 
+DESCRIBE KEYSPACES
+""""""""""""""""""
+
+Output the names of all keyspaces.
+
+.. productionlist::
+   describe_keyspaces_statement: DESCRIBE KEYSPACES
+
+Returned columns:
+
+======================== ========= ====================================================================================
+ Columns                  Type      Description
+======================== ========= ====================================================================================
+ keyspace_name                text  The keyspace name
+ type                         text  The schema element type
+ name                         text  The schema element name
+======================== ========= ====================================================================================
+
+DESCRIBE KEYSPACE
+"""""""""""""""""
+
+Output CQL commands that could be used to recreate the given keyspace, and the objects in it 
+(such as tables, types, functions, etc.).
+
+.. productionlist::
+   describe_keyspace_statement: DESCRIBE [ONLY] KEYSPACE [`keyspace_name`] [WITH INTERNALS]
+
+The ``keyspace_name`` argument may be omitted, in which case the current keyspace will be described.
+
+If ``WITH INTERNALS`` is specified, the output contains the table IDs and is adopted to represent the DDL necessary 
+to "re-create" dropped columns.
+
+If ``ONLY`` is specified, only the DDL to recreate the keyspace will be created. All keyspace elements, like tables,
+types, functions, etc will be omitted.
+
+Returned columns:
+
+======================== ========= ====================================================================================
+ Columns                  Type      Description
+======================== ========= ====================================================================================
+ keyspace_name                text  The keyspace name
+ type                         text  The schema element type
+ name                         text  The schema element name
+ create_statement             text  The CQL statement to use to recreate the schema element
+======================== ========= ====================================================================================
+
+DESCRIBE TABLES
+"""""""""""""""
+
+Output the names of all tables in the current keyspace, or in all keyspaces if there is no current keyspace.
+
+.. productionlist::
+   describe_tables_statement: DESCRIBE TABLES
+
+Returned columns:
+
+======================== ========= ====================================================================================
+ Columns                  Type      Description
+======================== ========= ====================================================================================
+ keyspace_name                text  The keyspace name
+ type                         text  The schema element type
+ name                         text  The schema element name
+======================== ========= ====================================================================================
+
+DESCRIBE TABLE
+""""""""""""""
+
+Output CQL commands that could be used to recreate the given table.
+
+.. productionlist::
+   describe_table_statement: DESCRIBE TABLE [`keyspace_name`.]`table_name` [WITH INTERNALS]
+
+If `WITH INTERNALS` is specified, the output contains the table ID and is adopted to represent the DDL necessary
+to "re-create" dropped columns.
+
+Returned columns:
+
+======================== ========= ====================================================================================
+ Columns                  Type      Description
+======================== ========= ====================================================================================
+ keyspace_name                text  The keyspace name
+ type                         text  The schema element type
+ name                         text  The schema element name
+ create_statement             text  The CQL statement to use to recreate the schema element
+======================== ========= ====================================================================================
+
+DESCRIBE INDEX
+""""""""""""""
+
+Output the CQL command that could be used to recreate the given index.
+
+.. productionlist::
+   describe_index_statement: DESCRIBE INDEX [`keyspace_name`.]`index_name`
+
+Returned columns:
+
+======================== ========= ====================================================================================
+ Columns                  Type      Description
+======================== ========= ====================================================================================
+ keyspace_name                text  The keyspace name
+ type                         text  The schema element type
+ name                         text  The schema element name
+ create_statement             text  The CQL statement to use to recreate the schema element
+======================== ========= ====================================================================================
+
+
+DESCRIBE MATERIALIZED VIEW
+""""""""""""""""""""""""""
+
+Output the CQL command that could be used to recreate the given materialized view.
+
+.. productionlist::
+   describe_materialized_view_statement: DESCRIBE MATERIALIZED VIEW [`keyspace_name`.]`view_name`
+
+Returned columns:
+
+======================== ========= ====================================================================================
+ Columns                  Type      Description
+======================== ========= ====================================================================================
+ keyspace_name                text  The keyspace name
+ type                         text  The schema element type
+ name                         text  The schema element name
+ create_statement             text  The CQL statement to use to recreate the schema element
+======================== ========= ====================================================================================
+
+DESCRIBE CLUSTER
+""""""""""""""""
+
+Output information about the connected Cassandra cluster, such as the cluster name, and the partitioner and snitch
+in use. When you are connected to a non-system keyspace, also shows endpoint-range ownership information for
+the Cassandra ring.
+
+.. productionlist::
+   describe_cluster_statement: DESCRIBE CLUSTER
+
+Returned columns:
+
+======================== ====================== ========================================================================
+ Columns                  Type                   Description
+======================== ====================== ========================================================================
+ cluster                                   text  The cluster name
+ partitioner                               text  The partitioner being used by the cluster
+ snitch                                    text  The snitch being used by the cluster
+ range_ownership          map<text, list<text>>  The CQL statement to use to recreate the schema element
+======================== ====================== ========================================================================
+
+DESCRIBE SCHEMA
+"""""""""""""""
+
+Output CQL commands that could be used to recreate the entire (non-system) schema.
+Works as though "DESCRIBE KEYSPACE k" was invoked for each non-system keyspace
+
+.. productionlist::
+   describe_schema_statement: DESCRIBE [FULL] SCHEMA [WITH INTERNALS]
+
+Use ``DESCRIBE FULL SCHEMA`` to include the system keyspaces.
+
+If ``WITH INTERNALS`` is specified, the output contains the table IDs and is adopted to represent the DDL necessary
+to "re-create" dropped columns.
+
+Returned columns:
+
+======================== ========= ====================================================================================
+ Columns                  Type      Description
+======================== ========= ====================================================================================
+ keyspace_name                text  The keyspace name
+ type                         text  The schema element type
+ name                         text  The schema element name
+ create_statement             text  The CQL statement to use to recreate the schema element
+======================== ========= ====================================================================================
+
+DESCRIBE TYPES
+""""""""""""""
+
+Output the names of all user-defined-types in the current keyspace, or in all keyspaces if there is no current keyspace.
+
+.. productionlist::
+   describe_types_statement: DESCRIBE TYPES
+
+Returned columns:
+
+======================== ========= ====================================================================================
+ Columns                  Type      Description
+======================== ========= ====================================================================================
+ keyspace_name                text  The keyspace name
+ type                         text  The schema element type
+ name                         text  The schema element name
+======================== ========= ====================================================================================
+
+DESCRIBE TYPE
+"""""""""""""
+
+Output the CQL command that could be used to recreate the given user-defined-type.
+
+.. productionlist::
+   describe_type_statement: DESCRIBE TYPE [`keyspace_name`.]`type_name`
+
+Returned columns:
+
+======================== ========= ====================================================================================
+ Columns                  Type      Description
+======================== ========= ====================================================================================
+ keyspace_name                text  The keyspace name
+ type                         text  The schema element type
+ name                         text  The schema element name
+ create_statement             text  The CQL statement to use to recreate the schema element
+======================== ========= ====================================================================================
+
+DESCRIBE FUNCTIONS
+""""""""""""""""""
+
+Output the names of all user-defined-functions in the current keyspace, or in all keyspaces if there is no current
+keyspace.
+
+.. productionlist::
+   describe_functions_statement: DESCRIBE FUNCTIONS
+
+Returned columns:
+
+======================== ========= ====================================================================================
+ Columns                  Type      Description
+======================== ========= ====================================================================================
+ keyspace_name                text  The keyspace name
+ type                         text  The schema element type
+ name                         text  The schema element name
+======================== ========= ====================================================================================
+
+DESCRIBE FUNCTION
+"""""""""""""""""
+
+Output the CQL command that could be used to recreate the given user-defined-function.
+
+.. productionlist::
+   describe_function_statement: DESCRIBE FUNCTION [`keyspace_name`.]`function_name`
+
+Returned columns:
+
+======================== ========= ====================================================================================
+ Columns                  Type      Description
+======================== ========= ====================================================================================
+ keyspace_name                text  The keyspace name
+ type                         text  The schema element type
+ name                         text  The schema element name
+ create_statement             text  The CQL statement to use to recreate the schema element
+======================== ========= ====================================================================================
+
+DESCRIBE AGGREGATES
+"""""""""""""""""""
+
+Output the names of all user-defined-aggregates in the current keyspace, or in all keyspaces if there is no current
+keyspace.
+
+.. productionlist::
+   describe_aggregates_statement: DESCRIBE AGGREGATES
+
+Returned columns:
+
+======================== ========= ====================================================================================
+ Columns                  Type      Description
+======================== ========= ====================================================================================
+ keyspace_name                text  The keyspace name
+ type                         text  The schema element type
+ name                         text  The schema element name
+======================== ========= ====================================================================================
+
+DESCRIBE AGGREGATE
+""""""""""""""""""
+
+Output the CQL command that could be used to recreate the given user-defined-aggregate.
+
+.. productionlist::
+   describe_aggregate_statement: DESCRIBE AGGREGATE [`keyspace_name`.]`aggregate_name`
+
+Returned columns:
+
+======================== ========= ====================================================================================
+ Columns                  Type      Description
+======================== ========= ====================================================================================
+ keyspace_name                text  The keyspace name
+ type                         text  The schema element type
+ name                         text  The schema element name
+ create_statement             text  The CQL statement to use to recreate the schema element
+======================== ========= ====================================================================================
+
+DESCRIBE object
+"""""""""""""""
+
+Output CQL commands that could be used to recreate the entire object schema, where object can be either a keyspace 
+or a table or an index or a materialized view (in this order).
+
+.. productionlist::
+   describe_object_statement: DESCRIBE `object_name` [WITH INTERNALS]
+
+If ``WITH INTERNALS`` is specified and ``object_name`` represents a keyspace or table the output contains the table IDs
+and is adopted to represent the DDL necessary to "re-create" dropped columns.
+
+``object_name`` cannot be any of the "describe what" qualifiers like "cluster", "table", etc.
+
+Returned columns:
+
+======================== ========= ====================================================================================
+ Columns                  Type      Description
+======================== ========= ====================================================================================
+ keyspace_name                text  The keyspace name
+ type                         text  The schema element type
+ name                         text  The schema element name
+ create_statement             text  The CQL statement to use to recreate the schema element
+======================== ========= ====================================================================================
+
diff --git a/pylib/cqlshlib/cqlshhandling.py b/pylib/cqlshlib/cqlshhandling.py
index 193ca88..aa1fbc0 100644
--- a/pylib/cqlshlib/cqlshhandling.py
+++ b/pylib/cqlshlib/cqlshhandling.py
@@ -78,12 +78,12 @@ cqlsh_special_cmd_command_syntax_rules = r'''
 
 cqlsh_describe_cmd_syntax_rules = r'''
 <describeCommand> ::= ( "DESCRIBE" | "DESC" )
-                                  ( "FUNCTIONS"
+                                ( ( "FUNCTIONS"
                                   | "FUNCTION" udf=<anyFunctionName>
                                   | "AGGREGATES"
                                   | "AGGREGATE" uda=<userAggregateName>
                                   | "KEYSPACES"
-                                  | "KEYSPACE" ksname=<keyspaceName>?
+                                  | "ONLY"? "KEYSPACE" ksname=<keyspaceName>?
                                   | ( "COLUMNFAMILY" | "TABLE" ) cf=<columnFamilyName>
                                   | "INDEX" idx=<indexName>
                                   | "MATERIALIZED" "VIEW" mv=<materializedViewName>
@@ -92,7 +92,9 @@ cqlsh_describe_cmd_syntax_rules = r'''
                                   | "CLUSTER"
                                   | "TYPES"
                                   | "TYPE" ut=<userTypeName>
-                                  | (ksname=<keyspaceName> | cf=<columnFamilyName> | idx=<indexName> | mv=<materializedViewName>))
+                                  | (ksname=<keyspaceName> | cf=<columnFamilyName> | idx=<indexName> | mv=<materializedViewName>)
+                                  ) ("WITH" "INTERNALS")?
+                                )
                     ;
 '''
 
diff --git a/pylib/cqlshlib/test/test_cqlsh_output.py b/pylib/cqlshlib/test/test_cqlsh_output.py
index 1182bd6..accf3ed 100644
--- a/pylib/cqlshlib/test/test_cqlsh_output.py
+++ b/pylib/cqlshlib/test/test_cqlsh_output.py
@@ -637,7 +637,7 @@ class TestCqlshOutput(BaseTestCase):
     def test_describe_columnfamily_output(self):
         # we can change these to regular expressions if/when it makes sense
         # to do so; these will likely be subject to lots of adjustments.
-        
+
         # note columns are now comparator-ordered instead of original-order.
         table_desc3 = dedent("""
             CREATE TABLE %s.has_all_types (
@@ -666,13 +666,13 @@ class TestCqlshOutput(BaseTestCase):
                 AND compression = {'chunk_length_in_kb': '16', 'class': 'org.apache.cassandra.io.compress.LZ4Compressor'}
                 AND crc_check_chance = 1.0
                 AND default_time_to_live = 0
+                AND extensions = {}
                 AND gc_grace_seconds = 864000
                 AND max_index_interval = 2048
                 AND memtable_flush_period_in_ms = 0
                 AND min_index_interval = 128
                 AND read_repair = 'BLOCKING'
-                AND speculative_retry = '99p';
-            """ % quote_name(get_keyspace()))
+                AND speculative_retry = '99p';""" % quote_name(get_keyspace()))
 
         with testrun_cqlsh(tty=True, env=self.default_env) as c:
             for cmdword in ('describe table', 'desc columnfamily'):
@@ -730,13 +730,14 @@ class TestCqlshOutput(BaseTestCase):
             \n
             Cluster: [ ] (?P<clustername> .* ) \n
             Partitioner: [ ] (?P<partitionername> .* ) \n
+            Snitch: [ ] (?P<snitchname> .* ) \n
             \n
         '''
 
         ringinfo_re = r'''
             Range[ ]ownership: \n
             (
-              [ ] .*? [ ][ ] \[ ( \d+ \. ){3} \d+ \] \n
+              [ ] .*? [ ][ ] \[ ( \d+ \. ){3} \d+ : \d+ \] \n
             )+
             \n
         '''
diff --git a/src/antlr/Lexer.g b/src/antlr/Lexer.g
index b9d4c1e..e9ca5eb 100644
--- a/src/antlr/Lexer.g
+++ b/src/antlr/Lexer.g
@@ -86,11 +86,14 @@ K_TRUNCATE:    T R U N C A T E;
 K_DELETE:      D E L E T E;
 K_IN:          I N;
 K_CREATE:      C R E A T E;
+K_SCHEMA:      S C H E M A;
 K_KEYSPACE:    ( K E Y S P A C E
-                 | S C H E M A );
+                 | K_SCHEMA );
 K_KEYSPACES:   K E Y S P A C E S;
 K_COLUMNFAMILY:( C O L U M N F A M I L Y
                  | T A B L E );
+K_TABLES:      ( C O L U M N F A M I L I E S
+                 | T A B L E S );
 K_MATERIALIZED:M A T E R I A L I Z E D;
 K_VIEW:        V I E W;
 K_INDEX:       I N D E X;
@@ -108,6 +111,7 @@ K_ALTER:       A L T E R;
 K_RENAME:      R E N A M E;
 K_ADD:         A D D;
 K_TYPE:        T Y P E;
+K_TYPES:       T Y P E S;
 K_COMPACT:     C O M P A C T;
 K_STORAGE:     S T O R A G E;
 K_ORDER:       O R D E R;
@@ -120,6 +124,9 @@ K_IF:          I F;
 K_IS:          I S;
 K_CONTAINS:    C O N T A I N S;
 K_GROUP:       G R O U P;
+K_CLUSTER:     C L U S T E R;
+K_INTERNALS:   I N T E R N A L S;
+K_ONLY:        O N L Y;
 
 K_GRANT:       G R A N T;
 K_ALL:         A L L;
@@ -191,6 +198,7 @@ K_FROZEN:      F R O Z E N;
 K_FUNCTION:    F U N C T I O N;
 K_FUNCTIONS:   F U N C T I O N S;
 K_AGGREGATE:   A G G R E G A T E;
+K_AGGREGATES:  A G G R E G A T E S;
 K_SFUNC:       S F U N C;
 K_STYPE:       S T Y P E;
 K_FINALFUNC:   F I N A L F U N C;
diff --git a/src/antlr/Parser.g b/src/antlr/Parser.g
index 1cfc7d1..4b7e5bf 100644
--- a/src/antlr/Parser.g
+++ b/src/antlr/Parser.g
@@ -246,6 +246,7 @@ cqlStatement returns [CQLStatement.Raw stmt]
     | st38=createMaterializedViewStatement { $stmt = st38; }
     | st39=dropMaterializedViewStatement   { $stmt = st39; }
     | st40=alterMaterializedViewStatement  { $stmt = st40; }
+    | st41=describeStatement               { $stmt = st41; }
     ;
 
 /*
@@ -1299,6 +1300,47 @@ userPassword[RoleOptions opts]
     :  K_PASSWORD v=STRING_LITERAL { opts.setOption(IRoleManager.Option.PASSWORD, $v.text); }
     ;
 
+/**
+ * DESCRIBE statement(s)
+ *
+ * Must be in sync with the javadoc for org.apache.cassandra.cql3.statements.DescribeStatement and the
+ * cqlsh syntax definition in for cqlsh_describe_cmd_syntax_rules pylib/cqlshlib/cqlshhandling.py.
+ */
+describeStatement returns [DescribeStatement stmt]
+    @init {
+        boolean fullSchema = false;
+        boolean pending = false;
+        boolean config = false;
+        boolean only = false;
+        QualifiedName gen = new QualifiedName();
+    }
+    : ( K_DESCRIBE | K_DESC )
+    ( (K_CLUSTER)=> K_CLUSTER                     { $stmt = DescribeStatement.cluster(); }
+    | (K_FULL { fullSchema=true; })? K_SCHEMA     { $stmt = DescribeStatement.schema(fullSchema); }
+    | (K_KEYSPACES)=> K_KEYSPACES                 { $stmt = DescribeStatement.keyspaces(); }
+    | (K_ONLY { only=true; })? K_KEYSPACE ( ks=keyspaceName )?
+                                                  { $stmt = DescribeStatement.keyspace(ks, only); }
+    | (K_TABLES) => K_TABLES                      { $stmt = DescribeStatement.tables(); }
+    | K_COLUMNFAMILY cf=columnFamilyName          { $stmt = DescribeStatement.table(cf.getKeyspace(), cf.getName()); }
+    | K_INDEX idx=columnFamilyName                { $stmt = DescribeStatement.index(idx.getKeyspace(), idx.getName()); }
+    | K_MATERIALIZED K_VIEW view=columnFamilyName { $stmt = DescribeStatement.view(view.getKeyspace(), view.getName()); }
+    | (K_TYPES) => K_TYPES                        { $stmt = DescribeStatement.types(); }
+    | K_TYPE tn=userTypeName                      { $stmt = DescribeStatement.type(tn.getKeyspace(), tn.getStringTypeName()); }
+    | (K_FUNCTIONS) => K_FUNCTIONS                { $stmt = DescribeStatement.functions(); }
+    | K_FUNCTION fn=functionName                  { $stmt = DescribeStatement.function(fn.keyspace, fn.name); }
+    | (K_AGGREGATES) => K_AGGREGATES              { $stmt = DescribeStatement.aggregates(); }
+    | K_AGGREGATE ag=functionName                 { $stmt = DescribeStatement.aggregate(ag.keyspace, ag.name); }
+    | ( ( ksT=IDENT                       { gen.setKeyspace($ksT.text, false);}
+          | ksT=QUOTED_NAME                 { gen.setKeyspace($ksT.text, true);}
+          | ksK=unreserved_keyword          { gen.setKeyspace(ksK, false);} ) '.' )?
+        ( tT=IDENT                          { gen.setName($tT.text, false);}
+        | tT=QUOTED_NAME                    { gen.setName($tT.text, true);}
+        | tK=unreserved_keyword             { gen.setName(tK, false);} )
+                                                    { $stmt = DescribeStatement.generic(gen.getKeyspace(), gen.getName()); }
+    )
+    ( K_WITH K_INTERNALS { $stmt.withInternalDetails(); } )?
+    ;
+
 /** DEFINITIONS **/
 
 // Column Identifiers.  These need to be treated differently from other
@@ -1812,10 +1854,13 @@ unreserved_function_keyword returns [String str]
 basic_unreserved_keyword returns [String str]
     : k=( K_KEYS
         | K_AS
+        | K_CLUSTER
         | K_CLUSTERING
         | K_COMPACT
         | K_STORAGE
+        | K_TABLES
         | K_TYPE
+        | K_TYPES
         | K_VALUES
         | K_MAP
         | K_LIST
@@ -1838,12 +1883,15 @@ basic_unreserved_keyword returns [String str]
         | K_CUSTOM
         | K_TRIGGER
         | K_CONTAINS
+        | K_INTERNALS
+        | K_ONLY
         | K_STATIC
         | K_FROZEN
         | K_TUPLE
         | K_FUNCTION
         | K_FUNCTIONS
         | K_AGGREGATE
+        | K_AGGREGATES
         | K_SFUNC
         | K_STYPE
         | K_FINALFUNC
diff --git a/src/java/org/apache/cassandra/audit/AuditLogEntryType.java b/src/java/org/apache/cassandra/audit/AuditLogEntryType.java
index 4eb112b..ccf0169 100644
--- a/src/java/org/apache/cassandra/audit/AuditLogEntryType.java
+++ b/src/java/org/apache/cassandra/audit/AuditLogEntryType.java
@@ -59,6 +59,7 @@ public enum AuditLogEntryType
     ALTER_TYPE(AuditLogEntryCategory.DDL),
     CREATE_ROLE(AuditLogEntryCategory.DCL),
     USE_KEYSPACE(AuditLogEntryCategory.OTHER),
+    DESCRIBE(AuditLogEntryCategory.OTHER),
 
     /*
      * Common Audit Log Entry Types
diff --git a/src/java/org/apache/cassandra/cql3/CQL3Type.java b/src/java/org/apache/cassandra/cql3/CQL3Type.java
index ee2db68..8f8df42 100644
--- a/src/java/org/apache/cassandra/cql3/CQL3Type.java
+++ b/src/java/org/apache/cassandra/cql3/CQL3Type.java
@@ -467,15 +467,25 @@ public interface CQL3Type
         @Override
         public String toString()
         {
+            return toString(true);
+        }
+
+        public String toString(boolean withFrozen)
+        {
             StringBuilder sb = new StringBuilder();
-            sb.append("frozen<tuple<");
+            if (withFrozen)
+                sb.append("frozen<");
+            sb.append("tuple<");
             for (int i = 0; i < type.size(); i++)
             {
                 if (i > 0)
                     sb.append(", ");
                 sb.append(type.type(i).asCQL3Type());
             }
-            sb.append(">>");
+            sb.append('>');
+            if (withFrozen)
+                sb.append('>');
+
             return sb.toString();
         }
     }
diff --git a/src/java/org/apache/cassandra/cql3/CqlBuilder.java b/src/java/org/apache/cassandra/cql3/CqlBuilder.java
new file mode 100644
index 0000000..d3ca1da
--- /dev/null
+++ b/src/java/org/apache/cassandra/cql3/CqlBuilder.java
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.cql3;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.TreeMap;
+
+import org.apache.cassandra.cql3.functions.FunctionName;
+import org.apache.cassandra.db.marshal.AbstractType;
+
+/**
+ * Utility class to facilitate the creation of the CQL representation of {@code SchemaElements}.
+ */
+public final class CqlBuilder
+{
+    @FunctionalInterface
+    public static interface Appender<T>
+    {
+        public void appendTo(CqlBuilder builder, T obj);
+    }
+
+    /**
+     * The new line character
+     */
+    private static final char NEW_LINE = '\n';
+
+    private static final String INDENTATION = "    ";
+
+    private final StringBuilder builder;
+
+    private int indent;
+
+    private boolean isNewLine = false;
+
+    public CqlBuilder()
+    {
+        this(64);
+    }
+
+    public CqlBuilder(int capacity)
+    {
+        builder = new StringBuilder(capacity);
+    }
+
+    public CqlBuilder append(String str)
+    {
+        indentIfNeeded();
+        builder.append(str);
+        return this;
+    }
+
+    public CqlBuilder appendQuotingIfNeeded(String str)
+    {
+        return append(ColumnIdentifier.maybeQuote(str));
+    }
+
+    public CqlBuilder appendWithSingleQuotes(String str)
+    {
+        indentIfNeeded();
+
+        builder.append('\'')
+               .append(str.replaceAll("'", "''"))
+               .append('\'');
+
+        return this;
+    }
+
+    public CqlBuilder append(char c)
+    {
+        indentIfNeeded();
+        builder.append(c);
+        return this;
+    }
+
+    public CqlBuilder append(boolean b)
+    {
+        indentIfNeeded();
+        builder.append(b);
+        return this;
+    }
+
+    public CqlBuilder append(int i)
+    {
+        indentIfNeeded();
+        builder.append(i);
+        return this;
+    }
+
+    public CqlBuilder append(long l)
+    {
+        indentIfNeeded();
+        builder.append(l);
+        return this;
+    }
+
+    public CqlBuilder append(float f)
+    {
+        indentIfNeeded();
+        builder.append(f);
+        return this;
+    }
+
+    public CqlBuilder append(double d)
+    {
+        indentIfNeeded();
+        builder.append(d);
+        return this;
+    }
+
+    public CqlBuilder newLine()
+    {
+        builder.append(NEW_LINE);
+        isNewLine = true;
+        return this;
+    }
+
+    public CqlBuilder append(AbstractType<?> type)
+    {
+        return append(type.asCQL3Type().toString());
+    }
+
+    public CqlBuilder append(ColumnIdentifier column)
+    {
+        return append(column.toCQLString());
+    }
+
+    public CqlBuilder append(FunctionName name)
+    {
+        name.appendCqlTo(this);
+        return this;
+    }
+
+    public CqlBuilder append(Map<String, String> map)
+    {
+        return append(map, true);
+    }
+
+    public CqlBuilder append(Map<String, String> map, boolean quoteValue)
+    {
+        indentIfNeeded();
+
+        builder.append('{');
+
+        Iterator<Entry<String, String>> iter = new TreeMap<>(map).entrySet()
+                                                                 .iterator();
+        while(iter.hasNext())
+        {
+            Entry<String, String> e = iter.next();
+            appendWithSingleQuotes(e.getKey());
+            builder.append(": ");
+            if (quoteValue)
+                appendWithSingleQuotes(e.getValue());
+            else
+                builder.append(e.getValue());
+
+            if (iter.hasNext())
+                builder.append(", ");
+        }
+        builder.append('}');
+        return this;
+    }
+
+    public <T> CqlBuilder appendWithSeparators(Iterable<T> iterable, Appender<T> appender, String separator)
+    {
+        return appendWithSeparators(iterable.iterator(), appender, separator);
+    }
+
+    public <T> CqlBuilder appendWithSeparators(Iterator<T> iter, Appender<T> appender, String separator)
+    {
+        while (iter.hasNext())
+        {
+            appender.appendTo(this, iter.next());
+            if (iter.hasNext())
+            {
+                append(separator);
+            }
+        }
+        return this;
+    }
+
+    public CqlBuilder increaseIndent()
+    {
+        indent++;
+        return this;
+    }
+
+    public CqlBuilder decreaseIndent()
+    {
+        if (indent > 0)
+            indent--;
+
+        return this;
+    }
+
+    private void indentIfNeeded()
+    {
+        if (isNewLine)
+        {
+            for (int i = 0; i < indent; i++)
+                builder.append(INDENTATION);
+            isNewLine = false;
+        }
+    }
+
+    public String toString()
+    {
+        return builder.toString();
+    }
+}
diff --git a/src/java/org/apache/cassandra/cql3/ResultSet.java b/src/java/org/apache/cassandra/cql3/ResultSet.java
index 7972061..4f60445 100644
--- a/src/java/org/apache/cassandra/cql3/ResultSet.java
+++ b/src/java/org/apache/cassandra/cql3/ResultSet.java
@@ -216,7 +216,14 @@ public class ResultSet
         // when re-preparing we create the intermediate object
         public ResultMetadata(List<ColumnSpecification> names)
         {
-            this(computeResultMetadataId(names), EnumSet.noneOf(Flag.class), names, names.size(), null);
+            this(names, null);
+        }
+
+        // Problem is that we compute the metadata from the columns on creation;
+        // when re-preparing we create the intermediate object
+        public ResultMetadata(List<ColumnSpecification> names, PagingState pagingState)
+        {
+            this(computeResultMetadataId(names), EnumSet.noneOf(Flag.class), names, names.size(), pagingState);
             if (!names.isEmpty() && ColumnSpecification.allInSameTable(names))
                 flags.add(Flag.GLOBAL_TABLES_SPEC);
         }
diff --git a/src/java/org/apache/cassandra/cql3/SchemaElement.java b/src/java/org/apache/cassandra/cql3/SchemaElement.java
new file mode 100644
index 0000000..ec0dbee
--- /dev/null
+++ b/src/java/org/apache/cassandra/cql3/SchemaElement.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.cql3;
+
+import java.util.Comparator;
+import java.util.Locale;
+
+/**
+ * A schema element (keyspace, udt, udf, uda, table, index, view).
+ */
+public interface SchemaElement
+{
+    /**
+     * Comparator used to sort {@code Describable} name.
+     */
+    Comparator<SchemaElement> NAME_COMPARATOR = (o1, o2) -> o1.elementName().compareToIgnoreCase(o2.elementName());
+
+    enum SchemaElementType
+    {
+        KEYSPACE,
+        TYPE,
+        FUNCTION,
+        AGGREGATE,
+        TABLE,
+        INDEX,
+        MATERIALIZED_VIEW;
+
+        @Override
+        public String toString()
+        {
+            return super.toString().toLowerCase(Locale.US);
+        }
+    }
+
+    /**
+     * Return the schema element type
+     *
+     * @return the schema element type
+     */
+    SchemaElementType elementType();
+
+    /**
+     * Returns the CQL name of the keyspace to which this schema element belong.
+     *
+     * @return the keyspace name.
+     */
+    String elementKeyspace();
+
+    /**
+     * Returns the CQL name of this schema element.
+     *
+     * @return the name of this schema element.
+     */
+    String elementName();
+
+    default String elementNameQuotedIfNeeded()
+    {
+        String name = elementName();
+        if (elementType() == SchemaElementType.FUNCTION
+                || elementType() == SchemaElementType.AGGREGATE)
+        {
+            int index = name.indexOf('(');
+            return ColumnIdentifier.maybeQuote(name.substring(0, index)) + name.substring(index);
+        }
+
+        return ColumnIdentifier.maybeQuote(name);
+    }
+
+    default String elementKeyspaceQuotedIfNeeded()
+    {
+        return ColumnIdentifier.maybeQuote(elementKeyspace());
+    }
+
+    /**
+     * Returns a CQL representation of this element
+     *
+     * @param withInternals if the internals part of the CQL should be exposed.
+     * @return a CQL representation of this element
+     */
+    String toCqlString(boolean withInternals);
+}
diff --git a/src/java/org/apache/cassandra/cql3/functions/AbstractFunction.java b/src/java/org/apache/cassandra/cql3/functions/AbstractFunction.java
index aea0d01..940f0a4 100644
--- a/src/java/org/apache/cassandra/cql3/functions/AbstractFunction.java
+++ b/src/java/org/apache/cassandra/cql3/functions/AbstractFunction.java
@@ -24,7 +24,10 @@ import com.google.common.base.Objects;
 
 import org.apache.cassandra.cql3.AssignmentTestable;
 import org.apache.cassandra.cql3.CQL3Type;
+import org.apache.cassandra.cql3.CQL3Type.Tuple;
 import org.apache.cassandra.cql3.ColumnSpecification;
+import org.apache.cassandra.cql3.CqlBuilder;
+import org.apache.cassandra.cql3.CqlBuilder.Appender;
 import org.apache.cassandra.db.marshal.AbstractType;
 
 import org.apache.commons.lang3.text.StrBuilder;
@@ -118,16 +121,41 @@ public abstract class AbstractFunction implements Function
     @Override
     public String toString()
     {
-        StringBuilder sb = new StringBuilder();
-        sb.append(name).append(" : (");
-        for (int i = 0; i < argTypes.size(); i++)
-        {
-            if (i > 0)
-                sb.append(", ");
-            sb.append(argTypes.get(i).asCQL3Type());
-        }
-        sb.append(") -> ").append(returnType.asCQL3Type());
-        return sb.toString();
+        return new CqlBuilder().append(name)
+                              .append(" : (")
+                              .appendWithSeparators(argTypes, (b, t) -> b.append(toCqlString(t)), ", ")
+                              .append(") -> ")
+                              .append(returnType)
+                              .toString();
+    }
+
+    public String elementKeyspace()
+    {
+        return name.keyspace;
+    }
+
+    public String elementName()
+    {
+        return new CqlBuilder().append(name.name)
+                               .append('(')
+                               .appendWithSeparators(argTypes, (b, t) -> b.append(toCqlString(t)), ", ")
+                               .append(')')
+                               .toString();
+    }
+
+    /**
+     * Converts the specified type into its CQL representation.
+     *
+     * <p>For user function and aggregates tuples need to be handle in a special way as they are frozen by nature
+     * but the frozen keyword should not appear in their CQL definition.</p>
+     *
+     * @param type the type
+     * @return the CQL representation of the specified type
+     */
+    protected String toCqlString(AbstractType<?> type)
+    {
+        return type.isTuple() ? ((Tuple) type.asCQL3Type()).toString(false)
+                              : type.asCQL3Type().toString();
     }
 
     @Override
diff --git a/src/java/org/apache/cassandra/cql3/functions/FunctionName.java b/src/java/org/apache/cassandra/cql3/functions/FunctionName.java
index be71d52..472a7ff 100644
--- a/src/java/org/apache/cassandra/cql3/functions/FunctionName.java
+++ b/src/java/org/apache/cassandra/cql3/functions/FunctionName.java
@@ -19,10 +19,14 @@ package org.apache.cassandra.cql3.functions;
 
 import com.google.common.base.Objects;
 
+import org.apache.cassandra.cql3.CqlBuilder;
 import org.apache.cassandra.schema.SchemaConstants;
 
 public final class FunctionName
 {
+    // We special case the token function because that's the only function which name is a reserved keyword
+    private static final FunctionName TOKEN_FUNCTION_NAME = FunctionName.nativeFunction("token");
+
     public final String keyspace;
     public final String name;
 
@@ -79,4 +83,21 @@ public final class FunctionName
     {
         return keyspace == null ? name : keyspace + "." + name;
     }
+
+    public void appendCqlTo(CqlBuilder builder)
+    {
+        if (equalsNativeFunction(TOKEN_FUNCTION_NAME))
+        {
+            builder.append(name);
+        }
+        else
+        {
+            if (keyspace != null)
+            {
+                builder.appendQuotingIfNeeded(keyspace)
+                       .append('.');
+            }
+            builder.appendQuotingIfNeeded(name);
+        }
+    }
 }
diff --git a/src/java/org/apache/cassandra/cql3/functions/UDAggregate.java b/src/java/org/apache/cassandra/cql3/functions/UDAggregate.java
index 96cc556..db5859f 100644
--- a/src/java/org/apache/cassandra/cql3/functions/UDAggregate.java
+++ b/src/java/org/apache/cassandra/cql3/functions/UDAggregate.java
@@ -25,6 +25,8 @@ import com.google.common.collect.Lists;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.cassandra.cql3.CqlBuilder;
+import org.apache.cassandra.cql3.SchemaElement;
 import org.apache.cassandra.cql3.functions.types.TypeCodec;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.db.marshal.UserType;
@@ -41,7 +43,7 @@ import static com.google.common.collect.Iterables.transform;
 /**
  * Base class for user-defined-aggregates.
  */
-public class UDAggregate extends AbstractFunction implements AggregateFunction
+public class UDAggregate extends AbstractFunction implements AggregateFunction, SchemaElement
 {
     protected static final Logger logger = LoggerFactory.getLogger(UDAggregate.class);
 
@@ -322,4 +324,41 @@ public class UDAggregate extends AbstractFunction implements AggregateFunction
     {
         return Objects.hashCode(name, Functions.typeHashCode(argTypes), Functions.typeHashCode(returnType), stateFunction, finalFunction, stateType, initcond);
     }
+
+    @Override
+    public SchemaElementType elementType()
+    {
+        return SchemaElementType.AGGREGATE;
+    }
+
+    @Override
+    public String toCqlString(boolean withInternals)
+    {
+        CqlBuilder builder = new CqlBuilder();
+        builder.append("CREATE AGGREGATE ")
+               .append(name())
+               .append('(')
+               .appendWithSeparators(argTypes, (b, t) -> b.append(toCqlString(t)), ", ")
+               .append(')')
+               .newLine()
+               .increaseIndent()
+               .append("SFUNC ")
+               .append(stateFunction().name().name)
+               .newLine()
+               .append("STYPE ")
+               .append(toCqlString(stateType()));
+
+        if (finalFunction() != null)
+            builder.newLine()
+                   .append("FINALFUNC ")
+                   .append(finalFunction().name().name);
+
+        if (initialCondition() != null)
+            builder.newLine()
+                   .append("INITCOND ")
+                   .append(stateType().asCQL3Type().toCQLLiteral(initialCondition(), ProtocolVersion.CURRENT));
+
+        return builder.append(";")
+                      .toString();
+    }
 }
diff --git a/src/java/org/apache/cassandra/cql3/functions/UDFunction.java b/src/java/org/apache/cassandra/cql3/functions/UDFunction.java
index f0fd8d9..bceb085 100644
--- a/src/java/org/apache/cassandra/cql3/functions/UDFunction.java
+++ b/src/java/org/apache/cassandra/cql3/functions/UDFunction.java
@@ -45,16 +45,15 @@ import org.slf4j.LoggerFactory;
 import org.apache.cassandra.config.Config;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.cql3.ColumnIdentifier;
+import org.apache.cassandra.cql3.CqlBuilder;
+import org.apache.cassandra.cql3.SchemaElement;
 import org.apache.cassandra.cql3.functions.types.DataType;
 import org.apache.cassandra.cql3.functions.types.TypeCodec;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.db.marshal.UserType;
 import org.apache.cassandra.exceptions.FunctionExecutionException;
 import org.apache.cassandra.exceptions.InvalidRequestException;
-import org.apache.cassandra.schema.Difference;
-import org.apache.cassandra.schema.Functions;
-import org.apache.cassandra.schema.KeyspaceMetadata;
-import org.apache.cassandra.schema.Schema;
+import org.apache.cassandra.schema.*;
 import org.apache.cassandra.service.ClientWarn;
 import org.apache.cassandra.tracing.Tracing;
 import org.apache.cassandra.transport.ProtocolVersion;
@@ -66,7 +65,7 @@ import static com.google.common.collect.Iterables.transform;
 /**
  * Base class for User Defined Functions.
  */
-public abstract class UDFunction extends AbstractFunction implements ScalarFunction
+public abstract class UDFunction extends AbstractFunction implements ScalarFunction, SchemaElement
 {
     protected static final Logger logger = LoggerFactory.getLogger(UDFunction.class);
 
@@ -303,6 +302,48 @@ public abstract class UDFunction extends AbstractFunction implements ScalarFunct
         };
     }
 
+    @Override
+    public SchemaElementType elementType()
+    {
+        return SchemaElementType.FUNCTION;
+    }
+
+    @Override
+    public String toCqlString(boolean withInternals)
+    {
+        CqlBuilder builder = new CqlBuilder();
+        builder.append("CREATE FUNCTION ")
+               .append(name())
+               .append("(");
+
+        for (int i = 0, m = argNames().size(); i < m; i++)
+        {
+            if (i > 0)
+                builder.append(", ");
+            builder.append(argNames().get(i))
+                   .append(' ')
+                   .append(toCqlString(argTypes().get(i)));
+        }
+
+        builder.append(')')
+               .newLine()
+               .increaseIndent()
+               .append(isCalledOnNullInput() ? "CALLED" : "RETURNS NULL")
+               .append(" ON NULL INPUT")
+               .newLine()
+               .append("RETURNS ")
+               .append(toCqlString(returnType()))
+               .newLine()
+               .append("LANGUAGE ")
+               .append(language())
+               .newLine()
+               .append("AS $$")
+               .append(body())
+               .append("$$;");
+
+        return builder.toString();
+    }
+
     public final ByteBuffer execute(ProtocolVersion protocolVersion, List<ByteBuffer> parameters)
     {
         assertUdfsEnabled(language);
diff --git a/src/java/org/apache/cassandra/cql3/statements/DescribeStatement.java b/src/java/org/apache/cassandra/cql3/statements/DescribeStatement.java
new file mode 100644
index 0000000..48b4160
--- /dev/null
+++ b/src/java/org/apache/cassandra/cql3/statements/DescribeStatement.java
@@ -0,0 +1,750 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.cql3.statements;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.*;
+import java.util.function.BiFunction;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import com.google.common.collect.ImmutableList;
+
+import org.apache.cassandra.audit.AuditLogContext;
+import org.apache.cassandra.audit.AuditLogEntryType;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.cql3.*;
+import org.apache.cassandra.cql3.functions.FunctionName;
+import org.apache.cassandra.db.KeyspaceNotDefinedException;
+import org.apache.cassandra.db.marshal.ListType;
+import org.apache.cassandra.db.marshal.MapType;
+import org.apache.cassandra.db.marshal.UTF8Type;
+import org.apache.cassandra.db.virtual.VirtualKeyspaceRegistry;
+import org.apache.cassandra.exceptions.InvalidRequestException;
+import org.apache.cassandra.exceptions.RequestExecutionException;
+import org.apache.cassandra.exceptions.RequestValidationException;
+import org.apache.cassandra.io.util.DataInputBuffer;
+import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.schema.*;
+import org.apache.cassandra.service.ClientState;
+import org.apache.cassandra.service.QueryState;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.service.pager.PagingState;
+import org.apache.cassandra.transport.messages.ResultMessage;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.UUIDGen;
+
+import static java.lang.String.format;
+import static org.apache.cassandra.cql3.statements.RequestValidations.checkNotEmpty;
+import static org.apache.cassandra.cql3.statements.RequestValidations.checkNotNull;
+import static org.apache.cassandra.cql3.statements.RequestValidations.checkTrue;
+import static org.apache.cassandra.cql3.statements.RequestValidations.invalidRequest;
+import static org.apache.cassandra.utils.ByteBufferUtil.bytes;
+
+/**
+ * The differents <code>DESCRIBE</code> statements parsed from a CQL statement.
+ */
+public abstract class DescribeStatement<T> extends CQLStatement.Raw implements CQLStatement
+{
+    private static final String KS = "system";
+    private static final String CF = "describe";
+
+    /**
+     * The columns returned by the describe queries that only list elements names (e.g. DESCRIBE KEYSPACES, DESCRIBE TABLES...) 
+     */
+    private static final List<ColumnSpecification> LIST_METADATA = 
+            ImmutableList.of(new ColumnSpecification(KS, CF, new ColumnIdentifier("keyspace_name", true), UTF8Type.instance),
+                             new ColumnSpecification(KS, CF, new ColumnIdentifier("type", true), UTF8Type.instance),
+                             new ColumnSpecification(KS, CF, new ColumnIdentifier("name", true), UTF8Type.instance));
+
+    /**
+     * The columns returned by the describe queries that returns the CREATE STATEMENT for the different elements (e.g. DESCRIBE KEYSPACE, DESCRIBE TABLE ...) 
+     */
+    private static final List<ColumnSpecification> ELEMENT_METADATA = 
+            ImmutableList.<ColumnSpecification>builder().addAll(LIST_METADATA)
+                                                        .add(new ColumnSpecification(KS, CF, new ColumnIdentifier("create_statement", true), UTF8Type.instance))
+                                                        .build();
+
+    /**
+     * "Magic version" for the paging state.
+     */
+    private static final int PAGING_STATE_VERSION = 0x0001;
+
+    static final String SCHEMA_CHANGED_WHILE_PAGING_MESSAGE = "The schema has changed since the previous page of the DESCRIBE statement result. " +
+                                                              "Please retry the DESCRIBE statement.";
+
+    private boolean includeInternalDetails;
+
+    public final void withInternalDetails()
+    {
+        this.includeInternalDetails = true;
+    }
+
+    @Override
+    public final CQLStatement prepare(ClientState clientState) throws RequestValidationException
+    {
+        return this;
+    }
+
+    public final List<ColumnSpecification> getBindVariables()
+    {
+        return Collections.emptyList();
+    }
+
+    @Override
+    public final void authorize(ClientState state)
+    {
+    }
+
+    @Override
+    public final void validate(ClientState state)
+    {
+    }
+
+    public final AuditLogContext getAuditLogContext()
+    {
+        return new AuditLogContext(AuditLogEntryType.DESCRIBE);
+    }
+
+    @Override
+    public final ResultMessage execute(QueryState state, QueryOptions options, long queryStartNanoTime) throws RequestValidationException, RequestExecutionException
+    {
+        return executeLocally(state, options);
+    }
+
+    @Override
+    public ResultMessage executeLocally(QueryState state, QueryOptions options)
+    {
+        Keyspaces keyspaces = Schema.instance.snapshot();
+        UUID schemaVersion = Schema.instance.getVersion();
+
+        keyspaces = Keyspaces.builder()
+                             .add(keyspaces)
+                             .add(VirtualKeyspaceRegistry.instance.virtualKeyspacesMetadata())
+                             .build();
+
+        PagingState pagingState = options.getPagingState();
+
+        // The paging implemented here uses some arbitray row number as the partition-key for paging,
+        // which is used to skip/limit the result from the Java Stream. This works good enough for
+        // reasonably sized schemas. Even a 'DESCRIBE SCHEMA' for an abnormally schema with 10000 tables
+        // completes within a few seconds. This seems good enough for now. Once Cassandra actually supports
+        // more than a few hundred tables, the implementation here should be reconsidered.
+        //
+        // Paging is only supported on row-level.
+        //
+        // The "partition key" in the paging-state contains a serialized object:
+        //   (short) version, currently 0x0001
+        //   (long) row offset
+        //   (vint bytes) serialized schema hash (currently the result of Keyspaces.hashCode())
+        //
+
+        long offset = getOffset(pagingState, schemaVersion);
+        int pageSize = options.getPageSize();
+
+        Stream<? extends T> stream = describe(state.getClientState(), keyspaces);
+
+        if (offset > 0L)
+            stream = stream.skip(offset);
+        if (pageSize > 0)
+            stream = stream.limit(pageSize);
+
+        List<List<ByteBuffer>> rows = stream.map(e -> toRow(e, includeInternalDetails))
+                                            .collect(Collectors.toList());
+
+        ResultSet.ResultMetadata resultMetadata = new ResultSet.ResultMetadata(metadata(state.getClientState()));
+        ResultSet result = new ResultSet(resultMetadata, rows);
+
+        if (pageSize > 0 && rows.size() == pageSize)
+        {
+            result.metadata.setHasMorePages(getPagingState(offset + pageSize, schemaVersion));
+        }
+
+        return new ResultMessage.Rows(result);
+    }
+
+    /**
+     * Returns the columns of the {@code ResultMetadata}
+     */
+    protected abstract List<ColumnSpecification> metadata(ClientState state);
+
+    private PagingState getPagingState(long nextPageOffset, UUID schemaVersion)
+    {
+        try (DataOutputBuffer out = new DataOutputBuffer())
+        {
+            out.writeShort(PAGING_STATE_VERSION);
+            out.writeUTF(FBUtilities.getReleaseVersionString());
+            out.write(UUIDGen.decompose(schemaVersion));
+            out.writeLong(nextPageOffset);
+
+            return new PagingState(out.asNewBuffer(),
+                                   null,
+                                   Integer.MAX_VALUE,
+                                   Integer.MAX_VALUE);
+        }
+        catch (IOException e)
+        {
+            throw new InvalidRequestException("Invalid paging state.", e);
+        }
+    }
+
+    private long getOffset(PagingState pagingState, UUID schemaVersion)
+    {
+        if (pagingState == null)
+            return 0L;
+
+        try (DataInputBuffer in = new DataInputBuffer(pagingState.partitionKey, false))
+        {
+            checkTrue(in.readShort() == PAGING_STATE_VERSION, "Incompatible paging state");
+
+            final String pagingStateServerVersion = in.readUTF();
+            final String releaseVersion = FBUtilities.getReleaseVersionString();
+            checkTrue(pagingStateServerVersion.equals(releaseVersion),
+                      "The server version of the paging state %s is different from the one of the server %s",
+                      pagingStateServerVersion,
+                      releaseVersion);
+
+            byte[] bytes = new byte[UUIDGen.UUID_LEN];
+            in.read(bytes);
+            UUID version = UUIDGen.getUUID(ByteBuffer.wrap(bytes));
+            checkTrue(schemaVersion.equals(version), SCHEMA_CHANGED_WHILE_PAGING_MESSAGE);
+
+            return in.readLong();
+        }
+        catch (IOException e)
+        {
+            throw new InvalidRequestException("Invalid paging state.", e);
+        }
+    }
+
+    protected abstract List<ByteBuffer> toRow(T element, boolean withInternals);
+
+    /**
+     * Returns the schema elements that must be part of the output.
+     */
+    protected abstract Stream<? extends T> describe(ClientState state, Keyspaces keyspaces);
+
+    /**
+     * Returns the metadata for the given keyspace or throws a {@link KeyspaceNotDefinedException} exception.
+     */
+    private static KeyspaceMetadata validateKeyspace(String ks, Keyspaces keyspaces)
+    {
+        return keyspaces.get(ks)
+                        .orElseThrow(() -> new KeyspaceNotDefinedException(format("'%s' not found in keyspaces", ks)));
+    }
+
+    /**
+     * {@code DescribeStatement} implementation used for describe queries that only list elements names.
+     */
+    public static final class Listing extends DescribeStatement<SchemaElement>
+    {
+        private final java.util.function.Function<KeyspaceMetadata, Stream<? extends SchemaElement>> elementsProvider;
+
+        public Listing(java.util.function.Function<KeyspaceMetadata, Stream<? extends SchemaElement>> elementsProvider)
+        {
+            this.elementsProvider = elementsProvider;
+        }
+
+        @Override
+        protected Stream<? extends SchemaElement> describe(ClientState state, Keyspaces keyspaces)
+        {
+            String keyspace = state.getRawKeyspace();
+            Stream<KeyspaceMetadata> stream = keyspace == null ? keyspaces.stream().sorted(SchemaElement.NAME_COMPARATOR)
+                                                               : Stream.of(validateKeyspace(keyspace, keyspaces));
+
+            return stream.flatMap(k -> elementsProvider.apply(k).sorted(SchemaElement.NAME_COMPARATOR));
+        }
+
+        @Override
+        protected List<ColumnSpecification> metadata(ClientState state)
+        {
+            return LIST_METADATA;
+        }
+
+        @Override
+        protected List<ByteBuffer> toRow(SchemaElement element, boolean withInternals)
+        {
+            return ImmutableList.of(bytes(element.elementKeyspaceQuotedIfNeeded()),
+                                    bytes(element.elementType().toString()),
+                                    bytes(element.elementNameQuotedIfNeeded()));
+        }
+    }
+
+    /**
+     * Creates a {@link DescribeStatement} for {@code DESCRIBE TABLES}.
+     */
+    public static DescribeStatement<SchemaElement> tables()
+    {
+        return new Listing(ks -> ks.tables.stream());
+    }
+
+    /**
+     * Creates a {@link DescribeStatement} for {@code DESCRIBE TYPES}.
+     */
+    public static DescribeStatement<SchemaElement> types()
+    {
+        return new Listing(ks -> ks.types.stream());
+    }
+
+    /**
+     * Creates a {@link DescribeStatement} for {@code DESCRIBE FUNCTIONS}.
+     */
+    public static DescribeStatement<SchemaElement> functions()
+    {
+        return new Listing(ks -> ks.functions.udfs());
+    }
+
+    /**
+     * Creates a {@link DescribeStatement} for {@code DESCRIBE AGGREGATES}.
+     */
+    public static DescribeStatement<SchemaElement> aggregates()
+    {
+        return new Listing(ks -> ks.functions.udas());
+    }
+
+    /**
+     * Creates a {@link DescribeStatement} for {@code DESCRIBE KEYSPACES}.
+     */
+    public static DescribeStatement<SchemaElement> keyspaces()
+    {
+        return new DescribeStatement<SchemaElement>()
+        {
+            @Override
+            protected Stream<? extends SchemaElement> describe(ClientState state, Keyspaces keyspaces)
+            {
+                return keyspaces.stream().sorted(SchemaElement.NAME_COMPARATOR);
+            }
+
+            @Override
+            protected List<ColumnSpecification> metadata(ClientState state)
+            {
+                return LIST_METADATA;
+            }
+
+            @Override
+            protected List<ByteBuffer> toRow(SchemaElement element, boolean withInternals)
+            {
+                return ImmutableList.of(bytes(element.elementKeyspaceQuotedIfNeeded()),
+                                        bytes(element.elementType().toString()),
+                                        bytes(element.elementNameQuotedIfNeeded()));
+            }
+        };
+    }
+
+    /**
+     * Creates a {@link DescribeStatement} for {@code DESCRIBE [FULL] SCHEMA}.
+     */
+    public static DescribeStatement<SchemaElement> schema(boolean includeSystemKeyspaces)
+    {
+        return new DescribeStatement<SchemaElement>()
+        {
+            @Override
+            protected Stream<? extends SchemaElement> describe(ClientState state, Keyspaces keyspaces)
+            {
+                return keyspaces.stream()
+                                .filter(ks -> includeSystemKeyspaces || !SchemaConstants.isSystemKeyspace(ks.name))
+                                .sorted(SchemaElement.NAME_COMPARATOR)
+                                .flatMap(ks -> getKeyspaceElements(ks, false));
+            }
+
+            @Override
+            protected List<ColumnSpecification> metadata(ClientState state)
+            {
+                return ELEMENT_METADATA;
+            }
+
+            @Override
+            protected List<ByteBuffer> toRow(SchemaElement element, boolean withInternals)
+            {
+                return ImmutableList.of(bytes(element.elementKeyspaceQuotedIfNeeded()),
+                                        bytes(element.elementType().toString()),
+                                        bytes(element.elementNameQuotedIfNeeded()),
+                                        bytes(element.toCqlString(withInternals)));
+            }
+        };
+    }
+
+    /**
+     * {@code DescribeStatement} implementation used for describe queries for a single schema element.
+     */
+    public static class Element extends DescribeStatement<SchemaElement>
+    {
+        /**
+         * The keyspace name 
+         */
+        private final String keyspace;
+
+        /**
+         * The element name
+         */
+        private final String name;
+
+        private final BiFunction<KeyspaceMetadata, String, Stream<? extends SchemaElement>> elementsProvider;
+
+        public Element(String keyspace, String name, BiFunction<KeyspaceMetadata, String, Stream<? extends SchemaElement>> elementsProvider)
+        {
+            this.keyspace = keyspace;
+            this.name = name;
+            this.elementsProvider = elementsProvider;
+        }
+
+        @Override
+        protected Stream<? extends SchemaElement> describe(ClientState state, Keyspaces keyspaces)
+        {
+            String ks = keyspace == null ? checkNotNull(state.getRawKeyspace(), "No keyspace specified and no current keyspace")
+                                         : keyspace;
+
+            return elementsProvider.apply(validateKeyspace(ks, keyspaces), name);
+        }
+
+        @Override
+        protected List<ColumnSpecification> metadata(ClientState state)
+        {
+            return ELEMENT_METADATA;
+        }
+
+        @Override
+        protected List<ByteBuffer> toRow(SchemaElement element, boolean withInternals)
+        {
+            return ImmutableList.of(bytes(element.elementKeyspaceQuotedIfNeeded()),
+                                    bytes(element.elementType().toString()),
+                                    bytes(element.elementNameQuotedIfNeeded()),
+                                    bytes(element.toCqlString(withInternals)));
+        }
+    }
+
+    /**
+     * Creates a {@link DescribeStatement} for {@code DESCRIBE KEYSPACE}.
+     */
+    public static DescribeStatement<SchemaElement> keyspace(String keyspace, boolean onlyKeyspaceDefinition)
+    {
+        return new Element(keyspace, null, (ks, t) -> getKeyspaceElements(ks, onlyKeyspaceDefinition));
+    }
+
+    private static Stream<? extends SchemaElement> getKeyspaceElements(KeyspaceMetadata ks, boolean onlyKeyspace)
+    {
+        Stream<? extends SchemaElement> s = Stream.of(ks);
+
+        if (!onlyKeyspace)
+        {
+            s = Stream.concat(s, ks.types.sortedStream());
+            s = Stream.concat(s, ks.functions.udfs().sorted(SchemaElement.NAME_COMPARATOR));
+            s = Stream.concat(s, ks.functions.udas().sorted(SchemaElement.NAME_COMPARATOR));
+            s = Stream.concat(s, ks.tables.stream().sorted(SchemaElement.NAME_COMPARATOR)
+                                                   .flatMap(tm -> getTableElements(ks, tm)));
+        }
+
+        return s;
+    }
+
+    private static Stream<? extends SchemaElement> getTableElements(KeyspaceMetadata ks, TableMetadata table)
+    {
+        Stream<? extends SchemaElement> s = Stream.of(table);
+        s = Stream.concat(s, table.indexes.stream()
+                                          .map(i -> toDescribable(table, i))
+                                          .sorted(SchemaElement.NAME_COMPARATOR));
+        s = Stream.concat(s, ks.views.stream(table.id)
+                                     .sorted(SchemaElement.NAME_COMPARATOR));
+        return s;
+    }
+
+    /**
+     * Creates a {@link DescribeStatement} for {@code DESCRIBE TABLE}.
+     */
+    public static DescribeStatement<SchemaElement> table(String keyspace, String name)
+    {
+        return new Element(keyspace, name, (ks, t) -> {
+
+            TableMetadata table = checkNotNull(ks.getTableOrViewNullable(t),
+                                               "Table '%s' not found in keyspace '%s'", t, ks.name);
+
+            return Stream.concat(Stream.of(table), table.indexes.stream()
+                                                                .map(index -> toDescribable(table, index))
+                                                                .sorted(SchemaElement.NAME_COMPARATOR));
+        });
+    }
+
+    /**
+     * Creates a {@link DescribeStatement} for {@code DESCRIBE INDEX}.
+     */
+    public static DescribeStatement<SchemaElement> index(String keyspace, String name)
+    {
+        return new Element(keyspace, name, (ks, index) -> {
+
+            TableMetadata tm = ks.findIndexedTable(index)
+                                 .orElseThrow(() -> invalidRequest("Table for existing index '%s' not found in '%s'",
+                                                                   index,
+                                                                   ks.name));
+            return tm.indexes.get(index)
+                             .map(i -> toDescribable(tm, i))
+                             .map(Stream::of)
+                             .orElseThrow(() -> invalidRequest("Index '%s' not found in '%s'", index, ks.name));
+        });
+    }
+
+    /**
+     * Creates a {@link DescribeStatement} for {@code DESCRIBE MATERIALIZED VIEW}.
+     */
+    public static DescribeStatement<SchemaElement> view(String keyspace, String name)
+    {
+        return new Element(keyspace, name, (ks, view) -> {
+
+            return ks.views.get(view)
+                           .map(Stream::of)
+                           .orElseThrow(() -> invalidRequest("Materialized view '%s' not found in '%s'", view, ks.name));
+        });
+    }
+
+    /**
+     * Creates a {@link DescribeStatement} for {@code DESCRIBE TYPE}.
+     */
+    public static DescribeStatement<SchemaElement> type(String keyspace, String name)
+    {
+        return new Element(keyspace, name, (ks, type) -> {
+
+            return ks.types.get(ByteBufferUtil.bytes(type))
+                           .map(Stream::of)
+                           .orElseThrow(() -> invalidRequest("User defined type '%s' not found in '%s'",
+                                                             type,
+                                                             ks.name));
+        });
+    }
+
+    /**
+     * Creates a {@link DescribeStatement} for {@code DESCRIBE FUNCTION}.
+     */
+    public static DescribeStatement<SchemaElement> function(String keyspace, String name)
+    {
+        return new Element(keyspace, name, (ks, n) -> {
+
+            return checkNotEmpty(ks.functions.getUdfs(new FunctionName(ks.name, n)),
+                                 "User defined function '%s' not found in '%s'", n, ks.name).stream()
+                                                                                             .sorted(SchemaElement.NAME_COMPARATOR);
+        });
+    }
+
+    /**
+     * Creates a {@link DescribeStatement} for {@code DESCRIBE FUNCTION}.
+     */
+    public static DescribeStatement<SchemaElement> aggregate(String keyspace, String name)
+    {
+        return new Element(keyspace, name, (ks, n) -> {
+
+            return checkNotEmpty(ks.functions.getUdas(new FunctionName(ks.name, n)),
+                                 "User defined aggregate '%s' not found in '%s'", n, ks.name).stream()
+                                                                                              .sorted(SchemaElement.NAME_COMPARATOR);
+        });
+    }
+
+    private static SchemaElement toDescribable(TableMetadata table, IndexMetadata index)
+    {
+        return new SchemaElement()
+                {
+                    @Override
+                    public SchemaElementType elementType()
+                    {
+                        return SchemaElementType.INDEX;
+                    }
+
+                    @Override
+                    public String elementKeyspace()
+                    {
+                        return table.keyspace;
+                    }
+
+                    @Override
+                    public String elementName()
+                    {
+                        return index.name;
+                    }
+
+                    @Override
+                    public String toCqlString(boolean withInternals)
+                    {
+                        return index.toCqlString(table);
+                    }
+                };
+    }
+
+    /**
+     * Creates a {@link DescribeStatement} for the generic {@code DESCRIBE ...}.
+     */
+    public static DescribeStatement<SchemaElement> generic(String keyspace, String name)
+    {
+        return new DescribeStatement<SchemaElement>()
+        {
+            private DescribeStatement<SchemaElement> delegate;
+
+            private DescribeStatement<SchemaElement> resolve(ClientState state, Keyspaces keyspaces)
+            {
+                String ks = keyspace;
+
+                // from cqlsh help: "keyspace or a table or an index or a materialized view (in this order)."
+                if (keyspace == null)
+                {
+                    if (keyspaces.containsKeyspace(name))
+                        return keyspace(name, false);
+
+                    String rawKeyspace = state.getRawKeyspace();
+                    ks = rawKeyspace == null ? name : rawKeyspace;
+                }
+
+                KeyspaceMetadata keyspaceMetadata = validateKeyspace(ks, keyspaces);
+
+                if (keyspaceMetadata.tables.getNullable(name) != null)
+                    return table(ks, name);
+
+                Optional<TableMetadata> indexed = keyspaceMetadata.findIndexedTable(name);
+                if (indexed.isPresent())
+                {
+                    Optional<IndexMetadata> index = indexed.get().indexes.get(name);
+                    if (index.isPresent())
+                        return index(ks, name);
+                }
+
+                if (keyspaceMetadata.views.getNullable(name) != null)
+                    return view(ks, name);
+
+                throw invalidRequest("'%s' not found in keyspace '%s'", name, ks);
+            }
+
+            @Override
+            protected Stream<? extends SchemaElement> describe(ClientState state, Keyspaces keyspaces)
+            {
+                delegate = resolve(state, keyspaces);
+                return delegate.describe(state, keyspaces);
+            }
+
+            @Override
+            protected List<ColumnSpecification> metadata(ClientState state)
+            {
+                return delegate.metadata(state);
+            }
+
+            @Override
+            protected List<ByteBuffer> toRow(SchemaElement element, boolean withInternals)
+            {
+                return delegate.toRow(element, withInternals);
+            }
+        };
+    }
+
+    /**
+     * Creates a {@link DescribeStatement} for {@code DESCRIBE CLUSTER}.
+     */
+    public static DescribeStatement<List<Object>> cluster()
+    {
+        return new DescribeStatement<List<Object>>()
+        {
+            /**
+             * The column index of the cluster name
+             */
+            private static final int CLUSTER_NAME_INDEX = 0;
+
+            /**
+             * The column index of the partitioner name
+             */
+            private static final int PARTITIONER_NAME_INDEX = 1;
+
+            /**
+             * The column index of the snitch class
+             */
+            private static final int SNITCH_CLASS_INDEX = 2;
+
+            /**
+             * The range ownerships index
+             */
+            private static final int RANGE_OWNERSHIPS_INDEX = 3;
+
+            @Override
+            protected Stream<List<Object>> describe(ClientState state, Keyspaces keyspaces)
+            {
+                List<Object> list = new ArrayList<Object>();
+                list.add(DatabaseDescriptor.getClusterName());
+                list.add(trimIfPresent(DatabaseDescriptor.getPartitionerName(), "org.apache.cassandra.dht."));
+                list.add(trimIfPresent(DatabaseDescriptor.getEndpointSnitch().getClass().getName(),
+                                            "org.apache.cassandra.locator."));
+ 
+                String useKs = state.getRawKeyspace();
+                if (mustReturnsRangeOwnerships(useKs))
+                {
+                    list.add(StorageService.instance.getRangeToAddressMap(useKs)
+                                                    .entrySet()
+                                                    .stream()
+                                                    .sorted(Comparator.comparing(Map.Entry::getKey))
+                                                    .collect(Collectors.toMap(e -> e.getKey().right.toString(),
+                                                                              e -> e.getValue()
+                                                                                    .stream()
+                                                                                    .map(r -> r.endpoint().toString())
+                                                                                    .collect(Collectors.toList()))));
+                }
+                return Stream.of(list);
+            }
+
+            private boolean mustReturnsRangeOwnerships(String useKs)
+            {
+                return useKs != null && !SchemaConstants.isLocalSystemKeyspace(useKs) && !SchemaConstants.isSystemKeyspace(useKs);
+            }
+
+            @Override
+            protected List<ColumnSpecification> metadata(ClientState state)
+            {
+                ImmutableList.Builder<ColumnSpecification> builder = ImmutableList.builder();
+                builder.add(new ColumnSpecification(KS, CF, new ColumnIdentifier("cluster", true), UTF8Type.instance),
+                                        new ColumnSpecification(KS, CF, new ColumnIdentifier("partitioner", true), UTF8Type.instance),
+                                        new ColumnSpecification(KS, CF, new ColumnIdentifier("snitch", true), UTF8Type.instance));
+
+                if (mustReturnsRangeOwnerships(state.getRawKeyspace()))
+                    builder.add(new ColumnSpecification(KS, CF, new ColumnIdentifier("range_ownership", true), MapType.getInstance(UTF8Type.instance,
+                                                                                                                                   ListType.getInstance(UTF8Type.instance, false), false)));
+
+                return builder.build();
+            }
+
+            @Override
+            protected List<ByteBuffer> toRow(List<Object> elements, boolean withInternals)
+            {
+                ImmutableList.Builder<ByteBuffer> builder = ImmutableList.builder(); 
+
+                builder.add(UTF8Type.instance.decompose((String) elements.get(CLUSTER_NAME_INDEX)),
+                            UTF8Type.instance.decompose((String) elements.get(PARTITIONER_NAME_INDEX)),
+                            UTF8Type.instance.decompose((String) elements.get(SNITCH_CLASS_INDEX)));
+
+                if (elements.size() > 3)
+                {
+                    MapType<String, List<String>> rangeOwnershipType = MapType.getInstance(UTF8Type.instance,
+                                                                                           ListType.getInstance(UTF8Type.instance, false),
+                                                                                           false);
+
+                    builder.add(rangeOwnershipType.decompose((Map<String, List<String>>) elements.get(RANGE_OWNERSHIPS_INDEX)));
+                }
+
+                return builder.build();
+            }
+
+            private String trimIfPresent(String src, String begin)
+            {
+                if (src.startsWith(begin))
+                    return src.substring(begin.length());
+                return src;
+            }
+        };
+    }
+}
diff --git a/src/java/org/apache/cassandra/cql3/statements/RequestValidations.java b/src/java/org/apache/cassandra/cql3/statements/RequestValidations.java
index fc07878..f351788 100644
--- a/src/java/org/apache/cassandra/cql3/statements/RequestValidations.java
+++ b/src/java/org/apache/cassandra/cql3/statements/RequestValidations.java
@@ -19,6 +19,7 @@ package org.apache.cassandra.cql3.statements;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.HashSet;
 import java.util.List;
 
@@ -142,6 +143,23 @@ public final class RequestValidations
     }
 
     /**
+     * Checks that the specified collections is NOT <code>empty</code>.
+     * If it is an <code>InvalidRequestException</code> will be throws.
+     *
+     * @param collection the collection to test
+     * @param messageTemplate the template used to build the error message
+     * @param messageArgs the message arguments
+     * @return the collection
+     * @throws InvalidRequestException if the specified collection is <code>empty</code>.
+     */
+    public static <T extends Collection<E>, E> T checkNotEmpty(T collection, String messageTemplate, Object... messageArgs)
+            throws InvalidRequestException
+    {
+        checkTrue(!collection.isEmpty(), messageTemplate, messageArgs);
+        return collection;
+    }
+
+    /**
      * Checks that the specified bind marker value is set to a meaningful value.
      * If it is not a <code>InvalidRequestException</code> will be thrown.
      *
diff --git a/src/java/org/apache/cassandra/cql3/statements/schema/AlterTypeStatement.java b/src/java/org/apache/cassandra/cql3/statements/schema/AlterTypeStatement.java
index 6eab2ba..f883038 100644
--- a/src/java/org/apache/cassandra/cql3/statements/schema/AlterTypeStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/schema/AlterTypeStatement.java
@@ -103,11 +103,11 @@ public abstract class AlterTypeStatement extends AlterSchemaStatement
         UserType apply(KeyspaceMetadata keyspace, UserType userType)
         {
             if (userType.fieldPosition(fieldName) >= 0)
-                throw ire("Cannot add field %s to type %s: a field with name %s already exists", fieldName, userType.toCQLString(), fieldName);
+                throw ire("Cannot add field %s to type %s: a field with name %s already exists", fieldName, userType.getCqlTypeName(), fieldName);
 
             AbstractType<?> fieldType = type.prepare(keyspaceName, keyspace.types).getType();
             if (fieldType.referencesUserType(userType.name))
-                throw ire("Cannot add new field %s of type %s to user type %s as it would create a circular reference", fieldName, type, userType.toCQLString());
+                throw ire("Cannot add new field %s of type %s to user type %s as it would create a circular reference", fieldName, type, userType.getCqlTypeName());
 
             List<FieldIdentifier> fieldNames = new ArrayList<>(userType.fieldNames()); fieldNames.add(fieldName);
             List<AbstractType<?>> fieldTypes = new ArrayList<>(userType.fieldTypes()); fieldTypes.add(fieldType);
@@ -138,7 +138,7 @@ public abstract class AlterTypeStatement extends AlterSchemaStatement
             if (!dependentAggregates.isEmpty())
             {
                 throw ire("Cannot alter user type %s as it is still used in INITCOND by aggregates %s",
-                          userType.toCQLString(),
+                          userType.getCqlTypeName(),
                           join(", ", dependentAggregates));
             }
 
@@ -148,14 +148,14 @@ public abstract class AlterTypeStatement extends AlterSchemaStatement
             {
                 int idx = userType.fieldPosition(oldName);
                 if (idx < 0)
-                    throw ire("Unkown field %s in user type %s", oldName, keyspaceName, userType.toCQLString());
+                    throw ire("Unkown field %s in user type %s", oldName, keyspaceName, userType.getCqlTypeName());
                 fieldNames.set(idx, newName);
             });
 
             fieldNames.forEach(name ->
             {
                 if (fieldNames.stream().filter(isEqual(name)).count() > 1)
-                    throw ire("Duplicate field name %s in type %s", name, keyspaceName, userType.toCQLString());
+                    throw ire("Duplicate field name %s in type %s", name, keyspaceName, userType.getCqlTypeName());
             });
 
             return new UserType(keyspaceName, userType.name, fieldNames, userType.fieldTypes(), true);
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index e6bb877..3be3706 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -1779,8 +1779,9 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
 
             try (PrintStream out = new PrintStream(schemaFile))
             {
-                for (String s: TableCQLHelper.dumpReCreateStatements(metadata()))
-                    out.println(s);
+                SchemaCQLHelper.reCreateStatementsForSchemaCql(metadata(),
+                                                               keyspace.getMetadata().types)
+                               .forEach(out::println);
             }
         }
         catch (IOException e)
diff --git a/src/java/org/apache/cassandra/db/SchemaCQLHelper.java b/src/java/org/apache/cassandra/db/SchemaCQLHelper.java
new file mode 100644
index 0000000..6f9e526
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/SchemaCQLHelper.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db;
+
+import java.nio.ByteBuffer;
+import java.util.stream.Stream;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import org.apache.cassandra.db.marshal.UTF8Type;
+import org.apache.cassandra.db.marshal.UserType;
+import org.apache.cassandra.schema.*;
+
+/**
+ * Helper methods to represent TableMetadata and related objects in CQL format
+ */
+public class SchemaCQLHelper
+{
+    /**
+     * Generates the DDL statement for a {@code schema.cql} snapshot file.
+     */
+    public static Stream<String> reCreateStatementsForSchemaCql(TableMetadata metadata, Types types)
+    {
+        // Types come first, as table can't be created without them
+        Stream<String> udts = SchemaCQLHelper.getUserTypesAsCQL(metadata, types);
+
+        return Stream.concat(udts,
+                             reCreateStatements(metadata,
+                                                true,
+                                                true,
+                                                true,
+                                                true));
+    }
+
+    public static Stream<String> reCreateStatements(TableMetadata metadata,
+                                                    boolean includeDroppedColumns,
+                                                    boolean internals,
+                                                    boolean ifNotExists,
+                                                    boolean includeIndexes)
+    {
+        // Record re-create schema statements
+        Stream<String> r = Stream.of(metadata)
+                                         .map((tm) -> SchemaCQLHelper.getTableMetadataAsCQL(tm,
+                                                                                            includeDroppedColumns,
+                                                                                            internals,
+                                                                                            ifNotExists));
+
+        if (includeIndexes)
+        {
+            // Indexes applied as last, since otherwise they may interfere with column drops / re-additions
+            r = Stream.concat(r, SchemaCQLHelper.getIndexesAsCQL(metadata));
+        }
+
+        return r;
+    }
+
+    /**
+     * Build a CQL String representation of Column Family Metadata.
+     *
+     * *Note*: this is _only_ visible for testing; you generally shouldn't re-create a single table in isolation as
+     * that will not contain everything needed for user types.
+     */
+    @VisibleForTesting
+    public static String getTableMetadataAsCQL(TableMetadata metadata,
+                                               boolean includeDroppedColumns,
+                                               boolean internals,
+                                               boolean ifNotExists)
+    {
+        if (metadata.isView())
+        {
+            KeyspaceMetadata keyspaceMetadata = Schema.instance.getKeyspaceMetadata(metadata.keyspace);
+            ViewMetadata viewMetadata = keyspaceMetadata.views.get(metadata.name).orElse(null);
+            assert viewMetadata != null;
+            return viewMetadata.toCqlString(internals, ifNotExists);
+        }
+
+        return metadata.toCqlString(includeDroppedColumns, internals, ifNotExists);
+    }
+
+    /**
+     * Build a CQL String representation of User Types used in the given table.
+     *
+     * Type order is ensured as types are built incrementally: from the innermost (most nested)
+     * to the outermost.
+     *
+     * @param metadata the table for which to extract the user types CQL statements.
+     * @param types the user types defined in the keyspace of the dumped table (which will thus contain any user type
+     * used by {@code metadata}).
+     * @return a list of {@code CREATE TYPE} statements corresponding to all the types used in {@code metadata}.
+     */
+    @VisibleForTesting
+    public static Stream<String> getUserTypesAsCQL(TableMetadata metadata, Types types)
+    {
+        /*
+         * Implementation note: at first approximation, it may seem like we don't need the Types argument and instead
+         * directly extract the user types from the provided TableMetadata. Indeed, full user types definitions are
+         * contained in UserType instances.
+         *
+         * However, the UserType instance found within the TableMetadata may have been frozen in such a way that makes
+         * it challenging.
+         *
+         * Consider the user has created:
+         *   CREATE TYPE inner (a set<int>);
+         *   CREATE TYPE outer (b inner);
+         *   CREATE TABLE t (k int PRIMARY KEY, c1 frozen<outer>, c2 set<frozen<inner>>)
+         * The corresponding TableMetadata would have, as types (where 'mc=true' means that the type has his isMultiCell
+         * set to true):
+         *   c1: UserType(mc=false, "outer", b->UserType(mc=false, "inner", a->SetType(mc=fase, Int32Type)))
+         *   c2: SetType(mc=true, UserType(mc=false, "inner", a->SetType(mc=fase, Int32Type)))
+         * From which, it's impossible to decide if we should dump the types above, or instead:
+         *   CREATE TYPE inner (a frozen<set<int>>);
+         *   CREATE TYPE outer (b frozen<inner>);
+         * or anything in-between.
+         *
+         * And while, as of the current limitation around multi-cell types (that are only support non-frozen at
+         * top-level), any of the generated definition would kind of "work", 1) this could confuse users and 2) this
+         * would break if we do lift the limitation, which wouldn't be future proof.
+         */
+        return metadata.getReferencedUserTypes()
+                       .stream()
+                       .map(name -> getType(metadata, types, name).toCqlString(false));
+    }
+
+    /**
+     * Build a CQL String representation of Indexes on columns in the given Column Family
+     */
+    @VisibleForTesting
+    public static Stream<String> getIndexesAsCQL(TableMetadata metadata)
+    {
+        return metadata.indexes
+                .stream()
+                .map(indexMetadata -> indexMetadata.toCqlString(metadata));
+    }
+
+    private static UserType getType(TableMetadata metadata, Types types, ByteBuffer name)
+    {
+        return types.get(name)
+                    .orElseThrow(() -> new IllegalStateException(String.format("user type %s is part of table %s definition but its definition was missing", 
+                                                                              UTF8Type.instance.getString(name),
+                                                                              metadata)));
+    }
+}
diff --git a/src/java/org/apache/cassandra/db/TableCQLHelper.java b/src/java/org/apache/cassandra/db/TableCQLHelper.java
deleted file mode 100644
index 79e2c13..0000000
--- a/src/java/org/apache/cassandra/db/TableCQLHelper.java
+++ /dev/null
@@ -1,430 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.db;
-
-import java.nio.ByteBuffer;
-import java.util.*;
-import java.util.concurrent.atomic.*;
-import java.util.function.*;
-import java.util.regex.Pattern;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.Iterables;
-
-import org.apache.cassandra.cql3.statements.schema.IndexTarget;
-import org.apache.cassandra.db.marshal.*;
-import org.apache.cassandra.schema.*;
-import org.apache.cassandra.utils.*;
-
-/**
- * Helper methods to represent TableMetadata and related objects in CQL format
- */
-public class TableCQLHelper
-{
-    private static final Pattern singleQuotePattern = Pattern.compile("'");
-
-    public static List<String> dumpReCreateStatements(TableMetadata metadata)
-    {
-        List<String> l = new ArrayList<>();
-        // Types come first, as table can't be created without them
-        l.addAll(TableCQLHelper.getUserTypesAsCQL(metadata));
-        // Record re-create schema statements
-        l.add(TableCQLHelper.getTableMetadataAsCQL(metadata, true));
-        // Dropped columns (and re-additions)
-        l.addAll(TableCQLHelper.getDroppedColumnsAsCQL(metadata));
-        // Indexes applied as last, since otherwise they may interfere with column drops / re-additions
-        l.addAll(TableCQLHelper.getIndexesAsCQL(metadata));
-        return l;
-    }
-
-    private static List<ColumnMetadata> getClusteringColumns(TableMetadata metadata)
-    {
-        List<ColumnMetadata> cds = new ArrayList<>(metadata.clusteringColumns().size());
-
-        if (!metadata.isStaticCompactTable())
-            for (ColumnMetadata cd : metadata.clusteringColumns())
-                cds.add(cd);
-
-        return cds;
-    }
-
-    private static List<ColumnMetadata> getPartitionColumns(TableMetadata metadata)
-    {
-        List<ColumnMetadata> cds = new ArrayList<>(metadata.regularAndStaticColumns().size());
-
-        for (ColumnMetadata cd : metadata.staticColumns())
-            cds.add(cd);
-
-        if (metadata.isDense())
-        {
-            // remove an empty type
-            for (ColumnMetadata cd : metadata.regularColumns())
-                if (!cd.type.equals(EmptyType.instance))
-                    cds.add(cd);
-        }
-        // "regular" columns are not exposed for static compact tables
-        else if (!metadata.isStaticCompactTable())
-        {
-            for (ColumnMetadata cd : metadata.regularColumns())
-                cds.add(cd);
-        }
-
-        return cds;
-    }
-
-    /**
-     * Build a CQL String representation of Table Metadata
-     */
-    @VisibleForTesting
-    public static String getTableMetadataAsCQL(TableMetadata metadata, boolean includeDroppedColumns)
-    {
-        StringBuilder sb = new StringBuilder();
-        if (!isCqlCompatible(metadata))
-        {
-            sb.append(String.format("/*\nWarning: Table %s omitted because it has constructs not compatible with CQL (was created via legacy API).\n",
-                                    metadata.toString()));
-            sb.append("\nApproximate structure, for reference:");
-            sb.append("\n(this should not be used to reproduce this schema)\n\n");
-        }
-
-        sb.append("CREATE TABLE IF NOT EXISTS ");
-        sb.append(metadata.toString()).append(" (");
-
-        List<ColumnMetadata> partitionKeyColumns = metadata.partitionKeyColumns();
-        List<ColumnMetadata> clusteringColumns = getClusteringColumns(metadata);
-        List<ColumnMetadata> partitionColumns = getPartitionColumns(metadata);
-
-        Consumer<StringBuilder> cdCommaAppender = commaAppender("\n\t");
-        sb.append("\n\t");
-        for (ColumnMetadata cfd: partitionKeyColumns)
-        {
-            cdCommaAppender.accept(sb);
-            sb.append(toCQL(cfd));
-            if (partitionKeyColumns.size() == 1 && clusteringColumns.size() == 0)
-                sb.append(" PRIMARY KEY");
-        }
-
-        for (ColumnMetadata cfd: clusteringColumns)
-        {
-            cdCommaAppender.accept(sb);
-            sb.append(toCQL(cfd));
-        }
-
-        for (ColumnMetadata cfd: partitionColumns)
-        {
-            cdCommaAppender.accept(sb);
-            sb.append(toCQL(cfd, metadata.isStaticCompactTable()));
-        }
-
-        if (includeDroppedColumns)
-        {
-            for (Map.Entry<ByteBuffer, DroppedColumn> entry: metadata.droppedColumns.entrySet())
-            {
-                if (metadata.getColumn(entry.getKey()) != null)
-                    continue;
-
-                DroppedColumn droppedColumn = entry.getValue();
-                cdCommaAppender.accept(sb);
-                sb.append(droppedColumn.column.name.toCQLString());
-                sb.append(' ');
-                sb.append(droppedColumn.column.type.asCQL3Type().toString());
-            }
-        }
-
-        if (clusteringColumns.size() > 0 || partitionKeyColumns.size() > 1)
-        {
-            sb.append(",\n\tPRIMARY KEY (");
-            if (partitionKeyColumns.size() > 1)
-            {
-                sb.append("(");
-                Consumer<StringBuilder> pkCommaAppender = commaAppender(" ");
-                for (ColumnMetadata cfd : partitionKeyColumns)
-                {
-                    pkCommaAppender.accept(sb);
-                    sb.append(cfd.name.toCQLString());
-                }
-                sb.append(")");
-            }
-            else
-            {
-                sb.append(partitionKeyColumns.get(0).name.toCQLString());
-            }
-
-            for (ColumnMetadata cfd : metadata.clusteringColumns())
-                sb.append(", ").append(cfd.name.toCQLString());
-
-            sb.append(')');
-        }
-        sb.append(")\n\t");
-        sb.append("WITH ");
-
-        sb.append("ID = ").append(metadata.id).append("\n\tAND ");
-
-        if (metadata.isCompactTable())
-            sb.append("COMPACT STORAGE\n\tAND ");
-
-        if (clusteringColumns.size() > 0)
-        {
-            sb.append("CLUSTERING ORDER BY (");
-
-            Consumer<StringBuilder> cOrderCommaAppender = commaAppender(" ");
-            for (ColumnMetadata cd : clusteringColumns)
-            {
-                cOrderCommaAppender.accept(sb);
-                sb.append(cd.name.toCQLString()).append(' ').append(cd.clusteringOrder().toString());
-            }
-            sb.append(")\n\tAND ");
-        }
-
-        sb.append(toCQL(metadata.params));
-        sb.append(";");
-
-        if (!isCqlCompatible(metadata))
-        {
-            sb.append("\n*/");
-        }
-        return sb.toString();
-    }
-
-    /**
-     * Build a CQL String representation of User Types used in the given Table.
-     *
-     * Type order is ensured as types are built incrementally: from the innermost (most nested)
-     * to the outermost.
-     */
-    @VisibleForTesting
-    public static List<String> getUserTypesAsCQL(TableMetadata metadata)
-    {
-        List<AbstractType> types = new ArrayList<>();
-        Set<AbstractType> typeSet = new HashSet<>();
-        for (ColumnMetadata cd: Iterables.concat(metadata.partitionKeyColumns(), metadata.clusteringColumns(), metadata.regularAndStaticColumns()))
-        {
-            AbstractType type = cd.type;
-            if (type.isUDT())
-                resolveUserType((UserType) type, typeSet, types);
-        }
-
-        List<String> typeStrings = new ArrayList<>(types.size());
-        for (AbstractType type: types)
-            typeStrings.add(toCQL((UserType) type));
-        return typeStrings;
-    }
-
-    /**
-     * Build a CQL String representation of Dropped Columns in the given Table.
-     *
-     * If the column was dropped once, but is now re-created `ADD` will be appended accordingly.
-     */
-    @VisibleForTesting
-    public static List<String> getDroppedColumnsAsCQL(TableMetadata metadata)
-    {
-        List<String> droppedColumns = new ArrayList<>();
-
-        for (Map.Entry<ByteBuffer, DroppedColumn> entry: metadata.droppedColumns.entrySet())
-        {
-            DroppedColumn column = entry.getValue();
-            droppedColumns.add(toCQLDrop(metadata, column));
-            if (metadata.getColumn(entry.getKey()) != null)
-                droppedColumns.add(toCQLAdd(metadata, metadata.getColumn(entry.getKey())));
-        }
-
-        return droppedColumns;
-    }
-
-    /**
-     * Build a CQL String representation of Indexes on columns in the given Table
-     */
-    @VisibleForTesting
-    public static List<String> getIndexesAsCQL(TableMetadata metadata)
-    {
-        List<String> indexes = new ArrayList<>(metadata.indexes.size());
-        for (IndexMetadata indexMetadata: metadata.indexes)
-            indexes.add(toCQL(metadata, indexMetadata));
-        return indexes;
-    }
-
-    private static String toCQL(TableMetadata baseTable, IndexMetadata indexMetadata)
-    {
-        if (indexMetadata.isCustom())
-        {
-            Map<String, String> options = new HashMap<>();
-            indexMetadata.options.forEach((k, v) -> {
-                if (!k.equals(IndexTarget.TARGET_OPTION_NAME) && !k.equals(IndexTarget.CUSTOM_INDEX_OPTION_NAME))
-                    options.put(k, v);
-            });
-
-            return String.format("CREATE CUSTOM INDEX %s ON %s (%s) USING '%s'%s;",
-                                 indexMetadata.toCQLString(),
-                                 baseTable.toString(),
-                                 indexMetadata.options.get(IndexTarget.TARGET_OPTION_NAME),
-                                 indexMetadata.options.get(IndexTarget.CUSTOM_INDEX_OPTION_NAME),
-                                 options.isEmpty() ? "" : " WITH OPTIONS " + toCQL(options));
-        }
-        else
-        {
-            return String.format("CREATE INDEX %s ON %s (%s);",
-                                 indexMetadata.toCQLString(),
-                                 baseTable.toString(),
-                                 indexMetadata.options.get(IndexTarget.TARGET_OPTION_NAME));
-        }
-    }
-    private static String toCQL(UserType userType)
-    {
-        StringBuilder sb = new StringBuilder();
-        sb.append("CREATE TYPE ").append(userType.toCQLString()).append(" (");
-
-        Consumer<StringBuilder> commaAppender = commaAppender(" ");
-        for (int i = 0; i < userType.size(); i++)
-        {
-            commaAppender.accept(sb);
-            sb.append(String.format("%s %s",
-                                    userType.fieldNameAsString(i),
-                                    userType.fieldType(i).asCQL3Type()));
-        }
-        sb.append(");");
-        return sb.toString();
-    }
-
-    private static String toCQL(TableParams tableParams)
-    {
-        StringBuilder builder = new StringBuilder();
-
-        builder.append("bloom_filter_fp_chance = ").append(tableParams.bloomFilterFpChance);
-        builder.append("\n\tAND crc_check_chance = ").append(tableParams.crcCheckChance);
-        builder.append("\n\tAND default_time_to_live = ").append(tableParams.defaultTimeToLive);
-        builder.append("\n\tAND gc_grace_seconds = ").append(tableParams.gcGraceSeconds);
-        builder.append("\n\tAND min_index_interval = ").append(tableParams.minIndexInterval);
-        builder.append("\n\tAND max_index_interval = ").append(tableParams.maxIndexInterval);
-        builder.append("\n\tAND memtable_flush_period_in_ms = ").append(tableParams.memtableFlushPeriodInMs);
-        builder.append("\n\tAND speculative_retry = '").append(tableParams.speculativeRetry).append("'");
-        builder.append("\n\tAND additional_write_policy = '").append(tableParams.additionalWritePolicy).append("'");
-        builder.append("\n\tAND comment = ").append(singleQuote(tableParams.comment));
-        builder.append("\n\tAND caching = ").append(toCQL(tableParams.caching.asMap()));
-        builder.append("\n\tAND compaction = ").append(toCQL(tableParams.compaction.asMap()));
-        builder.append("\n\tAND compression = ").append(toCQL(tableParams.compression.asMap()));
-        builder.append("\n\tAND cdc = ").append(tableParams.cdc);
-
-        builder.append("\n\tAND extensions = { ");
-        for (Map.Entry<String, ByteBuffer> entry : tableParams.extensions.entrySet())
-        {
-            builder.append(singleQuote(entry.getKey()));
-            builder.append(": ");
-            builder.append("0x").append(ByteBufferUtil.bytesToHex(entry.getValue()));
-        }
-        builder.append(" }");
-        return builder.toString();
-    }
-
-    private static String toCQL(Map<?, ?> map)
-    {
-        StringBuilder builder = new StringBuilder("{ ");
-
-        boolean isFirst = true;
-        for (Map.Entry entry: map.entrySet())
-        {
-            if (isFirst)
-                isFirst = false;
-            else
-                builder.append(", ");
-            builder.append(singleQuote(entry.getKey().toString()));
-            builder.append(": ");
-            builder.append(singleQuote(entry.getValue().toString()));
-        }
-
-        builder.append(" }");
-        return builder.toString();
-    }
-
-    private static String toCQL(ColumnMetadata cd)
-    {
-        return toCQL(cd, false);
-    }
-
-    private static String toCQL(ColumnMetadata cd, boolean isStaticCompactTable)
-    {
-        return String.format("%s %s%s",
-                             cd.name.toCQLString(),
-                             cd.type.asCQL3Type().toString(),
-                             cd.isStatic() && !isStaticCompactTable ? " static" : "");
-    }
-
-    private static String toCQLAdd(TableMetadata table, ColumnMetadata cd)
-    {
-        return String.format("ALTER TABLE %s ADD %s %s%s;",
-                             table.toString(),
-                             cd.name.toCQLString(),
-                             cd.type.asCQL3Type().toString(),
-                             cd.isStatic() ? " static" : "");
-    }
-
-    private static String toCQLDrop(TableMetadata table, DroppedColumn droppedColumn)
-    {
-        return String.format("ALTER TABLE %s DROP %s USING TIMESTAMP %s;",
-                             table.toString(),
-                             droppedColumn.column.name.toCQLString(),
-                             droppedColumn.droppedTime);
-    }
-
-    private static void resolveUserType(UserType type, Set<AbstractType> typeSet, List<AbstractType> types)
-    {
-        for (AbstractType subType: type.fieldTypes())
-            if (!typeSet.contains(subType) && subType.isUDT())
-                resolveUserType((UserType) subType, typeSet, types);
-
-        if (!typeSet.contains(type))
-        {
-            typeSet.add(type);
-            types.add(type);
-        }
-    }
-
-    private static String singleQuote(String s)
-    {
-        return String.format("'%s'", singleQuotePattern.matcher(s).replaceAll("''"));
-    }
-
-    private static Consumer<StringBuilder> commaAppender(String afterComma)
-    {
-        AtomicBoolean isFirst = new AtomicBoolean(true);
-        return new Consumer<StringBuilder>()
-        {
-            public void accept(StringBuilder stringBuilder)
-            {
-                if (!isFirst.getAndSet(false))
-                    stringBuilder.append(',').append(afterComma);
-            }
-        };
-    }
-
-    /**
-     * Whether or not the given metadata is compatible / representable with CQL Language
-     */
-    public static boolean isCqlCompatible(TableMetadata metaData)
-    {
-        if (metaData.isSuper())
-            return false;
-
-        if (metaData.isCompactTable()
-            && metaData.regularColumns().size() > 1
-            && metaData.clusteringColumns().size() >= 1)
-            return false;
-
-        return true;
-    }
-}
diff --git a/src/java/org/apache/cassandra/db/marshal/AbstractType.java b/src/java/org/apache/cassandra/db/marshal/AbstractType.java
index f305313..b65a1c1 100644
--- a/src/java/org/apache/cassandra/db/marshal/AbstractType.java
+++ b/src/java/org/apache/cassandra/db/marshal/AbstractType.java
@@ -20,12 +20,7 @@ package org.apache.cassandra.db.marshal;
 import java.io.IOException;
 import java.lang.reflect.Method;
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -36,15 +31,14 @@ import org.apache.cassandra.cql3.ColumnSpecification;
 import org.apache.cassandra.cql3.Term;
 import org.apache.cassandra.db.TypeSizes;
 import org.apache.cassandra.exceptions.SyntaxException;
-import org.apache.cassandra.serializers.TypeSerializer;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.serializers.MarshalException;
-
+import org.apache.cassandra.serializers.TypeSerializer;
 import org.apache.cassandra.transport.ProtocolVersion;
+import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FastByteOperations;
 import org.github.jamm.Unmetered;
-import org.apache.cassandra.io.util.DataOutputPlus;
-import org.apache.cassandra.io.util.DataInputPlus;
-import org.apache.cassandra.utils.ByteBufferUtil;
 
 import static org.apache.cassandra.db.marshal.AbstractType.ComparisonType.CUSTOM;
 
@@ -348,6 +342,11 @@ public abstract class AbstractType<T> implements Comparator<ByteBuffer>, Assignm
         return this;
     }
 
+    public List<AbstractType<?>> subTypes()
+    {
+        return Collections.emptyList();
+    }
+
     /**
      * Returns an AbstractType instance that is equivalent to this one, but with all nested UDTs and collections
      * explicitly frozen.
diff --git a/src/java/org/apache/cassandra/db/marshal/ListType.java b/src/java/org/apache/cassandra/db/marshal/ListType.java
index 3dbf4a3..c6e0262 100644
--- a/src/java/org/apache/cassandra/db/marshal/ListType.java
+++ b/src/java/org/apache/cassandra/db/marshal/ListType.java
@@ -139,6 +139,12 @@ public class ListType<T> extends CollectionType<List<T>>
     }
 
     @Override
+    public List<AbstractType<?>> subTypes()
+    {
+        return Collections.singletonList(elements);
+    }
+
+    @Override
     public boolean isMultiCell()
     {
         return isMultiCell;
diff --git a/src/java/org/apache/cassandra/db/marshal/MapType.java b/src/java/org/apache/cassandra/db/marshal/MapType.java
index bab25d3..6abe388 100644
--- a/src/java/org/apache/cassandra/db/marshal/MapType.java
+++ b/src/java/org/apache/cassandra/db/marshal/MapType.java
@@ -129,6 +129,12 @@ public class MapType<K, V> extends CollectionType<Map<K, V>>
     }
 
     @Override
+    public List<AbstractType<?>> subTypes()
+    {
+        return Arrays.asList(keys, values);
+    }
+
+    @Override
     public AbstractType<?> freeze()
     {
         if (isMultiCell)
diff --git a/src/java/org/apache/cassandra/db/marshal/SetType.java b/src/java/org/apache/cassandra/db/marshal/SetType.java
index ae9e0c0..6d17b67 100644
--- a/src/java/org/apache/cassandra/db/marshal/SetType.java
+++ b/src/java/org/apache/cassandra/db/marshal/SetType.java
@@ -121,6 +121,12 @@ public class SetType<T> extends CollectionType<Set<T>>
     }
 
     @Override
+    public List<AbstractType<?>> subTypes()
+    {
+        return Collections.singletonList(elements);
+    }
+
+    @Override
     public AbstractType<?> freezeNestedMulticellTypes()
     {
         if (!isMultiCell())
diff --git a/src/java/org/apache/cassandra/db/marshal/TupleType.java b/src/java/org/apache/cassandra/db/marshal/TupleType.java
index 00f4d24..dfdb8c2 100644
--- a/src/java/org/apache/cassandra/db/marshal/TupleType.java
+++ b/src/java/org/apache/cassandra/db/marshal/TupleType.java
@@ -126,6 +126,12 @@ public class TupleType extends AbstractType<ByteBuffer>
         return types.size();
     }
 
+    @Override
+    public List<AbstractType<?>> subTypes()
+    {
+        return types;
+    }
+
     public List<AbstractType<?>> allTypes()
     {
         return types;
diff --git a/src/java/org/apache/cassandra/db/marshal/UserType.java b/src/java/org/apache/cassandra/db/marshal/UserType.java
index 01e6a3f..3c023b7 100644
--- a/src/java/org/apache/cassandra/db/marshal/UserType.java
+++ b/src/java/org/apache/cassandra/db/marshal/UserType.java
@@ -29,9 +29,9 @@ import org.apache.cassandra.db.rows.Cell;
 import org.apache.cassandra.db.rows.CellPath;
 import org.apache.cassandra.schema.Difference;
 import org.apache.cassandra.serializers.MarshalException;
-import org.apache.cassandra.transport.ProtocolVersion;
 import org.apache.cassandra.serializers.TypeSerializer;
 import org.apache.cassandra.serializers.UserTypeSerializer;
+import org.apache.cassandra.transport.ProtocolVersion;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.Pair;
 
@@ -43,7 +43,7 @@ import static com.google.common.collect.Iterables.transform;
  *
  * A user type is really just a tuple type on steroids.
  */
-public class UserType extends TupleType
+public class UserType extends TupleType implements SchemaElement
 {
     public final String keyspace;
     public final ByteBuffer name;
@@ -432,7 +432,7 @@ public class UserType extends TupleType
         return sb.toString();
     }
 
-    public String toCQLString()
+    public String getCqlTypeName()
     {
         return String.format("%s.%s", ColumnIdentifier.maybeQuote(keyspace), ColumnIdentifier.maybeQuote(getNameAsString()));
     }
@@ -442,4 +442,52 @@ public class UserType extends TupleType
     {
         return serializer;
     }
+
+    @Override
+    public SchemaElementType elementType()
+    {
+        return SchemaElementType.TYPE;
+    }
+
+    @Override
+    public String elementKeyspace()
+    {
+        return keyspace;
+    }
+
+    @Override
+    public String elementName()
+    {
+        return getNameAsString();
+    }
+
+    @Override
+    public String toCqlString(boolean withInternals)
+    {
+        CqlBuilder builder = new CqlBuilder();
+        builder.append("CREATE TYPE ")
+               .appendQuotingIfNeeded(keyspace)
+               .append('.')
+               .appendQuotingIfNeeded(getNameAsString())
+               .append(" (")
+               .newLine()
+               .increaseIndent();
+
+        for (int i = 0; i < size(); i++)
+        {
+            if (i > 0)
+                builder.append(",")
+                       .newLine();
+
+            builder.append(fieldNameAsString(i))
+                   .append(' ')
+                   .append(fieldType(i));
+        }
+
+        builder.newLine()
+               .decreaseIndent()
+               .append(");");
+
+        return builder.toString();
+    }
 }
diff --git a/src/java/org/apache/cassandra/db/virtual/SystemViewsKeyspace.java b/src/java/org/apache/cassandra/db/virtual/SystemViewsKeyspace.java
index 0805fcf..92da4af 100644
--- a/src/java/org/apache/cassandra/db/virtual/SystemViewsKeyspace.java
+++ b/src/java/org/apache/cassandra/db/virtual/SystemViewsKeyspace.java
@@ -19,24 +19,24 @@ package org.apache.cassandra.db.virtual;
 
 import com.google.common.collect.ImmutableList;
 
+import static org.apache.cassandra.schema.SchemaConstants.VIRTUAL_VIEWS;
+
 public final class SystemViewsKeyspace extends VirtualKeyspace
 {
-    private static final String NAME = "system_views";
-
     public static SystemViewsKeyspace instance = new SystemViewsKeyspace();
 
     private SystemViewsKeyspace()
     {
-        super(NAME, new ImmutableList.Builder<VirtualTable>()
-                    .add(new CachesTable(NAME))
-                    .add(new ClientsTable(NAME))
-                    .add(new SettingsTable(NAME))
-                    .add(new SystemPropertiesTable(NAME))
-                    .add(new SSTableTasksTable(NAME))
-                    .add(new ThreadPoolsTable(NAME))
-                    .add(new InternodeOutboundTable(NAME))
-                    .add(new InternodeInboundTable(NAME))
-                    .addAll(TableMetricTables.getAll(NAME))
+        super(VIRTUAL_VIEWS, new ImmutableList.Builder<VirtualTable>()
+                    .add(new CachesTable(VIRTUAL_VIEWS))
+                    .add(new ClientsTable(VIRTUAL_VIEWS))
+                    .add(new SettingsTable(VIRTUAL_VIEWS))
+                    .add(new SystemPropertiesTable(VIRTUAL_VIEWS))
+                    .add(new SSTableTasksTable(VIRTUAL_VIEWS))
+                    .add(new ThreadPoolsTable(VIRTUAL_VIEWS))
+                    .add(new InternodeOutboundTable(VIRTUAL_VIEWS))
+                    .add(new InternodeInboundTable(VIRTUAL_VIEWS))
+                    .addAll(TableMetricTables.getAll(VIRTUAL_VIEWS))
                     .build());
     }
 }
diff --git a/src/java/org/apache/cassandra/db/virtual/VirtualSchemaKeyspace.java b/src/java/org/apache/cassandra/db/virtual/VirtualSchemaKeyspace.java
index 4255e2d..bb5a430 100644
--- a/src/java/org/apache/cassandra/db/virtual/VirtualSchemaKeyspace.java
+++ b/src/java/org/apache/cassandra/db/virtual/VirtualSchemaKeyspace.java
@@ -25,20 +25,18 @@ import org.apache.cassandra.db.marshal.UTF8Type;
 import org.apache.cassandra.dht.LocalPartitioner;
 import org.apache.cassandra.schema.ColumnMetadata;
 import org.apache.cassandra.schema.KeyspaceMetadata;
-import org.apache.cassandra.schema.Schema;
 import org.apache.cassandra.schema.TableMetadata;
 
+import static org.apache.cassandra.schema.SchemaConstants.VIRTUAL_SCHEMA;
 import static org.apache.cassandra.schema.TableMetadata.builder;
 
 public final class VirtualSchemaKeyspace extends VirtualKeyspace
 {
-    private static final String NAME = "system_virtual_schema";
-
     public static final VirtualSchemaKeyspace instance = new VirtualSchemaKeyspace();
 
     private VirtualSchemaKeyspace()
     {
-        super(NAME, ImmutableList.of(new VirtualKeyspaces(NAME), new VirtualTables(NAME), new VirtualColumns(NAME)));
+        super(VIRTUAL_SCHEMA, ImmutableList.of(new VirtualKeyspaces(VIRTUAL_SCHEMA), new VirtualTables(VIRTUAL_SCHEMA), new VirtualColumns(VIRTUAL_SCHEMA)));
     }
 
     private static final class VirtualKeyspaces extends AbstractVirtualTable
diff --git a/src/java/org/apache/cassandra/exceptions/InvalidRequestException.java b/src/java/org/apache/cassandra/exceptions/InvalidRequestException.java
index 4259d1a..846c38a 100644
--- a/src/java/org/apache/cassandra/exceptions/InvalidRequestException.java
+++ b/src/java/org/apache/cassandra/exceptions/InvalidRequestException.java
@@ -23,4 +23,9 @@ public class InvalidRequestException extends RequestValidationException
     {
         super(ExceptionCode.INVALID, msg);
     }
+
+    public InvalidRequestException(String msg, Throwable t)
+    {
+        super(ExceptionCode.INVALID, msg, t);
+    }
 }
diff --git a/src/java/org/apache/cassandra/locator/EndpointsByRange.java b/src/java/org/apache/cassandra/locator/EndpointsByRange.java
index 2d1cde6..a1b03b3 100644
--- a/src/java/org/apache/cassandra/locator/EndpointsByRange.java
+++ b/src/java/org/apache/cassandra/locator/EndpointsByRange.java
@@ -25,8 +25,6 @@ import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.locator.ReplicaCollection.Builder.Conflict;
 
-import java.util.Collections;
-import java.util.HashMap;
 import java.util.Map;
 
 public class EndpointsByRange extends ReplicaMultimap<Range<Token>, EndpointsForRange>
diff --git a/src/java/org/apache/cassandra/schema/ColumnMetadata.java b/src/java/org/apache/cassandra/schema/ColumnMetadata.java
index e6547d8..ee34be5 100644
--- a/src/java/org/apache/cassandra/schema/ColumnMetadata.java
+++ b/src/java/org/apache/cassandra/schema/ColumnMetadata.java
@@ -426,6 +426,16 @@ public final class ColumnMetadata extends ColumnSpecification implements Selecta
             ((UserType)type).nameComparator().validate(path.get(0));
     }
 
+    public void appendCqlTo(CqlBuilder builder, boolean ignoreStatic)
+    {
+        builder.append(name)
+               .append(' ')
+               .append(type);
+
+        if (isStatic() && !ignoreStatic)
+            builder.append(" static");
+    }
+
     public static String toCQLString(Iterable<ColumnMetadata> defs)
     {
         return toCQLString(defs.iterator());
@@ -443,6 +453,14 @@ public final class ColumnMetadata extends ColumnSpecification implements Selecta
         return sb.toString();
     }
 
+
+    public void appendNameAndOrderTo(CqlBuilder builder)
+    {
+        builder.append(name.toCQLString())
+               .append(' ')
+               .append(clusteringOrder().toString());
+    }
+
     /**
      * The type of the cell values for cell belonging to this column.
      *
@@ -648,7 +666,4 @@ public final class ColumnMetadata extends ColumnSpecification implements Selecta
             }
         }
     }
-
-
-
 }
diff --git a/src/java/org/apache/cassandra/schema/Functions.java b/src/java/org/apache/cassandra/schema/Functions.java
index 2a0111c..c5de3b8 100644
--- a/src/java/org/apache/cassandra/schema/Functions.java
+++ b/src/java/org/apache/cassandra/schema/Functions.java
@@ -20,6 +20,7 @@ package org.apache.cassandra.schema;
 import java.nio.ByteBuffer;
 import java.util.*;
 import java.util.function.Predicate;
+import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 import com.google.common.collect.*;
@@ -142,6 +143,36 @@ public final class Functions implements Iterable<Function>
         return functions.get(name);
     }
 
+    /**
+     * Get all UDFs overloads with the specified name
+     *
+     * @param name fully qualified function name
+     * @return an empty list if the function name is not found; a non-empty collection of {@link UDFunction} otherwise
+     */
+    public Collection<UDFunction> getUdfs(FunctionName name)
+    {
+        return functions.get(name)
+                        .stream()
+                        .filter(Filter.UDF)
+                        .map(f -> (UDFunction) f)
+                        .collect(Collectors.toList());
+    }
+
+    /**
+     * Get all UDAs overloads with the specified name
+     *
+     * @param name fully qualified function name
+     * @return an empty list if the function name is not found; a non-empty collection of {@link UDAggregate} otherwise
+     */
+    public Collection<UDAggregate> getUdas(FunctionName name)
+    {
+        return functions.get(name)
+                        .stream()
+                        .filter(Filter.UDA)
+                        .map(f -> (UDAggregate) f)
+                        .collect(Collectors.toList());
+    }
+
     public Optional<Function> find(FunctionName name, List<AbstractType<?>> argTypes)
     {
         return find(name, argTypes, Filter.ALL);
diff --git a/src/java/org/apache/cassandra/schema/IndexMetadata.java b/src/java/org/apache/cassandra/schema/IndexMetadata.java
index 3020793..81f48ff 100644
--- a/src/java/org/apache/cassandra/schema/IndexMetadata.java
+++ b/src/java/org/apache/cassandra/schema/IndexMetadata.java
@@ -32,6 +32,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.cql3.ColumnIdentifier;
+import org.apache.cassandra.cql3.CqlBuilder;
 import org.apache.cassandra.cql3.statements.schema.IndexTarget;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.exceptions.UnknownIndexException;
@@ -224,9 +225,49 @@ public final class IndexMetadata
                .build();
     }
 
-    public String toCQLString()
+    public String toCqlString(TableMetadata table)
     {
-        return ColumnIdentifier.maybeQuote(name);
+        CqlBuilder builder = new CqlBuilder();
+        appendCqlTo(builder, table);
+        return builder.toString();
+    }
+
+    /**
+     * Appends to the specified builder the CQL used to create this index.
+     *
+     * @param builder the builder to which the CQL myst be appended
+     * @param table the parent table
+     */
+    public void appendCqlTo(CqlBuilder builder, TableMetadata table)
+    {
+        if (isCustom())
+        {
+            Map<String, String> copyOptions = new HashMap<>(options);
+
+            builder.append("CREATE CUSTOM INDEX ")
+                   .appendQuotingIfNeeded(name)
+                   .append(" ON ")
+                   .append(table.toString())
+                   .append(" (")
+                   .append(copyOptions.remove(IndexTarget.TARGET_OPTION_NAME))
+                   .append(") USING ")
+                   .appendWithSingleQuotes(copyOptions.remove(IndexTarget.CUSTOM_INDEX_OPTION_NAME));
+
+            if (!copyOptions.isEmpty())
+                builder.append(" WITH OPTIONS = ")
+                       .append(options);
+        }
+        else
+        {
+            builder.append("CREATE INDEX ")
+                   .appendQuotingIfNeeded(name)
+                   .append(" ON ")
+                   .append(table.toString())
+                   .append(" (")
+                   .append(options.get(IndexTarget.TARGET_OPTION_NAME))
+                   .append(')');
+        }
+        builder.append(';');
     }
 
     public static class Serializer
diff --git a/src/java/org/apache/cassandra/schema/KeyspaceMetadata.java b/src/java/org/apache/cassandra/schema/KeyspaceMetadata.java
index aacd962..23c931f 100644
--- a/src/java/org/apache/cassandra/schema/KeyspaceMetadata.java
+++ b/src/java/org/apache/cassandra/schema/KeyspaceMetadata.java
@@ -28,6 +28,8 @@ import com.google.common.base.Objects;
 import com.google.common.collect.Iterables;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.cql3.CqlBuilder;
+import org.apache.cassandra.cql3.SchemaElement;
 import org.apache.cassandra.cql3.functions.UDAggregate;
 import org.apache.cassandra.cql3.functions.UDFunction;
 import org.apache.cassandra.db.marshal.UserType;
@@ -46,7 +48,7 @@ import static com.google.common.collect.Iterables.any;
 /**
  * An immutable representation of keyspace metadata (name, params, tables, types, and functions).
  */
-public final class KeyspaceMetadata
+public final class KeyspaceMetadata implements SchemaElement
 {
     public enum Kind
     {
@@ -230,6 +232,61 @@ public final class KeyspaceMetadata
                           .toString();
     }
 
+    @Override
+    public SchemaElementType elementType()
+    {
+        return SchemaElementType.KEYSPACE;
+    }
+
+    @Override
+    public String elementKeyspace()
+    {
+        return name;
+    }
+
+    @Override
+    public String elementName()
+    {
+        return name;
+    }
+
+    @Override
+    public String toCqlString(boolean withInternals)
+    {
+        CqlBuilder builder = new CqlBuilder();
+        if (isVirtual())
+        {
+            builder.append("/*")
+                   .newLine()
+                   .append("Warning: Keyspace ")
+                   .appendQuotingIfNeeded(name)
+                   .append(" is a virtual keyspace and cannot be recreated with CQL.")
+                   .newLine()
+                   .append("Structure, for reference:")
+                   .newLine()
+                   .append("VIRTUAL KEYSPACE ")
+                   .appendQuotingIfNeeded(name)
+                   .append(';')
+                   .newLine()
+                   .append("*/")
+                   .toString();
+        }
+        else
+        {
+            builder.append("CREATE KEYSPACE ")
+                   .appendQuotingIfNeeded(name)
+                   .append(" WITH replication = ");
+
+            params.replication.appendCqlTo(builder);
+
+            builder.append("  AND durable_writes = ")
+                   .append(params.durableWrites)
+                   .append(';')
+                   .toString();
+        }
+        return builder.toString();
+    }
+
     public void validate()
     {
         if (!SchemaConstants.isValidName(name))
diff --git a/src/java/org/apache/cassandra/schema/ReplicationParams.java b/src/java/org/apache/cassandra/schema/ReplicationParams.java
index 5f744f8..048b4ed 100644
--- a/src/java/org/apache/cassandra/schema/ReplicationParams.java
+++ b/src/java/org/apache/cassandra/schema/ReplicationParams.java
@@ -25,6 +25,7 @@ import com.google.common.base.Objects;
 import com.google.common.collect.ImmutableMap;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.cql3.CqlBuilder;
 import org.apache.cassandra.locator.*;
 import org.apache.cassandra.service.StorageService;
 
@@ -128,4 +129,21 @@ public final class ReplicationParams
             helper.add(entry.getKey(), entry.getValue());
         return helper.toString();
     }
+
+    public void appendCqlTo(CqlBuilder builder)
+    {
+        String classname = "org.apache.cassandra.locator".equals(klass.getPackage().getName()) ? klass.getSimpleName()
+                                                                                               : klass.getName();
+        builder.append("{'class': ")
+               .appendWithSingleQuotes(classname);
+
+        options.forEach((k, v) -> {
+            builder.append(", ")
+                   .appendWithSingleQuotes(k)
+                   .append(": ")
+                   .appendWithSingleQuotes(v);
+        });
+
+        builder.append('}');
+    }
 }
diff --git a/src/java/org/apache/cassandra/schema/Schema.java b/src/java/org/apache/cassandra/schema/Schema.java
index 2c2d444..e2be6ee 100644
--- a/src/java/org/apache/cassandra/schema/Schema.java
+++ b/src/java/org/apache/cassandra/schema/Schema.java
@@ -239,6 +239,11 @@ public final class Schema
         return keyspaceInstances.remove(keyspaceName);
     }
 
+    public Keyspaces snapshot()
+    {
+        return keyspaces;
+    }
+
     /**
      * Remove keyspace definition from system
      *
diff --git a/src/java/org/apache/cassandra/schema/SchemaConstants.java b/src/java/org/apache/cassandra/schema/SchemaConstants.java
index 82bd2cb..7b6b7de 100644
--- a/src/java/org/apache/cassandra/schema/SchemaConstants.java
+++ b/src/java/org/apache/cassandra/schema/SchemaConstants.java
@@ -39,6 +39,10 @@ public final class SchemaConstants
     public static final String AUTH_KEYSPACE_NAME = "system_auth";
     public static final String DISTRIBUTED_KEYSPACE_NAME = "system_distributed";
 
+    public static final String VIRTUAL_SCHEMA = "system_virtual_schema";
+
+    public static final String VIRTUAL_VIEWS = "system_views";
+
     /* system keyspace names (the ones with LocalStrategy replication strategy) */
     public static final Set<String> LOCAL_SYSTEM_KEYSPACE_NAMES =
         ImmutableSet.of(SYSTEM_KEYSPACE_NAME, SCHEMA_KEYSPACE_NAME);
@@ -84,4 +88,24 @@ public final class SchemaConstants
     {
         return REPLICATED_SYSTEM_KEYSPACE_NAMES.contains(keyspaceName.toLowerCase());
     }
+
+    /**
+     * Checks if the keyspace is a virtual system keyspace.
+     * @return {@code true} if the keyspace is a virtual system keyspace, {@code false} otherwise.
+     */
+    public static boolean isVirtualSystemKeyspace(String keyspaceName)
+    {
+        return VIRTUAL_SCHEMA.equals(keyspaceName.toLowerCase()) || VIRTUAL_VIEWS.equals(keyspaceName.toLowerCase());
+    }
+
+    /**
+     * Checks if the keyspace is a system keyspace (local replicated or virtual).
+     * @return {@code true} if the keyspace is a system keyspace, {@code false} otherwise.
+     */
+    public static boolean isSystemKeyspace(String keyspaceName)
+    {
+        return isLocalSystemKeyspace(keyspaceName)
+                || isReplicatedSystemKeyspace(keyspaceName)
+                || isVirtualSystemKeyspace(keyspaceName);
+    }
 }
diff --git a/src/java/org/apache/cassandra/schema/TableMetadata.java b/src/java/org/apache/cassandra/schema/TableMetadata.java
index 02b851b..bebb65e 100644
--- a/src/java/org/apache/cassandra/schema/TableMetadata.java
+++ b/src/java/org/apache/cassandra/schema/TableMetadata.java
@@ -19,7 +19,7 @@ package org.apache.cassandra.schema;
 
 import java.nio.ByteBuffer;
 import java.util.*;
-import java.util.Objects;
+import java.util.Map.Entry;
 
 import javax.annotation.Nullable;
 
@@ -32,6 +32,8 @@ import org.slf4j.LoggerFactory;
 import org.apache.cassandra.auth.DataResource;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.cql3.ColumnIdentifier;
+import org.apache.cassandra.cql3.CqlBuilder;
+import org.apache.cassandra.cql3.SchemaElement;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.marshal.*;
 import org.apache.cassandra.dht.IPartitioner;
@@ -40,16 +42,15 @@ import org.apache.cassandra.service.reads.SpeculativeRetryPolicy;
 import org.apache.cassandra.utils.AbstractIterator;
 import org.github.jamm.Unmetered;
 
+import static com.google.common.collect.Iterables.any;
+import static com.google.common.collect.Iterables.transform;
 import static java.lang.String.format;
 import static java.util.stream.Collectors.toList;
 import static java.util.stream.Collectors.toSet;
-
-import static com.google.common.collect.Iterables.any;
-import static com.google.common.collect.Iterables.transform;
 import static org.apache.cassandra.schema.IndexMetadata.isNameValid;
 
 @Unmetered
-public final class TableMetadata
+public final class TableMetadata implements SchemaElement
 {
     private static final Logger logger = LoggerFactory.getLogger(TableMetadata.class);
     private static final ImmutableSet<Flag> DEFAULT_CQL_FLAGS = ImmutableSet.of(Flag.COMPOUND);
@@ -295,6 +296,11 @@ public final class TableMetadata
         return clusteringColumns;
     }
 
+    public ImmutableList<ColumnMetadata> createStatementClusteringColumns()
+    {
+        return isStaticCompactTable() ? ImmutableList.of() : clusteringColumns;
+    }
+
     public RegularAndStaticColumns regularAndStaticColumns()
     {
         return regularAndStaticColumns;
@@ -317,27 +323,53 @@ public final class TableMetadata
      */
     public Iterator<ColumnMetadata> allColumnsInSelectOrder()
     {
-        final boolean isStaticCompactTable = isStaticCompactTable();
-        final boolean noNonPkColumns = isCompactTable() && CompactTables.hasEmptyCompactValue(this);
+        boolean isStaticCompactTable = isStaticCompactTable();
+        boolean noNonPkColumns = isCompactTable() && CompactTables.hasEmptyCompactValue(this);
 
-        return new AbstractIterator<ColumnMetadata>()
-        {
-            private final Iterator<ColumnMetadata> partitionKeyIter = partitionKeyColumns.iterator();
-            private final Iterator<ColumnMetadata> clusteringIter =
+        Iterator<ColumnMetadata> partitionKeyIter = partitionKeyColumns.iterator();
+        Iterator<ColumnMetadata> clusteringIter =
                 isStaticCompactTable ? Collections.emptyIterator() : clusteringColumns.iterator();
-            private final Iterator<ColumnMetadata> otherColumns =
+        Iterator<ColumnMetadata> otherColumns =
                 noNonPkColumns
-              ? Collections.emptyIterator()
-              : (isStaticCompactTable ? staticColumns().selectOrderIterator()
-                                      : regularAndStaticColumns.selectOrderIterator());
+                      ? Collections.emptyIterator()
+                      : (isStaticCompactTable ? staticColumns().selectOrderIterator()
+                                              : regularAndStaticColumns.selectOrderIterator());
+
+        return columnsIterator(partitionKeyIter, clusteringIter, otherColumns);
+    }
+
+    /**
+     * Returns an iterator over all column definitions that respect the order of the CREATE statement.
+     */
+    public Iterator<ColumnMetadata> allColumnsInCreateOrder()
+    {
+        boolean isStaticCompactTable = isStaticCompactTable();
+        boolean noNonPkColumns = isCompactTable() && CompactTables.hasEmptyCompactValue(this);
 
+        Iterator<ColumnMetadata> partitionKeyIter = partitionKeyColumns.iterator();
+        Iterator<ColumnMetadata> clusteringIter = createStatementClusteringColumns().iterator();
+        Iterator<ColumnMetadata> otherColumns =
+                noNonPkColumns
+                      ? Collections.emptyIterator()
+                      : (isStaticCompactTable ? staticColumns().iterator()
+                                              : regularAndStaticColumns.iterator());
+
+        return columnsIterator(partitionKeyIter, clusteringIter, otherColumns);
+    }
+
+    private static Iterator<ColumnMetadata> columnsIterator(Iterator<ColumnMetadata> partitionKeys,
+                                                            Iterator<ColumnMetadata> clusteringColumns,
+                                                            Iterator<ColumnMetadata> otherColumns)
+    {
+        return new AbstractIterator<ColumnMetadata>()
+        {
             protected ColumnMetadata computeNext()
             {
-                if (partitionKeyIter.hasNext())
-                    return partitionKeyIter.next();
+                if (partitionKeys.hasNext())
+                    return partitionKeys.next();
 
-                if (clusteringIter.hasNext())
-                    return clusteringIter.next();
+                if (clusteringColumns.hasNext())
+                    return clusteringColumns.next();
 
                 return otherColumns.hasNext() ? otherColumns.next() : endOfData();
             }
@@ -1090,4 +1122,268 @@ public final class TableMetadata
     {
         return isView() && Keyspace.open(keyspace).viewManager.getByName(name).enforceStrictLiveness();
     }
+
+    /**
+     * Returns the names of all the user types referenced by this table.
+     *
+     * @return the names of all the user types referenced by this table.
+     */
+    public Set<ByteBuffer> getReferencedUserTypes()
+    {
+        Set<ByteBuffer> types = new LinkedHashSet<>();
+        columns().forEach(c -> addUserTypes(c.type, types));
+        return types;
+    }
+
+    /**
+     * Find all user types used by the specified type and add them to the set.
+     *
+     * @param type the type to check for user types.
+     * @param types the set of UDT names to which to add new user types found in {@code type}. Note that the
+     * insertion ordering is important and ensures that if a user type A uses another user type B, then B will appear
+     * before A in iteration order.
+     */
+    private static void addUserTypes(AbstractType<?> type, Set<ByteBuffer> types)
+    {
+        // Reach into subtypes first, so that if the type is a UDT, it's dependencies are recreated first.
+        type.subTypes().forEach(t -> addUserTypes(t, types));
+
+        if (type.isUDT())
+            types.add(((UserType)type).name);
+    }
+
+    @Override
+    public SchemaElementType elementType()
+    {
+        return SchemaElementType.TABLE;
+    }
+
+    @Override
+    public String elementKeyspace()
+    {
+        return keyspace;
+    }
+
+    @Override
+    public String elementName()
+    {
+        return name;
+    }
+
+    @Override
+    public String toCqlString(boolean withInternals)
+    {
+        CqlBuilder builder = new CqlBuilder(2048);
+        appendCqlTo(builder, withInternals, withInternals, false);
+        return builder.toString();
+    }
+
+    public String toCqlString(boolean includeDroppedColumns,
+                              boolean internals,
+                              boolean ifNotExists)
+    {
+        CqlBuilder builder = new CqlBuilder(2048);
+        appendCqlTo(builder, includeDroppedColumns, internals, ifNotExists);
+        return builder.toString();
+    }
+
+    public void appendCqlTo(CqlBuilder builder,
+                            boolean includeDroppedColumns,
+                            boolean internals,
+                            boolean ifNotExists)
+    {
+        assert !isView();
+
+        String createKeyword = "CREATE";
+        if (!isCQLTable())
+        {
+            builder.append("/*")
+                   .newLine()
+                   .append("Warning: Table ")
+                   .append(toString())
+                   .append(" omitted because it has constructs not compatible with CQL (was created via legacy API).")
+                   .newLine()
+                   .append("Approximate structure, for reference:")
+                   .newLine()
+                   .append("(this should not be used to reproduce this schema)")
+                   .newLine()
+                   .newLine();
+        }
+        else if (isVirtual())
+        {
+            builder.append(String.format("/*\n" +
+                    "Warning: Table %s is a virtual table and cannot be recreated with CQL.\n" +
+                    "Structure, for reference:\n",
+                                         toString()));
+            createKeyword = "VIRTUAL";
+        }
+
+        builder.append(createKeyword)
+               .append(" TABLE ");
+
+        if (ifNotExists)
+            builder.append("IF NOT EXISTS ");
+
+        builder.append(toString())
+               .append(" (")
+               .newLine()
+               .increaseIndent();
+
+        boolean hasSingleColumnPrimaryKey = partitionKeyColumns.size() == 1 && clusteringColumns.isEmpty();
+
+        appendColumnDefinitions(builder, includeDroppedColumns, hasSingleColumnPrimaryKey);
+
+        if (!hasSingleColumnPrimaryKey)
+            appendPrimaryKey(builder);
+
+        builder.decreaseIndent()
+               .append(')');
+
+        appendTableOptions(builder, internals);
+
+        builder.decreaseIndent();
+
+        if (!isCQLTable() || isVirtual())
+        {
+            builder.newLine()
+                   .append("*/");
+        }
+
+        if (includeDroppedColumns)
+            appendDropColumns(builder);
+    }
+
+    private void appendColumnDefinitions(CqlBuilder builder,
+                                         boolean includeDroppedColumns,
+                                         boolean hasSingleColumnPrimaryKey)
+    {
+        Iterator<ColumnMetadata> iter = allColumnsInCreateOrder();
+        while (iter.hasNext())
+        {
+            ColumnMetadata column = iter.next();
+
+            // If the column has been re-added after a drop, we don't include it right away. Instead, we'll add the
+            // dropped one first below, then we'll issue the DROP and then the actual ADD for this column, thus
+            // simulating the proper sequence of events.
+            if (includeDroppedColumns && droppedColumns.containsKey(column.name.bytes))
+                continue;
+
+            column.appendCqlTo(builder, isStaticCompactTable());
+
+            if (hasSingleColumnPrimaryKey && column.isPartitionKey())
+                builder.append(" PRIMARY KEY");
+
+            if (!hasSingleColumnPrimaryKey || (includeDroppedColumns && !droppedColumns.isEmpty()) || iter.hasNext())
+                builder.append(',');
+
+            builder.newLine();
+        }
+
+        if (includeDroppedColumns)
+        {
+            Iterator<DroppedColumn> iterDropped = droppedColumns.values().iterator();
+            while (iterDropped.hasNext())
+            {
+                DroppedColumn dropped = iterDropped.next();
+                dropped.column.appendCqlTo(builder, isStaticCompactTable());
+
+                if (!hasSingleColumnPrimaryKey || iter.hasNext())
+                    builder.append(',');
+
+                builder.newLine();
+            }
+        }
+    }
+
+    void appendPrimaryKey(CqlBuilder builder)
+    {
+        List<ColumnMetadata> partitionKeyColumns = partitionKeyColumns();
+        List<ColumnMetadata> clusteringColumns = createStatementClusteringColumns();
+
+        builder.append("PRIMARY KEY (");
+        if (partitionKeyColumns.size() > 1)
+        {
+            builder.append('(')
+                   .appendWithSeparators(partitionKeyColumns, (b, c) -> b.append(c.name), ", ")
+                   .append(')');
+        }
+        else
+        {
+            builder.append(partitionKeyColumns.get(0).name);
+        }
+
+        if (!clusteringColumns.isEmpty())
+            builder.append(", ")
+                   .appendWithSeparators(clusteringColumns, (b, c) -> b.append(c.name), ", ");
+
+        builder.append(')')
+               .newLine();
+    }
+
+    void appendTableOptions(CqlBuilder builder, boolean internals)
+    {
+        builder.append(" WITH ")
+               .increaseIndent();
+
+        if (internals)
+            builder.append("ID = ")
+                   .append(id.toString())
+                   .newLine()
+                   .append("AND ");
+
+        if (isCompactTable())
+            builder.append("COMPACT STORAGE")
+                   .newLine()
+                   .append("AND ");
+
+        List<ColumnMetadata> clusteringColumns = createStatementClusteringColumns();
+        if (!clusteringColumns.isEmpty())
+        {
+            builder.append("CLUSTERING ORDER BY (")
+                   .appendWithSeparators(clusteringColumns, (b, c) -> c.appendNameAndOrderTo(b), ", ")
+                   .append(')')
+                   .newLine()
+                   .append("AND ");
+        }
+
+        if (isVirtual())
+        {
+            builder.append("comment = ").appendWithSingleQuotes(params.comment);
+        }
+        else
+        {
+            params.appendCqlTo(builder);
+        }
+        builder.append(";");
+    }
+
+    private void appendDropColumns(CqlBuilder builder)
+    {
+        for (Entry<ByteBuffer, DroppedColumn> entry : droppedColumns.entrySet())
+        {
+            DroppedColumn dropped = entry.getValue();
+
+            builder.newLine()
+                   .append("ALTER TABLE ")
+                   .append(toString())
+                   .append(" DROP ")
+                   .append(dropped.column.name)
+                   .append(" USING TIMESTAMP ")
+                   .append(dropped.droppedTime)
+                   .append(';');
+
+            ColumnMetadata column = getColumn(entry.getKey());
+            if (column != null)
+            {
+                builder.newLine()
+                       .append("ALTER TABLE ")
+                       .append(toString())
+                       .append(" ADD ");
+
+                column.appendCqlTo(builder, false);
+
+                builder.append(';');
+            }
+        }
+    }
 }
diff --git a/src/java/org/apache/cassandra/schema/TableParams.java b/src/java/org/apache/cassandra/schema/TableParams.java
index 7c3eaa2..c99a2f3 100644
--- a/src/java/org/apache/cassandra/schema/TableParams.java
+++ b/src/java/org/apache/cassandra/schema/TableParams.java
@@ -19,19 +19,23 @@ package org.apache.cassandra.schema;
 
 import java.nio.ByteBuffer;
 import java.util.Map;
+import java.util.Map.Entry;
 
 import com.google.common.base.MoreObjects;
 import com.google.common.base.Objects;
 import com.google.common.collect.ImmutableMap;
 
 import org.apache.cassandra.cql3.Attributes;
+import org.apache.cassandra.cql3.CqlBuilder;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.service.reads.PercentileSpeculativeRetryPolicy;
 import org.apache.cassandra.service.reads.SpeculativeRetryPolicy;
 import org.apache.cassandra.service.reads.repair.ReadRepairStrategy;
 import org.apache.cassandra.utils.BloomCalculations;
+import org.apache.cassandra.utils.ByteBufferUtil;
 
 import static java.lang.String.format;
+import static java.util.stream.Collectors.toMap;
 
 public final class TableParams
 {
@@ -253,6 +257,47 @@ public final class TableParams
                           .toString();
     }
 
+    public void appendCqlTo(CqlBuilder builder)
+    {
+        // option names should be in alphabetical order
+        builder.append("additional_write_policy = ").appendWithSingleQuotes(additionalWritePolicy.toString())
+               .newLine()
+               .append("AND bloom_filter_fp_chance = ").append(bloomFilterFpChance)
+               .newLine()
+               .append("AND caching = ").append(caching.asMap())
+               .newLine()
+               .append("AND cdc = ").append(cdc)
+               .newLine()
+               .append("AND comment = ").appendWithSingleQuotes(comment)
+               .newLine()
+               .append("AND compaction = ").append(compaction.asMap())
+               .newLine()
+               .append("AND compression = ").append(compression.asMap())
+               .newLine()
+               .append("AND crc_check_chance = ").append(crcCheckChance)
+               .newLine()
+               .append("AND default_time_to_live = ").append(defaultTimeToLive)
+               .newLine()
+               .append("AND extensions = ").append(extensions.entrySet()
+                                                             .stream()
+                                                             .collect(toMap(Entry::getKey,
+                                                                             e -> "0x" + ByteBufferUtil.bytesToHex(e.getValue()))),
+                                                   false)
+               .newLine()
+               .append("AND gc_grace_seconds = ").append(gcGraceSeconds)
+               .newLine()
+               .append("AND max_index_interval = ").append(maxIndexInterval)
+               .newLine()
+               .append("AND memtable_flush_period_in_ms = ").append(memtableFlushPeriodInMs)
+               .newLine()
+               .append("AND min_index_interval = ").append(minIndexInterval)
+               .newLine()
+               .append("AND read_repair = ").appendWithSingleQuotes(readRepair.toString())
+               .newLine()
+               .append("AND speculative_retry = ").appendWithSingleQuotes(speculativeRetry.toString());
+
+    }
+
     public static final class Builder
     {
         private String comment = "";
diff --git a/src/java/org/apache/cassandra/schema/Tables.java b/src/java/org/apache/cassandra/schema/Tables.java
index 0320440..33ed1b8 100644
--- a/src/java/org/apache/cassandra/schema/Tables.java
+++ b/src/java/org/apache/cassandra/schema/Tables.java
@@ -23,6 +23,8 @@ import java.util.Iterator;
 import java.util.Map;
 import java.util.Optional;
 import java.util.function.Predicate;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
 
 import javax.annotation.Nullable;
 
@@ -77,6 +79,11 @@ public final class Tables implements Iterable<TableMetadata>
         return tables.values().iterator();
     }
 
+    public Stream<TableMetadata> stream()
+    {
+        return StreamSupport.stream(spliterator(), false);
+    }
+
     public Iterable<TableMetadata> referencingUserType(ByteBuffer name)
     {
         return Iterables.filter(tables.values(), t -> t.referencesUserType(name));
diff --git a/src/java/org/apache/cassandra/schema/Types.java b/src/java/org/apache/cassandra/schema/Types.java
index 64aeead..76694cc 100644
--- a/src/java/org/apache/cassandra/schema/Types.java
+++ b/src/java/org/apache/cassandra/schema/Types.java
@@ -20,6 +20,8 @@ package org.apache.cassandra.schema;
 import java.nio.ByteBuffer;
 import java.util.*;
 import java.util.function.Predicate;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
 
 import javax.annotation.Nullable;
 
@@ -86,6 +88,22 @@ public final class Types implements Iterable<UserType>
         return types.values().iterator();
     }
 
+    public Stream<UserType> stream()
+    {
+        return StreamSupport.stream(spliterator(), false);
+    }
+
+    /**
+     * Returns a stream of user types sorted by dependencies
+     * @return a stream of user types sorted by dependencies
+     */
+    public Stream<UserType> sortedStream()
+    {
+        Set<ByteBuffer> sorted = new LinkedHashSet<>();
+        types.values().forEach(t -> addUserTypes(t, sorted));
+        return sorted.stream().map(n -> types.get(n));
+    }
+
     public Iterable<UserType> referencingUserType(ByteBuffer name)
     {
         return Iterables.filter(types.values(), t -> t.referencesUserType(name) && !t.name.equals(name));
@@ -201,6 +219,36 @@ public final class Types implements Iterable<UserType>
         return types.values().toString();
     }
 
+    /**
+     * Sorts the types by dependencies.
+     *
+     * @param types the types to sort
+     * @return the types sorted by dependencies and names
+     */
+    private static Set<ByteBuffer> sortByDependencies(Collection<UserType> types)
+    {
+        Set<ByteBuffer> sorted = new LinkedHashSet<>();
+        types.stream().forEach(t -> addUserTypes(t, sorted));
+        return sorted;
+    }
+
+    /**
+     * Find all user types used by the specified type and add them to the set.
+     *
+     * @param type the type to check for user types.
+     * @param types the set of UDT names to which to add new user types found in {@code type}. Note that the
+     * insertion ordering is important and ensures that if a user type A uses another user type B, then B will appear
+     * before A in iteration order.
+     */
+    private static void addUserTypes(AbstractType<?> type, Set<ByteBuffer> types)
+    {
+        // Reach into subtypes first, so that if the type is a UDT, it's dependencies are recreated first.
+        type.subTypes().forEach(t -> addUserTypes(t, types));
+
+        if (type.isUDT())
+            types.add(((UserType) type).name);
+    }
+
     public static final class Builder
     {
         final ImmutableSortedMap.Builder<ByteBuffer, UserType> types = ImmutableSortedMap.naturalOrder();
diff --git a/src/java/org/apache/cassandra/schema/ViewMetadata.java b/src/java/org/apache/cassandra/schema/ViewMetadata.java
index 66360bf..3fbf728 100644
--- a/src/java/org/apache/cassandra/schema/ViewMetadata.java
+++ b/src/java/org/apache/cassandra/schema/ViewMetadata.java
@@ -26,7 +26,7 @@ import org.apache.commons.lang3.builder.ToStringBuilder;
 import org.apache.cassandra.cql3.*;
 import org.apache.cassandra.db.marshal.UserType;
 
-public final class ViewMetadata
+public final class ViewMetadata implements SchemaElement
 {
     public final TableId baseTableId;
     public final String baseTableName;
@@ -161,4 +161,79 @@ public final class ViewMetadata
                                 whereClause,
                                 metadata.unbuild().addColumn(column).build());
     }
+
+    public void appendCqlTo(CqlBuilder builder,
+                            boolean internals,
+                            boolean ifNotExists)
+    {
+        builder.append("CREATE MATERIALIZED VIEW ");
+
+        if (ifNotExists)
+            builder.append("IF NOT EXISTS ");
+
+        builder.append(metadata.toString())
+               .append(" AS")
+               .newLine()
+               .increaseIndent()
+               .append("SELECT ");
+
+        if (includeAllColumns)
+        {
+            builder.append('*');
+        }
+        else
+        {
+            builder.appendWithSeparators(metadata.allColumnsInSelectOrder(), (b, c) -> b.append(c.name), ", ");
+        }
+
+        builder.newLine()
+               .append("FROM ")
+               .appendQuotingIfNeeded(metadata.keyspace)
+               .append('.')
+               .appendQuotingIfNeeded(baseTableName)
+               .newLine()
+               .append("WHERE ")
+               .append(whereClause.toString())
+               .newLine();
+
+        metadata.appendPrimaryKey(builder);
+
+        builder.decreaseIndent();
+
+        metadata.appendTableOptions(builder, internals);
+    }
+
+    @Override
+    public SchemaElementType elementType()
+    {
+        return SchemaElementType.MATERIALIZED_VIEW;
+    }
+
+    @Override
+    public String elementKeyspace()
+    {
+        return keyspace();
+    }
+
+    @Override
+    public String elementName()
+    {
+        return name();
+    }
+
+    @Override
+    public String toCqlString(boolean withInternals)
+    {
+        CqlBuilder builder = new CqlBuilder(2048);
+        appendCqlTo(builder, withInternals, false);
+        return builder.toString();
+    }
+
+    public String toCqlString(boolean internals,
+                              boolean ifNotExists)
+    {
+        CqlBuilder builder = new CqlBuilder(2048);
+        appendCqlTo(builder, internals, ifNotExists);
+        return builder.toString();
+    }
 }
diff --git a/src/java/org/apache/cassandra/schema/Views.java b/src/java/org/apache/cassandra/schema/Views.java
index 07cd8f2..f926c07 100644
--- a/src/java/org/apache/cassandra/schema/Views.java
+++ b/src/java/org/apache/cassandra/schema/Views.java
@@ -22,6 +22,8 @@ import java.util.Iterator;
 import java.util.Map;
 import java.util.Optional;
 import java.util.function.Predicate;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
 
 import javax.annotation.Nullable;
 
@@ -83,6 +85,16 @@ public final class Views implements Iterable<ViewMetadata>
         return Iterables.filter(this, v -> v.baseTableId.equals(tableId));
     }
 
+    public Stream<ViewMetadata> stream()
+    {
+        return StreamSupport.stream(spliterator(), false);
+    }
+
+    public Stream<ViewMetadata> stream(TableId tableId)
+    {
+        return stream().filter(v -> v.baseTableId.equals(tableId));
+    }
+
     /**
      * Get the materialized view with the specified name
      *
diff --git a/src/java/org/apache/cassandra/utils/FBUtilities.java b/src/java/org/apache/cassandra/utils/FBUtilities.java
index f82df04..6db50a4 100644
--- a/src/java/org/apache/cassandra/utils/FBUtilities.java
+++ b/src/java/org/apache/cassandra/utils/FBUtilities.java
@@ -26,6 +26,9 @@ import java.security.MessageDigest;
 import java.security.NoSuchAlgorithmException;
 import java.util.*;
 import java.util.concurrent.*;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
 import java.util.zip.CRC32;
 import java.util.zip.Checksum;
 
diff --git a/test/unit/org/apache/cassandra/cql3/CQLTester.java b/test/unit/org/apache/cassandra/cql3/CQLTester.java
index d580923..619fdad 100644
--- a/test/unit/org/apache/cassandra/cql3/CQLTester.java
+++ b/test/unit/org/apache/cassandra/cql3/CQLTester.java
@@ -604,7 +604,7 @@ public abstract class CQLTester
 
     protected String createType(String query)
     {
-        String typeName = "type_" + seqNumber.getAndIncrement();
+        String typeName = String.format("type_%02d", seqNumber.getAndIncrement());
         String fullQuery = String.format(query, KEYSPACE + "." + typeName);
         types.add(typeName);
         logger.info(fullQuery);
@@ -614,7 +614,7 @@ public abstract class CQLTester
 
     protected String createFunction(String keyspace, String argTypes, String query) throws Throwable
     {
-        String functionName = keyspace + ".function_" + seqNumber.getAndIncrement();
+        String functionName = String.format("%s.function_%02d", keyspace, seqNumber.getAndIncrement());
         createFunctionOverload(functionName, argTypes, query);
         return functionName;
     }
@@ -629,7 +629,7 @@ public abstract class CQLTester
 
     protected String createAggregate(String keyspace, String argTypes, String query) throws Throwable
     {
-        String aggregateName = keyspace + "." + "aggregate_" + seqNumber.getAndIncrement();
+        String aggregateName = String.format("%s.aggregate_%02d", keyspace, seqNumber.getAndIncrement());
         createAggregateOverload(aggregateName, argTypes, query);
         return aggregateName;
     }
@@ -667,7 +667,7 @@ public abstract class CQLTester
     
     protected String createKeyspaceName()
     {
-        String currentKeyspace = "keyspace_" + seqNumber.getAndIncrement();
+        String currentKeyspace = String.format("keyspace_%02d", seqNumber.getAndIncrement());
         keyspaces.add(currentKeyspace);
         return currentKeyspace;
     }
@@ -688,7 +688,7 @@ public abstract class CQLTester
 
     protected String createTableName()
     {
-        String currentTable = "table_" + seqNumber.getAndIncrement();
+        String currentTable = String.format("table_%02d", seqNumber.getAndIncrement());
         tables.add(currentTable);
         return currentTable;
     }
@@ -893,6 +893,11 @@ public abstract class CQLTester
         return sessionNet().execute(formatQuery(query), values);
     }
 
+    protected com.datastax.driver.core.ResultSet executeNet(ProtocolVersion protocolVersion, Statement statement)
+    {
+        return sessionNet(protocolVersion).execute(statement);
+    }
+
     protected com.datastax.driver.core.ResultSet executeNetWithPaging(ProtocolVersion version, String query, int pageSize) throws Throwable
     {
         return sessionNet(version).execute(new SimpleStatement(formatQuery(query)).setFetchSize(pageSize));
@@ -1079,16 +1084,18 @@ public abstract class CQLTester
                 ByteBuffer expectedByteValue = makeByteBuffer(expected == null ? null : expected[j], column.type);
                 ByteBuffer actualValue = actual.getBytes(column.name.toString());
 
+                if (expectedByteValue != null)
+                    expectedByteValue = expectedByteValue.duplicate();
                 if (!Objects.equal(expectedByteValue, actualValue))
                 {
                     Object actualValueDecoded = actualValue == null ? null : column.type.getSerializer().deserialize(actualValue);
-                    if (!Objects.equal(expected[j], actualValueDecoded))
+                    if (!Objects.equal(expected != null ? expected[j] : null, actualValueDecoded))
                         Assert.fail(String.format("Invalid value for row %d column %d (%s of type %s), expected <%s> but got <%s>",
                                                   i,
                                                   j,
                                                   column.name,
                                                   column.type.asCQL3Type(),
-                                                  formatValue(expectedByteValue, column.type),
+                                                  formatValue(expectedByteValue != null ? expectedByteValue.duplicate() : null, column.type),
                                                   formatValue(actualValue, column.type)));
                 }
             }
diff --git a/test/unit/org/apache/cassandra/cql3/statements/DescribeStatementTest.java b/test/unit/org/apache/cassandra/cql3/statements/DescribeStatementTest.java
new file mode 100644
index 0000000..0b6d8a3
--- /dev/null
+++ b/test/unit/org/apache/cassandra/cql3/statements/DescribeStatementTest.java
@@ -0,0 +1,815 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.cql3.statements;
+
+import java.util.Iterator;
+import java.util.Optional;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+
+import org.apache.commons.lang3.ArrayUtils;
+
+import org.junit.Test;
+
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Row;
+import com.datastax.driver.core.SimpleStatement;
+import com.datastax.driver.core.exceptions.InvalidQueryException;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.cql3.CQLTester;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.locator.TokenMetadata;
+import org.apache.cassandra.schema.Schema;
+import org.apache.cassandra.schema.TableId;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.transport.ProtocolVersion;
+
+import static java.lang.String.format;
+import static org.apache.cassandra.schema.SchemaConstants.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class DescribeStatementTest extends CQLTester
+{
+    @Test
+    public void testSchemaChangeDuringPaging()
+    {
+            SimpleStatement stmt = new SimpleStatement("DESCRIBE KEYSPACES");
+            stmt.setFetchSize(1);
+            ResultSet rs = executeNet(ProtocolVersion.CURRENT, stmt);
+            Iterator<Row> iter = rs.iterator();
+            assertTrue(iter.hasNext());
+            iter.next();
+
+            createKeyspace("CREATE KEYSPACE %s WITH REPLICATION = {'class' : 'SimpleStrategy', 'replication_factor' : 1};");
+
+            try
+            {
+                iter.next();
+                fail("Expected InvalidQueryException");
+            }
+            catch (InvalidQueryException e)
+            {
+                assertEquals(DescribeStatement.SCHEMA_CHANGED_WHILE_PAGING_MESSAGE, e.getMessage());
+            }
+    }
+
+    @Test
+    public void testDescribeFunctionAndAggregate() throws Throwable
+    {
+        String fNonOverloaded = createFunction(KEYSPACE,
+                                               "",
+                                               "CREATE OR REPLACE FUNCTION %s() " +
+                                               "CALLED ON NULL INPUT " +
+                                               "RETURNS int " +
+                                               "LANGUAGE java " +
+                                               "AS 'throw new RuntimeException();';");
+
+        String fOverloaded = createFunction(KEYSPACE,
+                                            "int, ascii",
+                                            "CREATE FUNCTION %s (input int, other_in ascii) " +
+                                            "RETURNS NULL ON NULL INPUT " +
+                                            "RETURNS text " +
+                                            "LANGUAGE java " +
+                                            "AS 'return \"Hello World\";'");
+        createFunctionOverload(fOverloaded,
+                               "text, ascii",
+                               "CREATE FUNCTION %s (input text, other_in ascii) " +
+                               "RETURNS NULL ON NULL INPUT " +
+                               "RETURNS text " +
+                               "LANGUAGE java " +
+                               "AS 'return \"Hello World\";'");
+
+        for (String describeKeyword : new String[]{"DESCRIBE", "DESC"})
+        {
+            assertRowsNet(executeDescribeNet(describeKeyword + " FUNCTION " + fNonOverloaded),
+                          row(KEYSPACE,
+                              "function",
+                              shortFunctionName(fNonOverloaded) + "()",
+                              "CREATE FUNCTION " + fNonOverloaded + "()\n" +
+                                      "    CALLED ON NULL INPUT\n" +
+                                      "    RETURNS int\n" +
+                                      "    LANGUAGE java\n" +
+                                  "    AS $$throw new RuntimeException();$$;"));
+
+            assertRowsNet(executeDescribeNet(describeKeyword + " FUNCTION " + fOverloaded),
+                          row(KEYSPACE,
+                              "function",
+                              shortFunctionName(fOverloaded) + "(int, ascii)",
+                              "CREATE FUNCTION " + fOverloaded + "(input int, other_in ascii)\n" +
+                                      "    RETURNS NULL ON NULL INPUT\n" +
+                                      "    RETURNS text\n" +
+                                      "    LANGUAGE java\n" +
+                                  "    AS $$return \"Hello World\";$$;"),
+                          row(KEYSPACE,
+                              "function",
+                              shortFunctionName(fOverloaded) + "(text, ascii)",
+                              "CREATE FUNCTION " + fOverloaded + "(input text, other_in ascii)\n" +
+                                      "    RETURNS NULL ON NULL INPUT\n" +
+                                      "    RETURNS text\n" +
+                                      "    LANGUAGE java\n" +
+                                  "    AS $$return \"Hello World\";$$;"));
+
+            assertRowsNet(executeDescribeNet(describeKeyword + " FUNCTIONS"),
+                          row(KEYSPACE,
+                              "function",
+                              shortFunctionName(fNonOverloaded) + "()"),
+                          row(KEYSPACE,
+                              "function",
+                              shortFunctionName(fOverloaded) + "(int, ascii)"),
+                          row(KEYSPACE,
+                              "function",
+                              shortFunctionName(fOverloaded) + "(text, ascii)"));
+        }
+
+        String fIntState = createFunction(KEYSPACE,
+                                          "int, int",
+                                          "CREATE FUNCTION %s (state int, add_to int) " +
+                                          "CALLED ON NULL INPUT " +
+                                          "RETURNS int " +
+                                          "LANGUAGE java " +
+                                          "AS 'return state + add_to;'");
+        String fFinal = createFunction(KEYSPACE,
+                                       "int",
+                                       "CREATE FUNCTION %s (state int) " +
+                                       "RETURNS NULL ON NULL INPUT " +
+                                       "RETURNS int " +
+                                       "LANGUAGE java " +
+                                       "AS 'return state;'");
+
+        String aNonDeterministic = createAggregate(KEYSPACE,
+                                                   "int",
+                                                   format("CREATE AGGREGATE %%s(int) " +
+                                                                 "SFUNC %s " +
+                                                                 "STYPE int " +
+                                                                 "INITCOND 42",
+                                                                 shortFunctionName(fIntState)));
+        String aDeterministic = createAggregate(KEYSPACE,
+                                                "int",
+                                                format("CREATE AGGREGATE %%s(int) " +
+                                                              "SFUNC %s " +
+                                                              "STYPE int " +
+                                                              "FINALFUNC %s ",
+                                                              shortFunctionName(fIntState),
+                                                              shortFunctionName(fFinal)));
+
+        for (String describeKeyword : new String[]{"DESCRIBE", "DESC"})
+        {
+            assertRowsNet(executeDescribeNet(describeKeyword + " AGGREGATE " + aNonDeterministic),
+                          row(KEYSPACE,
+                              "aggregate",
+                              shortFunctionName(aNonDeterministic) + "(int)",
+                              "CREATE AGGREGATE " + aNonDeterministic + "(int)\n" +
+                                      "    SFUNC " + shortFunctionName(fIntState) + "\n" +
+                                      "    STYPE int\n" +
+                                  "    INITCOND 42;"));
+            assertRowsNet(executeDescribeNet(describeKeyword + " AGGREGATE " + aDeterministic),
+                          row(KEYSPACE,
+                              "aggregate",
+                              shortFunctionName(aDeterministic) + "(int)",
+                              "CREATE AGGREGATE " + aDeterministic + "(int)\n" +
+                                      "    SFUNC " + shortFunctionName(fIntState) + "\n" +
+                                      "    STYPE int\n" +
+                                      "    FINALFUNC " + shortFunctionName(fFinal) + ";"));
+            assertRowsNet(executeDescribeNet(describeKeyword + " AGGREGATES"),
+                          row(KEYSPACE,
+                              "aggregate",
+                              shortFunctionName(aNonDeterministic) + "(int)"),
+                          row(KEYSPACE,
+                              "aggregate",
+                              shortFunctionName(aDeterministic) + "(int)"));
+        }
+    }
+
+    @Test
+    public void testDescribeFunctionWithTuples() throws Throwable
+    {
+        String function = createFunction(KEYSPACE,
+                                         "tuple<int>, list<frozen<tuple<int, text>>>, tuple<frozen<tuple<int, text>>, text>",
+                                         "CREATE OR REPLACE FUNCTION %s(t tuple<int>, l list<frozen<tuple<int, text>>>, nt tuple<frozen<tuple<int, text>>, text>) " +
+                                         "CALLED ON NULL INPUT " +
+                                         "RETURNS tuple<int, text> " +
+                                         "LANGUAGE java " +
+                                         "AS 'throw new RuntimeException();';");
+
+            assertRowsNet(executeDescribeNet("DESCRIBE FUNCTION " + function),
+                          row(KEYSPACE,
+                              "function",
+                              shortFunctionName(function) + "(tuple<int>, list<frozen<tuple<int, text>>>, tuple<frozen<tuple<int, text>>, text>)",
+                              "CREATE FUNCTION " + function + "(t tuple<int>, l list<frozen<tuple<int, text>>>, nt tuple<frozen<tuple<int, text>>, text>)\n" +
+                              "    CALLED ON NULL INPUT\n" +
+                              "    RETURNS tuple<int, text>\n" +
+                              "    LANGUAGE java\n" +
+                              "    AS $$throw new RuntimeException();$$;"));
+    }
+
+    @Test
+    public void testDescribeMaterializedView() throws Throwable
+    {
+        assertRowsNet(executeDescribeNet("DESCRIBE ONLY KEYSPACE system_virtual_schema;"), 
+                      row("system_virtual_schema",
+                          "keyspace",
+                          "system_virtual_schema",
+                          "/*\n" + 
+                          "Warning: Keyspace system_virtual_schema is a virtual keyspace and cannot be recreated with CQL.\n" +
+                          "Structure, for reference:\n" +
+                          "VIRTUAL KEYSPACE system_virtual_schema;\n" +
+                          "*/"));
+
+        assertRowsNet(executeDescribeNet("DESCRIBE TABLE system_virtual_schema.columns;"), 
+                      row("system_virtual_schema",
+                          "table",
+                          "columns",
+                          "/*\n" + 
+                          "Warning: Table system_virtual_schema.columns is a virtual table and cannot be recreated with CQL.\n" +
+                          "Structure, for reference:\n" +
+                          "VIRTUAL TABLE system_virtual_schema.columns (\n" +
+                          "    keyspace_name text,\n" +
+                          "    table_name text,\n" +
+                          "    column_name text,\n" +
+                          "    clustering_order text,\n" +
+                          "    column_name_bytes blob,\n" +
+                          "    kind text,\n" +
+                          "    position int,\n" +
+                          "    type text,\n" +
+                          "    PRIMARY KEY (keyspace_name, table_name, column_name)\n" +
+                          ") WITH CLUSTERING ORDER BY (table_name ASC, column_name ASC)\n" +
+                          "    AND comment = 'virtual column definitions';\n" +
+                          "*/"));
+    }
+
+    @Test
+    public void testDescribe() throws Throwable
+    {
+        try
+        {
+            execute("CREATE KEYSPACE test WITH REPLICATION = {'class' : 'SimpleStrategy', 'replication_factor' : 1};");
+            execute("CREATE TABLE test.users ( userid text PRIMARY KEY, firstname text, lastname text, age int);");
+            execute("CREATE INDEX myindex ON test.users (age);");
+            execute("CREATE TABLE test.\"Test\" (id int, col int, val text, PRIMARY KEY(id, col));");
+            execute("CREATE INDEX ON test.\"Test\" (col);");
+            execute("CREATE INDEX ON test.\"Test\" (val)");
+            execute("CREATE TABLE test.users_mv (username varchar, password varchar, gender varchar, session_token varchar, " +
+                    "state varchar, birth_year bigint, PRIMARY KEY (username));");
+            execute("CREATE MATERIALIZED VIEW test.users_by_state AS SELECT * FROM test.users_mv " +
+                    "WHERE STATE IS NOT NULL AND username IS NOT NULL PRIMARY KEY (state, username)");
+            execute(allTypesTable());
+
+            // Test describe schema
+
+            Object[][] testSchemaOutput = rows(
+                          row(KEYSPACE, "keyspace", KEYSPACE,
+                              "CREATE KEYSPACE " + KEYSPACE +
+                                  " WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'}" +
+                                  "  AND durable_writes = true;"),
+                          row(KEYSPACE_PER_TEST, "keyspace", KEYSPACE_PER_TEST,
+                              "CREATE KEYSPACE " + KEYSPACE_PER_TEST +
+                                  " WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'}" +
+                                  "  AND durable_writes = true;"),
+                          row("test", "keyspace", "test", keyspaceOutput()),
+                          row("test", "table", "has_all_types", allTypesTable()),
+                          row("test", "table", "\"Test\"", testTableOutput()),
+                          row("test", "index", "\"Test_col_idx\"", indexOutput("\"Test_col_idx\"", "\"Test\"", "col")),
+                          row("test", "index", "\"Test_val_idx\"", indexOutput("\"Test_val_idx\"", "\"Test\"", "val")),
+                          row("test", "table", "users", userTableOutput()),
+                          row("test", "index", "myindex", indexOutput("myindex", "users", "age")),
+                          row("test", "table", "users_mv", usersMvTableOutput()),
+                          row("test", "materialized_view", "users_by_state", usersByStateMvOutput()));
+
+            assertRowsNet(executeDescribeNet("DESCRIBE SCHEMA"), testSchemaOutput);
+            assertRowsNet(executeDescribeNet("DESC SCHEMA"), testSchemaOutput);
+
+            // Test describe keyspaces/keyspace
+
+            Object[][] testKeyspacesOutput = rows(row(KEYSPACE, "keyspace", KEYSPACE),
+                                                  row(KEYSPACE_PER_TEST, "keyspace", KEYSPACE_PER_TEST),
+                                                  row(SYSTEM_KEYSPACE_NAME, "keyspace", SYSTEM_KEYSPACE_NAME),
+                                                  row(AUTH_KEYSPACE_NAME, "keyspace", AUTH_KEYSPACE_NAME),
+                                                  row(DISTRIBUTED_KEYSPACE_NAME, "keyspace", DISTRIBUTED_KEYSPACE_NAME),
+                                                  row(SCHEMA_KEYSPACE_NAME, "keyspace", SCHEMA_KEYSPACE_NAME),
+                                                  row(TRACE_KEYSPACE_NAME, "keyspace", TRACE_KEYSPACE_NAME),
+                                                  row(VIRTUAL_SCHEMA, "keyspace", VIRTUAL_SCHEMA),
+                                                  row("test", "keyspace", "test"));
+
+            for (String describeKeyword : new String[]{"DESCRIBE", "DESC"})
+            {
+                assertRowsNet(executeDescribeNet(describeKeyword + " KEYSPACES"), testKeyspacesOutput);
+                assertRowsNet(executeDescribeNet("test", describeKeyword + " KEYSPACES"), testKeyspacesOutput);
+
+                assertRowsNet(executeDescribeNet(describeKeyword + " ONLY KEYSPACE test"),
+                              row("test", "keyspace", "test", keyspaceOutput()));
+            }
+
+            Object[][] testKeyspaceOutput = rows(row("test", "keyspace", "test", keyspaceOutput()),
+                                                 row("test", "table", "has_all_types", allTypesTable()),
+                                                 row("test", "table", "\"Test\"", testTableOutput()),
+                                                 row("test", "index", "\"Test_col_idx\"", indexOutput("\"Test_col_idx\"", "\"Test\"", "col")),
+                                                 row("test", "index", "\"Test_val_idx\"", indexOutput("\"Test_val_idx\"", "\"Test\"", "val")),
+                                                 row("test", "table", "users", userTableOutput()),
+                                                 row("test", "index", "myindex", indexOutput("myindex", "users", "age")),
+                                                 row("test", "table", "users_mv", usersMvTableOutput()),
+                                                 row("test", "materialized_view", "users_by_state", usersByStateMvOutput()));
+
+            for (String describeKeyword : new String[]{"DESCRIBE", "DESC"})
+            {
+                assertRowsNet(executeDescribeNet(describeKeyword + " KEYSPACE test"), testKeyspaceOutput);
+                assertRowsNet(executeDescribeNet(describeKeyword + " test"), testKeyspaceOutput);
+
+                describeError(describeKeyword + " test2", "'test2' not found in keyspaces");
+            }
+
+            // Test describe tables/table
+            for (String cmd : new String[]{"describe TABLES", "DESC tables"})
+                assertRowsNet(executeDescribeNet("test", cmd),
+                              row("test", "table", "has_all_types"),
+                              row("test", "table", "\"Test\""),
+                              row("test", "table", "users"),
+                              row("test", "table", "users_mv"));
+
+            testDescribeTable("test", "has_all_types", row("test", "table", "has_all_types", allTypesTable()));
+
+            testDescribeTable("test", "\"Test\"",
+                              row("test", "table", "\"Test\"", testTableOutput()),
+                              row("test", "index", "\"Test_col_idx\"", indexOutput("\"Test_col_idx\"", "\"Test\"", "col")),
+                              row("test", "index", "\"Test_val_idx\"", indexOutput("\"Test_val_idx\"", "\"Test\"", "val")));
+
+            testDescribeTable("test", "users", row("test", "table", "users", userTableOutput()),
+                                               row("test", "index", "myindex", indexOutput("myindex", "users", "age")));
+
+            describeError("test", "DESCRIBE users2", "'users2' not found in keyspace 'test'");
+            describeError("DESCRIBE test.users2", "'users2' not found in keyspace 'test'");
+
+            // Test describe index
+
+            testDescribeIndex("test", "myindex", row("test", "index", "myindex", indexOutput("myindex", "users", "age")));
+            testDescribeIndex("test", "\"Test_col_idx\"", row("test", "index", "\"Test_col_idx\"", indexOutput("\"Test_col_idx\"", "\"Test\"", "col")));
+            testDescribeIndex("test", "\"Test_val_idx\"", row("test", "index", "\"Test_val_idx\"", indexOutput("\"Test_val_idx\"", "\"Test\"", "val")));
+
+            describeError("DESCRIBE test.myindex2", "'myindex2' not found in keyspace 'test'");
+            describeError("test", "DESCRIBE myindex2", "'myindex2' not found in keyspace 'test'");
+
+            // Test describe materialized view
+
+            testDescribeMaterializedView("test", "users_by_state", row("test", "materialized_view", "users_by_state", usersByStateMvOutput()));
+        }
+        finally
+        {
+            execute("DROP KEYSPACE IF EXISTS test");
+        }
+    }
+
+    private void testDescribeTable(String keyspace, String table, Object[]... rows) throws Throwable
+    {
+        for (String describeKeyword : new String[]{"describe", "desc"})
+        {
+            for (String cmd : new String[]{describeKeyword + " table " + keyspace + "." + table,
+                                           describeKeyword + " columnfamily " + keyspace + "." + table,
+                                           describeKeyword + " " + keyspace + "." + table})
+            {
+                assertRowsNet(executeDescribeNet(cmd), rows);
+            }
+
+            for (String cmd : new String[]{describeKeyword + " table " + table,
+                                           describeKeyword + " columnfamily " + table,
+                                           describeKeyword + " " + table})
+            {
+                assertRowsNet(executeDescribeNet(keyspace, cmd), rows);
+            }
+        }
+    }
+
+    private void testDescribeIndex(String keyspace, String index, Object[]... rows) throws Throwable
+    {
+        for (String describeKeyword : new String[]{"describe", "desc"})
+        {
+            for (String cmd : new String[]{describeKeyword + " index " + keyspace + "." + index,
+                                           describeKeyword + " " + keyspace + "." + index})
+            {
+                assertRowsNet(executeDescribeNet(cmd), rows);
+            }
+
+            for (String cmd : new String[]{describeKeyword + " index " + index,
+                                           describeKeyword + " " + index})
+            {
+                assertRowsNet(executeDescribeNet(keyspace, cmd), rows);
+            }
+        }
+    }
+
+    private void testDescribeMaterializedView(String keyspace, String view, Object[]... rows) throws Throwable
+    {
+        for (String describeKeyword : new String[]{"describe", "desc"})
+        {
+            for (String cmd : new String[]{describeKeyword + " materialized view " + keyspace + "." + view,
+                                           describeKeyword + " " + keyspace + "." + view})
+            {
+                assertRowsNet(executeDescribeNet(cmd), rows);
+            }
+
+            for (String cmd : new String[]{describeKeyword + " materialized view " + view,
+                                           describeKeyword + " " + view})
+            {
+                assertRowsNet(executeDescribeNet(keyspace, cmd), rows);
+            }
+        }
+    }
+
+    @Test
+    public void testDescribeCluster() throws Throwable
+    {
+        for (String describeKeyword : new String[]{"DESCRIBE", "DESC"})
+        {
+            assertRowsNet(executeDescribeNet(describeKeyword + " CLUSTER"),
+                         row("Test Cluster",
+                             "ByteOrderedPartitioner",
+                             DatabaseDescriptor.getEndpointSnitch().getClass().getName()));
+
+            assertRowsNet(executeDescribeNet("system_virtual_schema", describeKeyword + " CLUSTER"),
+                          row("Test Cluster",
+                              "ByteOrderedPartitioner",
+                              DatabaseDescriptor.getEndpointSnitch().getClass().getName()));
+        }
+
+        TokenMetadata tokenMetadata = StorageService.instance.getTokenMetadata();
+        Token token = tokenMetadata.sortedTokens().get(0);
+        InetAddressAndPort addressAndPort = tokenMetadata.getAllEndpoints().iterator().next();
+
+        assertRowsNet(executeDescribeNet(KEYSPACE, "DESCRIBE CLUSTER"),
+                      row("Test Cluster",
+                          "ByteOrderedPartitioner",
+                          DatabaseDescriptor.getEndpointSnitch().getClass().getName(),
+                          ImmutableMap.of(token.toString(), ImmutableList.of(addressAndPort.toString()))));
+    }
+
+    @Test
+    public void testDescribeTableWithInternals() throws Throwable
+    {
+        String table = createTable("CREATE TABLE %s (pk1 text, pk2 int, c int, s decimal static, v1 text, v2 int, v3 int, PRIMARY KEY ((pk1, pk2), c ))");
+
+        TableId id = Schema.instance.getTableMetadata(KEYSPACE, table).id;
+
+        String tableCreateStatement = "CREATE TABLE " + KEYSPACE + "." + table + " (\n" +
+                                      "    pk1 text,\n" +
+                                      "    pk2 int,\n" +
+                                      "    c int,\n" +
+                                      "    s decimal static,\n" +
+                                      "    v1 text,\n" +
+                                      "    v2 int,\n" +
+                                      "    v3 int,\n" +
+                                      "    PRIMARY KEY ((pk1, pk2), c)\n" +
+                                      ") WITH ID = " + id + "\n" +
+                                      "    AND CLUSTERING ORDER BY (c ASC)\n" +
+                                      "    AND " + tableParametersCql();
+
+        assertRowsNet(executeDescribeNet("DESCRIBE TABLE " + KEYSPACE + "." + table + " WITH INTERNALS"),
+                      row(KEYSPACE,
+                          "table",
+                          table,
+                          tableCreateStatement));
+
+        String dropStatement = "ALTER TABLE " + KEYSPACE + "." + table + " DROP v3 USING TIMESTAMP 1589286942065000;";
+
+        execute(dropStatement);
+
+        assertRowsNet(executeDescribeNet("DESCRIBE TABLE " + KEYSPACE + "." + table + " WITH INTERNALS"),
+                      row(KEYSPACE,
+                          "table",
+                          table,
+                          tableCreateStatement + "\n" +
+                          dropStatement));
+
+        String addStatement = "ALTER TABLE " + KEYSPACE + "." + table + " ADD v3 int;";
+
+        execute(addStatement);
+
+        assertRowsNet(executeDescribeNet("DESCRIBE TABLE " + KEYSPACE + "." + table + " WITH INTERNALS"),
+                      row(KEYSPACE,
+                          "table",
+                          table,
+                          tableCreateStatement + "\n" +
+                          dropStatement + "\n" +
+                          addStatement));
+    }
+
+    @Test
+    public void testPrimaryKeyPositionWithAndWithoutInternals() throws Throwable
+    {
+        String table = createTable("CREATE TABLE %s (pk text, v1 text, v2 int, v3 int, PRIMARY KEY (pk))");
+
+        TableId id = Schema.instance.getTableMetadata(KEYSPACE, table).id;
+
+        String tableCreateStatement = "CREATE TABLE " + KEYSPACE + "." + table + " (\n" +
+                                      "    pk text PRIMARY KEY,\n" +
+                                      "    v1 text,\n" +
+                                      "    v2 int,\n" +
+                                      "    v3 int\n" +
+                                      ") WITH ID = " + id + "\n" +
+                                      "    AND " + tableParametersCql();
+
+        assertRowsNet(executeDescribeNet("DESCRIBE TABLE " + KEYSPACE + "." + table + " WITH INTERNALS"),
+                      row(KEYSPACE,
+                          "table",
+                          table,
+                          tableCreateStatement));
+
+        String dropStatement = "ALTER TABLE " + KEYSPACE + "." + table + " DROP v3 USING TIMESTAMP 1589286942065000;";
+
+        execute(dropStatement);
+
+        assertRowsNet(executeDescribeNet("DESCRIBE TABLE " + KEYSPACE + "." + table + " WITH INTERNALS"),
+                      row(KEYSPACE,
+                          "table",
+                          table,
+                          tableCreateStatement + "\n" +
+                          dropStatement));
+
+        String tableCreateStatementWithoutDroppedColumn = "CREATE TABLE " + KEYSPACE + "." + table + " (\n" +
+                                                          "    pk text PRIMARY KEY,\n" +
+                                                          "    v1 text,\n" +
+                                                          "    v2 int\n" +
+                                                          ") WITH " + tableParametersCql();
+
+        assertRowsNet(executeDescribeNet("DESCRIBE TABLE " + KEYSPACE + "." + table),
+                      row(KEYSPACE,
+                          "table",
+                          table,
+                          tableCreateStatementWithoutDroppedColumn));
+    }
+
+    
+    @Test
+    public void testDescribeMissingKeyspace() throws Throwable
+    {
+        describeError("DESCRIBE TABLE foop",
+                      "No keyspace specified and no current keyspace");
+        describeError("DESCRIBE MATERIALIZED VIEW foop",
+                      "No keyspace specified and no current keyspace");
+        describeError("DESCRIBE INDEX foop",
+                      "No keyspace specified and no current keyspace");
+        describeError("DESCRIBE TYPE foop",
+                      "No keyspace specified and no current keyspace");
+        describeError("DESCRIBE FUNCTION foop",
+                      "No keyspace specified and no current keyspace");
+        describeError("DESCRIBE AGGREGATE foop",
+                      "No keyspace specified and no current keyspace");
+    }
+
+    @Test
+    public void testDescribeNotFound() throws Throwable
+    {
+        describeError(format("DESCRIBE AGGREGATE %s.%s", KEYSPACE, "aggr_foo"),
+                      format("User defined aggregate '%s' not found in '%s'", "aggr_foo", KEYSPACE));
+
+        describeError(format("DESCRIBE FUNCTION %s.%s", KEYSPACE, "func_foo"),
+                      format("User defined function '%s' not found in '%s'", "func_foo", KEYSPACE));
+
+        describeError(format("DESCRIBE %s.%s", KEYSPACE, "func_foo"),
+                      format("'%s' not found in keyspace '%s'", "func_foo", KEYSPACE));
+
+        describeError(format("DESCRIBE %s", "foo"),
+                      format("'%s' not found in keyspaces", "foo"));
+    }
+
+    @Test
+    public void testDescribeTypes() throws Throwable
+    {
+        String type1 = createType("CREATE TYPE %s (a int)");
+        String type2 = createType("CREATE TYPE %s (x text, y text)");
+        String type3 = createType("CREATE TYPE %s (a text, b frozen<" + type2 + ">)");
+        execute("ALTER TYPE " + KEYSPACE + "." + type1 + " ADD b frozen<" + type3 + ">");
+
+        try
+        {
+            assertRowsNet(executeDescribeNet(KEYSPACE, "DESCRIBE TYPES"),
+                          row(KEYSPACE, "type", type1),
+                          row(KEYSPACE, "type", type2),
+                          row(KEYSPACE, "type", type3));
+
+            assertRowsNet(executeDescribeNet(KEYSPACE, "DESCRIBE TYPE " + type2),
+                          row(KEYSPACE, "type", type2, "CREATE TYPE " + KEYSPACE + "." + type2 + " (\n" +
+                                                       "    x text,\n" + 
+                                                       "    y text\n" +
+                                                       ");"));
+            assertRowsNet(executeDescribeNet(KEYSPACE, "DESCRIBE TYPE " + type1),
+                          row(KEYSPACE, "type", type1, "CREATE TYPE " + KEYSPACE + "." + type1 + " (\n" +
+                                                       "    a int,\n" + 
+                                                       "    b frozen<" + type3 + ">\n" +
+                                                       ");"));
+
+            assertRowsNet(executeDescribeNet(KEYSPACE, "DESCRIBE KEYSPACE " + KEYSPACE),
+                          row(KEYSPACE, "keyspace", KEYSPACE, "CREATE KEYSPACE " + KEYSPACE +
+                                                          " WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'}" +
+                                                          "  AND durable_writes = true;"),
+                          row(KEYSPACE, "type", type2, "CREATE TYPE " + KEYSPACE + "." + type2 + " (\n" +
+                                                       "    x text,\n" + 
+                                                       "    y text\n" +
+                                                       ");"),
+                          row(KEYSPACE, "type", type3, "CREATE TYPE " + KEYSPACE + "." + type3 + " (\n" +
+                                                       "    a text,\n" + 
+                                                       "    b frozen<" + type2 + ">\n" +
+                                                       ");"),
+                          row(KEYSPACE, "type", type1, "CREATE TYPE " + KEYSPACE + "." + type1 + " (\n" +
+                                                       "    a int,\n" + 
+                                                       "    b frozen<" + type3 + ">\n" +
+                                                       ");"));
+        }
+        finally
+        {
+            execute("DROP TYPE " + KEYSPACE + "." + type1);
+            execute("DROP TYPE " + KEYSPACE + "." + type3);
+            execute("DROP TYPE " + KEYSPACE + "." + type2);
+        }
+    }
+
+    /**
+     * Tests for the error reported in CASSANDRA-9064 by:
+     *
+     * - creating the table described in the bug report, using LCS,
+     * - DESCRIBE-ing that table via cqlsh, then DROPping it,
+     * - running the output of the DESCRIBE statement as a CREATE TABLE statement, and
+     * - inserting a value into the table.
+     *
+     * The final two steps of the test should not fall down. If one does, that
+     * indicates the output of DESCRIBE is not a correct CREATE TABLE statement.
+     */
+    @Test
+    public void testDescribeRoundtrip() throws Throwable
+    {
+        for (String withInternals : new String[]{"", " WITH INTERNALS"})
+        {
+            String table = createTable("CREATE TABLE %s (key int PRIMARY KEY) WITH compaction = {'class': 'LeveledCompactionStrategy'}");
+
+            String output = executeDescribeNet(KEYSPACE, "DESCRIBE TABLE " + table + withInternals).all().get(0).getString("create_statement");
+
+            execute("DROP TABLE %s");
+
+            executeNet(output);
+
+            String output2 = executeDescribeNet(KEYSPACE, "DESCRIBE TABLE " + table + withInternals).all().get(0).getString("create_statement");
+            assertEquals(output, output2);
+
+            execute("INSERT INTO %s (key) VALUES (1)");
+        }
+    }
+
+    private static String allTypesTable()
+    {
+        return "CREATE TABLE test.has_all_types (\n" +
+               "    num int PRIMARY KEY,\n" +
+               "    asciicol ascii,\n" +
+               "    bigintcol bigint,\n" +
+               "    blobcol blob,\n" +
+               "    booleancol boolean,\n" +
+               "    decimalcol decimal,\n" +
+               "    doublecol double,\n" +
+               "    durationcol duration,\n" +
+               "    floatcol float,\n" +
+               "    frozenlistcol frozen<list<text>>,\n" +
+               "    frozenmapcol frozen<map<timestamp, timeuuid>>,\n" +
+               "    frozensetcol frozen<set<bigint>>,\n" +
+               "    intcol int,\n" +
+               "    smallintcol smallint,\n" +
+               "    textcol text,\n" +
+               "    timestampcol timestamp,\n" +
+               "    tinyintcol tinyint,\n" +
+               "    tuplecol frozen<tuple<text, int, frozen<tuple<timestamp>>>>,\n" +
+               "    uuidcol uuid,\n" +
+               "    varcharcol text,\n" +
+               "    varintcol varint,\n" +
+               "    listcol list<decimal>,\n" +
+               "    mapcol map<timestamp, timeuuid>,\n" +
+               "    setcol set<tinyint>\n" +
+               ") WITH " + tableParametersCql();
+    }
+
+    private static String usersByStateMvOutput()
+    {
+        return "CREATE MATERIALIZED VIEW test.users_by_state AS\n" +
+               "    SELECT *\n" +
+               "    FROM test.users_mv\n" +
+               "    WHERE state IS NOT NULL AND username IS NOT NULL\n" +
+               "    PRIMARY KEY (state, username)\n" +
+               " WITH CLUSTERING ORDER BY (username ASC)\n" +
+               "    AND " + tableParametersCql();
+    }
+
+    private static String indexOutput(String index, String table, String col)
+    {
+        return format("CREATE INDEX %s ON %s.%s (%s);", index, "test", table, col);
+    }
+
+    private static String usersMvTableOutput()
+    {
+        return "CREATE TABLE test.users_mv (\n" +
+               "    username text PRIMARY KEY,\n" +
+               "    birth_year bigint,\n" +
+               "    gender text,\n" +
+               "    password text,\n" +
+               "    session_token text,\n" +
+               "    state text\n" +
+               ") WITH " + tableParametersCql();
+    }
+
+    private static String userTableOutput()
+    {
+        return "CREATE TABLE test.users (\n" +
+               "    userid text PRIMARY KEY,\n" +
+               "    age int,\n" +
+               "    firstname text,\n" +
+               "    lastname text\n" +
+               ") WITH " + tableParametersCql();
+    }
+
+    private static String testTableOutput()
+    {
+        return "CREATE TABLE test.\"Test\" (\n" +
+                   "    id int,\n" +
+                   "    col int,\n" +
+                   "    val text,\n"  +
+                   "    PRIMARY KEY (id, col)\n" +
+                   ") WITH CLUSTERING ORDER BY (col ASC)\n" +
+                   "    AND " + tableParametersCql();
+    }
+
+    private static String tableParametersCql()
+    {
+        return "additional_write_policy = '99p'\n" +
+               "    AND bloom_filter_fp_chance = 0.01\n" +
+               "    AND caching = {'keys': 'ALL', 'rows_per_partition': 'NONE'}\n" +
+               "    AND cdc = false\n" +
+               "    AND comment = ''\n" +
+               "    AND compaction = {'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy', 'max_threshold': '32', 'min_threshold': '4'}\n" +
+               "    AND compression = {'chunk_length_in_kb': '16', 'class': 'org.apache.cassandra.io.compress.LZ4Compressor'}\n" +
+               "    AND crc_check_chance = 1.0\n" +
+               "    AND default_time_to_live = 0\n" +
+               "    AND extensions = {}\n" +
+               "    AND gc_grace_seconds = 864000\n" +
+               "    AND max_index_interval = 2048\n" +
+               "    AND memtable_flush_period_in_ms = 0\n" +
+               "    AND min_index_interval = 128\n" +
+               "    AND read_repair = 'BLOCKING'\n" +
+               "    AND speculative_retry = '99p';";
+    }
+
+    private static String keyspaceOutput()
+    {
+        return "CREATE KEYSPACE test WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'}  AND durable_writes = true;";
+    }
+
+    private void describeError(String cql, String msg) throws Throwable
+    {
+        describeError(null, cql, msg);
+    }
+
+    private void describeError(String useKs, String cql, String msg) throws Throwable
+    {
+        assertInvalidThrowMessage(Optional.of(getProtocolVersion(useKs)), msg, InvalidQueryException.class, cql, ArrayUtils.EMPTY_OBJECT_ARRAY);
+    }
+
+    private ResultSet executeDescribeNet(String cql) throws Throwable
+    {
+        return executeDescribeNet(null, cql);
+    }
+
+    private ResultSet executeDescribeNet(String useKs, String cql) throws Throwable
+    {
+        return executeNetWithPaging(getProtocolVersion(useKs), cql, 3);
+    }
+
+    private ProtocolVersion getProtocolVersion(String useKs) throws Throwable
+    {
+        // We're using a trick here to distinguish driver sessions with a "USE keyspace" and without:
+        // As different ProtocolVersions use different driver instances, we use different ProtocolVersions
+        // for the with and without "USE keyspace" cases.
+
+        ProtocolVersion v = useKs != null ? ProtocolVersion.CURRENT : ProtocolVersion.V5;
+
+        if (useKs != null)
+            executeNet(v, "USE " + useKs);
+        return v;
+    }
+
+    private static Object[][] rows(Object[]... rows)
+    {
+        return rows;
+    }
+}
diff --git a/test/unit/org/apache/cassandra/db/SchemaCQLHelperTest.java b/test/unit/org/apache/cassandra/db/SchemaCQLHelperTest.java
new file mode 100644
index 0000000..6857dd3
--- /dev/null
+++ b/test/unit/org/apache/cassandra/db/SchemaCQLHelperTest.java
@@ -0,0 +1,468 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.io.Files;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.cassandra.*;
+import org.apache.cassandra.cql3.*;
+import org.apache.cassandra.cql3.statements.schema.IndexTarget;
+import org.apache.cassandra.db.marshal.*;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.index.sasi.SASIIndex;
+import org.apache.cassandra.schema.*;
+import org.apache.cassandra.service.reads.SpeculativeRetryPolicy;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
+import org.json.simple.JSONArray;
+import org.json.simple.JSONObject;
+import org.json.simple.parser.JSONParser;
+
+import java.io.FileReader;
+import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.stream.Collectors;
+
+import static org.hamcrest.CoreMatchers.allOf;
+import static org.hamcrest.CoreMatchers.containsString;
+import static org.hamcrest.CoreMatchers.startsWith;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+
+public class SchemaCQLHelperTest extends CQLTester
+{
+    @Before
+    public void defineSchema() throws ConfigurationException
+    {
+        SchemaLoader.prepareServer();
+    }
+
+    @Test
+    public void testUserTypesCQL()
+    {
+        String keyspace = "cql_test_keyspace_user_types";
+        String table = "test_table_user_types";
+
+        UserType typeA = new UserType(keyspace, ByteBufferUtil.bytes("a"),
+                                      Arrays.asList(FieldIdentifier.forUnquoted("a1"),
+                                                    FieldIdentifier.forUnquoted("a2"),
+                                                    FieldIdentifier.forUnquoted("a3")),
+                                      Arrays.asList(IntegerType.instance,
+                                                    IntegerType.instance,
+                                                    IntegerType.instance),
+                                      true);
+
+        UserType typeB = new UserType(keyspace, ByteBufferUtil.bytes("b"),
+                                      Arrays.asList(FieldIdentifier.forUnquoted("b1"),
+                                                    FieldIdentifier.forUnquoted("b2"),
+                                                    FieldIdentifier.forUnquoted("b3")),
+                                      Arrays.asList(typeA,
+                                                    typeA,
+                                                    typeA),
+                                      true);
+
+        UserType typeC = new UserType(keyspace, ByteBufferUtil.bytes("c"),
+                                      Arrays.asList(FieldIdentifier.forUnquoted("c1"),
+                                                    FieldIdentifier.forUnquoted("c2"),
+                                                    FieldIdentifier.forUnquoted("c3")),
+                                      Arrays.asList(typeB,
+                                                    typeB,
+                                                    typeB),
+                                      true);
+
+        TableMetadata cfm =
+        TableMetadata.builder(keyspace, table)
+                     .addPartitionKeyColumn("pk1", IntegerType.instance)
+                     .addClusteringColumn("ck1", IntegerType.instance)
+                     .addRegularColumn("reg1", typeC.freeze())
+                     .addRegularColumn("reg2", ListType.getInstance(IntegerType.instance, false))
+                     .addRegularColumn("reg3", MapType.getInstance(AsciiType.instance, IntegerType.instance, true))
+                     .build();
+
+        SchemaLoader.createKeyspace(keyspace, KeyspaceParams.simple(1), Tables.of(cfm), Types.of(typeA, typeB, typeC));
+
+        ColumnFamilyStore cfs = Keyspace.open(keyspace).getColumnFamilyStore(table);
+
+        assertEquals(ImmutableList.of("CREATE TYPE cql_test_keyspace_user_types.a (\n" +
+                                      "    a1 varint,\n" +
+                                      "    a2 varint,\n" +
+                                      "    a3 varint\n" +
+                                      ");",
+                                      "CREATE TYPE cql_test_keyspace_user_types.b (\n" +
+                                      "    b1 a,\n" +
+                                      "    b2 a,\n" +
+                                      "    b3 a\n" +
+                                      ");",
+                                      "CREATE TYPE cql_test_keyspace_user_types.c (\n" +
+                                      "    c1 b,\n" +
+                                      "    c2 b,\n" +
+                                      "    c3 b\n" +
+                                      ");"),
+                     SchemaCQLHelper.getUserTypesAsCQL(cfs.metadata(), cfs.keyspace.getMetadata().types).collect(Collectors.toList()));
+    }
+
+    @Test
+    public void testDroppedColumnsCQL()
+    {
+        String keyspace = "cql_test_keyspace_dropped_columns";
+        String table = "test_table_dropped_columns";
+
+        TableMetadata.Builder builder =
+        TableMetadata.builder(keyspace, table)
+                     .addPartitionKeyColumn("pk1", IntegerType.instance)
+                     .addClusteringColumn("ck1", IntegerType.instance)
+                     .addStaticColumn("st1", IntegerType.instance)
+                     .addRegularColumn("reg1", IntegerType.instance)
+                     .addRegularColumn("reg2", IntegerType.instance)
+                     .addRegularColumn("reg3", IntegerType.instance);
+
+        ColumnMetadata st1 = builder.getColumn(ByteBufferUtil.bytes("st1"));
+        ColumnMetadata reg1 = builder.getColumn(ByteBufferUtil.bytes("reg1"));
+        ColumnMetadata reg2 = builder.getColumn(ByteBufferUtil.bytes("reg2"));
+        ColumnMetadata reg3 = builder.getColumn(ByteBufferUtil.bytes("reg3"));
+
+        builder.removeRegularOrStaticColumn(st1.name)
+               .removeRegularOrStaticColumn(reg1.name)
+               .removeRegularOrStaticColumn(reg2.name)
+               .removeRegularOrStaticColumn(reg3.name);
+
+        builder.recordColumnDrop(st1, 5000)
+               .recordColumnDrop(reg1, 10000)
+               .recordColumnDrop(reg2, 20000)
+               .recordColumnDrop(reg3, 30000);
+
+        SchemaLoader.createKeyspace(keyspace, KeyspaceParams.simple(1), builder);
+
+        ColumnFamilyStore cfs = Keyspace.open(keyspace).getColumnFamilyStore(table);
+
+        String expected = "CREATE TABLE IF NOT EXISTS cql_test_keyspace_dropped_columns.test_table_dropped_columns (\n" +
+                          "    pk1 varint,\n" +
+                          "    ck1 varint,\n" +
+                          "    reg1 varint,\n" +
+                          "    reg3 varint,\n" +
+                          "    reg2 varint,\n" +
+                          "    st1 varint static,\n" +
+                          "    PRIMARY KEY (pk1, ck1)\n) WITH ID =";
+        String actual = SchemaCQLHelper.getTableMetadataAsCQL(cfs.metadata(), true, true, true);
+
+        assertThat(actual,
+                   allOf(startsWith(expected),
+                         containsString("ALTER TABLE cql_test_keyspace_dropped_columns.test_table_dropped_columns DROP reg1 USING TIMESTAMP 10000;"),
+                         containsString("ALTER TABLE cql_test_keyspace_dropped_columns.test_table_dropped_columns DROP reg3 USING TIMESTAMP 30000;"),
+                         containsString("ALTER TABLE cql_test_keyspace_dropped_columns.test_table_dropped_columns DROP reg2 USING TIMESTAMP 20000;"),
+                         containsString("ALTER TABLE cql_test_keyspace_dropped_columns.test_table_dropped_columns DROP st1 USING TIMESTAMP 5000;")));
+    }
+
+    @Test
+    public void testReaddedColumns()
+    {
+        String keyspace = "cql_test_keyspace_readded_columns";
+        String table = "test_table_readded_columns";
+
+        TableMetadata.Builder builder =
+        TableMetadata.builder(keyspace, table)
+                     .addPartitionKeyColumn("pk1", IntegerType.instance)
+                     .addClusteringColumn("ck1", IntegerType.instance)
+                     .addRegularColumn("reg1", IntegerType.instance)
+                     .addStaticColumn("st1", IntegerType.instance)
+                     .addRegularColumn("reg2", IntegerType.instance);
+
+        ColumnMetadata reg1 = builder.getColumn(ByteBufferUtil.bytes("reg1"));
+        ColumnMetadata st1 = builder.getColumn(ByteBufferUtil.bytes("st1"));
+
+        builder.removeRegularOrStaticColumn(reg1.name);
+        builder.removeRegularOrStaticColumn(st1.name);
+
+        builder.recordColumnDrop(reg1, 10000);
+        builder.recordColumnDrop(st1, 20000);
+
+        builder.addColumn(reg1);
+        builder.addColumn(st1);
+
+        SchemaLoader.createKeyspace(keyspace, KeyspaceParams.simple(1), builder);
+
+        ColumnFamilyStore cfs = Keyspace.open(keyspace).getColumnFamilyStore(table);
+
+        // when re-adding, column is present as both column and as dropped column record.
+        String actual = SchemaCQLHelper.getTableMetadataAsCQL(cfs.metadata(), true, true, true);
+        String expected = "CREATE TABLE IF NOT EXISTS cql_test_keyspace_readded_columns.test_table_readded_columns (\n" +
+                          "    pk1 varint,\n" +
+                          "    ck1 varint,\n" +
+                          "    reg2 varint,\n" +
+                          "    reg1 varint,\n" +
+                          "    st1 varint static,\n" +
+                          "    PRIMARY KEY (pk1, ck1)\n" +
+                          ") WITH ID";
+
+        assertThat(actual,
+                   allOf(startsWith(expected),
+                         containsString("ALTER TABLE cql_test_keyspace_readded_columns.test_table_readded_columns DROP reg1 USING TIMESTAMP 10000;"),
+                         containsString("ALTER TABLE cql_test_keyspace_readded_columns.test_table_readded_columns ADD reg1 varint;"),
+                         containsString("ALTER TABLE cql_test_keyspace_readded_columns.test_table_readded_columns DROP st1 USING TIMESTAMP 20000;"),
+                         containsString("ALTER TABLE cql_test_keyspace_readded_columns.test_table_readded_columns ADD st1 varint static;")));
+    }
+
+    @Test
+    public void testCfmColumnsCQL()
+    {
+        String keyspace = "cql_test_keyspace_create_table";
+        String table = "test_table_create_table";
+
+        TableMetadata.Builder metadata =
+        TableMetadata.builder(keyspace, table)
+                     .addPartitionKeyColumn("pk1", IntegerType.instance)
+                     .addPartitionKeyColumn("pk2", AsciiType.instance)
+                     .addClusteringColumn("ck1", ReversedType.getInstance(IntegerType.instance))
+                     .addClusteringColumn("ck2", IntegerType.instance)
+                     .addStaticColumn("st1", AsciiType.instance)
+                     .addRegularColumn("reg1", AsciiType.instance)
+                     .addRegularColumn("reg2", ListType.getInstance(IntegerType.instance, false))
+                     .addRegularColumn("reg3", MapType.getInstance(AsciiType.instance, IntegerType.instance, true));
+
+        SchemaLoader.createKeyspace(keyspace, KeyspaceParams.simple(1), metadata);
+
+        ColumnFamilyStore cfs = Keyspace.open(keyspace).getColumnFamilyStore(table);
+
+        assertThat(SchemaCQLHelper.getTableMetadataAsCQL(cfs.metadata(), true, true, true),
+                   startsWith(
+                   "CREATE TABLE IF NOT EXISTS cql_test_keyspace_create_table.test_table_create_table (\n" +
+                   "    pk1 varint,\n" +
+                   "    pk2 ascii,\n" +
+                   "    ck1 varint,\n" +
+                   "    ck2 varint,\n" +
+                   "    st1 ascii static,\n" +
+                   "    reg1 ascii,\n" +
+                   "    reg2 frozen<list<varint>>,\n" +
+                   "    reg3 map<ascii, varint>,\n" +
+                   "    PRIMARY KEY ((pk1, pk2), ck1, ck2)\n" +
+                   ") WITH ID = " + cfs.metadata.id + "\n" +
+                   "    AND CLUSTERING ORDER BY (ck1 DESC, ck2 ASC)"));
+    }
+
+    @Test
+    public void testCfmOptionsCQL()
+    {
+        String keyspace = "cql_test_keyspace_options";
+        String table = "test_table_options";
+
+        TableMetadata.Builder builder = TableMetadata.builder(keyspace, table);
+        long droppedTimestamp = FBUtilities.timestampMicros();
+        builder.addPartitionKeyColumn("pk1", IntegerType.instance)
+               .addClusteringColumn("cl1", IntegerType.instance)
+               .addRegularColumn("reg1", AsciiType.instance)
+               .bloomFilterFpChance(1.0)
+               .comment("comment")
+               .compaction(CompactionParams.lcs(Collections.singletonMap("sstable_size_in_mb", "1")))
+               .compression(CompressionParams.lz4(1 << 16, 1 << 15))
+               .crcCheckChance(0.3)
+               .defaultTimeToLive(4)
+               .gcGraceSeconds(5)
+               .minIndexInterval(6)
+               .maxIndexInterval(7)
+               .memtableFlushPeriod(8)
+               .speculativeRetry(SpeculativeRetryPolicy.fromString("always"))
+               .additionalWritePolicy(SpeculativeRetryPolicy.fromString("always"))
+               .extensions(ImmutableMap.of("ext1", ByteBuffer.wrap("val1".getBytes())))
+               .recordColumnDrop(ColumnMetadata.regularColumn(keyspace, table, "reg1", AsciiType.instance),
+                                 droppedTimestamp);
+
+        SchemaLoader.createKeyspace(keyspace, KeyspaceParams.simple(1), builder);
+
+        ColumnFamilyStore cfs = Keyspace.open(keyspace).getColumnFamilyStore(table);
+
+        assertThat(SchemaCQLHelper.getTableMetadataAsCQL(cfs.metadata(), true, true, true),
+                   containsString("CLUSTERING ORDER BY (cl1 ASC)\n" +
+                            "    AND additional_write_policy = 'ALWAYS'\n" +
+                            "    AND bloom_filter_fp_chance = 1.0\n" +
+                            "    AND caching = {'keys': 'ALL', 'rows_per_partition': 'NONE'}\n" +
+                            "    AND cdc = false\n" +
+                            "    AND comment = 'comment'\n" +
+                            "    AND compaction = {'class': 'org.apache.cassandra.db.compaction.LeveledCompactionStrategy', 'max_threshold': '32', 'min_threshold': '4', 'sstable_size_in_mb': '1'}\n" +
+                            "    AND compression = {'chunk_length_in_kb': '64', 'class': 'org.apache.cassandra.io.compress.LZ4Compressor', 'min_compress_ratio': '2.0'}\n" +
+                            "    AND crc_check_chance = 0.3\n" +
+                            "    AND default_time_to_live = 4\n" +
+                            "    AND extensions = {'ext1': 0x76616c31}\n" +
+                            "    AND gc_grace_seconds = 5\n" +
+                            "    AND max_index_interval = 7\n" +
+                            "    AND memtable_flush_period_in_ms = 8\n" +
+                            "    AND min_index_interval = 6\n" +
+                            "    AND read_repair = 'BLOCKING'\n" +
+                            "    AND speculative_retry = 'ALWAYS';"
+                   ));
+    }
+
+    @Test
+    public void testCfmIndexJson()
+    {
+        String keyspace = "cql_test_keyspace_3";
+        String table = "test_table_3";
+
+        TableMetadata.Builder builder =
+        TableMetadata.builder(keyspace, table)
+                     .addPartitionKeyColumn("pk1", IntegerType.instance)
+                     .addClusteringColumn("cl1", IntegerType.instance)
+                     .addRegularColumn("reg1", AsciiType.instance);
+
+        ColumnIdentifier reg1 = ColumnIdentifier.getInterned("reg1", true);
+
+        builder.indexes(
+        Indexes.of(IndexMetadata.fromIndexTargets(
+        Collections.singletonList(new IndexTarget(reg1, IndexTarget.Type.VALUES)),
+        "indexName",
+        IndexMetadata.Kind.COMPOSITES,
+        Collections.emptyMap()),
+                   IndexMetadata.fromIndexTargets(
+                   Collections.singletonList(new IndexTarget(reg1, IndexTarget.Type.KEYS)),
+                   "indexName2",
+                   IndexMetadata.Kind.COMPOSITES,
+                   Collections.emptyMap()),
+                   IndexMetadata.fromIndexTargets(
+                   Collections.singletonList(new IndexTarget(reg1, IndexTarget.Type.KEYS_AND_VALUES)),
+                   "indexName3",
+                   IndexMetadata.Kind.COMPOSITES,
+                   Collections.emptyMap()),
+                   IndexMetadata.fromIndexTargets(
+                   Collections.singletonList(new IndexTarget(reg1, IndexTarget.Type.KEYS_AND_VALUES)),
+                   "indexName4",
+                   IndexMetadata.Kind.CUSTOM,
+                   Collections.singletonMap(IndexTarget.CUSTOM_INDEX_OPTION_NAME, SASIIndex.class.getName()))));
+
+
+        SchemaLoader.createKeyspace(keyspace, KeyspaceParams.simple(1), builder);
+
+        ColumnFamilyStore cfs = Keyspace.open(keyspace).getColumnFamilyStore(table);
+
+        assertEquals(ImmutableList.of("CREATE INDEX \"indexName\" ON cql_test_keyspace_3.test_table_3 (values(reg1));",
+                                      "CREATE INDEX \"indexName2\" ON cql_test_keyspace_3.test_table_3 (keys(reg1));",
+                                      "CREATE INDEX \"indexName3\" ON cql_test_keyspace_3.test_table_3 (entries(reg1));",
+                                      "CREATE CUSTOM INDEX \"indexName4\" ON cql_test_keyspace_3.test_table_3 (entries(reg1)) USING 'org.apache.cassandra.index.sasi.SASIIndex';"),
+                     SchemaCQLHelper.getIndexesAsCQL(cfs.metadata()).collect(Collectors.toList()));
+    }
+
+    private final static String SNAPSHOT = "testsnapshot";
+
+    @Test
+    public void testSnapshot() throws Throwable
+    {
+        String typeA = createType("CREATE TYPE %s (a1 varint, a2 varint, a3 varint);");
+        String typeB = createType("CREATE TYPE %s (b1 frozen<" + typeA + ">, b2 frozen<" + typeA + ">, b3 frozen<" + typeA + ">);");
+        String typeC = createType("CREATE TYPE %s (c1 frozen<" + typeB + ">, c2 frozen<" + typeB + ">, c3 frozen<" + typeB + ">);");
+
+        String tableName = createTable("CREATE TABLE IF NOT EXISTS %s (" +
+                                       "pk1 varint," +
+                                       "pk2 ascii," +
+                                       "ck1 varint," +
+                                       "ck2 varint," +
+                                       "reg1 " + typeC + "," +
+                                       "reg2 int," +
+                                       "reg3 int," +
+                                       "PRIMARY KEY ((pk1, pk2), ck1, ck2)) WITH " +
+                                       "CLUSTERING ORDER BY (ck1 ASC, ck2 DESC);");
+
+        alterTable("ALTER TABLE %s DROP reg3 USING TIMESTAMP 10000;");
+        alterTable("ALTER TABLE %s ADD reg3 int;");
+
+        for (int i = 0; i < 10; i++)
+            execute("INSERT INTO %s (pk1, pk2, ck1, ck2, reg1, reg2) VALUES (?, ?, ?, ?, ?, ?)", i, i + 1, i + 2, i + 3, null, i + 5);
+
+        ColumnFamilyStore cfs = Keyspace.open(keyspace()).getColumnFamilyStore(tableName);
+        cfs.snapshot(SNAPSHOT);
+
+        String schema = Files.toString(cfs.getDirectories().getSnapshotSchemaFile(SNAPSHOT), Charset.defaultCharset());
+        assertThat(schema,
+                   allOf(containsString(String.format("CREATE TYPE %s.%s (\n" +
+                                                      "    a1 varint,\n" +
+                                                      "    a2 varint,\n" +
+                                                      "    a3 varint\n" +
+                                                      ");", keyspace(), typeA)),
+                         containsString(String.format("CREATE TYPE %s.%s (\n" +
+                                                      "    a1 varint,\n" +
+                                                      "    a2 varint,\n" +
+                                                      "    a3 varint\n" +
+                                                      ");", keyspace(), typeA)),
+                         containsString(String.format("CREATE TYPE %s.%s (\n" +
+                                                      "    b1 frozen<%s>,\n" +
+                                                      "    b2 frozen<%s>,\n" +
+                                                      "    b3 frozen<%s>\n" +
+                                                      ");", keyspace(), typeB, typeA, typeA, typeA)),
+                         containsString(String.format("CREATE TYPE %s.%s (\n" +
+                                                      "    c1 frozen<%s>,\n" +
+                                                      "    c2 frozen<%s>,\n" +
+                                                      "    c3 frozen<%s>\n" +
+                                                      ");", keyspace(), typeC, typeB, typeB, typeB))));
+
+        schema = schema.substring(schema.indexOf("CREATE TABLE")); // trim to ensure order
+        String expected = "CREATE TABLE IF NOT EXISTS " + keyspace() + "." + tableName + " (\n" +
+                          "    pk1 varint,\n" +
+                          "    pk2 ascii,\n" +
+                          "    ck1 varint,\n" +
+                          "    ck2 varint,\n" +
+                          "    reg2 int,\n" +
+                          "    reg1 " + typeC+ ",\n" +
+                          "    reg3 int,\n" +
+                          "    PRIMARY KEY ((pk1, pk2), ck1, ck2)\n" +
+                          ") WITH ID = " + cfs.metadata.id + "\n" +
+                          "    AND CLUSTERING ORDER BY (ck1 ASC, ck2 DESC)";
+
+        assertThat(schema,
+                   allOf(startsWith(expected),
+                         containsString("ALTER TABLE " + keyspace() + "." + tableName + " DROP reg3 USING TIMESTAMP 10000;"),
+                         containsString("ALTER TABLE " + keyspace() + "." + tableName + " ADD reg3 int;")));
+
+        JSONObject manifest = (JSONObject) new JSONParser().parse(new FileReader(cfs.getDirectories().getSnapshotManifestFile(SNAPSHOT)));
+        JSONArray files = (JSONArray) manifest.get("files");
+        Assert.assertEquals(1, files.size());
+    }
+
+    @Test
+    public void testSystemKsSnapshot()
+    {
+        ColumnFamilyStore cfs = Keyspace.open("system").getColumnFamilyStore("peers");
+        cfs.snapshot(SNAPSHOT);
+
+        Assert.assertTrue(cfs.getDirectories().getSnapshotManifestFile(SNAPSHOT).exists());
+        Assert.assertFalse(cfs.getDirectories().getSnapshotSchemaFile(SNAPSHOT).exists());
+    }
+
+    @Test
+    public void testBooleanCompositeKey() throws Throwable
+    {
+        createTable("CREATE TABLE %s (t_id boolean, id boolean, ck boolean, nk boolean, PRIMARY KEY ((t_id, id), ck))");
+
+        execute("insert into %s (t_id, id, ck, nk) VALUES (true, false, false, true)");
+        assertRows(execute("select * from %s"), row(true, false, false, true));
+
+        // CASSANDRA-14752 -
+        // a problem with composite boolean types meant that calling this would
+        // prevent any boolean values to be inserted afterwards
+        ColumnFamilyStore cfs = getCurrentColumnFamilyStore();
+        cfs.getSSTablesForKey("false:true");
+
+        execute("insert into %s (t_id, id, ck, nk) VALUES (true, true, false, true)");
+        assertRows(execute("select t_id, id, ck, nk from %s"), row(true, false, false, true), row(true, true, false, true));
+    }
+}
diff --git a/test/unit/org/apache/cassandra/db/TableCQLHelperTest.java b/test/unit/org/apache/cassandra/db/TableCQLHelperTest.java
deleted file mode 100644
index a1cae54..0000000
--- a/test/unit/org/apache/cassandra/db/TableCQLHelperTest.java
+++ /dev/null
@@ -1,446 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.db;
-
-import java.io.FileReader;
-import java.nio.ByteBuffer;
-import java.nio.charset.Charset;
-import java.util.*;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.io.Files;
-import org.apache.cassandra.service.reads.NeverSpeculativeRetryPolicy;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-import org.apache.cassandra.*;
-import org.apache.cassandra.cql3.*;
-import org.apache.cassandra.cql3.statements.schema.IndexTarget;
-import org.apache.cassandra.db.marshal.*;
-import org.apache.cassandra.exceptions.*;
-import org.apache.cassandra.index.sasi.*;
-import org.apache.cassandra.schema.*;
-import org.apache.cassandra.service.reads.AlwaysSpeculativeRetryPolicy;
-import org.apache.cassandra.utils.*;
-import org.json.simple.JSONArray;
-import org.json.simple.JSONObject;
-import org.json.simple.parser.JSONParser;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-public class TableCQLHelperTest extends CQLTester
-{
-    @Before
-    public void defineSchema() throws ConfigurationException
-    {
-        SchemaLoader.prepareServer();
-    }
-
-    @Test
-    public void testUserTypesCQL()
-    {
-        String keyspace = "cql_test_keyspace_user_types";
-        String table = "test_table_user_types";
-
-        UserType typeA = new UserType(keyspace, ByteBufferUtil.bytes("a"),
-                                      Arrays.asList(FieldIdentifier.forUnquoted("a1"),
-                                                    FieldIdentifier.forUnquoted("a2"),
-                                                    FieldIdentifier.forUnquoted("a3")),
-                                      Arrays.asList(IntegerType.instance,
-                                                    IntegerType.instance,
-                                                    IntegerType.instance),
-                                      true);
-
-        UserType typeB = new UserType(keyspace, ByteBufferUtil.bytes("b"),
-                                      Arrays.asList(FieldIdentifier.forUnquoted("b1"),
-                                                    FieldIdentifier.forUnquoted("b2"),
-                                                    FieldIdentifier.forUnquoted("b3")),
-                                      Arrays.asList(typeA,
-                                                    typeA,
-                                                    typeA),
-                                      true);
-
-        UserType typeC = new UserType(keyspace, ByteBufferUtil.bytes("c"),
-                                      Arrays.asList(FieldIdentifier.forUnquoted("c1"),
-                                                    FieldIdentifier.forUnquoted("c2"),
-                                                    FieldIdentifier.forUnquoted("c3")),
-                                      Arrays.asList(typeB,
-                                                    typeB,
-                                                    typeB),
-                                      true);
-
-        TableMetadata cfm =
-            TableMetadata.builder(keyspace, table)
-                         .addPartitionKeyColumn("pk1", IntegerType.instance)
-                         .addClusteringColumn("ck1", IntegerType.instance)
-                         .addRegularColumn("reg1", typeC)
-                         .addRegularColumn("reg2", ListType.getInstance(IntegerType.instance, false))
-                         .addRegularColumn("reg3", MapType.getInstance(AsciiType.instance, IntegerType.instance, true))
-                         .build();
-
-        SchemaLoader.createKeyspace(keyspace, KeyspaceParams.simple(1), Tables.of(cfm), Types.of(typeA, typeB, typeC));
-
-        ColumnFamilyStore cfs = Keyspace.open(keyspace).getColumnFamilyStore(table);
-
-        assertEquals(ImmutableList.of("CREATE TYPE cql_test_keyspace_user_types.a (a1 varint, a2 varint, a3 varint);",
-                                      "CREATE TYPE cql_test_keyspace_user_types.b (b1 a, b2 a, b3 a);",
-                                      "CREATE TYPE cql_test_keyspace_user_types.c (c1 b, c2 b, c3 b);"),
-                     TableCQLHelper.getUserTypesAsCQL(cfs.metadata()));
-    }
-
-    @Test
-    public void testDroppedColumnsCQL()
-    {
-        String keyspace = "cql_test_keyspace_dropped_columns";
-        String table = "test_table_dropped_columns";
-
-        TableMetadata.Builder builder =
-            TableMetadata.builder(keyspace, table)
-                         .addPartitionKeyColumn("pk1", IntegerType.instance)
-                         .addClusteringColumn("ck1", IntegerType.instance)
-                         .addRegularColumn("reg1", IntegerType.instance)
-                         .addRegularColumn("reg2", IntegerType.instance)
-                         .addRegularColumn("reg3", IntegerType.instance);
-
-        ColumnMetadata reg1 = builder.getColumn(ByteBufferUtil.bytes("reg1"));
-        ColumnMetadata reg2 = builder.getColumn(ByteBufferUtil.bytes("reg2"));
-        ColumnMetadata reg3 = builder.getColumn(ByteBufferUtil.bytes("reg3"));
-
-        builder.removeRegularOrStaticColumn(reg1.name)
-               .removeRegularOrStaticColumn(reg2.name)
-               .removeRegularOrStaticColumn(reg3.name);
-
-        builder.recordColumnDrop(reg1, 10000)
-               .recordColumnDrop(reg2, 20000)
-               .recordColumnDrop(reg3, 30000);
-
-        SchemaLoader.createKeyspace(keyspace, KeyspaceParams.simple(1), builder);
-
-        ColumnFamilyStore cfs = Keyspace.open(keyspace).getColumnFamilyStore(table);
-
-        assertEquals(ImmutableList.of("ALTER TABLE cql_test_keyspace_dropped_columns.test_table_dropped_columns DROP reg1 USING TIMESTAMP 10000;",
-                                      "ALTER TABLE cql_test_keyspace_dropped_columns.test_table_dropped_columns DROP reg3 USING TIMESTAMP 30000;",
-                                      "ALTER TABLE cql_test_keyspace_dropped_columns.test_table_dropped_columns DROP reg2 USING TIMESTAMP 20000;"),
-                     TableCQLHelper.getDroppedColumnsAsCQL(cfs.metadata()));
-
-        assertTrue(TableCQLHelper.getTableMetadataAsCQL(cfs.metadata(), true).startsWith(
-        "CREATE TABLE IF NOT EXISTS cql_test_keyspace_dropped_columns.test_table_dropped_columns (\n" +
-        "\tpk1 varint,\n" +
-        "\tck1 varint,\n" +
-        "\treg1 varint,\n" +
-        "\treg3 varint,\n" +
-        "\treg2 varint,\n" +
-        "\tPRIMARY KEY (pk1, ck1))"));
-    }
-
-    @Test
-    public void testReaddedColumns()
-    {
-        String keyspace = "cql_test_keyspace_readded_columns";
-        String table = "test_table_readded_columns";
-
-        TableMetadata.Builder builder =
-            TableMetadata.builder(keyspace, table)
-                         .addPartitionKeyColumn("pk1", IntegerType.instance)
-                         .addClusteringColumn("ck1", IntegerType.instance)
-                         .addRegularColumn("reg1", IntegerType.instance)
-                         .addStaticColumn("reg2", IntegerType.instance)
-                         .addRegularColumn("reg3", IntegerType.instance);
-
-        ColumnMetadata reg1 = builder.getColumn(ByteBufferUtil.bytes("reg1"));
-        ColumnMetadata reg2 = builder.getColumn(ByteBufferUtil.bytes("reg2"));
-
-        builder.removeRegularOrStaticColumn(reg1.name);
-        builder.removeRegularOrStaticColumn(reg2.name);
-
-        builder.recordColumnDrop(reg1, 10000);
-        builder.recordColumnDrop(reg2, 20000);
-
-        builder.addColumn(reg1);
-        builder.addColumn(reg2);
-
-        SchemaLoader.createKeyspace(keyspace, KeyspaceParams.simple(1), builder);
-
-        ColumnFamilyStore cfs = Keyspace.open(keyspace).getColumnFamilyStore(table);
-
-        // when re-adding, column is present in CREATE, then in DROP and then in ADD again, to record DROP with a proper timestamp
-        assertTrue(TableCQLHelper.getTableMetadataAsCQL(cfs.metadata(), true).startsWith(
-        "CREATE TABLE IF NOT EXISTS cql_test_keyspace_readded_columns.test_table_readded_columns (\n" +
-        "\tpk1 varint,\n" +
-        "\tck1 varint,\n" +
-        "\treg2 varint static,\n" +
-        "\treg1 varint,\n" +
-        "\treg3 varint,\n" +
-        "\tPRIMARY KEY (pk1, ck1))"));
-
-        assertEquals(ImmutableList.of("ALTER TABLE cql_test_keyspace_readded_columns.test_table_readded_columns DROP reg1 USING TIMESTAMP 10000;",
-                                      "ALTER TABLE cql_test_keyspace_readded_columns.test_table_readded_columns ADD reg1 varint;",
-                                      "ALTER TABLE cql_test_keyspace_readded_columns.test_table_readded_columns DROP reg2 USING TIMESTAMP 20000;",
-                                      "ALTER TABLE cql_test_keyspace_readded_columns.test_table_readded_columns ADD reg2 varint static;"),
-                     TableCQLHelper.getDroppedColumnsAsCQL(cfs.metadata()));
-    }
-
-    @Test
-    public void testCfmColumnsCQL()
-    {
-        String keyspace = "cql_test_keyspace_create_table";
-        String table = "test_table_create_table";
-
-        TableMetadata.Builder metadata =
-            TableMetadata.builder(keyspace, table)
-                         .addPartitionKeyColumn("pk1", IntegerType.instance)
-                         .addPartitionKeyColumn("pk2", AsciiType.instance)
-                         .addClusteringColumn("ck1", ReversedType.getInstance(IntegerType.instance))
-                         .addClusteringColumn("ck2", IntegerType.instance)
-                         .addStaticColumn("st1", AsciiType.instance)
-                         .addRegularColumn("reg1", AsciiType.instance)
-                         .addRegularColumn("reg2", ListType.getInstance(IntegerType.instance, false))
-                         .addRegularColumn("reg3", MapType.getInstance(AsciiType.instance, IntegerType.instance, true));
-
-        SchemaLoader.createKeyspace(keyspace, KeyspaceParams.simple(1), metadata);
-
-        ColumnFamilyStore cfs = Keyspace.open(keyspace).getColumnFamilyStore(table);
-
-        assertTrue(TableCQLHelper.getTableMetadataAsCQL(cfs.metadata(), true).startsWith(
-        "CREATE TABLE IF NOT EXISTS cql_test_keyspace_create_table.test_table_create_table (\n" +
-        "\tpk1 varint,\n" +
-        "\tpk2 ascii,\n" +
-        "\tck1 varint,\n" +
-        "\tck2 varint,\n" +
-        "\tst1 ascii static,\n" +
-        "\treg1 ascii,\n" +
-        "\treg2 frozen<list<varint>>,\n" +
-        "\treg3 map<ascii, varint>,\n" +
-        "\tPRIMARY KEY ((pk1, pk2), ck1, ck2))\n" +
-        "\tWITH ID = " + cfs.metadata.id + "\n" +
-        "\tAND CLUSTERING ORDER BY (ck1 DESC, ck2 ASC)"));
-    }
-
-    @Test
-    public void testCfmOptionsCQL()
-    {
-        String keyspace = "cql_test_keyspace_options";
-        String table = "test_table_options";
-
-        TableMetadata.Builder builder = TableMetadata.builder(keyspace, table);
-        builder.addPartitionKeyColumn("pk1", IntegerType.instance)
-               .addClusteringColumn("cl1", IntegerType.instance)
-               .addRegularColumn("reg1", AsciiType.instance)
-               .bloomFilterFpChance(1.0)
-               .comment("comment")
-               .compaction(CompactionParams.lcs(Collections.singletonMap("sstable_size_in_mb", "1")))
-               .compression(CompressionParams.lz4(1 << 16, 1 << 15))
-               .crcCheckChance(0.3)
-               .defaultTimeToLive(4)
-               .gcGraceSeconds(5)
-               .minIndexInterval(6)
-               .maxIndexInterval(7)
-               .memtableFlushPeriod(8)
-               .speculativeRetry(AlwaysSpeculativeRetryPolicy.INSTANCE)
-               .additionalWritePolicy(NeverSpeculativeRetryPolicy.INSTANCE)
-               .extensions(ImmutableMap.of("ext1", ByteBuffer.wrap("val1".getBytes())))
-               .recordColumnDrop(ColumnMetadata.regularColumn(keyspace, table, "reg1", AsciiType.instance),
-                                 FBUtilities.timestampMicros());
-
-        SchemaLoader.createKeyspace(keyspace, KeyspaceParams.simple(1), builder);
-
-        ColumnFamilyStore cfs = Keyspace.open(keyspace).getColumnFamilyStore(table);
-
-        assertTrue(TableCQLHelper.getTableMetadataAsCQL(cfs.metadata(), true).endsWith(
-        "AND bloom_filter_fp_chance = 1.0\n" +
-        "\tAND crc_check_chance = 0.3\n" +
-        "\tAND default_time_to_live = 4\n" +
-        "\tAND gc_grace_seconds = 5\n" +
-        "\tAND min_index_interval = 6\n" +
-        "\tAND max_index_interval = 7\n" +
-        "\tAND memtable_flush_period_in_ms = 8\n" +
-        "\tAND speculative_retry = 'ALWAYS'\n" +
-        "\tAND additional_write_policy = 'NEVER'\n" +
-        "\tAND comment = 'comment'\n" +
-        "\tAND caching = { 'keys': 'ALL', 'rows_per_partition': 'NONE' }\n" +
-        "\tAND compaction = { 'max_threshold': '32', 'min_threshold': '4', 'sstable_size_in_mb': '1', 'class': 'org.apache.cassandra.db.compaction.LeveledCompactionStrategy' }\n" +
-        "\tAND compression = { 'chunk_length_in_kb': '64', 'min_compress_ratio': '2.0', 'class': 'org.apache.cassandra.io.compress.LZ4Compressor' }\n" +
-        "\tAND cdc = false\n" +
-        "\tAND extensions = { 'ext1': 0x76616c31 };"
-        ));
-    }
-
-    @Test
-    public void testCfmIndexJson()
-    {
-        String keyspace = "cql_test_keyspace_3";
-        String table = "test_table_3";
-
-        TableMetadata.Builder builder =
-            TableMetadata.builder(keyspace, table)
-                         .addPartitionKeyColumn("pk1", IntegerType.instance)
-                         .addClusteringColumn("cl1", IntegerType.instance)
-                         .addRegularColumn("reg1", AsciiType.instance);
-
-        ColumnIdentifier reg1 = ColumnIdentifier.getInterned("reg1", true);
-
-        builder.indexes(
-            Indexes.of(IndexMetadata.fromIndexTargets(
-            Collections.singletonList(new IndexTarget(reg1, IndexTarget.Type.VALUES)),
-                                                      "indexName",
-                                                      IndexMetadata.Kind.COMPOSITES,
-                                                      Collections.emptyMap()),
-                       IndexMetadata.fromIndexTargets(
-                       Collections.singletonList(new IndexTarget(reg1, IndexTarget.Type.KEYS)),
-                                                      "indexName2",
-                                                      IndexMetadata.Kind.COMPOSITES,
-                                                      Collections.emptyMap()),
-                       IndexMetadata.fromIndexTargets(
-                       Collections.singletonList(new IndexTarget(reg1, IndexTarget.Type.KEYS_AND_VALUES)),
-                                                      "indexName3",
-                                                      IndexMetadata.Kind.COMPOSITES,
-                                                      Collections.emptyMap()),
-                       IndexMetadata.fromIndexTargets(
-                       Collections.singletonList(new IndexTarget(reg1, IndexTarget.Type.KEYS_AND_VALUES)),
-                                                      "indexName4",
-                                                      IndexMetadata.Kind.CUSTOM,
-                                                      Collections.singletonMap(IndexTarget.CUSTOM_INDEX_OPTION_NAME, SASIIndex.class.getName()))));
-
-
-        SchemaLoader.createKeyspace(keyspace, KeyspaceParams.simple(1), builder);
-
-        ColumnFamilyStore cfs = Keyspace.open(keyspace).getColumnFamilyStore(table);
-
-        assertEquals(ImmutableList.of("CREATE INDEX \"indexName\" ON cql_test_keyspace_3.test_table_3 (values(reg1));",
-                                      "CREATE INDEX \"indexName2\" ON cql_test_keyspace_3.test_table_3 (keys(reg1));",
-                                      "CREATE INDEX \"indexName3\" ON cql_test_keyspace_3.test_table_3 (entries(reg1));",
-                                      "CREATE CUSTOM INDEX \"indexName4\" ON cql_test_keyspace_3.test_table_3 (entries(reg1)) USING 'org.apache.cassandra.index.sasi.SASIIndex';"),
-                     TableCQLHelper.getIndexesAsCQL(cfs.metadata()));
-    }
-
-    private final static String SNAPSHOT = "testsnapshot";
-
-    @Test
-    public void testSnapshot() throws Throwable
-    {
-        String typeA = createType("CREATE TYPE %s (a1 varint, a2 varint, a3 varint);");
-        String typeB = createType("CREATE TYPE %s (b1 frozen<" + typeA + ">, b2 frozen<" + typeA + ">, b3 frozen<" + typeA + ">);");
-        String typeC = createType("CREATE TYPE %s (c1 frozen<" + typeB + ">, c2 frozen<" + typeB + ">, c3 frozen<" + typeB + ">);");
-
-        String tableName = createTable("CREATE TABLE IF NOT EXISTS %s (" +
-                                       "pk1 varint," +
-                                       "pk2 ascii," +
-                                       "ck1 varint," +
-                                       "ck2 varint," +
-                                       "reg1 " + typeC + "," +
-                                       "reg2 int," +
-                                       "reg3 int," +
-                                       "PRIMARY KEY ((pk1, pk2), ck1, ck2)) WITH " +
-                                       "CLUSTERING ORDER BY (ck1 ASC, ck2 DESC);");
-
-        alterTable("ALTER TABLE %s DROP reg3 USING TIMESTAMP 10000;");
-        alterTable("ALTER TABLE %s ADD reg3 int;");
-
-        for (int i = 0; i < 10; i++)
-            execute("INSERT INTO %s (pk1, pk2, ck1, ck2, reg1, reg2) VALUES (?, ?, ?, ?, ?, ?)", i, i + 1, i + 2, i + 3, null, i + 5);
-
-        ColumnFamilyStore cfs = Keyspace.open(keyspace()).getColumnFamilyStore(tableName);
-        cfs.snapshot(SNAPSHOT);
-
-        String schema = Files.toString(cfs.getDirectories().getSnapshotSchemaFile(SNAPSHOT), Charset.defaultCharset());
-        assertTrue(schema.contains(String.format("CREATE TYPE %s.%s (a1 varint, a2 varint, a3 varint);", keyspace(), typeA)));
-        assertTrue(schema.contains(String.format("CREATE TYPE %s.%s (a1 varint, a2 varint, a3 varint);", keyspace(), typeA)));
-        assertTrue(schema.contains(String.format("CREATE TYPE %s.%s (b1 frozen<%s>, b2 frozen<%s>, b3 frozen<%s>);", keyspace(), typeB, typeA, typeA, typeA)));
-        assertTrue(schema.contains(String.format("CREATE TYPE %s.%s (c1 frozen<%s>, c2 frozen<%s>, c3 frozen<%s>);", keyspace(), typeC, typeB, typeB, typeB)));
-
-        schema = schema.substring(schema.indexOf("CREATE TABLE")); // trim to ensure order
-
-        assertTrue(schema.startsWith("CREATE TABLE IF NOT EXISTS " + keyspace() + "." + tableName + " (\n" +
-                                     "\tpk1 varint,\n" +
-                                     "\tpk2 ascii,\n" +
-                                     "\tck1 varint,\n" +
-                                     "\tck2 varint,\n" +
-                                     "\treg2 int,\n" +
-                                     "\treg3 int,\n" +
-                                     "\treg1 " + typeC + ",\n" +
-                                     "\tPRIMARY KEY ((pk1, pk2), ck1, ck2))\n" +
-                                     "\tWITH ID = " + cfs.metadata.id + "\n" +
-                                     "\tAND CLUSTERING ORDER BY (ck1 ASC, ck2 DESC)"));
-
-        schema = schema.substring(schema.indexOf("ALTER"));
-        assertTrue(schema.startsWith(String.format("ALTER TABLE %s.%s DROP reg3 USING TIMESTAMP 10000;", keyspace(), tableName)));
-        assertTrue(schema.contains(String.format("ALTER TABLE %s.%s ADD reg3 int;", keyspace(), tableName)));
-
-        JSONObject manifest = (JSONObject) new JSONParser().parse(new FileReader(cfs.getDirectories().getSnapshotManifestFile(SNAPSHOT)));
-        JSONArray files = (JSONArray) manifest.get("files");
-        Assert.assertEquals(1, files.size());
-    }
-
-    @Test
-    public void testSystemKsSnapshot() throws Throwable
-    {
-        ColumnFamilyStore cfs = Keyspace.open("system").getColumnFamilyStore("peers");
-        cfs.snapshot(SNAPSHOT);
-
-        Assert.assertTrue(cfs.getDirectories().getSnapshotManifestFile(SNAPSHOT).exists());
-        Assert.assertFalse(cfs.getDirectories().getSnapshotSchemaFile(SNAPSHOT).exists());
-    }
-
-    @Test
-    public void testDroppedType() throws Throwable
-    {
-        String typeA = createType("CREATE TYPE %s (a1 varint, a2 varint, a3 varint);");
-        String typeB = createType("CREATE TYPE %s (b1 frozen<" + typeA + ">, b2 frozen<" + typeA + ">, b3 frozen<" + typeA + ">);");
-
-        String tableName = createTable("CREATE TABLE IF NOT EXISTS %s (" +
-                                       "pk1 varint," +
-                                       "ck1 varint," +
-                                       "reg1 frozen<" + typeB + ">," +
-                                       "reg2 varint," +
-                                       "PRIMARY KEY (pk1, ck1));");
-
-        alterTable("ALTER TABLE %s DROP reg1 USING TIMESTAMP 10000;");
-
-        Runnable validate = () -> {
-            try
-            {
-                ColumnFamilyStore cfs = Keyspace.open(keyspace()).getColumnFamilyStore(tableName);
-                cfs.snapshot(SNAPSHOT);
-                String schema = Files.toString(cfs.getDirectories().getSnapshotSchemaFile(SNAPSHOT), Charset.defaultCharset());
-
-                // When both column and it's type are dropped, the type in column definition gets substituted with a tuple
-                assertTrue(schema.startsWith("CREATE TABLE IF NOT EXISTS " + keyspace() + "." + tableName + " (\n" +
-                                             "\tpk1 varint,\n" +
-                                             "\tck1 varint,\n" +
-                                             "\treg2 varint,\n" +
-                                             "\treg1 frozen<tuple<frozen<tuple<varint, varint, varint>>, frozen<tuple<varint, varint, varint>>, frozen<tuple<varint, varint, varint>>>>,\n" +
-                                             "\tPRIMARY KEY (pk1, ck1))"));
-                assertTrue(schema.contains("ALTER TABLE " + keyspace() + "." + tableName + " DROP reg1 USING TIMESTAMP 10000;"));
-            }
-            catch (Exception e)
-            {
-                throw new RuntimeException(e);
-            }
-        };
-
-        // Validate before and after the type drop
-        validate.run();
-        schemaChange("DROP TYPE " + keyspace() + "." + typeB);
-        schemaChange("DROP TYPE " + keyspace() + "." + typeA);
-        validate.run();
-    }
-}
diff --git a/test/unit/org/apache/cassandra/utils/FBUtilitiesTest.java b/test/unit/org/apache/cassandra/utils/FBUtilitiesTest.java
index f5f94aa..833f2e4 100644
--- a/test/unit/org/apache/cassandra/utils/FBUtilitiesTest.java
+++ b/test/unit/org/apache/cassandra/utils/FBUtilitiesTest.java
@@ -227,5 +227,4 @@ public class FBUtilitiesTest
             executor.shutdown();
         }
     }
-
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org