You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2013/11/04 16:08:16 UTC

git commit: Add user-defined types to CQL3

Updated Branches:
  refs/heads/trunk 6c9efb0e1 -> a552b305f


Add user-defined types to CQL3

patch by slebresne; reviewed by iamaleskey for CASSANDRA-5590


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/a552b305
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/a552b305
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/a552b305

Branch: refs/heads/trunk
Commit: a552b305f3d1b17e394744b18efd7f40599f3c2e
Parents: 6c9efb0
Author: Sylvain Lebresne <sy...@datastax.com>
Authored: Thu Sep 19 09:20:13 2013 +0200
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Mon Nov 4 16:07:40 2013 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |   2 +
 .../org/apache/cassandra/config/CFMetaData.java |   7 +
 .../cassandra/config/DatabaseDescriptor.java    |   1 +
 .../org/apache/cassandra/config/KSMetaData.java |   3 +-
 .../org/apache/cassandra/config/Schema.java     |  24 +-
 .../org/apache/cassandra/config/UTMetaData.java | 151 ++++++++
 .../org/apache/cassandra/cql3/CQL3Type.java     |  88 +++++
 src/java/org/apache/cassandra/cql3/Cql.g        |  98 +++++-
 .../apache/cassandra/cql3/QueryProcessor.java   |   7 +-
 .../org/apache/cassandra/cql3/UserTypes.java    | 177 ++++++++++
 .../cql3/statements/AlterTypeStatement.java     | 352 +++++++++++++++++++
 .../cql3/statements/CreateTypeStatement.java    | 117 ++++++
 .../cql3/statements/DropTypeStatement.java      | 131 +++++++
 .../cassandra/cql3/statements/Selectable.java   |  18 +
 .../cassandra/cql3/statements/Selection.java    |  89 ++++-
 .../org/apache/cassandra/db/DefsTables.java     |  58 ++-
 .../org/apache/cassandra/db/SystemKeyspace.java |  19 +-
 .../cassandra/db/marshal/CompositeType.java     |  19 +-
 .../apache/cassandra/db/marshal/TypeParser.java |  81 ++++-
 .../apache/cassandra/db/marshal/UserType.java   |  91 +++++
 .../cassandra/service/MigrationManager.java     |  24 +-
 21 files changed, 1510 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/a552b305/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index e55c8f7..4231b0b 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,8 @@
  * Remove leveled json manifest migration code (CASSANDRA-5996)
  * Remove CFDefinition (CASSANDRA-6253)
  * Use AtomicIntegerFieldUpdater in RefCountedMemory (CASSANDRA-6278)
+ * User-defined types for CQL3 (5590)
+
 
 2.0.3
  * Avoid flushing compaction_history after each operation (CASSANDRA-6287)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a552b305/src/java/org/apache/cassandra/config/CFMetaData.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/CFMetaData.java b/src/java/org/apache/cassandra/config/CFMetaData.java
index b3162a1..d04dc25 100644
--- a/src/java/org/apache/cassandra/config/CFMetaData.java
+++ b/src/java/org/apache/cassandra/config/CFMetaData.java
@@ -168,6 +168,13 @@ public final class CFMetaData
                                                               + "PRIMARY KEY (keyspace_name, columnfamily_name, trigger_name)"
                                                               + ") WITH COMMENT='triggers metadata table'");
 
+    public static final CFMetaData SchemaUserTypesCf = compile("CREATE TABLE " + SystemKeyspace.SCHEMA_USER_TYPES_CF + " ("
+                                                               + "type_name text,"
+                                                               + "column_names list<text>,"
+                                                               + "column_types list<text>,"
+                                                               + "PRIMARY KEY (type_name)"
+                                                               + ") WITH COMMENT='Defined user types' AND gc_grace_seconds=8640");
+
     public static final CFMetaData HintsCf = compile("CREATE TABLE " + SystemKeyspace.HINTS_CF + " ("
                                                      + "target_id uuid,"
                                                      + "hint_id timeuuid,"

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a552b305/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index f417f41..bd6e4c1 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -465,6 +465,7 @@ public class DatabaseDescriptor
         assert systemKeyspaces.size() == Schema.systemKeyspaceNames.size();
         for (KSMetaData ksmd : systemKeyspaces)
             Schema.instance.load(ksmd);
+        Schema.instance.loadUserTypes();
 
         /* Load the seeds for node contact points */
         if (conf.seed_provider == null)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a552b305/src/java/org/apache/cassandra/config/KSMetaData.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/KSMetaData.java b/src/java/org/apache/cassandra/config/KSMetaData.java
index 0a32f5c..71f81c7 100644
--- a/src/java/org/apache/cassandra/config/KSMetaData.java
+++ b/src/java/org/apache/cassandra/config/KSMetaData.java
@@ -84,11 +84,12 @@ public final class KSMetaData
                                                 CFMetaData.PeerEventsCf,
                                                 CFMetaData.HintsCf,
                                                 CFMetaData.IndexCf,
-                                                CFMetaData.SchemaTriggersCf,
                                                 CFMetaData.CounterIdCf,
                                                 CFMetaData.SchemaKeyspacesCf,
                                                 CFMetaData.SchemaColumnFamiliesCf,
                                                 CFMetaData.SchemaColumnsCf,
+                                                CFMetaData.SchemaTriggersCf,
+                                                CFMetaData.SchemaUserTypesCf,
                                                 CFMetaData.CompactionLogCf,
                                                 CFMetaData.CompactionHistoryCf,
                                                 CFMetaData.PaxosCf,

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a552b305/src/java/org/apache/cassandra/config/Schema.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/Schema.java b/src/java/org/apache/cassandra/config/Schema.java
index 112dc87..146b82b 100644
--- a/src/java/org/apache/cassandra/config/Schema.java
+++ b/src/java/org/apache/cassandra/config/Schema.java
@@ -30,6 +30,7 @@ import org.slf4j.LoggerFactory;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.marshal.UserType;
 import org.apache.cassandra.io.sstable.Descriptor;
 import org.apache.cassandra.service.MigrationManager;
 import org.apache.cassandra.tracing.Tracing;
@@ -60,6 +61,8 @@ public class Schema
     /* metadata map for faster ColumnFamily lookup */
     private final BiMap<Pair<String, String>, UUID> cfIdMap = HashBiMap.create();
 
+    public final UTMetaData userTypes = new UTMetaData();
+
     private volatile UUID version;
 
     // 59adb24e-f3cd-3e02-97f0-5b395827453f
@@ -116,6 +119,24 @@ public class Schema
         return this;
     }
 
+    public Schema loadUserTypes()
+    {
+        userTypes.addAll(UTMetaData.fromSchema(SystemKeyspace.serializedSchema(SystemKeyspace.SCHEMA_USER_TYPES_CF)));
+        return this;
+    }
+
+    public Schema loadType(UserType newType)
+    {
+        userTypes.addType(newType);
+        return this;
+    }
+
+    public Schema dropType(UserType droppedType)
+    {
+        userTypes.removeType(droppedType);
+        return this;
+    }
+
     /**
      * Get keyspace instance by name
      *
@@ -400,7 +421,8 @@ public class Schema
     {
         try
         {
-            return systemKeyspaceNames.contains(ByteBufferUtil.string(row.key.key));
+            return !row.cf.metadata().cfName.equals(SystemKeyspace.SCHEMA_USER_TYPES_CF)
+                && systemKeyspaceNames.contains(ByteBufferUtil.string(row.key.key));
         }
         catch (CharacterCodingException e)
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a552b305/src/java/org/apache/cassandra/config/UTMetaData.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/UTMetaData.java b/src/java/org/apache/cassandra/config/UTMetaData.java
new file mode 100644
index 0000000..108e106
--- /dev/null
+++ b/src/java/org/apache/cassandra/config/UTMetaData.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.config;
+
+import java.nio.ByteBuffer;
+import java.util.*;
+
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.marshal.*;
+import org.apache.cassandra.exceptions.RequestValidationException;
+import org.apache.cassandra.cql3.*;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+/**
+ * Defined (and loaded) user types.
+ *
+ * In practice, because user types are global, we have only one instance of
+ * this class that retrieve through the Schema class.
+ */
+public final class UTMetaData
+{
+    private final Map<ByteBuffer, UserType> userTypes = new HashMap<>();
+
+    // Only for Schema. You should generally not create instance of this, but rather use
+    // the global reference Schema.instance().userTypes;
+    UTMetaData() {}
+
+    public static UTMetaData fromSchema(UntypedResultSet rows)
+    {
+        UTMetaData m = new UTMetaData();
+        for (UntypedResultSet.Row row : rows)
+            m.addType(fromSchema(row));
+        return m;
+    }
+
+    private static UserType fromSchema(UntypedResultSet.Row row)
+    {
+        try
+        {
+            ByteBuffer name = ByteBufferUtil.bytes(row.getString("type_name"));
+            List<String> rawColumns = row.getList("column_names", UTF8Type.instance);
+            List<String> rawTypes = row.getList("column_types", UTF8Type.instance);
+
+            List<ByteBuffer> columns = new ArrayList<>(rawColumns.size());
+            for (String rawColumn : rawColumns)
+                columns.add(ByteBufferUtil.bytes(rawColumn));
+
+            List<AbstractType<?>> types = new ArrayList<>(rawTypes.size());
+            for (String rawType : rawTypes)
+                types.add(TypeParser.parse(rawType));
+
+            return new UserType(name, columns, types);
+        }
+        catch (RequestValidationException e)
+        {
+            // If it has been written in the schema, it should be valid
+            throw new AssertionError();
+        }
+    }
+
+    public static UTMetaData fromSchema(List<Row> rows)
+    {
+        UntypedResultSet result = QueryProcessor.resultify("SELECT * FROM system." + SystemKeyspace.SCHEMA_USER_TYPES_CF, rows);
+        return fromSchema(result);
+    }
+
+    public static RowMutation toSchema(UserType newType, long timestamp)
+    {
+        RowMutation rm = new RowMutation(Keyspace.SYSTEM_KS, newType.name);
+        ColumnFamily cf = rm.addOrGet(SystemKeyspace.SCHEMA_USER_TYPES_CF);
+
+        ColumnNameBuilder builder = CFMetaData.SchemaUserTypesCf.getColumnNameBuilder();
+        UpdateParameters params = new UpdateParameters(CFMetaData.SchemaUserTypesCf, Collections.<ByteBuffer>emptyList(), timestamp, 0, null);
+
+        List<ByteBuffer> columnTypes = new ArrayList<>(newType.types.size());
+        for (AbstractType<?> type : newType.types)
+            columnTypes.add(ByteBufferUtil.bytes(type.toString()));
+
+        try
+        {
+            new Lists.Setter(new ColumnIdentifier("column_names", false), new Lists.Value(newType.columnNames)).execute(newType.name, cf, builder.copy(), params);
+            new Lists.Setter(new ColumnIdentifier("column_types", false), new Lists.Value(columnTypes)).execute(newType.name, cf, builder, params);
+        }
+        catch (RequestValidationException e)
+        {
+            throw new AssertionError();
+        }
+
+        return rm;
+    }
+
+    public static RowMutation dropFromSchema(UserType droppedType, long timestamp)
+    {
+        RowMutation rm = new RowMutation(Keyspace.SYSTEM_KS, droppedType.name);
+        rm.delete(SystemKeyspace.SCHEMA_USER_TYPES_CF, timestamp);
+        return rm;
+    }
+
+    public void addAll(UTMetaData types)
+    {
+        for (UserType type : types.userTypes.values())
+            addType(type);
+    }
+
+    public UserType getType(ColumnIdentifier typeName)
+    {
+        return getType(typeName.bytes);
+    }
+
+    public UserType getType(ByteBuffer typeName)
+    {
+        return userTypes.get(typeName);
+    }
+
+    public Map<ByteBuffer, UserType> getAllTypes()
+    {
+        // Copy to avoid concurrent modification while iterating. Not intended to be called on a criticial path anyway
+        return new HashMap<>(userTypes);
+    }
+
+    // This is *not* thread safe. As far as the global instance is concerned, only
+    // Schema.loadType() (which is only called in DefsTables that is synchronized)
+    // should use this.
+    public void addType(UserType type)
+    {
+        UserType old = userTypes.get(type.name);
+        assert old == null || type.isCompatibleWith(old);
+        userTypes.put(type.name, type);
+    }
+
+    // Same remarks than for addType
+    public void removeType(UserType type)
+    {
+        userTypes.remove(type.name);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a552b305/src/java/org/apache/cassandra/cql3/CQL3Type.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/CQL3Type.java b/src/java/org/apache/cassandra/cql3/CQL3Type.java
index 9a6336e..fa6e3a0 100644
--- a/src/java/org/apache/cassandra/cql3/CQL3Type.java
+++ b/src/java/org/apache/cassandra/cql3/CQL3Type.java
@@ -17,6 +17,9 @@
  */
 package org.apache.cassandra.cql3;
 
+import java.nio.ByteBuffer;
+
+import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.db.marshal.*;
 import org.apache.cassandra.exceptions.InvalidRequestException;
 import org.apache.cassandra.exceptions.ConfigurationException;
@@ -26,6 +29,7 @@ public interface CQL3Type
 {
     public boolean isCollection();
     public boolean isCounter();
+    public boolean isUserType();
     public AbstractType<?> getType();
 
     public enum Native implements CQL3Type
@@ -69,6 +73,11 @@ public interface CQL3Type
             return this == COUNTER;
         }
 
+        public boolean isUserType()
+        {
+            return false;
+        }
+
         @Override
         public String toString()
         {
@@ -105,6 +114,11 @@ public interface CQL3Type
             return false;
         }
 
+        public boolean isUserType()
+        {
+            return false;
+        }
+
         @Override
         public final boolean equals(Object o)
         {
@@ -182,6 +196,11 @@ public interface CQL3Type
             return false;
         }
 
+        public boolean isUserType()
+        {
+            return false;
+        }
+
         @Override
         public final boolean equals(Object o)
         {
@@ -214,4 +233,73 @@ public interface CQL3Type
             throw new AssertionError();
         }
     }
+
+    public static class UserDefined implements CQL3Type
+    {
+        // Keeping this separatly from type just to simplify toString()
+        ColumnIdentifier name;
+        UserType type;
+
+        private UserDefined(ColumnIdentifier name, UserType type)
+        {
+            this.name = name;
+            this.type = type;
+        }
+
+        public static UserDefined create(ByteBuffer name, UserType type)
+        {
+            return new UserDefined(new ColumnIdentifier(name, UTF8Type.instance), type);
+        }
+
+        public static UserDefined create(ColumnIdentifier name) throws InvalidRequestException
+        {
+            UserType type = Schema.instance.userTypes.getType(name);
+            if (type == null)
+                throw new InvalidRequestException("Unknown type " + name);
+
+            return new UserDefined(name, type);
+        }
+
+        public boolean isUserType()
+        {
+            return true;
+        }
+
+        public boolean isCollection()
+        {
+            return false;
+        }
+
+        public boolean isCounter()
+        {
+            return false;
+        }
+
+        public AbstractType<?> getType()
+        {
+            return type;
+        }
+
+        @Override
+        public final boolean equals(Object o)
+        {
+            if(!(o instanceof UserDefined))
+                return false;
+
+            UserDefined that = (UserDefined)o;
+            return type.equals(that.type);
+        }
+
+        @Override
+        public final int hashCode()
+        {
+            return type.hashCode();
+        }
+
+        @Override
+        public String toString()
+        {
+            return name.toString();
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a552b305/src/java/org/apache/cassandra/cql3/Cql.g
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/Cql.g b/src/java/org/apache/cassandra/cql3/Cql.g
index ed950da..5bea701 100644
--- a/src/java/org/apache/cassandra/cql3/Cql.g
+++ b/src/java/org/apache/cassandra/cql3/Cql.g
@@ -30,6 +30,7 @@ options {
     import java.util.Arrays;
     import java.util.Collections;
     import java.util.EnumSet;
+    import java.util.HashSet;
     import java.util.HashMap;
     import java.util.LinkedHashMap;
     import java.util.List;
@@ -53,6 +54,18 @@ options {
     private final List<String> recognitionErrors = new ArrayList<String>();
     private final List<ColumnIdentifier> bindVariables = new ArrayList<ColumnIdentifier>();
 
+    public static final Set<String> reservedTypeNames = new HashSet<String>()
+    {{
+        add("byte");
+        add("smallint");
+        add("complex");
+        add("enum");
+        add("date");
+        add("interval");
+        add("macaddr");
+        add("bitstring");
+    }};
+
     public AbstractMarker.Raw newBindVariables(ColumnIdentifier name)
     {
         AbstractMarker.Raw marker = new AbstractMarker.Raw(bindVariables.size());
@@ -209,6 +222,9 @@ cqlStatement returns [ParsedStatement stmt]
     | st22=listUsersStatement          { $stmt = st22; }
     | st23=createTriggerStatement      { $stmt = st23; }
     | st24=dropTriggerStatement        { $stmt = st24; }
+    | st25=createTypeStatement         { $stmt = st25; }
+    | st26=alterTypeStatement          { $stmt = st26; }
+    | st27=dropTypeStatement           { $stmt = st27; }
     ;
 
 /*
@@ -261,17 +277,19 @@ selector returns [RawSelector s]
     ;
 
 unaliasedSelector returns [Selectable s]
-    : c=cident                                  { $s = c; }
-    | K_WRITETIME '(' c=cident ')'              { $s = new Selectable.WritetimeOrTTL(c, true); }
-    | K_TTL       '(' c=cident ')'              { $s = new Selectable.WritetimeOrTTL(c, false); }
-    | f=functionName args=selectionFunctionArgs { $s = new Selectable.WithFunction(f, args); }
+    @init { Selectable tmp = null; }
+    :  ( c=cident                                  { tmp = c; }
+       | K_WRITETIME '(' c=cident ')'              { tmp = new Selectable.WritetimeOrTTL(c, true); }
+       | K_TTL       '(' c=cident ')'              { tmp = new Selectable.WritetimeOrTTL(c, false); }
+       | f=functionName args=selectionFunctionArgs { tmp = new Selectable.WithFunction(f, args); }
+       ) ( '.' fi=cident { tmp = new Selectable.WithFieldSelection(tmp, fi); } )* { $s = tmp; }
     ;
 
 selectionFunctionArgs returns [List<Selectable> a]
     : '(' ')' { $a = Collections.emptyList(); }
     | '(' s1=unaliasedSelector { List<Selectable> args = new ArrayList<Selectable>(); args.add(s1); }
           ( ',' sn=unaliasedSelector { args.add(sn); } )*
-       ')' { $a = args; }
+      ')' { $a = args; }
     ;
 
 selectCountClause returns [List<RawSelector> expr]
@@ -498,6 +516,26 @@ cfamOrdering[CreateTableStatement.RawStatement expr]
     : k=cident (K_ASC | K_DESC { reversed=true;} ) { $expr.setOrdering(k, reversed); }
     ;
 
+
+/**
+ * CREATE TYPE foo (
+ *    <name1> <type1>,
+ *    <name2> <type2>,
+ *    ....
+ * )
+ */
+createTypeStatement returns [CreateTypeStatement expr]
+    @init { boolean ifNotExists = false; }
+    : K_CREATE K_TYPE (K_IF K_NOT K_EXISTS { ifNotExists = true; } )?
+         tn=non_type_ident { $expr = new CreateTypeStatement(tn, ifNotExists); }
+         '(' typeColumns[expr] ( ',' typeColumns[expr]? )* ')'
+    ;
+
+typeColumns[CreateTypeStatement expr]
+    : k=cident v=comparatorType { $expr.addDefinition(k, v); }
+    ;
+
+
 /**
  * CREATE INDEX [IF NOT EXISTS] [indexName] ON <columnFamily> (<columnName>);
  * CREATE CUSTOM INDEX [IF NOT EXISTS] [indexName] ON <columnFamily> (<columnName>) USING <indexClass>;
@@ -567,6 +605,25 @@ alterTableStatement returns [AlterTableStatement expr]
     ;
 
 /**
+ * ALTER TYPE <name> ALTER <field> TYPE <newtype>;
+ * ALTER TYPE <name> ADD <field> <newtype>;
+ * ALTER TYPE <name> RENAME <field> TO <newtype> AND ...;
+ */
+alterTypeStatement returns [AlterTypeStatement expr]
+    : K_ALTER K_TYPE name=non_type_ident
+          ( K_ALTER f=cident K_TYPE v=comparatorType { $expr = AlterTypeStatement.alter(name, f, v); }
+          | K_ADD   f=cident v=comparatorType        { $expr = AlterTypeStatement.addition(name, f, v); }
+          | K_RENAME K_TO new_name=non_type_ident    { $expr = AlterTypeStatement.typeRename(name, new_name); }
+          | K_RENAME
+               { Map<ColumnIdentifier, ColumnIdentifier> renames = new HashMap<ColumnIdentifier, ColumnIdentifier>(); }
+                 id1=cident K_TO toId1=cident { renames.put(id1, toId1); }
+                 ( K_AND idn=cident K_TO toIdn=cident { renames.put(idn, toIdn); } )*
+               { $expr = AlterTypeStatement.renames(name, renames); }
+          )
+    ;
+
+
+/**
  * DROP KEYSPACE [IF EXISTS] <KSP>;
  */
 dropKeyspaceStatement returns [DropKeyspaceStatement ksp]
@@ -583,6 +640,14 @@ dropTableStatement returns [DropTableStatement stmt]
     ;
 
 /**
+ * DROP TYPE <name>;
+ */
+dropTypeStatement returns [DropTypeStatement stmt]
+    @init { boolean ifExists = false; }
+    : K_DROP K_TYPE (K_IF K_EXISTS { ifExists = true; } )? name=non_type_ident { $stmt = new DropTypeStatement(name, ifExists); }
+    ;
+
+/**
  * DROP INDEX [IF EXISTS] <INDEX_NAME>
  */
 dropIndexStatement returns [DropIndexStatement expr]
@@ -769,9 +834,17 @@ collection_literal returns [Term.Raw value]
     | '{' '}' { $value = new Sets.Literal(Collections.<Term.Raw>emptyList()); }
     ;
 
+usertype_literal returns [UserTypes.Literal ut]
+    @init{ Map<ColumnIdentifier, Term.Raw> m = new HashMap<ColumnIdentifier, Term.Raw>(); }
+    @after{ $ut = new UserTypes.Literal(m); }
+    // We don't allow empty literals because that conflicts with sets/maps and is currently useless since we don't allow empty user types
+    : '{' k1=cident ':' v1=term { m.put(k1, v1); } ( ',' kn=cident ':' vn=term { m.put(kn, vn); } )* '}'
+    ;
+
 value returns [Term.Raw value]
     : c=constant           { $value = c; }
     | l=collection_literal { $value = l; }
+    | u=usertype_literal   { $value = u; }
     | K_NULL               { $value = Constants.NULL_LITERAL; }
     | ':' id=cident        { $value = newBindVariables(id); }
     | QMARK                { $value = newBindVariables(null); }
@@ -879,6 +952,7 @@ relation[List<Relation> clauses]
 comparatorType returns [CQL3Type t]
     : c=native_type     { $t = c; }
     | c=collection_type { $t = c; }
+    | id=non_type_ident  { try { $t = CQL3Type.UserDefined.create(id); } catch (InvalidRequestException e) { addRecognitionError(e.getMessage()); }}
     | s=STRING_LITERAL
       {
         try {
@@ -928,12 +1002,25 @@ username
     | STRING_LITERAL
     ;
 
+// Basically the same than cident, but we need to exlude existing CQL3 types
+// (which for some reason are not reserved otherwise)
+non_type_ident returns [ColumnIdentifier id]
+    : t=IDENT                    { if (reservedTypeNames.contains($t.text)) addRecognitionError("Invalid (reserved) user type name " + $t.text); $id = new ColumnIdentifier($t.text, false); }
+    | t=QUOTED_NAME              { $id = new ColumnIdentifier($t.text, true); }
+    | k=basic_unreserved_keyword { $id = new ColumnIdentifier(k, false); }
+    ;
+
 unreserved_keyword returns [String str]
     : u=unreserved_function_keyword     { $str = u; }
     | k=(K_TTL | K_COUNT | K_WRITETIME) { $str = $k.text; }
     ;
 
 unreserved_function_keyword returns [String str]
+    : u=basic_unreserved_keyword { $str = u; }
+    | t=native_type              { $str = t.toString(); }
+    ;
+
+basic_unreserved_keyword returns [String str]
     : k=( K_KEY
         | K_AS
         | K_CLUSTERING
@@ -958,7 +1045,6 @@ unreserved_function_keyword returns [String str]
         | K_TRIGGER
         | K_DISTINCT
         ) { $str = $k.text; }
-    | t=native_type { $str = t.toString(); }
     ;
 
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a552b305/src/java/org/apache/cassandra/cql3/QueryProcessor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/QueryProcessor.java b/src/java/org/apache/cassandra/cql3/QueryProcessor.java
index dc2649c..1cb040c 100644
--- a/src/java/org/apache/cassandra/cql3/QueryProcessor.java
+++ b/src/java/org/apache/cassandra/cql3/QueryProcessor.java
@@ -250,10 +250,15 @@ public class QueryProcessor
 
     public static UntypedResultSet resultify(String query, Row row)
     {
+        return resultify(query, Collections.singletonList(row));
+    }
+
+    public static UntypedResultSet resultify(String query, List<Row> rows)
+    {
         try
         {
             SelectStatement ss = (SelectStatement) getStatement(query, null).statement;
-            ResultSet cqlRows = ss.process(Collections.singletonList(row));
+            ResultSet cqlRows = ss.process(rows);
             return new UntypedResultSet(cqlRows);
         }
         catch (RequestValidationException e)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a552b305/src/java/org/apache/cassandra/cql3/UserTypes.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/UserTypes.java b/src/java/org/apache/cassandra/cql3/UserTypes.java
new file mode 100644
index 0000000..dd546ee
--- /dev/null
+++ b/src/java/org/apache/cassandra/cql3/UserTypes.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.cql3;
+
+import java.nio.ByteBuffer;
+import java.util.*;
+
+import org.apache.cassandra.db.marshal.CompositeType;
+import org.apache.cassandra.db.marshal.UserType;
+import org.apache.cassandra.db.marshal.UTF8Type;
+import org.apache.cassandra.exceptions.InvalidRequestException;
+import org.apache.cassandra.utils.FBUtilities;
+
+/**
+ * Static helper methods and classes for user types.
+ */
+public abstract class UserTypes
+{
+    private UserTypes() {}
+
+    public static ColumnSpecification fieldSpecOf(ColumnSpecification column, int field)
+    {
+        return new ColumnSpecification(column.ksName, column.cfName, new ColumnIdentifier(column.name + "." + field, true), ((UserType)column.type).types.get(field));
+    }
+
+    public static class Literal implements Term.Raw
+    {
+        public final Map<ColumnIdentifier, Term.Raw> entries;
+
+        public Literal(Map<ColumnIdentifier, Term.Raw> entries)
+        {
+            this.entries = entries;
+        }
+
+        public Term prepare(ColumnSpecification receiver) throws InvalidRequestException
+        {
+            validateAssignableTo(receiver);
+
+            UserType ut = (UserType)receiver.type;
+            boolean allTerminal = true;
+            List<Term> values = new ArrayList<>(entries.size());
+            for (int i = 0; i < ut.types.size(); i++)
+            {
+                ColumnIdentifier field = new ColumnIdentifier(ut.columnNames.get(i), UTF8Type.instance);
+                Term value = entries.get(field).prepare(fieldSpecOf(receiver, i));
+
+                if (value instanceof Term.NonTerminal)
+                    allTerminal = false;
+
+                values.add(value);
+            }
+            DelayedValue value = new DelayedValue(((UserType)receiver.type), values);
+            return allTerminal ? value.bind(Collections.<ByteBuffer>emptyList()) : value;
+        }
+
+        private void validateAssignableTo(ColumnSpecification receiver) throws InvalidRequestException
+        {
+            if (!(receiver.type instanceof UserType))
+                throw new InvalidRequestException(String.format("Invalid user type literal for %s of type %s", receiver, receiver.type.asCQL3Type()));
+
+            UserType ut = (UserType)receiver.type;
+            for (int i = 0; i < ut.types.size(); i++)
+            {
+                ColumnIdentifier field = new ColumnIdentifier(ut.columnNames.get(i), UTF8Type.instance);
+                Term.Raw value = entries.get(field);
+                if (value == null)
+                    throw new InvalidRequestException(String.format("Invalid user type literal for %s: missing field %s", receiver, field));
+
+                ColumnSpecification fieldSpec = fieldSpecOf(receiver, i);
+                if (!value.isAssignableTo(fieldSpec))
+                    throw new InvalidRequestException(String.format("Invalid user type literal for %s: field %s is not of type %s", receiver, field, fieldSpec.type.asCQL3Type()));
+            }
+        }
+
+        public boolean isAssignableTo(ColumnSpecification receiver)
+        {
+            try
+            {
+                validateAssignableTo(receiver);
+                return true;
+            }
+            catch (InvalidRequestException e)
+            {
+                return false;
+            }
+        }
+
+        @Override
+        public String toString()
+        {
+            StringBuilder sb = new StringBuilder();
+            sb.append("{");
+            Iterator<Map.Entry<ColumnIdentifier, Term.Raw>> iter = entries.entrySet().iterator();
+            while (iter.hasNext())
+            {
+                Map.Entry<ColumnIdentifier, Term.Raw> entry = iter.next();
+                sb.append(entry.getKey()).append(":").append(entry.getValue());
+                if (iter.hasNext())
+                    sb.append(", ");
+            }
+            sb.append("}");
+            return sb.toString();
+        }
+    }
+
+    // Same purpose than Lists.DelayedValue, except we do handle bind marker in that case
+    public static class DelayedValue extends Term.NonTerminal
+    {
+        private final UserType type;
+        private final List<Term> values;
+
+        public DelayedValue(UserType type, List<Term> values)
+        {
+            this.type = type;
+            this.values = values;
+        }
+
+        public boolean containsBindMarker()
+        {
+            for (Term t : values)
+                if (t.containsBindMarker())
+                    return true;
+            return false;
+        }
+
+        public void collectMarkerSpecification(VariableSpecifications boundNames)
+        {
+            for (int i = 0; i < type.types.size(); i++)
+                values.get(i).collectMarkerSpecification(boundNames);
+        }
+
+        private ByteBuffer[] bindInternal(List<ByteBuffer> variables) throws InvalidRequestException
+        {
+            ByteBuffer[] buffers = new ByteBuffer[values.size()];
+            for (int i = 0; i < type.types.size(); i++)
+            {
+                ByteBuffer buffer = values.get(i).bindAndGet(variables);
+                if (buffer == null)
+                    throw new InvalidRequestException("null is not supported inside user type literals");
+                if (buffer.remaining() > FBUtilities.MAX_UNSIGNED_SHORT)
+                    throw new InvalidRequestException(String.format("Value for field %s is too long. User type fields are limited to %d bytes but %d bytes provided",
+                                                                    UTF8Type.instance.getString(type.columnNames.get(i)),
+                                                                    FBUtilities.MAX_UNSIGNED_SHORT,
+                                                                    buffer.remaining()));
+
+                buffers[i] = buffer;
+            }
+            return buffers;
+        }
+
+        public Constants.Value bind(List<ByteBuffer> variables) throws InvalidRequestException
+        {
+            return new Constants.Value(bindAndGet(variables));
+        }
+
+        @Override
+        public ByteBuffer bindAndGet(List<ByteBuffer> variables) throws InvalidRequestException
+        {
+            return CompositeType.build(bindInternal(variables));
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a552b305/src/java/org/apache/cassandra/cql3/statements/AlterTypeStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/AlterTypeStatement.java b/src/java/org/apache/cassandra/cql3/statements/AlterTypeStatement.java
new file mode 100644
index 0000000..fbecefe
--- /dev/null
+++ b/src/java/org/apache/cassandra/cql3/statements/AlterTypeStatement.java
@@ -0,0 +1,352 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.cql3.statements;
+
+import java.nio.ByteBuffer;
+import java.util.*;
+
+import org.apache.cassandra.auth.Permission;
+import org.apache.cassandra.config.*;
+import org.apache.cassandra.cql3.*;
+import org.apache.cassandra.db.marshal.*;
+import org.apache.cassandra.exceptions.*;
+import org.apache.cassandra.service.ClientState;
+import org.apache.cassandra.service.MigrationManager;
+import org.apache.cassandra.transport.messages.ResultMessage;
+
+public abstract class AlterTypeStatement extends SchemaAlteringStatement
+{
+    protected final ColumnIdentifier name;
+
+    protected AlterTypeStatement(ColumnIdentifier name)
+    {
+        super();
+        this.name = name;
+    }
+
+    protected abstract UserType makeUpdatedType(UserType toUpdate) throws InvalidRequestException;
+
+    public static AlterTypeStatement addition(ColumnIdentifier name, ColumnIdentifier fieldName, CQL3Type type)
+    {
+        return new AddOrAlter(name, true, fieldName, type);
+    }
+
+    public static AlterTypeStatement alter(ColumnIdentifier name, ColumnIdentifier fieldName, CQL3Type type)
+    {
+        return new AddOrAlter(name, false, fieldName, type);
+    }
+
+    public static AlterTypeStatement renames(ColumnIdentifier name, Map<ColumnIdentifier, ColumnIdentifier> renames)
+    {
+        return new Renames(name, renames);
+    }
+
+    public static AlterTypeStatement typeRename(ColumnIdentifier name, ColumnIdentifier newName)
+    {
+        return new TypeRename(name, newName);
+    }
+
+    public void checkAccess(ClientState state) throws UnauthorizedException, InvalidRequestException
+    {
+        // We may want a slightly different permission?
+        state.hasAllKeyspacesAccess(Permission.ALTER);
+    }
+
+    public void validate(ClientState state) throws RequestValidationException
+    {
+        // Validation is left to announceMigration as it's easier to do it while constructing the updated type.
+        // It doesn't really change anything anyway.
+    }
+
+    public ResultMessage.SchemaChange.Change changeType()
+    {
+        return ResultMessage.SchemaChange.Change.UPDATED;
+    }
+
+    @Override
+    public String keyspace()
+    {
+        // Kind of ugly, but SchemaAlteringStatement uses that for notifying change, and an empty keyspace
+        // there kind of make sense
+        return "";
+    }
+
+    public void announceMigration() throws InvalidRequestException, ConfigurationException
+    {
+        UserType toUpdate = Schema.instance.userTypes.getType(name);
+        // Shouldn't happen, unless we race with a drop
+        if (toUpdate == null)
+            throw new InvalidRequestException(String.format("No user type named %s exists.", name));
+
+        UserType updated = makeUpdatedType(toUpdate);
+
+        // Now, we need to announce the type update to basically change it for new tables using this type,
+        // but we also need to find all existing user types and CF using it and change them.
+        MigrationManager.announceTypeUpdate(updated);
+
+        for (KSMetaData ksm : Schema.instance.getKeyspaceDefinitions())
+        {
+            for (CFMetaData cfm : ksm.cfMetaData().values())
+            {
+                CFMetaData copy = cfm.clone();
+                boolean modified = false;
+                for (ColumnDefinition def : copy.allColumns())
+                    modified |= updateDefinition(copy, def, toUpdate.name, updated);
+                if (modified)
+                    MigrationManager.announceColumnFamilyUpdate(copy, false);
+            }
+        }
+
+        // Other user types potentially using the updated type
+        for (UserType ut : Schema.instance.userTypes.getAllTypes().values())
+        {
+            // Re-updating the type we've just updated would be harmless but useless so we avoid it.
+            // Besides, we use the occasion to drop the old version of the type if it's a type rename
+            if (ut.name.equals(toUpdate.name))
+            {
+                if (!ut.name.equals(updated.name))
+                    MigrationManager.announceTypeDrop(ut);
+                continue;
+            }
+            AbstractType<?> upd = updateWith(ut, toUpdate.name, updated);
+            if (upd != null)
+                MigrationManager.announceTypeUpdate((UserType)upd);
+        }
+    }
+
+    private static int getIdxOfField(UserType type, ColumnIdentifier field)
+    {
+        for (int i = 0; i < type.types.size(); i++)
+            if (field.bytes.equals(type.columnNames.get(i)))
+                return i;
+        return -1;
+    }
+
+    private boolean updateDefinition(CFMetaData cfm, ColumnDefinition def, ByteBuffer toReplace, UserType updated)
+    {
+        AbstractType<?> t = updateWith(def.type, toReplace, updated);
+        if (t == null)
+            return false;
+
+        // We need to update this validator ...
+        cfm.addOrReplaceColumnDefinition(def.withNewType(t));
+
+        // ... but if it's part of the comparator or key validator, we need to go update those too.
+        switch (def.kind)
+        {
+            case PARTITION_KEY:
+                cfm.keyValidator(updateWith(cfm.getKeyValidator(), toReplace, updated));
+                break;
+            case CLUSTERING_COLUMN:
+                cfm.comparator = updateWith(cfm.comparator, toReplace, updated);
+                break;
+        }
+        return true;
+    }
+
+    // Update the provided type were all instance of a given userType is replaced by a new version
+    // Note that this methods reaches inside other UserType, CompositeType and CollectionType.
+    private static AbstractType<?> updateWith(AbstractType<?> type, ByteBuffer toReplace, UserType updated)
+    {
+        if (type instanceof UserType)
+        {
+            UserType ut = (UserType)type;
+
+            // If it's directly the type we've updated, then just use the new one.
+            if (toReplace.equals(ut.name))
+                return updated;
+
+            // Otherwise, check for nesting
+            List<AbstractType<?>> updatedTypes = updateTypes(ut.types, toReplace, updated);
+            return updatedTypes == null ? null : new UserType(ut.name, new ArrayList<>(ut.columnNames), updatedTypes);
+        }
+        else if (type instanceof CompositeType)
+        {
+            CompositeType ct = (CompositeType)type;
+            List<AbstractType<?>> updatedTypes = updateTypes(ct.types, toReplace, updated);
+            return updatedTypes == null ? null : CompositeType.getInstance(updatedTypes);
+        }
+        else if (type instanceof ColumnToCollectionType)
+        {
+            ColumnToCollectionType ctct = (ColumnToCollectionType)type;
+            Map<ByteBuffer, CollectionType> updatedTypes = null;
+            for (Map.Entry<ByteBuffer, CollectionType> entry : ctct.defined.entrySet())
+            {
+                AbstractType<?> t = updateWith(entry.getValue(), toReplace, updated);
+                if (t == null)
+                    continue;
+
+                if (updatedTypes == null)
+                    updatedTypes = new HashMap<>(ctct.defined);
+
+                updatedTypes.put(entry.getKey(), (CollectionType)t);
+            }
+            return updatedTypes == null ? null : ColumnToCollectionType.getInstance(updatedTypes);
+        }
+        else if (type instanceof CollectionType)
+        {
+            if (type instanceof ListType)
+            {
+                AbstractType<?> t = updateWith(((ListType)type).elements, toReplace, updated);
+                return t == null ? null : ListType.getInstance(t);
+            }
+            else if (type instanceof SetType)
+            {
+                AbstractType<?> t = updateWith(((SetType)type).elements, toReplace, updated);
+                return t == null ? null : SetType.getInstance(t);
+            }
+            else
+            {
+                assert type instanceof MapType;
+                MapType mt = (MapType)type;
+                AbstractType<?> k = updateWith(mt.keys, toReplace, updated);
+                AbstractType<?> v = updateWith(mt.values, toReplace, updated);
+                if (k == null && v == null)
+                    return null;
+                return MapType.getInstance(k == null ? mt.keys : k, v == null ? mt.values : v);
+            }
+        }
+        else
+        {
+            return null;
+        }
+    }
+
+    private static List<AbstractType<?>> updateTypes(List<AbstractType<?>> toUpdate, ByteBuffer toReplace, UserType updated)
+    {
+        // But this can also be nested.
+        List<AbstractType<?>> updatedTypes = null;
+        for (int i = 0; i < toUpdate.size(); i++)
+        {
+            AbstractType<?> t = updateWith(toUpdate.get(i), toReplace, updated);
+            if (t == null)
+                continue;
+
+            if (updatedTypes == null)
+                updatedTypes = new ArrayList<>(toUpdate);
+
+            updatedTypes.set(i, t);
+        }
+        return updatedTypes;
+    }
+
+    private static class AddOrAlter extends AlterTypeStatement
+    {
+        private final boolean isAdd;
+        private final ColumnIdentifier fieldName;
+        private final CQL3Type type;
+
+        public AddOrAlter(ColumnIdentifier name, boolean isAdd, ColumnIdentifier fieldName, CQL3Type type)
+        {
+            super(name);
+            this.isAdd = isAdd;
+            this.fieldName = fieldName;
+            this.type = type;
+        }
+
+        private UserType doAdd(UserType toUpdate) throws InvalidRequestException
+        {
+            if (getIdxOfField(toUpdate, fieldName) >= 0)
+                throw new InvalidRequestException(String.format("Cannot add new field %s to type %s: a field of the same name already exists", fieldName, name));
+
+            List<ByteBuffer> newNames = new ArrayList<>(toUpdate.columnNames.size() + 1);
+            newNames.addAll(toUpdate.columnNames);
+            newNames.add(fieldName.bytes);
+
+            List<AbstractType<?>> newTypes = new ArrayList<>(toUpdate.types.size() + 1);
+            newTypes.addAll(toUpdate.types);
+            newTypes.add(type.getType());
+
+            return new UserType(toUpdate.name, newNames, newTypes);
+        }
+
+        private UserType doAlter(UserType toUpdate) throws InvalidRequestException
+        {
+            int idx = getIdxOfField(toUpdate, fieldName);
+            if (idx < 0)
+                throw new InvalidRequestException(String.format("Unknown field %s in type %s", fieldName, name));
+
+            AbstractType<?> previous = toUpdate.types.get(idx);
+            if (!type.getType().isCompatibleWith(previous))
+                throw new InvalidRequestException(String.format("Type %s is incompatible with previous type %s of field %s in user type %s", type, previous.asCQL3Type(), fieldName, name));
+
+            List<ByteBuffer> newNames = new ArrayList<>(toUpdate.columnNames);
+            List<AbstractType<?>> newTypes = new ArrayList<>(toUpdate.types);
+            newTypes.set(idx, type.getType());
+
+            return new UserType(toUpdate.name, newNames, newTypes);
+        }
+
+        protected UserType makeUpdatedType(UserType toUpdate) throws InvalidRequestException
+        {
+            return isAdd ? doAdd(toUpdate) : doAlter(toUpdate);
+        }
+    }
+
+    private static class Renames extends AlterTypeStatement
+    {
+        private final Map<ColumnIdentifier, ColumnIdentifier> renames;
+
+        public Renames(ColumnIdentifier name, Map<ColumnIdentifier, ColumnIdentifier> renames)
+        {
+            super(name);
+            this.renames = renames;
+        }
+
+        protected UserType makeUpdatedType(UserType toUpdate) throws InvalidRequestException
+        {
+            List<ByteBuffer> newNames = new ArrayList<>(toUpdate.columnNames);
+            List<AbstractType<?>> newTypes = new ArrayList<>(toUpdate.types);
+
+            for (Map.Entry<ColumnIdentifier, ColumnIdentifier> entry : renames.entrySet())
+            {
+                ColumnIdentifier from = entry.getKey();
+                ColumnIdentifier to = entry.getValue();
+                int idx = getIdxOfField(toUpdate, from);
+                if (idx < 0)
+                    throw new InvalidRequestException(String.format("Unknown field %s in type %s", from, name));
+                newNames.set(idx, to.bytes);
+            }
+
+            UserType updated = new UserType(toUpdate.name, newNames, newTypes);
+            CreateTypeStatement.checkForDuplicateNames(updated);
+            return updated;
+        }
+
+    }
+
+    private static class TypeRename extends AlterTypeStatement
+    {
+        private final ColumnIdentifier newName;
+
+        public TypeRename(ColumnIdentifier name, ColumnIdentifier newName)
+        {
+            super(name);
+            this.newName = newName;
+        }
+
+        protected UserType makeUpdatedType(UserType toUpdate) throws InvalidRequestException
+        {
+            UserType previous = Schema.instance.userTypes.getType(newName.bytes);
+            if (previous != null)
+                throw new InvalidRequestException(String.format("Cannot rename user type %s to %s as another type of that name exists", name, newName));
+
+            return new UserType(newName.bytes, new ArrayList<>(toUpdate.columnNames), new ArrayList<>(toUpdate.types));
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a552b305/src/java/org/apache/cassandra/cql3/statements/CreateTypeStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/CreateTypeStatement.java b/src/java/org/apache/cassandra/cql3/statements/CreateTypeStatement.java
new file mode 100644
index 0000000..c355ceb
--- /dev/null
+++ b/src/java/org/apache/cassandra/cql3/statements/CreateTypeStatement.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.cql3.statements;
+
+import java.nio.ByteBuffer;
+import java.util.*;
+
+import org.apache.cassandra.auth.Permission;
+import org.apache.cassandra.config.*;
+import org.apache.cassandra.cql3.*;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.marshal.UTF8Type;
+import org.apache.cassandra.db.marshal.UserType;
+import org.apache.cassandra.exceptions.*;
+import org.apache.cassandra.service.ClientState;
+import org.apache.cassandra.service.MigrationManager;
+import org.apache.cassandra.transport.messages.ResultMessage;
+
+public class CreateTypeStatement extends SchemaAlteringStatement
+{
+    private final ColumnIdentifier name;
+    private final List<ColumnIdentifier> columnNames = new ArrayList<>();
+    private final List<CQL3Type> columnTypes = new ArrayList<>();
+    private final boolean ifNotExists;
+
+    public CreateTypeStatement(ColumnIdentifier name, boolean ifNotExists)
+    {
+        super();
+        this.name = name;
+        this.ifNotExists = ifNotExists;
+    }
+
+    public void addDefinition(ColumnIdentifier name, CQL3Type type)
+    {
+        columnNames.add(name);
+        columnTypes.add(type);
+    }
+
+    public void checkAccess(ClientState state) throws UnauthorizedException, InvalidRequestException
+    {
+        // We may want a slightly different permission?
+        state.hasAllKeyspacesAccess(Permission.CREATE);
+    }
+
+    public void validate(ClientState state) throws RequestValidationException
+    {
+        if (Schema.instance.userTypes.getType(name) != null && !ifNotExists)
+            throw new InvalidRequestException(String.format("A user type of name %s already exists.", name));
+    }
+
+    public static void checkForDuplicateNames(UserType type) throws InvalidRequestException
+    {
+        for (int i = 0; i < type.types.size() - 1; i++)
+        {
+            ByteBuffer fieldName = type.columnNames.get(i);
+            for (int j = i+1; j < type.types.size(); j++)
+            {
+                if (fieldName.equals(type.columnNames.get(j)))
+                    throw new InvalidRequestException(String.format("Duplicate field name %s in type %s",
+                                                                    UTF8Type.instance.getString(fieldName),
+                                                                    UTF8Type.instance.getString(type.name)));
+            }
+        }
+    }
+
+    public ResultMessage.SchemaChange.Change changeType()
+    {
+        return ResultMessage.SchemaChange.Change.CREATED;
+    }
+
+    @Override
+    public String keyspace()
+    {
+        // Kind of ugly, but SchemaAlteringStatement uses that for notifying change, and an empty keyspace
+        // there kind of make sense
+        return "";
+    }
+
+    private UserType createType()
+    {
+        List<ByteBuffer> names = new ArrayList<>(columnNames.size());
+        for (ColumnIdentifier name : columnNames)
+            names.add(name.bytes);
+
+        List<AbstractType<?>> types = new ArrayList<>(columnTypes.size());
+        for (CQL3Type type : columnTypes)
+            types.add(type.getType());
+
+        return new UserType(name.bytes, names, types);
+    }
+
+    public void announceMigration() throws InvalidRequestException, ConfigurationException
+    {
+        // Can happen with ifNotExists
+        if (Schema.instance.userTypes.getType(name) != null)
+            return;
+
+        UserType type = createType();
+        checkForDuplicateNames(type);
+        MigrationManager.announceNewType(type);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a552b305/src/java/org/apache/cassandra/cql3/statements/DropTypeStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/DropTypeStatement.java b/src/java/org/apache/cassandra/cql3/statements/DropTypeStatement.java
new file mode 100644
index 0000000..b20c681
--- /dev/null
+++ b/src/java/org/apache/cassandra/cql3/statements/DropTypeStatement.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.cql3.statements;
+
+import org.apache.cassandra.auth.Permission;
+import org.apache.cassandra.config.*;
+import org.apache.cassandra.cql3.*;
+import org.apache.cassandra.db.marshal.*;
+import org.apache.cassandra.exceptions.*;
+import org.apache.cassandra.service.ClientState;
+import org.apache.cassandra.service.MigrationManager;
+import org.apache.cassandra.transport.messages.ResultMessage;
+
+public class DropTypeStatement extends SchemaAlteringStatement
+{
+    private final ColumnIdentifier name;
+    private final boolean ifExists;
+
+    public DropTypeStatement(ColumnIdentifier name, boolean ifExists)
+    {
+        super();
+        this.name = name;
+        this.ifExists = ifExists;
+    }
+
+    public void checkAccess(ClientState state) throws UnauthorizedException, InvalidRequestException
+    {
+        // We may want a slightly different permission?
+        state.hasAllKeyspacesAccess(Permission.DROP);
+    }
+
+    public void validate(ClientState state) throws RequestValidationException
+    {
+        UserType old = Schema.instance.userTypes.getType(name);
+        if (old == null)
+        {
+            if (ifExists)
+                return;
+            else
+                throw new InvalidRequestException(String.format("No user type named %s exists.", name));
+        }
+
+        // We don't want to drop a type unless it's not used anymore (mainly because
+        // if someone drops a type and recreates one with the same name but different
+        // definition with the previous name still in use, things can get messy).
+        // We have two places to check: 1) other user type that can nest the one
+        // we drop and 2) existing tables referencing the type (maybe in a nested
+        // way).
+        for (UserType ut : Schema.instance.userTypes.getAllTypes().values())
+        {
+            if (ut.name.equals(name.bytes))
+                continue;
+            if (isUsedBy(ut))
+                throw new InvalidRequestException(String.format("Cannot drop user type %s as it is still used by user type %s", name, ut.asCQL3Type()));
+        }
+
+        for (KSMetaData ksm : Schema.instance.getKeyspaceDefinitions())
+            for (CFMetaData cfm : ksm.cfMetaData().values())
+                for (ColumnDefinition def : cfm.allColumns())
+                    if (isUsedBy(def.type))
+                        throw new InvalidRequestException(String.format("Cannot drop user type %s as it is still used by table %s.%s", name, cfm.ksName, cfm.cfName));
+    }
+
+    private boolean isUsedBy(AbstractType<?> toCheck) throws RequestValidationException
+    {
+        if (toCheck instanceof CompositeType)
+        {
+            CompositeType ct = (CompositeType)toCheck;
+
+            if ((ct instanceof UserType) && name.bytes.equals(((UserType)ct).name))
+                return true;
+
+            // Also reach into subtypes
+            for (AbstractType<?> subtype : ct.types)
+                if (isUsedBy(subtype))
+                    return true;
+        }
+        else if (toCheck instanceof ColumnToCollectionType)
+        {
+            for (CollectionType collection : ((ColumnToCollectionType)toCheck).defined.values())
+                if (isUsedBy(collection))
+                    return true;
+        }
+        else if (toCheck instanceof CollectionType)
+        {
+            if (toCheck instanceof ListType)
+                return isUsedBy(((ListType)toCheck).elements);
+            else if (toCheck instanceof SetType)
+                return isUsedBy(((SetType)toCheck).elements);
+            else
+                return isUsedBy(((MapType)toCheck).keys) || isUsedBy(((MapType)toCheck).keys);
+        }
+        return false;
+    }
+
+    public ResultMessage.SchemaChange.Change changeType()
+    {
+        return ResultMessage.SchemaChange.Change.DROPPED;
+    }
+
+    @Override
+    public String keyspace()
+    {
+        // Kind of ugly, but SchemaAlteringStatement uses that for notifying change, and an empty keyspace
+        // there kind of make sense
+        return "";
+    }
+
+    public void announceMigration() throws InvalidRequestException, ConfigurationException
+    {
+        UserType toDrop = Schema.instance.userTypes.getType(name);
+        // Can be null with ifExists
+        if (toDrop != null)
+            MigrationManager.announceTypeDrop(toDrop);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a552b305/src/java/org/apache/cassandra/cql3/statements/Selectable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/Selectable.java b/src/java/org/apache/cassandra/cql3/statements/Selectable.java
index 9f25542..448301c 100644
--- a/src/java/org/apache/cassandra/cql3/statements/Selectable.java
+++ b/src/java/org/apache/cassandra/cql3/statements/Selectable.java
@@ -66,4 +66,22 @@ public interface Selectable
             return sb.append(")").toString();
         }
     }
+
+    public static class WithFieldSelection implements Selectable
+    {
+        public final Selectable selected;
+        public final ColumnIdentifier field;
+
+        public WithFieldSelection(Selectable selected, ColumnIdentifier field)
+        {
+            this.selected = selected;
+            this.field = field;
+        }
+
+        @Override
+        public String toString()
+        {
+            return String.format("%s.%s", selected, field);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a552b305/src/java/org/apache/cassandra/cql3/statements/Selection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/Selection.java b/src/java/org/apache/cassandra/cql3/statements/Selection.java
index 7f13d29..a578f3f 100644
--- a/src/java/org/apache/cassandra/cql3/statements/Selection.java
+++ b/src/java/org/apache/cassandra/cql3/statements/Selection.java
@@ -35,6 +35,8 @@ import org.apache.cassandra.db.context.CounterContext;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.db.marshal.Int32Type;
 import org.apache.cassandra.db.marshal.LongType;
+import org.apache.cassandra.db.marshal.UserType;
+import org.apache.cassandra.db.marshal.UTF8Type;
 import org.apache.cassandra.exceptions.InvalidRequestException;
 import org.apache.cassandra.utils.ByteBufferUtil;
 
@@ -117,6 +119,26 @@ public abstract class Selection
                 metadata.add(makeWritetimeOrTTLSpec(cfm, tot, raw.alias));
             return new WritetimeOrTTLSelector(def.name.toString(), addAndGetIndex(def, defs), tot.isWritetime);
         }
+        else if (raw.selectable instanceof Selectable.WithFieldSelection)
+        {
+            Selectable.WithFieldSelection withField = (Selectable.WithFieldSelection)raw.selectable;
+            Selector selected = makeSelector(cfm, new RawSelector(withField.selected, null), defs, null);
+            AbstractType<?> type = selected.getType();
+            if (!(type instanceof UserType))
+                throw new InvalidRequestException(String.format("Invalid field selection: %s of type %s is not a user type", withField.selected, type.asCQL3Type()));
+
+            UserType ut = (UserType)type;
+            for (int i = 0; i < ut.types.size(); i++)
+            {
+                if (!ut.columnNames.get(i).equals(withField.field.bytes))
+                    continue;
+
+                if (metadata != null)
+                    metadata.add(makeFieldSelectSpec(cfm, withField, ut.types.get(i), raw.alias));
+                return new FieldSelector(ut, i, selected);
+            }
+            throw new InvalidRequestException(String.format("%s of type %s has no field %s", withField.selected, type.asCQL3Type(), withField.field));
+        }
         else
         {
             Selectable.WithFunction withFun = (Selectable.WithFunction)raw.selectable;
@@ -143,6 +165,14 @@ public abstract class Selection
                                        tot.isWritetime ? LongType.instance : Int32Type.instance);
     }
 
+    private static ColumnSpecification makeFieldSelectSpec(CFMetaData cfm, Selectable.WithFieldSelection s, AbstractType<?> type, ColumnIdentifier alias)
+    {
+        return new ColumnSpecification(cfm.ksName,
+                                       cfm.cfName,
+                                       alias == null ? new ColumnIdentifier(s.toString(), true) : alias,
+                                       type);
+    }
+
     private static ColumnSpecification makeFunctionSpec(CFMetaData cfm,
                                                         Selectable.WithFunction fun,
                                                         AbstractType<?> returnType,
@@ -331,12 +361,18 @@ public abstract class Selection
         }
     }
 
-    private interface Selector extends AssignementTestable
+    private static abstract class Selector implements AssignementTestable
     {
-        public ByteBuffer compute(ResultSetBuilder rs) throws InvalidRequestException;
+        public abstract ByteBuffer compute(ResultSetBuilder rs) throws InvalidRequestException;
+        public abstract AbstractType<?> getType();
+
+        public boolean isAssignableTo(ColumnSpecification receiver)
+        {
+            return getType().asCQL3Type().equals(receiver.type.asCQL3Type());
+        }
     }
 
-    private static class SimpleSelector implements Selector
+    private static class SimpleSelector extends Selector
     {
         private final String columnName;
         private final int idx;
@@ -354,9 +390,9 @@ public abstract class Selection
             return rs.current.get(idx);
         }
 
-        public boolean isAssignableTo(ColumnSpecification receiver)
+        public AbstractType<?> getType()
         {
-            return type.asCQL3Type().equals(receiver.type.asCQL3Type());
+            return type;
         }
 
         @Override
@@ -366,7 +402,7 @@ public abstract class Selection
         }
     }
 
-    private static class FunctionSelector implements Selector
+    private static class FunctionSelector extends Selector
     {
         private final Function fun;
         private final List<Selector> argSelectors;
@@ -386,9 +422,9 @@ public abstract class Selection
             return fun.execute(args);
         }
 
-        public boolean isAssignableTo(ColumnSpecification receiver)
+        public AbstractType<?> getType()
         {
-            return fun.returnType().asCQL3Type().equals(receiver.type.asCQL3Type());
+            return fun.returnType();
         }
 
         @Override
@@ -406,7 +442,38 @@ public abstract class Selection
         }
     }
 
-    private static class WritetimeOrTTLSelector implements Selector
+    private static class FieldSelector extends Selector
+    {
+        private final UserType type;
+        private final int field;
+        private final Selector selected;
+
+        public FieldSelector(UserType type, int field, Selector selected)
+        {
+            this.type = type;
+            this.field = field;
+            this.selected = selected;
+        }
+
+        public ByteBuffer compute(ResultSetBuilder rs) throws InvalidRequestException
+        {
+            ByteBuffer[] buffers = type.split(selected.compute(rs));
+            return field < buffers.length ? buffers[field] : null;
+        }
+
+        public AbstractType<?> getType()
+        {
+            return type.types.get(field);
+        }
+
+        @Override
+        public String toString()
+        {
+            return String.format("%s.%s", selected, UTF8Type.instance.getString(type.columnNames.get(field)));
+        }
+    }
+
+    private static class WritetimeOrTTLSelector extends Selector
     {
         private final String columnName;
         private final int idx;
@@ -431,9 +498,9 @@ public abstract class Selection
             return ttl > 0 ? ByteBufferUtil.bytes(ttl) : null;
         }
 
-        public boolean isAssignableTo(ColumnSpecification receiver)
+        public AbstractType<?> getType()
         {
-            return receiver.type.asCQL3Type().equals(isWritetime ? CQL3Type.Native.BIGINT : CQL3Type.Native.INT);
+            return isWritetime ? LongType.instance : Int32Type.instance;
         }
 
         @Override

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a552b305/src/java/org/apache/cassandra/db/DefsTables.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DefsTables.java b/src/java/org/apache/cassandra/db/DefsTables.java
index 98905b1..3cd5156 100644
--- a/src/java/org/apache/cassandra/db/DefsTables.java
+++ b/src/java/org/apache/cassandra/db/DefsTables.java
@@ -31,9 +31,11 @@ import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.config.KSMetaData;
 import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.config.UTMetaData;
 import org.apache.cassandra.db.compaction.CompactionManager;
 import org.apache.cassandra.db.filter.QueryFilter;
 import org.apache.cassandra.db.marshal.AsciiType;
+import org.apache.cassandra.db.marshal.UserType;
 import org.apache.cassandra.db.marshal.UTF8Type;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.service.MigrationManager;
@@ -168,6 +170,7 @@ public class DefsTables
         // current state of the schema
         Map<DecoratedKey, ColumnFamily> oldKeyspaces = SystemKeyspace.getSchema(SystemKeyspace.SCHEMA_KEYSPACES_CF);
         Map<DecoratedKey, ColumnFamily> oldColumnFamilies = SystemKeyspace.getSchema(SystemKeyspace.SCHEMA_COLUMNFAMILIES_CF);
+        List<Row> oldTypes = SystemKeyspace.serializedSchema(SystemKeyspace.SCHEMA_USER_TYPES_CF);
 
         for (RowMutation mutation : mutations)
             mutation.apply();
@@ -178,6 +181,7 @@ public class DefsTables
         // with new data applied
         Map<DecoratedKey, ColumnFamily> newKeyspaces = SystemKeyspace.getSchema(SystemKeyspace.SCHEMA_KEYSPACES_CF);
         Map<DecoratedKey, ColumnFamily> newColumnFamilies = SystemKeyspace.getSchema(SystemKeyspace.SCHEMA_COLUMNFAMILIES_CF);
+        List<Row> newTypes = SystemKeyspace.serializedSchema(SystemKeyspace.SCHEMA_USER_TYPES_CF);
 
         Set<String> keyspacesToDrop = mergeKeyspaces(oldKeyspaces, newKeyspaces);
         mergeColumnFamilies(oldColumnFamilies, newColumnFamilies);
@@ -186,6 +190,8 @@ public class DefsTables
         for (String keyspaceToDrop : keyspacesToDrop)
             dropKeyspace(keyspaceToDrop);
 
+        mergeTypes(oldTypes, newTypes);
+
         Schema.instance.updateVersionAndAnnounce();
     }
 
@@ -320,6 +326,52 @@ public class DefsTables
         }
     }
 
+    private static void mergeTypes(List<Row> old, List<Row> updated)
+    {
+        MapDifference<ByteBuffer, UserType> diff = Maps.difference(UTMetaData.fromSchema(old).getAllTypes(),
+                                                                   UTMetaData.fromSchema(updated).getAllTypes());
+
+        // New types
+        for (UserType newType : diff.entriesOnlyOnRight().values())
+            Schema.instance.loadType(newType);
+
+        // Dropped types
+        for (UserType droppedType : diff.entriesOnlyOnLeft().values())
+            Schema.instance.dropType(droppedType);
+
+        // Now deal with modified types: if one is 'extended' compared to the other, we always prefer that
+        // one. But otherwise (if it's a field rename for instance) we just pick the more recent one
+        // (timestamp wise).
+        for (MapDifference.ValueDifference<UserType> modified : diff.entriesDiffering().values())
+        {
+            UserType u1 = modified.leftValue();
+            UserType u2 = modified.rightValue();
+            // Note that that loadType is a 'load or update'
+            if (u1.isCompatibleWith(u2) && !u2.isCompatibleWith(u1))
+            {
+                Schema.instance.loadType(u1);
+            }
+            else if (u2.isCompatibleWith(u1) && !u1.isCompatibleWith(u2))
+            {
+                Schema.instance.loadType(u2);
+            }
+            else
+            {
+                long leftTimestamp = firstTimestampOf(u1.name, old);
+                long rightTimestamp = firstTimestampOf(u1.name, updated);
+                Schema.instance.loadType(leftTimestamp > rightTimestamp ? u1 : u2);
+            }
+        }
+    }
+
+    private static long firstTimestampOf(ByteBuffer key, List<Row> rows)
+    {
+        for (Row row : rows)
+            if (row.key.key.equals(key))
+                return row.cf.iterator().next().timestamp();
+        return -1;
+    }
+
     private static void addKeyspace(KSMetaData ksm)
     {
         assert Schema.instance.getKSMetaData(ksm.name) == null;
@@ -450,10 +502,8 @@ public class DefsTables
 
     private static void flushSchemaCFs()
     {
-        SystemKeyspace.forceBlockingFlush(SystemKeyspace.SCHEMA_KEYSPACES_CF);
-        SystemKeyspace.forceBlockingFlush(SystemKeyspace.SCHEMA_COLUMNFAMILIES_CF);
-        SystemKeyspace.forceBlockingFlush(SystemKeyspace.SCHEMA_COLUMNS_CF);
-        SystemKeyspace.forceBlockingFlush(SystemKeyspace.SCHEMA_TRIGGERS_CF);
+        for (String cf : SystemKeyspace.allSchemaCfs)
+            SystemKeyspace.forceBlockingFlush(cf);
     }
 }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a552b305/src/java/org/apache/cassandra/db/SystemKeyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SystemKeyspace.java b/src/java/org/apache/cassandra/db/SystemKeyspace.java
index e69ac51..c79da11 100644
--- a/src/java/org/apache/cassandra/db/SystemKeyspace.java
+++ b/src/java/org/apache/cassandra/db/SystemKeyspace.java
@@ -79,6 +79,7 @@ public class SystemKeyspace
     public static final String SCHEMA_COLUMNFAMILIES_CF = "schema_columnfamilies";
     public static final String SCHEMA_COLUMNS_CF = "schema_columns";
     public static final String SCHEMA_TRIGGERS_CF = "schema_triggers";
+    public static final String SCHEMA_USER_TYPES_CF = "schema_usertypes";
     public static final String COMPACTION_LOG = "compactions_in_progress";
     public static final String PAXOS_CF = "paxos";
     public static final String SSTABLE_ACTIVITY_CF = "sstable_activity";
@@ -87,6 +88,12 @@ public class SystemKeyspace
     private static final String LOCAL_KEY = "local";
     private static final ByteBuffer ALL_LOCAL_NODE_ID_KEY = ByteBufferUtil.bytes("Local");
 
+    public static final List<String> allSchemaCfs = Arrays.asList(SCHEMA_KEYSPACES_CF,
+                                                                  SCHEMA_COLUMNFAMILIES_CF,
+                                                                  SCHEMA_COLUMNS_CF,
+                                                                  SCHEMA_TRIGGERS_CF,
+                                                                  SCHEMA_USER_TYPES_CF);
+
     public enum BootstrapState
     {
         NEEDS_BOOTSTRAP,
@@ -710,10 +717,8 @@ public class SystemKeyspace
     {
         List<Row> schema = new ArrayList<>();
 
-        schema.addAll(serializedSchema(SCHEMA_KEYSPACES_CF));
-        schema.addAll(serializedSchema(SCHEMA_COLUMNFAMILIES_CF));
-        schema.addAll(serializedSchema(SCHEMA_COLUMNS_CF));
-        schema.addAll(serializedSchema(SCHEMA_TRIGGERS_CF));
+        for (String cf : allSchemaCfs)
+            schema.addAll(serializedSchema(cf));
 
         return schema;
     }
@@ -737,10 +742,8 @@ public class SystemKeyspace
     {
         Map<DecoratedKey, RowMutation> mutationMap = new HashMap<>();
 
-        serializeSchema(mutationMap, SCHEMA_KEYSPACES_CF);
-        serializeSchema(mutationMap, SCHEMA_COLUMNFAMILIES_CF);
-        serializeSchema(mutationMap, SCHEMA_COLUMNS_CF);
-        serializeSchema(mutationMap, SCHEMA_TRIGGERS_CF);
+        for (String cf : allSchemaCfs)
+            serializeSchema(mutationMap, cf);
 
         return mutationMap.values();
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a552b305/src/java/org/apache/cassandra/db/marshal/CompositeType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/marshal/CompositeType.java b/src/java/org/apache/cassandra/db/marshal/CompositeType.java
index 244ae08..ac303c2 100644
--- a/src/java/org/apache/cassandra/db/marshal/CompositeType.java
+++ b/src/java/org/apache/cassandra/db/marshal/CompositeType.java
@@ -88,7 +88,7 @@ public class CompositeType extends AbstractCompositeType
         return ct;
     }
 
-    private CompositeType(List<AbstractType<?>> types)
+    protected CompositeType(List<AbstractType<?>> types)
     {
         this.types = ImmutableList.copyOf(types);
     }
@@ -133,6 +133,23 @@ public class CompositeType extends AbstractCompositeType
         return build(serialized);
     }
 
+    // Overriding the one of AbstractCompositeType because we can do a tad better
+    @Override
+    public ByteBuffer[] split(ByteBuffer name)
+    {
+        // Assume all components, we'll trunk the array afterwards if need be, but
+        // most names will be complete.
+        ByteBuffer[] l = new ByteBuffer[types.size()];
+        ByteBuffer bb = name.duplicate();
+        int i = 0;
+        while (bb.remaining() > 0)
+        {
+            l[i++] = getWithShortLength(bb);
+            bb.get(); // skip end-of-component
+        }
+        return i == l.length ? l : Arrays.copyOfRange(l, 0, i);
+    }
+
     // Extract component idx from bb. Return null if there is not enough component.
     public static ByteBuffer extractComponent(ByteBuffer bb, int idx)
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a552b305/src/java/org/apache/cassandra/db/marshal/TypeParser.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/marshal/TypeParser.java b/src/java/org/apache/cassandra/db/marshal/TypeParser.java
index 1330a40..8ba0c0e 100644
--- a/src/java/org/apache/cassandra/db/marshal/TypeParser.java
+++ b/src/java/org/apache/cassandra/db/marshal/TypeParser.java
@@ -32,6 +32,7 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.cassandra.exceptions.*;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.Pair;
 
 /**
  * Parse a string containing an Type definition.
@@ -258,16 +259,7 @@ public class TypeParser
                 return map;
             }
 
-            String bbHex = readNextIdentifier();
-            ByteBuffer bb = null;
-            try
-            {
-                 bb = ByteBufferUtil.hexToBytes(bbHex);
-            }
-            catch (NumberFormatException e)
-            {
-                throwSyntaxError(e.getMessage());
-            }
+            ByteBuffer bb = fromHex(readNextIdentifier());
 
             skipBlank();
             if (str.charAt(idx) != ':')
@@ -292,6 +284,60 @@ public class TypeParser
         throw new SyntaxException(String.format("Syntax error parsing '%s' at char %d: unexpected end of string", str, idx));
     }
 
+    private ByteBuffer fromHex(String hex) throws SyntaxException
+    {
+        try
+        {
+            return ByteBufferUtil.hexToBytes(hex);
+        }
+        catch (NumberFormatException e)
+        {
+            throwSyntaxError(e.getMessage());
+            return null;
+        }
+    }
+
+    public Pair<ByteBuffer, List<Pair<ByteBuffer, AbstractType>>> getUserTypeParameters() throws SyntaxException, ConfigurationException
+    {
+
+        if (isEOS() || str.charAt(idx) != '(')
+            throw new IllegalStateException();
+
+        ++idx; // skipping '('
+
+        skipBlankAndComma();
+        ByteBuffer typeName = fromHex(readNextIdentifier());
+        List<Pair<ByteBuffer, AbstractType>> defs = new ArrayList<>();
+
+        while (skipBlankAndComma())
+        {
+            if (str.charAt(idx) == ')')
+            {
+                ++idx;
+                return Pair.create(typeName, defs);
+            }
+
+            ByteBuffer name = fromHex(readNextIdentifier());
+            skipBlank();
+            if (str.charAt(idx) != ':')
+                throwSyntaxError("expecting ':' token");
+            ++idx;
+            skipBlank();
+            try
+            {
+                AbstractType type = parse();
+                defs.add(Pair.create(name, type));
+            }
+            catch (SyntaxException e)
+            {
+                SyntaxException ex = new SyntaxException(String.format("Exception while parsing '%s' around char %d", str, idx));
+                ex.initCause(e);
+                throw ex;
+            }
+        }
+        throw new SyntaxException(String.format("Syntax error parsing '%s' at char %d: unexpected end of string", str, idx));
+    }
+
     private static AbstractType<?> getAbstractType(String compareWith) throws ConfigurationException
     {
         String className = compareWith.contains(".") ? compareWith : "org.apache.cassandra.db.marshal." + compareWith;
@@ -514,4 +560,19 @@ public class TypeParser
         sb.append(')');
         return sb.toString();
     }
+
+    public static String stringifyUserTypeParameters(ByteBuffer typeName, List<ByteBuffer> columnNames, List<AbstractType<?>> columnTypes)
+    {
+        StringBuilder sb = new StringBuilder();
+        sb.append('(').append(ByteBufferUtil.bytesToHex(typeName));
+
+        for (int i = 0; i < columnNames.size(); i++)
+        {
+            sb.append(',');
+            sb.append(ByteBufferUtil.bytesToHex(columnNames.get(i))).append(":");
+            sb.append(columnTypes.get(i).toString());
+        }
+        sb.append(')');
+        return sb.toString();
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a552b305/src/java/org/apache/cassandra/db/marshal/UserType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/marshal/UserType.java b/src/java/org/apache/cassandra/db/marshal/UserType.java
new file mode 100644
index 0000000..7b84301
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/marshal/UserType.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.marshal;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+import com.google.common.base.Objects;
+
+import org.apache.cassandra.cql3.CQL3Type;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.exceptions.SyntaxException;
+import org.apache.cassandra.utils.Pair;
+
+/**
+ * A user defined type.
+ *
+ * The serialized format and sorting is exactly the one of CompositeType, but
+ * we keep additional metadata (the name of the type and the names
+ * of the columns).
+ */
+public class UserType extends CompositeType
+{
+    public final ByteBuffer name;
+    public final List<ByteBuffer> columnNames;
+
+    public UserType(ByteBuffer name, List<ByteBuffer> columnNames, List<AbstractType<?>> types)
+    {
+        super(types);
+        this.name = name;
+        this.columnNames = columnNames;
+    }
+
+    public static UserType getInstance(TypeParser parser) throws ConfigurationException, SyntaxException
+    {
+        Pair<ByteBuffer, List<Pair<ByteBuffer, AbstractType>>> params = parser.getUserTypeParameters();
+        ByteBuffer name = params.left;
+        List<ByteBuffer> columnNames = new ArrayList<>(params.right.size());
+        List<AbstractType<?>> columnTypes = new ArrayList<>(params.right.size());
+        for (Pair<ByteBuffer, AbstractType> p : params.right)
+        {
+            columnNames.add(p.left);
+            columnTypes.add(p.right);
+        }
+        return new UserType(name, columnNames, columnTypes);
+    }
+
+    @Override
+    public final int hashCode()
+    {
+        return Objects.hashCode(name, columnNames, types);
+    }
+
+    @Override
+    public final boolean equals(Object o)
+    {
+        if(!(o instanceof UserType))
+            return false;
+
+        UserType that = (UserType)o;
+        return name.equals(that.name) && columnNames.equals(that.columnNames) && types.equals(that.types);
+    }
+
+    @Override
+    public CQL3Type asCQL3Type()
+    {
+        return CQL3Type.UserDefined.create(name, this);
+    }
+
+    @Override
+    public String toString()
+    {
+        return getClass().getName() + TypeParser.stringifyUserTypeParameters(name, columnNames, types);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a552b305/src/java/org/apache/cassandra/service/MigrationManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/MigrationManager.java b/src/java/org/apache/cassandra/service/MigrationManager.java
index 012cdf2..e4740ae 100644
--- a/src/java/org/apache/cassandra/service/MigrationManager.java
+++ b/src/java/org/apache/cassandra/service/MigrationManager.java
@@ -37,8 +37,10 @@ import org.apache.cassandra.concurrent.Stage;
 import org.apache.cassandra.concurrent.StageManager;
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.KSMetaData;
+import org.apache.cassandra.config.UTMetaData;
 import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.marshal.UserType;
 import org.apache.cassandra.exceptions.AlreadyExistsException;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.gms.*;
@@ -231,6 +233,11 @@ public class MigrationManager implements IEndpointStateChangeSubscriber
         announce(cfm.toSchema(FBUtilities.timestampMicros()));
     }
 
+    public static void announceNewType(UserType newType)
+    {
+        announce(UTMetaData.toSchema(newType, FBUtilities.timestampMicros()));
+    }
+
     public static void announceKeyspaceUpdate(KSMetaData ksm) throws ConfigurationException
     {
         ksm.validate();
@@ -257,6 +264,12 @@ public class MigrationManager implements IEndpointStateChangeSubscriber
         announce(oldCfm.toSchemaUpdate(cfm, FBUtilities.timestampMicros(), fromThrift));
     }
 
+    public static void announceTypeUpdate(UserType updatedType)
+    {
+        // We don't make a difference with a new type. DefsTable.mergeType will make sure we keep the updated version.
+        announceNewType(updatedType);
+    }
+
     public static void announceKeyspaceDrop(String ksName) throws ConfigurationException
     {
         KSMetaData oldKsm = Schema.instance.getKSMetaData(ksName);
@@ -277,6 +290,11 @@ public class MigrationManager implements IEndpointStateChangeSubscriber
         announce(oldCfm.dropFromSchema(FBUtilities.timestampMicros()));
     }
 
+    public static void announceTypeDrop(UserType droppedType)
+    {
+        announce(UTMetaData.dropFromSchema(droppedType, FBUtilities.timestampMicros()));
+    }
+
     /**
      * actively announce a new version to active hosts via rpc
      * @param schema The schema mutation to be applied
@@ -344,10 +362,8 @@ public class MigrationManager implements IEndpointStateChangeSubscriber
         logger.debug("Truncating schema tables...");
 
         // truncate schema tables
-        SystemKeyspace.schemaCFS(SystemKeyspace.SCHEMA_KEYSPACES_CF).truncateBlocking();
-        SystemKeyspace.schemaCFS(SystemKeyspace.SCHEMA_COLUMNFAMILIES_CF).truncateBlocking();
-        SystemKeyspace.schemaCFS(SystemKeyspace.SCHEMA_COLUMNS_CF).truncateBlocking();
-        SystemKeyspace.schemaCFS(SystemKeyspace.SCHEMA_TRIGGERS_CF).truncateBlocking();
+        for (String cf : SystemKeyspace.allSchemaCfs)
+            SystemKeyspace.schemaCFS(cf).truncateBlocking();
 
         logger.debug("Clearing local schema keyspace definitions...");