You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2014/01/07 17:47:17 UTC

[1/2] Make user types keyspace scoped

Updated Branches:
  refs/heads/trunk 0a1b277d6 -> d63d07b92


http://git-wip-us.apache.org/repos/asf/cassandra/blob/d63d07b9/src/java/org/apache/cassandra/cql3/statements/AlterTypeStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/AlterTypeStatement.java b/src/java/org/apache/cassandra/cql3/statements/AlterTypeStatement.java
index ea9f0c7..dd88aca 100644
--- a/src/java/org/apache/cassandra/cql3/statements/AlterTypeStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/AlterTypeStatement.java
@@ -32,40 +32,49 @@ import org.apache.cassandra.transport.messages.ResultMessage;
 
 public abstract class AlterTypeStatement extends SchemaAlteringStatement
 {
-    protected final ColumnIdentifier name;
+    protected final UTName name;
 
-    protected AlterTypeStatement(ColumnIdentifier name)
+    protected AlterTypeStatement(UTName name)
     {
         super();
         this.name = name;
     }
 
+    @Override
+    public void prepareKeyspace(ClientState state) throws InvalidRequestException
+    {
+        if (!name.hasKeyspace())
+            name.setKeyspace(state.getKeyspace());
+
+        if (name.getKeyspace() == null)
+            throw new InvalidRequestException("You need to be logged in a keyspace or use a fully qualified user type name");
+    }
+
     protected abstract UserType makeUpdatedType(UserType toUpdate) throws InvalidRequestException;
 
-    public static AlterTypeStatement addition(ColumnIdentifier name, ColumnIdentifier fieldName, CQL3Type type)
+    public static AlterTypeStatement addition(UTName name, ColumnIdentifier fieldName, CQL3Type.Raw type)
     {
         return new AddOrAlter(name, true, fieldName, type);
     }
 
-    public static AlterTypeStatement alter(ColumnIdentifier name, ColumnIdentifier fieldName, CQL3Type type)
+    public static AlterTypeStatement alter(UTName name, ColumnIdentifier fieldName, CQL3Type.Raw type)
     {
         return new AddOrAlter(name, false, fieldName, type);
     }
 
-    public static AlterTypeStatement renames(ColumnIdentifier name, Map<ColumnIdentifier, ColumnIdentifier> renames)
+    public static AlterTypeStatement renames(UTName name, Map<ColumnIdentifier, ColumnIdentifier> renames)
     {
         return new Renames(name, renames);
     }
 
-    public static AlterTypeStatement typeRename(ColumnIdentifier name, ColumnIdentifier newName)
+    public static AlterTypeStatement typeRename(UTName name, UTName newName)
     {
         return new TypeRename(name, newName);
     }
 
     public void checkAccess(ClientState state) throws UnauthorizedException, InvalidRequestException
     {
-        // We may want a slightly different permission?
-        state.hasAllKeyspacesAccess(Permission.ALTER);
+        state.hasKeyspaceAccess(keyspace(), Permission.ALTER);
     }
 
     public void validate(ClientState state) throws RequestValidationException
@@ -82,14 +91,16 @@ public abstract class AlterTypeStatement extends SchemaAlteringStatement
     @Override
     public String keyspace()
     {
-        // Kind of ugly, but SchemaAlteringStatement uses that for notifying change, and an empty keyspace
-        // there kind of make sense
-        return "";
+        return name.getKeyspace();
     }
 
     public void announceMigration() throws InvalidRequestException, ConfigurationException
     {
-        UserType toUpdate = Schema.instance.userTypes.getType(name);
+        KSMetaData ksm = Schema.instance.getKSMetaData(name.getKeyspace());
+        if (ksm == null)
+            throw new InvalidRequestException(String.format("Cannot alter type in unknown keyspace %s", name.getKeyspace()));
+
+        UserType toUpdate = ksm.userTypes.getType(name.getUserTypeName());
         // Shouldn't happen, unless we race with a drop
         if (toUpdate == null)
             throw new InvalidRequestException(String.format("No user type named %s exists.", name));
@@ -100,33 +111,33 @@ public abstract class AlterTypeStatement extends SchemaAlteringStatement
         // but we also need to find all existing user types and CF using it and change them.
         MigrationManager.announceTypeUpdate(updated);
 
-        for (KSMetaData ksm : Schema.instance.getKeyspaceDefinitions())
+        for (KSMetaData ksm2 : Schema.instance.getKeyspaceDefinitions())
         {
-            for (CFMetaData cfm : ksm.cfMetaData().values())
+            for (CFMetaData cfm : ksm2.cfMetaData().values())
             {
                 CFMetaData copy = cfm.clone();
                 boolean modified = false;
                 for (ColumnDefinition def : copy.allColumns())
-                    modified |= updateDefinition(copy, def, toUpdate.name, updated);
+                    modified |= updateDefinition(copy, def, toUpdate.keyspace, toUpdate.name, updated);
                 if (modified)
                     MigrationManager.announceColumnFamilyUpdate(copy, false);
             }
-        }
 
-        // Other user types potentially using the updated type
-        for (UserType ut : Schema.instance.userTypes.getAllTypes().values())
-        {
-            // Re-updating the type we've just updated would be harmless but useless so we avoid it.
-            // Besides, we use the occasion to drop the old version of the type if it's a type rename
-            if (ut.name.equals(toUpdate.name))
+            // Other user types potentially using the updated type
+            for (UserType ut : ksm2.userTypes.getAllTypes().values())
             {
-                if (!ut.name.equals(updated.name))
-                    MigrationManager.announceTypeDrop(ut);
-                continue;
+                // Re-updating the type we've just updated would be harmless but useless so we avoid it.
+                // Besides, we use the occasion to drop the old version of the type if it's a type rename
+                if (ut.keyspace.equals(toUpdate.keyspace) && ut.name.equals(toUpdate.name))
+                {
+                    if (!ut.keyspace.equals(updated.keyspace) || !ut.name.equals(updated.name))
+                        MigrationManager.announceTypeDrop(ut);
+                    continue;
+                }
+                AbstractType<?> upd = updateWith(ut, toUpdate.keyspace, toUpdate.name, updated);
+                if (upd != null)
+                    MigrationManager.announceTypeUpdate((UserType)upd);
             }
-            AbstractType<?> upd = updateWith(ut, toUpdate.name, updated);
-            if (upd != null)
-                MigrationManager.announceTypeUpdate((UserType)upd);
         }
     }
 
@@ -138,9 +149,9 @@ public abstract class AlterTypeStatement extends SchemaAlteringStatement
         return -1;
     }
 
-    private boolean updateDefinition(CFMetaData cfm, ColumnDefinition def, ByteBuffer toReplace, UserType updated)
+    private boolean updateDefinition(CFMetaData cfm, ColumnDefinition def, String keyspace, ByteBuffer toReplace, UserType updated)
     {
-        AbstractType<?> t = updateWith(def.type, toReplace, updated);
+        AbstractType<?> t = updateWith(def.type, keyspace, toReplace, updated);
         if (t == null)
             return false;
 
@@ -151,10 +162,10 @@ public abstract class AlterTypeStatement extends SchemaAlteringStatement
         switch (def.kind)
         {
             case PARTITION_KEY:
-                cfm.keyValidator(updateWith(cfm.getKeyValidator(), toReplace, updated));
+                cfm.keyValidator(updateWith(cfm.getKeyValidator(), keyspace, toReplace, updated));
                 break;
             case CLUSTERING_COLUMN:
-                cfm.comparator = CellNames.fromAbstractType(updateWith(cfm.comparator.asAbstractType(), toReplace, updated), cfm.comparator.isDense());
+                cfm.comparator = CellNames.fromAbstractType(updateWith(cfm.comparator.asAbstractType(), keyspace, toReplace, updated), cfm.comparator.isDense());
                 break;
         }
         return true;
@@ -162,24 +173,24 @@ public abstract class AlterTypeStatement extends SchemaAlteringStatement
 
     // Update the provided type were all instance of a given userType is replaced by a new version
     // Note that this methods reaches inside other UserType, CompositeType and CollectionType.
-    private static AbstractType<?> updateWith(AbstractType<?> type, ByteBuffer toReplace, UserType updated)
+    private static AbstractType<?> updateWith(AbstractType<?> type, String keyspace, ByteBuffer toReplace, UserType updated)
     {
         if (type instanceof UserType)
         {
             UserType ut = (UserType)type;
 
             // If it's directly the type we've updated, then just use the new one.
-            if (toReplace.equals(ut.name))
+            if (keyspace.equals(ut.keyspace) && toReplace.equals(ut.name))
                 return updated;
 
             // Otherwise, check for nesting
-            List<AbstractType<?>> updatedTypes = updateTypes(ut.types, toReplace, updated);
-            return updatedTypes == null ? null : new UserType(ut.name, new ArrayList<>(ut.columnNames), updatedTypes);
+            List<AbstractType<?>> updatedTypes = updateTypes(ut.types, keyspace, toReplace, updated);
+            return updatedTypes == null ? null : new UserType(ut.keyspace, ut.name, new ArrayList<>(ut.columnNames), updatedTypes);
         }
         else if (type instanceof CompositeType)
         {
             CompositeType ct = (CompositeType)type;
-            List<AbstractType<?>> updatedTypes = updateTypes(ct.types, toReplace, updated);
+            List<AbstractType<?>> updatedTypes = updateTypes(ct.types, keyspace, toReplace, updated);
             return updatedTypes == null ? null : CompositeType.getInstance(updatedTypes);
         }
         else if (type instanceof ColumnToCollectionType)
@@ -188,7 +199,7 @@ public abstract class AlterTypeStatement extends SchemaAlteringStatement
             Map<ByteBuffer, CollectionType> updatedTypes = null;
             for (Map.Entry<ByteBuffer, CollectionType> entry : ctct.defined.entrySet())
             {
-                AbstractType<?> t = updateWith(entry.getValue(), toReplace, updated);
+                AbstractType<?> t = updateWith(entry.getValue(), keyspace, toReplace, updated);
                 if (t == null)
                     continue;
 
@@ -203,20 +214,20 @@ public abstract class AlterTypeStatement extends SchemaAlteringStatement
         {
             if (type instanceof ListType)
             {
-                AbstractType<?> t = updateWith(((ListType)type).elements, toReplace, updated);
+                AbstractType<?> t = updateWith(((ListType)type).elements, keyspace, toReplace, updated);
                 return t == null ? null : ListType.getInstance(t);
             }
             else if (type instanceof SetType)
             {
-                AbstractType<?> t = updateWith(((SetType)type).elements, toReplace, updated);
+                AbstractType<?> t = updateWith(((SetType)type).elements, keyspace, toReplace, updated);
                 return t == null ? null : SetType.getInstance(t);
             }
             else
             {
                 assert type instanceof MapType;
                 MapType mt = (MapType)type;
-                AbstractType<?> k = updateWith(mt.keys, toReplace, updated);
-                AbstractType<?> v = updateWith(mt.values, toReplace, updated);
+                AbstractType<?> k = updateWith(mt.keys, keyspace, toReplace, updated);
+                AbstractType<?> v = updateWith(mt.values, keyspace, toReplace, updated);
                 if (k == null && v == null)
                     return null;
                 return MapType.getInstance(k == null ? mt.keys : k, v == null ? mt.values : v);
@@ -228,13 +239,13 @@ public abstract class AlterTypeStatement extends SchemaAlteringStatement
         }
     }
 
-    private static List<AbstractType<?>> updateTypes(List<AbstractType<?>> toUpdate, ByteBuffer toReplace, UserType updated)
+    private static List<AbstractType<?>> updateTypes(List<AbstractType<?>> toUpdate, String keyspace, ByteBuffer toReplace, UserType updated)
     {
         // But this can also be nested.
         List<AbstractType<?>> updatedTypes = null;
         for (int i = 0; i < toUpdate.size(); i++)
         {
-            AbstractType<?> t = updateWith(toUpdate.get(i), toReplace, updated);
+            AbstractType<?> t = updateWith(toUpdate.get(i), keyspace, toReplace, updated);
             if (t == null)
                 continue;
 
@@ -250,9 +261,9 @@ public abstract class AlterTypeStatement extends SchemaAlteringStatement
     {
         private final boolean isAdd;
         private final ColumnIdentifier fieldName;
-        private final CQL3Type type;
+        private final CQL3Type.Raw type;
 
-        public AddOrAlter(ColumnIdentifier name, boolean isAdd, ColumnIdentifier fieldName, CQL3Type type)
+        public AddOrAlter(UTName name, boolean isAdd, ColumnIdentifier fieldName, CQL3Type.Raw type)
         {
             super(name);
             this.isAdd = isAdd;
@@ -271,9 +282,9 @@ public abstract class AlterTypeStatement extends SchemaAlteringStatement
 
             List<AbstractType<?>> newTypes = new ArrayList<>(toUpdate.types.size() + 1);
             newTypes.addAll(toUpdate.types);
-            newTypes.add(type.getType());
+            newTypes.add(type.prepare(keyspace()).getType());
 
-            return new UserType(toUpdate.name, newNames, newTypes);
+            return new UserType(toUpdate.keyspace, toUpdate.name, newNames, newTypes);
         }
 
         private UserType doAlter(UserType toUpdate) throws InvalidRequestException
@@ -283,14 +294,14 @@ public abstract class AlterTypeStatement extends SchemaAlteringStatement
                 throw new InvalidRequestException(String.format("Unknown field %s in type %s", fieldName, name));
 
             AbstractType<?> previous = toUpdate.types.get(idx);
-            if (!type.getType().isCompatibleWith(previous))
+            if (!type.prepare(keyspace()).getType().isCompatibleWith(previous))
                 throw new InvalidRequestException(String.format("Type %s is incompatible with previous type %s of field %s in user type %s", type, previous.asCQL3Type(), fieldName, name));
 
             List<ByteBuffer> newNames = new ArrayList<>(toUpdate.columnNames);
             List<AbstractType<?>> newTypes = new ArrayList<>(toUpdate.types);
-            newTypes.set(idx, type.getType());
+            newTypes.set(idx, type.prepare(keyspace()).getType());
 
-            return new UserType(toUpdate.name, newNames, newTypes);
+            return new UserType(toUpdate.keyspace, toUpdate.name, newNames, newTypes);
         }
 
         protected UserType makeUpdatedType(UserType toUpdate) throws InvalidRequestException
@@ -303,7 +314,7 @@ public abstract class AlterTypeStatement extends SchemaAlteringStatement
     {
         private final Map<ColumnIdentifier, ColumnIdentifier> renames;
 
-        public Renames(ColumnIdentifier name, Map<ColumnIdentifier, ColumnIdentifier> renames)
+        public Renames(UTName name, Map<ColumnIdentifier, ColumnIdentifier> renames)
         {
             super(name);
             this.renames = renames;
@@ -324,7 +335,7 @@ public abstract class AlterTypeStatement extends SchemaAlteringStatement
                 newNames.set(idx, to.bytes);
             }
 
-            UserType updated = new UserType(toUpdate.name, newNames, newTypes);
+            UserType updated = new UserType(toUpdate.keyspace, toUpdate.name, newNames, newTypes);
             CreateTypeStatement.checkForDuplicateNames(updated);
             return updated;
         }
@@ -333,9 +344,9 @@ public abstract class AlterTypeStatement extends SchemaAlteringStatement
 
     private static class TypeRename extends AlterTypeStatement
     {
-        private final ColumnIdentifier newName;
+        private final UTName newName;
 
-        public TypeRename(ColumnIdentifier name, ColumnIdentifier newName)
+        public TypeRename(UTName name, UTName newName)
         {
             super(name);
             this.newName = newName;
@@ -343,11 +354,15 @@ public abstract class AlterTypeStatement extends SchemaAlteringStatement
 
         protected UserType makeUpdatedType(UserType toUpdate) throws InvalidRequestException
         {
-            UserType previous = Schema.instance.userTypes.getType(newName.bytes);
+            KSMetaData ksm = Schema.instance.getKSMetaData(newName.getKeyspace());
+            if (ksm == null)
+                throw new InvalidRequestException("Unknown keyspace " + newName.getKeyspace());
+
+            UserType previous = ksm.userTypes.getType(newName.getUserTypeName());
             if (previous != null)
                 throw new InvalidRequestException(String.format("Cannot rename user type %s to %s as another type of that name exists", name, newName));
 
-            return new UserType(newName.bytes, new ArrayList<>(toUpdate.columnNames), new ArrayList<>(toUpdate.types));
+            return new UserType(newName.getKeyspace(), newName.getUserTypeName(), new ArrayList<>(toUpdate.columnNames), new ArrayList<>(toUpdate.types));
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d63d07b9/src/java/org/apache/cassandra/cql3/statements/CFStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/CFStatement.java b/src/java/org/apache/cassandra/cql3/statements/CFStatement.java
index 2ccc203..9b2987c 100644
--- a/src/java/org/apache/cassandra/cql3/statements/CFStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/CFStatement.java
@@ -37,11 +37,20 @@ public abstract class CFStatement extends ParsedStatement
     {
         if (!cfName.hasKeyspace())
         {
-            // XXX: We explicitely only want to call state.getKeyspace() in this case, don't move it outside the if.
+            // XXX: We explicitely only want to call state.getKeyspace() in this case, as we don't want to throw
+            // if not logged in any keyspace but a keyspace is explicitely set on the statement. So don't move
+            // the call outside the 'if' or replace the method by 'prepareKeyspace(state.getKeyspace())'
             cfName.setKeyspace(state.getKeyspace(), true);
         }
     }
 
+    // Only for internal calls, use the version with ClientState for user queries
+    public void prepareKeyspace(String keyspace)
+    {
+        if (!cfName.hasKeyspace())
+            cfName.setKeyspace(keyspace, true);
+    }
+
     public String keyspace()
     {
         assert cfName.hasKeyspace() : "The statement hasn't be prepared correctly";

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d63d07b9/src/java/org/apache/cassandra/cql3/statements/CreateTableStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/CreateTableStatement.java b/src/java/org/apache/cassandra/cql3/statements/CreateTableStatement.java
index 8f934e3..8351e71 100644
--- a/src/java/org/apache/cassandra/cql3/statements/CreateTableStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/CreateTableStatement.java
@@ -150,7 +150,7 @@ public class CreateTableStatement extends SchemaAlteringStatement
 
     public static class RawStatement extends CFStatement
     {
-        private final Map<ColumnIdentifier, CQL3Type> definitions = new HashMap<ColumnIdentifier, CQL3Type>();
+        private final Map<ColumnIdentifier, CQL3Type.Raw> definitions = new HashMap<>();
         public final CFPropDefs properties = new CFPropDefs();
 
         private final List<List<ColumnIdentifier>> keyAliases = new ArrayList<List<ColumnIdentifier>>();
@@ -188,10 +188,10 @@ public class CreateTableStatement extends SchemaAlteringStatement
             CreateTableStatement stmt = new CreateTableStatement(cfName, properties, ifNotExists);
 
             Map<ByteBuffer, CollectionType> definedCollections = null;
-            for (Map.Entry<ColumnIdentifier, CQL3Type> entry : definitions.entrySet())
+            for (Map.Entry<ColumnIdentifier, CQL3Type.Raw> entry : definitions.entrySet())
             {
                 ColumnIdentifier id = entry.getKey();
-                CQL3Type pt = entry.getValue();
+                CQL3Type pt = entry.getValue().prepare(keyspace());
                 if (pt.isCollection())
                 {
                     if (definedCollections == null)
@@ -358,7 +358,7 @@ public class CreateTableStatement extends SchemaAlteringStatement
             return isReversed != null && isReversed ? ReversedType.getInstance(type) : type;
         }
 
-        public void addDefinition(ColumnIdentifier def, CQL3Type type)
+        public void addDefinition(ColumnIdentifier def, CQL3Type.Raw type)
         {
             definedNames.add(def);
             definitions.put(def, type);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d63d07b9/src/java/org/apache/cassandra/cql3/statements/CreateTypeStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/CreateTypeStatement.java b/src/java/org/apache/cassandra/cql3/statements/CreateTypeStatement.java
index c355ceb..de7ce56 100644
--- a/src/java/org/apache/cassandra/cql3/statements/CreateTypeStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/CreateTypeStatement.java
@@ -33,19 +33,29 @@ import org.apache.cassandra.transport.messages.ResultMessage;
 
 public class CreateTypeStatement extends SchemaAlteringStatement
 {
-    private final ColumnIdentifier name;
+    private final UTName name;
     private final List<ColumnIdentifier> columnNames = new ArrayList<>();
-    private final List<CQL3Type> columnTypes = new ArrayList<>();
+    private final List<CQL3Type.Raw> columnTypes = new ArrayList<>();
     private final boolean ifNotExists;
 
-    public CreateTypeStatement(ColumnIdentifier name, boolean ifNotExists)
+    public CreateTypeStatement(UTName name, boolean ifNotExists)
     {
         super();
         this.name = name;
         this.ifNotExists = ifNotExists;
     }
 
-    public void addDefinition(ColumnIdentifier name, CQL3Type type)
+    @Override
+    public void prepareKeyspace(ClientState state) throws InvalidRequestException
+    {
+        if (!name.hasKeyspace())
+            name.setKeyspace(state.getKeyspace());
+
+        if (name.getKeyspace() == null)
+            throw new InvalidRequestException("You need to be logged in a keyspace or use a fully qualified user type name");
+    }
+
+    public void addDefinition(ColumnIdentifier name, CQL3Type.Raw type)
     {
         columnNames.add(name);
         columnTypes.add(type);
@@ -53,13 +63,15 @@ public class CreateTypeStatement extends SchemaAlteringStatement
 
     public void checkAccess(ClientState state) throws UnauthorizedException, InvalidRequestException
     {
-        // We may want a slightly different permission?
-        state.hasAllKeyspacesAccess(Permission.CREATE);
+        state.hasKeyspaceAccess(keyspace(), Permission.CREATE);
     }
 
     public void validate(ClientState state) throws RequestValidationException
     {
-        if (Schema.instance.userTypes.getType(name) != null && !ifNotExists)
+        KSMetaData ksm = Schema.instance.getKSMetaData(name.getKeyspace());
+        if (ksm == null)
+            throw new InvalidRequestException(String.format("Cannot add type in unknown keyspace %s", name.getKeyspace()));
+        if (ksm.userTypes.getType(name.getUserTypeName()) != null && !ifNotExists)
             throw new InvalidRequestException(String.format("A user type of name %s already exists.", name));
     }
 
@@ -80,34 +92,35 @@ public class CreateTypeStatement extends SchemaAlteringStatement
 
     public ResultMessage.SchemaChange.Change changeType()
     {
-        return ResultMessage.SchemaChange.Change.CREATED;
+        return ResultMessage.SchemaChange.Change.UPDATED;
     }
 
     @Override
     public String keyspace()
     {
-        // Kind of ugly, but SchemaAlteringStatement uses that for notifying change, and an empty keyspace
-        // there kind of make sense
-        return "";
+        return name.getKeyspace();
     }
 
-    private UserType createType()
+    private UserType createType() throws InvalidRequestException
     {
         List<ByteBuffer> names = new ArrayList<>(columnNames.size());
         for (ColumnIdentifier name : columnNames)
             names.add(name.bytes);
 
         List<AbstractType<?>> types = new ArrayList<>(columnTypes.size());
-        for (CQL3Type type : columnTypes)
-            types.add(type.getType());
+        for (CQL3Type.Raw type : columnTypes)
+            types.add(type.prepare(keyspace()).getType());
 
-        return new UserType(name.bytes, names, types);
+        return new UserType(name.getKeyspace(), name.getUserTypeName(), names, types);
     }
 
     public void announceMigration() throws InvalidRequestException, ConfigurationException
     {
+        KSMetaData ksm = Schema.instance.getKSMetaData(name.getKeyspace());
+        assert ksm != null; // should haven't validate otherwise
+
         // Can happen with ifNotExists
-        if (Schema.instance.userTypes.getType(name) != null)
+        if (ksm.userTypes.getType(name.getUserTypeName()) != null)
             return;
 
         UserType type = createType();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d63d07b9/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java b/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java
index b465347..d3dfe7c 100644
--- a/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java
@@ -112,7 +112,7 @@ public class DeleteStatement extends ModificationStatement
                 if (def.kind != ColumnDefinition.Kind.REGULAR && def.kind != ColumnDefinition.Kind.COMPACT_VALUE)
                     throw new InvalidRequestException(String.format("Invalid identifier %s for deletion (should not be a PRIMARY KEY part)", def.name));
 
-                Operation op = deletion.prepare(def);
+                Operation op = deletion.prepare(cfm.ksName, def);
                 op.collectMarkerSpecification(boundNames);
                 stmt.addOperation(op);
             }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d63d07b9/src/java/org/apache/cassandra/cql3/statements/DropTypeStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/DropTypeStatement.java b/src/java/org/apache/cassandra/cql3/statements/DropTypeStatement.java
index b20c681..667ead4 100644
--- a/src/java/org/apache/cassandra/cql3/statements/DropTypeStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/DropTypeStatement.java
@@ -28,10 +28,10 @@ import org.apache.cassandra.transport.messages.ResultMessage;
 
 public class DropTypeStatement extends SchemaAlteringStatement
 {
-    private final ColumnIdentifier name;
+    private final UTName name;
     private final boolean ifExists;
 
-    public DropTypeStatement(ColumnIdentifier name, boolean ifExists)
+    public DropTypeStatement(UTName name, boolean ifExists)
     {
         super();
         this.name = name;
@@ -40,13 +40,16 @@ public class DropTypeStatement extends SchemaAlteringStatement
 
     public void checkAccess(ClientState state) throws UnauthorizedException, InvalidRequestException
     {
-        // We may want a slightly different permission?
-        state.hasAllKeyspacesAccess(Permission.DROP);
+        state.hasKeyspaceAccess(keyspace(), Permission.DROP);
     }
 
     public void validate(ClientState state) throws RequestValidationException
     {
-        UserType old = Schema.instance.userTypes.getType(name);
+        KSMetaData ksm = Schema.instance.getKSMetaData(name.getKeyspace());
+        if (ksm == null)
+            throw new InvalidRequestException(String.format("Cannot drop type in unknown keyspace %s", name.getKeyspace()));
+
+        UserType old = ksm.userTypes.getType(name.getUserTypeName());
         if (old == null)
         {
             if (ifExists)
@@ -61,19 +64,22 @@ public class DropTypeStatement extends SchemaAlteringStatement
         // We have two places to check: 1) other user type that can nest the one
         // we drop and 2) existing tables referencing the type (maybe in a nested
         // way).
-        for (UserType ut : Schema.instance.userTypes.getAllTypes().values())
+
+        for (KSMetaData ksm2 : Schema.instance.getKeyspaceDefinitions())
         {
-            if (ut.name.equals(name.bytes))
-                continue;
-            if (isUsedBy(ut))
-                throw new InvalidRequestException(String.format("Cannot drop user type %s as it is still used by user type %s", name, ut.asCQL3Type()));
-        }
+            for (UserType ut : ksm2.userTypes.getAllTypes().values())
+            {
+                if (ut.keyspace.equals(name.getKeyspace()) && ut.name.equals(name.getUserTypeName()))
+                    continue;
+                if (isUsedBy(ut))
+                    throw new InvalidRequestException(String.format("Cannot drop user type %s as it is still used by user type %s", name, ut.asCQL3Type()));
+            }
 
-        for (KSMetaData ksm : Schema.instance.getKeyspaceDefinitions())
-            for (CFMetaData cfm : ksm.cfMetaData().values())
+            for (CFMetaData cfm : ksm2.cfMetaData().values())
                 for (ColumnDefinition def : cfm.allColumns())
                     if (isUsedBy(def.type))
                         throw new InvalidRequestException(String.format("Cannot drop user type %s as it is still used by table %s.%s", name, cfm.ksName, cfm.cfName));
+        }
     }
 
     private boolean isUsedBy(AbstractType<?> toCheck) throws RequestValidationException
@@ -82,8 +88,12 @@ public class DropTypeStatement extends SchemaAlteringStatement
         {
             CompositeType ct = (CompositeType)toCheck;
 
-            if ((ct instanceof UserType) && name.bytes.equals(((UserType)ct).name))
-                return true;
+            if ((ct instanceof UserType))
+            {
+                UserType ut = (UserType)ct;
+                if (name.getKeyspace().equals(ut.keyspace) && name.getUserTypeName().equals(ut.name))
+                    return true;
+            }
 
             // Also reach into subtypes
             for (AbstractType<?> subtype : ct.types)
@@ -110,20 +120,21 @@ public class DropTypeStatement extends SchemaAlteringStatement
 
     public ResultMessage.SchemaChange.Change changeType()
     {
-        return ResultMessage.SchemaChange.Change.DROPPED;
+        return ResultMessage.SchemaChange.Change.UPDATED;
     }
 
     @Override
     public String keyspace()
     {
-        // Kind of ugly, but SchemaAlteringStatement uses that for notifying change, and an empty keyspace
-        // there kind of make sense
-        return "";
+        return name.getKeyspace();
     }
 
     public void announceMigration() throws InvalidRequestException, ConfigurationException
     {
-        UserType toDrop = Schema.instance.userTypes.getType(name);
+        KSMetaData ksm = Schema.instance.getKSMetaData(name.getKeyspace());
+        assert ksm != null;
+
+        UserType toDrop = ksm.userTypes.getType(name.getUserTypeName());
         // Can be null with ifExists
         if (toDrop != null)
             MigrationManager.announceTypeDrop(toDrop);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d63d07b9/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
index d164816..148edda 100644
--- a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
@@ -174,7 +174,7 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
 
                     if (rel.operator() == Relation.Type.EQ)
                     {
-                        Term t = rel.getValue().prepare(def);
+                        Term t = rel.getValue().prepare(keyspace(), def);
                         t.collectMarkerSpecification(names);
                         restriction = new Restriction.EQ(t, false);
                     }
@@ -182,7 +182,7 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
                     {
                         if (rel.getValue() != null)
                         {
-                            Term t = rel.getValue().prepare(def);
+                            Term t = rel.getValue().prepare(keyspace(), def);
                             t.collectMarkerSpecification(names);
                             restriction = Restriction.IN.create(t);
                         }
@@ -191,7 +191,7 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
                             List<Term> values = new ArrayList<Term>(rel.getInValues().size());
                             for (Term.Raw raw : rel.getInValues())
                             {
-                                Term t = raw.prepare(def);
+                                Term t = raw.prepare(keyspace(), def);
                                 t.collectMarkerSpecification(names);
                                 values.add(t);
                             }
@@ -612,7 +612,7 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
                         if (def.type instanceof ListType)
                             throw new InvalidRequestException(String.format("List operation (%s) are not allowed in conditional updates", def.name));
 
-                        Operation condition = entry.right.prepare(def);
+                        Operation condition = entry.right.prepare(keyspace(), def);
                         assert !condition.requiresRead();
 
                         condition.collectMarkerSpecification(boundNames);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d63d07b9/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
index b3f10c6..4d89988 100644
--- a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
@@ -1043,7 +1043,7 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
             Term prepLimit = null;
             if (limit != null)
             {
-                prepLimit = limit.prepare(limitReceiver());
+                prepLimit = limit.prepare(keyspace(), limitReceiver());
                 prepLimit.collectMarkerSpecification(names);
             }
 
@@ -1424,7 +1424,7 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
                     {
                         if (restriction != null)
                             throw new InvalidRequestException(String.format("%s cannot be restricted by more than one relation if it includes an Equal", def.name));
-                        Term t = newRel.getValue().prepare(receiver);
+                        Term t = newRel.getValue().prepare(keyspace(), receiver);
                         t.collectMarkerSpecification(boundNames);
                         restriction = new Restriction.EQ(t, newRel.onToken);
                     }
@@ -1437,7 +1437,7 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
                     {
                         // Means we have a "SELECT ... IN ?"
                         assert newRel.getValue() != null;
-                        Term t = newRel.getValue().prepare(receiver);
+                        Term t = newRel.getValue().prepare(keyspace(), receiver);
                         t.collectMarkerSpecification(boundNames);
                         restriction = Restriction.IN.create(t);
                     }
@@ -1446,7 +1446,7 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
                         List<Term> inValues = new ArrayList<Term>(newRel.getInValues().size());
                         for (Term.Raw raw : newRel.getInValues())
                         {
-                            Term t = raw.prepare(receiver);
+                            Term t = raw.prepare(keyspace(), receiver);
                             t.collectMarkerSpecification(boundNames);
                             inValues.add(t);
                         }
@@ -1462,7 +1462,7 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
                             restriction = new Restriction.Slice(newRel.onToken);
                         else if (!restriction.isSlice())
                             throw new InvalidRequestException(String.format("%s cannot be restricted by both an equal and an inequal relation", def.name));
-                        Term t = newRel.getValue().prepare(receiver);
+                        Term t = newRel.getValue().prepare(keyspace(), receiver);
                         t.collectMarkerSpecification(boundNames);
                         ((Restriction.Slice)restriction).setBound(def.name, newRel.operator(), t);
                     }
@@ -1482,7 +1482,7 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
                             throw new InvalidRequestException(String.format("Collection column %s can only be restricted by CONTAINS or CONTAINS KEY", def.name));
                         boolean isKey = newRel.operator() == Relation.Type.CONTAINS_KEY;
                         receiver = makeCollectionReceiver(receiver, isKey);
-                        Term t = newRel.getValue().prepare(receiver);
+                        Term t = newRel.getValue().prepare(keyspace(), receiver);
                         ((Restriction.Contains)restriction).add(t, isKey);
                     }
             }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d63d07b9/src/java/org/apache/cassandra/cql3/statements/Selection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/Selection.java b/src/java/org/apache/cassandra/cql3/statements/Selection.java
index f8b3f52..0d9c355 100644
--- a/src/java/org/apache/cassandra/cql3/statements/Selection.java
+++ b/src/java/org/apache/cassandra/cql3/statements/Selection.java
@@ -150,7 +150,7 @@ public abstract class Selection
             if (returnType == null)
                 throw new InvalidRequestException(String.format("Unknown function '%s'", withFun.functionName));
             ColumnSpecification spec = makeFunctionSpec(cfm, withFun, returnType, raw.alias);
-            Function fun = Functions.get(withFun.functionName, args, spec);
+            Function fun = Functions.get(cfm.ksName, withFun.functionName, args, spec);
             if (metadata != null)
                 metadata.add(spec);
             return new FunctionSelector(fun, args);
@@ -352,7 +352,7 @@ public abstract class Selection
         public abstract ByteBuffer compute(ResultSetBuilder rs) throws InvalidRequestException;
         public abstract AbstractType<?> getType();
 
-        public boolean isAssignableTo(ColumnSpecification receiver)
+        public boolean isAssignableTo(String keyspace, ColumnSpecification receiver)
         {
             return getType().asCQL3Type().equals(receiver.type.asCQL3Type());
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d63d07b9/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java b/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java
index 6cf0856..1102c09 100644
--- a/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java
@@ -155,13 +155,13 @@ public class UpdateStatement extends ModificationStatement
                 {
                     case PARTITION_KEY:
                     case CLUSTERING_COLUMN:
-                        Term t = value.prepare(def);
+                        Term t = value.prepare(keyspace(), def);
                         t.collectMarkerSpecification(boundNames);
                         stmt.addKeyValue(def.name, t);
                         break;
                     case COMPACT_VALUE:
                     case REGULAR:
-                        Operation operation = new Operation.SetValue(value).prepare(def);
+                        Operation operation = new Operation.SetValue(value).prepare(keyspace(), def);
                         operation.collectMarkerSpecification(boundNames);
                         stmt.addOperation(operation);
                         break;
@@ -207,7 +207,7 @@ public class UpdateStatement extends ModificationStatement
                 if (def == null)
                     throw new InvalidRequestException(String.format("Unknown identifier %s", entry.left));
 
-                Operation operation = entry.right.prepare(def);
+                Operation operation = entry.right.prepare(keyspace(), def);
                 operation.collectMarkerSpecification(boundNames);
 
                 switch (def.kind)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d63d07b9/src/java/org/apache/cassandra/db/CFRowAdder.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/CFRowAdder.java b/src/java/org/apache/cassandra/db/CFRowAdder.java
index 097f6aa..ec22e9a 100644
--- a/src/java/org/apache/cassandra/db/CFRowAdder.java
+++ b/src/java/org/apache/cassandra/db/CFRowAdder.java
@@ -17,14 +17,18 @@
  */
 package org.apache.cassandra.db;
 
+import java.nio.ByteBuffer;
+
 import org.apache.cassandra.cql3.ColumnIdentifier;
 import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.db.composites.CellName;
 import org.apache.cassandra.db.composites.Composite;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.db.marshal.CollectionType;
+import org.apache.cassandra.db.marshal.ListType;
 import org.apache.cassandra.db.marshal.MapType;
 import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.UUIDGen;
 
 /**
  * Convenience object to populate a given CQL3 row in a ColumnFamily object.
@@ -57,6 +61,15 @@ public class CFRowAdder
         return add(cf.getComparator().create(prefix, def.name), def, value);
     }
 
+    public CFRowAdder resetCollection(String cql3ColumnName)
+    {
+        ColumnDefinition def = getDefinition(cql3ColumnName);
+        assert def.type.isCollection();
+        Composite name = cf.getComparator().create(prefix, def.name);
+        cf.addAtom(new RangeTombstone(name.start(), name.end(), timestamp - 1, ldt));
+        return this;
+    }
+
     public CFRowAdder addMapEntry(String cql3ColumnName, Object key, Object value)
     {
         ColumnDefinition def = getDefinition(cql3ColumnName);
@@ -66,6 +79,14 @@ public class CFRowAdder
         return add(name, def, value);
     }
 
+    public CFRowAdder addListEntry(String cql3ColumnName, Object value)
+    {
+        ColumnDefinition def = getDefinition(cql3ColumnName);
+        assert def.type instanceof ListType;
+        CellName name = cf.getComparator().create(prefix, def.name, ByteBuffer.wrap(UUIDGen.getTimeUUIDBytes()));
+        return add(name, def, value);
+    }
+
     private ColumnDefinition getDefinition(String name)
     {
         return cf.metadata().getColumnDefinition(new ColumnIdentifier(name, false));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d63d07b9/src/java/org/apache/cassandra/db/DefsTables.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DefsTables.java b/src/java/org/apache/cassandra/db/DefsTables.java
index a9e1c74..1a49192 100644
--- a/src/java/org/apache/cassandra/db/DefsTables.java
+++ b/src/java/org/apache/cassandra/db/DefsTables.java
@@ -128,7 +128,7 @@ public class DefsTables
             if (Schema.invalidSchemaRow(row) || Schema.ignoredSchemaRow(row))
                 continue;
 
-            keyspaces.add(KSMetaData.fromSchema(row, serializedColumnFamilies(row.key)));
+            keyspaces.add(KSMetaData.fromSchema(row, serializedColumnFamilies(row.key), serializedUserTypes(row.key)));
         }
 
         return keyspaces;
@@ -142,6 +142,14 @@ public class DefsTables
                                                                                          System.currentTimeMillis())));
     }
 
+    private static Row serializedUserTypes(DecoratedKey ksNameKey)
+    {
+        ColumnFamilyStore cfsStore = SystemKeyspace.schemaCFS(SystemKeyspace.SCHEMA_USER_TYPES_CF);
+        return new Row(ksNameKey, cfsStore.getColumnFamily(QueryFilter.getIdentityFilter(ksNameKey,
+                                                                                         SystemKeyspace.SCHEMA_USER_TYPES_CF,
+                                                                                         System.currentTimeMillis())));
+    }
+
     /**
      * Merge remote schema in form of mutations with local and mutate ks/cf metadata objects
      * (which also involves fs operations on add/drop ks/cf)
@@ -156,7 +164,7 @@ public class DefsTables
         // current state of the schema
         Map<DecoratedKey, ColumnFamily> oldKeyspaces = SystemKeyspace.getSchema(SystemKeyspace.SCHEMA_KEYSPACES_CF);
         Map<DecoratedKey, ColumnFamily> oldColumnFamilies = SystemKeyspace.getSchema(SystemKeyspace.SCHEMA_COLUMNFAMILIES_CF);
-        List<Row> oldTypes = SystemKeyspace.serializedSchema(SystemKeyspace.SCHEMA_USER_TYPES_CF);
+        Map<DecoratedKey, ColumnFamily> oldTypes = SystemKeyspace.getSchema(SystemKeyspace.SCHEMA_USER_TYPES_CF);
 
         for (Mutation mutation : mutations)
             mutation.apply();
@@ -167,17 +175,16 @@ public class DefsTables
         // with new data applied
         Map<DecoratedKey, ColumnFamily> newKeyspaces = SystemKeyspace.getSchema(SystemKeyspace.SCHEMA_KEYSPACES_CF);
         Map<DecoratedKey, ColumnFamily> newColumnFamilies = SystemKeyspace.getSchema(SystemKeyspace.SCHEMA_COLUMNFAMILIES_CF);
-        List<Row> newTypes = SystemKeyspace.serializedSchema(SystemKeyspace.SCHEMA_USER_TYPES_CF);
+        Map<DecoratedKey, ColumnFamily> newTypes = SystemKeyspace.getSchema(SystemKeyspace.SCHEMA_USER_TYPES_CF);
 
         Set<String> keyspacesToDrop = mergeKeyspaces(oldKeyspaces, newKeyspaces);
         mergeColumnFamilies(oldColumnFamilies, newColumnFamilies);
+        mergeTypes(oldTypes, newTypes);
 
         // it is safe to drop a keyspace only when all nested ColumnFamilies where deleted
         for (String keyspaceToDrop : keyspacesToDrop)
             dropKeyspace(keyspaceToDrop);
 
-        mergeTypes(oldTypes, newTypes);
-
         Schema.instance.updateVersionAndAnnounce();
     }
 
@@ -195,7 +202,7 @@ public class DefsTables
 
             // we don't care about nested ColumnFamilies here because those are going to be processed separately
             if (!(ksAttrs.getColumnCount() == 0))
-                addKeyspace(KSMetaData.fromSchema(new Row(entry.getKey(), entry.getValue()), Collections.<CFMetaData>emptyList()));
+                addKeyspace(KSMetaData.fromSchema(new Row(entry.getKey(), entry.getValue()), Collections.<CFMetaData>emptyList(), new UTMetaData()));
         }
 
         /**
@@ -216,7 +223,7 @@ public class DefsTables
 
             if (prevValue.getColumnCount() == 0)
             {
-                addKeyspace(KSMetaData.fromSchema(new Row(entry.getKey(), newValue), Collections.<CFMetaData>emptyList()));
+                addKeyspace(KSMetaData.fromSchema(new Row(entry.getKey(), newValue), Collections.<CFMetaData>emptyList(), new UTMetaData()));
                 continue;
             }
 
@@ -241,7 +248,7 @@ public class DefsTables
             if (newState.getColumnCount() == 0)
                 keyspacesToDrop.add(AsciiType.instance.getString(key.key));
             else
-                updateKeyspace(KSMetaData.fromSchema(new Row(key, newState), Collections.<CFMetaData>emptyList()));
+                updateKeyspace(KSMetaData.fromSchema(new Row(key, newState), Collections.<CFMetaData>emptyList(), new UTMetaData()));
         }
 
         return keyspacesToDrop;
@@ -312,52 +319,54 @@ public class DefsTables
         }
     }
 
-    private static void mergeTypes(List<Row> old, List<Row> updated)
+    private static void mergeTypes(Map<DecoratedKey, ColumnFamily> old, Map<DecoratedKey, ColumnFamily> updated)
     {
-        MapDifference<ByteBuffer, UserType> diff = Maps.difference(UTMetaData.fromSchema(old).getAllTypes(),
-                                                                   UTMetaData.fromSchema(updated).getAllTypes());
+        MapDifference<DecoratedKey, ColumnFamily> diff = Maps.difference(old, updated);
 
-        // New types
-        for (UserType newType : diff.entriesOnlyOnRight().values())
-            Schema.instance.loadType(newType);
+        // New keyspace with types
+        for (Map.Entry<DecoratedKey, ColumnFamily> entry : diff.entriesOnlyOnRight().entrySet())
+        {
+            ColumnFamily cfTypes = entry.getValue();
+            if (cfTypes.getColumnCount() == 0)
+                continue;
 
-        // Dropped types
-        for (UserType droppedType : diff.entriesOnlyOnLeft().values())
-            Schema.instance.dropType(droppedType);
+            for (UserType ut : UTMetaData.fromSchema(new Row(entry.getKey(), cfTypes)).values())
+                addType(ut);
+        }
 
-        // Now deal with modified types: if one is 'extended' compared to the other, we always prefer that
-        // one. But otherwise (if it's a field rename for instance) we just pick the more recent one
-        // (timestamp wise).
-        for (MapDifference.ValueDifference<UserType> modified : diff.entriesDiffering().values())
+        for (Map.Entry<DecoratedKey, MapDifference.ValueDifference<ColumnFamily>> modifiedEntry : diff.entriesDiffering().entrySet())
         {
-            UserType u1 = modified.leftValue();
-            UserType u2 = modified.rightValue();
-            // Note that that loadType is a 'load or update'
-            if (u1.isCompatibleWith(u2) && !u2.isCompatibleWith(u1))
+            DecoratedKey keyspace = modifiedEntry.getKey();
+            ColumnFamily prevCFTypes = modifiedEntry.getValue().leftValue(); // state before external modification
+            ColumnFamily newCFTypes = modifiedEntry.getValue().rightValue(); // updated state
+
+            if (prevCFTypes.getColumnCount() == 0) // whole keyspace was deleted and now it's re-created
             {
-                Schema.instance.loadType(u1);
+                for (UserType ut : UTMetaData.fromSchema(new Row(keyspace, newCFTypes)).values())
+                    addType(ut);
             }
-            else if (u2.isCompatibleWith(u1) && !u1.isCompatibleWith(u2))
+            else if (newCFTypes.getColumnCount() == 0) // whole keyspace is deleted
             {
-                Schema.instance.loadType(u2);
+                for (UserType ut : UTMetaData.fromSchema(new Row(keyspace, prevCFTypes)).values())
+                    dropType(ut);
             }
-            else
+            else // has modifications in the types, need to perform nested diff to determine what was really changed
             {
-                long leftTimestamp = firstTimestampOf(u1.name, old);
-                long rightTimestamp = firstTimestampOf(u1.name, updated);
-                Schema.instance.loadType(leftTimestamp > rightTimestamp ? u1 : u2);
+                MapDifference<ByteBuffer, UserType> typesDiff = Maps.difference(UTMetaData.fromSchema(new Row(keyspace, prevCFTypes)),
+                                                                                UTMetaData.fromSchema(new Row(keyspace, newCFTypes)));
+
+                for (UserType type : typesDiff.entriesOnlyOnRight().values())
+                    addType(type);
+
+                for (UserType type : typesDiff.entriesOnlyOnLeft().values())
+                    dropType(type);
+
+                for (MapDifference.ValueDifference<UserType> tdiff : typesDiff.entriesDiffering().values())
+                    addType(tdiff.rightValue()); // use the most recent value
             }
         }
     }
 
-    private static long firstTimestampOf(ByteBuffer key, List<Row> rows)
-    {
-        for (Row row : rows)
-            if (row.key.key.equals(key))
-                return row.cf.iterator().next().timestamp();
-        return -1;
-    }
-
     private static void addKeyspace(KSMetaData ksm)
     {
         assert Schema.instance.getKSMetaData(ksm.name) == null;
@@ -393,6 +402,19 @@ public class DefsTables
         }
     }
 
+    private static void addType(UserType ut)
+    {
+        KSMetaData ksm = Schema.instance.getKSMetaData(ut.keyspace);
+        assert ksm != null;
+
+        logger.info("Loading {}", ut);
+
+        ksm.userTypes.addType(ut);
+
+        if (!StorageService.instance.isClientMode())
+            MigrationManager.instance.notifyUpdateKeyspace(ksm);
+    }
+
     private static void updateKeyspace(KSMetaData newState)
     {
         KSMetaData oldKsm = Schema.instance.getKSMetaData(newState.name);
@@ -483,6 +505,17 @@ public class DefsTables
         }
     }
 
+    private static void dropType(UserType ut)
+    {
+        KSMetaData ksm = Schema.instance.getKSMetaData(ut.keyspace);
+        assert ksm != null;
+
+        ksm.userTypes.removeType(ut);
+
+        if (!StorageService.instance.isClientMode())
+            MigrationManager.instance.notifyUpdateKeyspace(ksm);
+    }
+
     private static KSMetaData makeNewKeyspaceDefinition(KSMetaData ksm, CFMetaData toExclude)
     {
         // clone ksm but do not include the new def

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d63d07b9/src/java/org/apache/cassandra/db/marshal/TypeParser.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/marshal/TypeParser.java b/src/java/org/apache/cassandra/db/marshal/TypeParser.java
index 8ba0c0e..1b83180 100644
--- a/src/java/org/apache/cassandra/db/marshal/TypeParser.java
+++ b/src/java/org/apache/cassandra/db/marshal/TypeParser.java
@@ -297,7 +297,7 @@ public class TypeParser
         }
     }
 
-    public Pair<ByteBuffer, List<Pair<ByteBuffer, AbstractType>>> getUserTypeParameters() throws SyntaxException, ConfigurationException
+    public Pair<Pair<String, ByteBuffer>, List<Pair<ByteBuffer, AbstractType>>> getUserTypeParameters() throws SyntaxException, ConfigurationException
     {
 
         if (isEOS() || str.charAt(idx) != '(')
@@ -306,6 +306,8 @@ public class TypeParser
         ++idx; // skipping '('
 
         skipBlankAndComma();
+        String keyspace = readNextIdentifier();
+        skipBlankAndComma();
         ByteBuffer typeName = fromHex(readNextIdentifier());
         List<Pair<ByteBuffer, AbstractType>> defs = new ArrayList<>();
 
@@ -314,7 +316,7 @@ public class TypeParser
             if (str.charAt(idx) == ')')
             {
                 ++idx;
-                return Pair.create(typeName, defs);
+                return Pair.create(Pair.create(keyspace, typeName), defs);
             }
 
             ByteBuffer name = fromHex(readNextIdentifier());
@@ -561,10 +563,10 @@ public class TypeParser
         return sb.toString();
     }
 
-    public static String stringifyUserTypeParameters(ByteBuffer typeName, List<ByteBuffer> columnNames, List<AbstractType<?>> columnTypes)
+    public static String stringifyUserTypeParameters(String keysace, ByteBuffer typeName, List<ByteBuffer> columnNames, List<AbstractType<?>> columnTypes)
     {
         StringBuilder sb = new StringBuilder();
-        sb.append('(').append(ByteBufferUtil.bytesToHex(typeName));
+        sb.append('(').append(keysace).append(",").append(ByteBufferUtil.bytesToHex(typeName));
 
         for (int i = 0; i < columnNames.size(); i++)
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d63d07b9/src/java/org/apache/cassandra/db/marshal/UserType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/marshal/UserType.java b/src/java/org/apache/cassandra/db/marshal/UserType.java
index 7b84301..eb95fb9 100644
--- a/src/java/org/apache/cassandra/db/marshal/UserType.java
+++ b/src/java/org/apache/cassandra/db/marshal/UserType.java
@@ -37,20 +37,23 @@ import org.apache.cassandra.utils.Pair;
  */
 public class UserType extends CompositeType
 {
+    public final String keyspace;
     public final ByteBuffer name;
     public final List<ByteBuffer> columnNames;
 
-    public UserType(ByteBuffer name, List<ByteBuffer> columnNames, List<AbstractType<?>> types)
+    public UserType(String keyspace, ByteBuffer name, List<ByteBuffer> columnNames, List<AbstractType<?>> types)
     {
         super(types);
+        this.keyspace = keyspace;
         this.name = name;
         this.columnNames = columnNames;
     }
 
     public static UserType getInstance(TypeParser parser) throws ConfigurationException, SyntaxException
     {
-        Pair<ByteBuffer, List<Pair<ByteBuffer, AbstractType>>> params = parser.getUserTypeParameters();
-        ByteBuffer name = params.left;
+        Pair<Pair<String, ByteBuffer>, List<Pair<ByteBuffer, AbstractType>>> params = parser.getUserTypeParameters();
+        String keyspace = params.left.left;
+        ByteBuffer name = params.left.right;
         List<ByteBuffer> columnNames = new ArrayList<>(params.right.size());
         List<AbstractType<?>> columnTypes = new ArrayList<>(params.right.size());
         for (Pair<ByteBuffer, AbstractType> p : params.right)
@@ -58,13 +61,13 @@ public class UserType extends CompositeType
             columnNames.add(p.left);
             columnTypes.add(p.right);
         }
-        return new UserType(name, columnNames, columnTypes);
+        return new UserType(keyspace, name, columnNames, columnTypes);
     }
 
     @Override
     public final int hashCode()
     {
-        return Objects.hashCode(name, columnNames, types);
+        return Objects.hashCode(keyspace, name, columnNames, types);
     }
 
     @Override
@@ -74,18 +77,18 @@ public class UserType extends CompositeType
             return false;
 
         UserType that = (UserType)o;
-        return name.equals(that.name) && columnNames.equals(that.columnNames) && types.equals(that.types);
+        return keyspace.equals(that.keyspace) && name.equals(that.name) && columnNames.equals(that.columnNames) && types.equals(that.types);
     }
 
     @Override
     public CQL3Type asCQL3Type()
     {
-        return CQL3Type.UserDefined.create(name, this);
+        return CQL3Type.UserDefined.create(this);
     }
 
     @Override
     public String toString()
     {
-        return getClass().getName() + TypeParser.stringifyUserTypeParameters(name, columnNames, types);
+        return getClass().getName() + TypeParser.stringifyUserTypeParameters(keyspace, name, columnNames, types);
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d63d07b9/src/java/org/apache/cassandra/service/MigrationManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/MigrationManager.java b/src/java/org/apache/cassandra/service/MigrationManager.java
index e308613..f0e0012 100644
--- a/src/java/org/apache/cassandra/service/MigrationManager.java
+++ b/src/java/org/apache/cassandra/service/MigrationManager.java
@@ -266,7 +266,6 @@ public class MigrationManager implements IEndpointStateChangeSubscriber
 
     public static void announceTypeUpdate(UserType updatedType)
     {
-        // We don't make a difference with a new type. DefsTable.mergeType will make sure we keep the updated version.
         announceNewType(updatedType);
     }
 


[2/2] git commit: Make user types keyspace scoped

Posted by sl...@apache.org.
Make user types keyspace scoped

patch by slebresne; reviewed by iamaleksey for CASSANDRA-6438


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/d63d07b9
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/d63d07b9
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/d63d07b9

Branch: refs/heads/trunk
Commit: d63d07b9270d73a289086c69002b5a0023b2d233
Parents: 0a1b277
Author: Sylvain Lebresne <sy...@datastax.com>
Authored: Wed Dec 4 10:20:40 2013 +0100
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Tue Jan 7 17:47:11 2014 +0100

----------------------------------------------------------------------
 .../org/apache/cassandra/config/CFMetaData.java |   8 +-
 .../cassandra/config/DatabaseDescriptor.java    |   2 -
 .../org/apache/cassandra/config/KSMetaData.java |  35 ++-
 .../org/apache/cassandra/config/Schema.java     |  23 +-
 .../org/apache/cassandra/config/UTMetaData.java |  88 ++++---
 .../apache/cassandra/cql3/AbstractMarker.java   |   6 +-
 .../cassandra/cql3/AssignementTestable.java     |   4 +-
 .../org/apache/cassandra/cql3/Attributes.java   |   4 +-
 .../org/apache/cassandra/cql3/CQL3Type.java     | 256 ++++++++++++-------
 .../org/apache/cassandra/cql3/Constants.java    |  12 +-
 src/java/org/apache/cassandra/cql3/Cql.g        |  28 +-
 src/java/org/apache/cassandra/cql3/Lists.java   |  14 +-
 src/java/org/apache/cassandra/cql3/Maps.java    |  18 +-
 .../org/apache/cassandra/cql3/Operation.java    |  40 +--
 src/java/org/apache/cassandra/cql3/Sets.java    |  14 +-
 src/java/org/apache/cassandra/cql3/Term.java    |   2 +-
 .../org/apache/cassandra/cql3/TypeCast.java     |  27 +-
 src/java/org/apache/cassandra/cql3/UTName.java  |  58 +++++
 .../org/apache/cassandra/cql3/UserTypes.java    |  14 +-
 .../cassandra/cql3/functions/FunctionCall.java  |   8 +-
 .../cassandra/cql3/functions/Functions.java     |  14 +-
 .../cql3/statements/AlterTableStatement.java    |   6 +-
 .../cql3/statements/AlterTypeStatement.java     | 129 +++++-----
 .../cassandra/cql3/statements/CFStatement.java  |  11 +-
 .../cql3/statements/CreateTableStatement.java   |   8 +-
 .../cql3/statements/CreateTypeStatement.java    |  45 ++--
 .../cql3/statements/DeleteStatement.java        |   2 +-
 .../cql3/statements/DropTypeStatement.java      |  51 ++--
 .../cql3/statements/ModificationStatement.java  |   8 +-
 .../cql3/statements/SelectStatement.java        |  12 +-
 .../cassandra/cql3/statements/Selection.java    |   4 +-
 .../cql3/statements/UpdateStatement.java        |   6 +-
 .../org/apache/cassandra/db/CFRowAdder.java     |  21 ++
 .../org/apache/cassandra/db/DefsTables.java     | 113 +++++---
 .../apache/cassandra/db/marshal/TypeParser.java |  10 +-
 .../apache/cassandra/db/marshal/UserType.java   |  19 +-
 .../cassandra/service/MigrationManager.java     |   1 -
 37 files changed, 686 insertions(+), 435 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/d63d07b9/src/java/org/apache/cassandra/config/CFMetaData.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/CFMetaData.java b/src/java/org/apache/cassandra/config/CFMetaData.java
index d8ae26a..78ee300 100644
--- a/src/java/org/apache/cassandra/config/CFMetaData.java
+++ b/src/java/org/apache/cassandra/config/CFMetaData.java
@@ -39,6 +39,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.cql3.*;
+import org.apache.cassandra.cql3.statements.CFStatement;
 import org.apache.cassandra.cql3.statements.CreateTableStatement;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.compaction.AbstractCompactionStrategy;
@@ -172,10 +173,11 @@ public final class CFMetaData
                                                               + ") WITH COMMENT='triggers metadata table'");
 
     public static final CFMetaData SchemaUserTypesCf = compile("CREATE TABLE " + SystemKeyspace.SCHEMA_USER_TYPES_CF + " ("
+                                                               + "keyspace_name text,"
                                                                + "type_name text,"
                                                                + "column_names list<text>,"
                                                                + "column_types list<text>,"
-                                                               + "PRIMARY KEY (type_name)"
+                                                               + "PRIMARY KEY (keyspace_name, type_name)"
                                                                + ") WITH COMMENT='Defined user types' AND gc_grace_seconds=8640");
 
     public static final CFMetaData HintsCf = compile("CREATE TABLE " + SystemKeyspace.HINTS_CF + " ("
@@ -517,7 +519,9 @@ public final class CFMetaData
     {
         try
         {
-            CreateTableStatement statement = (CreateTableStatement) QueryProcessor.parseStatement(cql).prepare().statement;
+            CFStatement parsed = (CFStatement)QueryProcessor.parseStatement(cql);
+            parsed.prepareKeyspace(keyspace);
+            CreateTableStatement statement = (CreateTableStatement) parsed.prepare().statement;
             CFMetaData cfm = newSystemMetadata(keyspace, statement.columnFamily(), "", statement.comparator);
             statement.applyPropertiesTo(cfm);
             return cfm.rebuild();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d63d07b9/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 6b49d21..cef5e14 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -517,8 +517,6 @@ public class DatabaseDescriptor
     /** load keyspace (keyspace) definitions, but do not initialize the keyspace instances. */
     public static void loadSchemas()
     {
-        Schema.instance.loadUserTypes();
-
         ColumnFamilyStore schemaCFS = SystemKeyspace.schemaCFS(SystemKeyspace.SCHEMA_KEYSPACES_CF);
 
         // if keyspace with definitions is empty try loading the old way

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d63d07b9/src/java/org/apache/cassandra/config/KSMetaData.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/KSMetaData.java b/src/java/org/apache/cassandra/config/KSMetaData.java
index c3fe641..05e6248 100644
--- a/src/java/org/apache/cassandra/config/KSMetaData.java
+++ b/src/java/org/apache/cassandra/config/KSMetaData.java
@@ -43,8 +43,20 @@ public final class KSMetaData
     private final Map<String, CFMetaData> cfMetaData;
     public final boolean durableWrites;
 
+    public final UTMetaData userTypes;
+
     KSMetaData(String name, Class<? extends AbstractReplicationStrategy> strategyClass, Map<String, String> strategyOptions, boolean durableWrites, Iterable<CFMetaData> cfDefs)
     {
+        this(name, strategyClass, strategyOptions, durableWrites, cfDefs, new UTMetaData());
+    }
+
+    KSMetaData(String name,
+               Class<? extends AbstractReplicationStrategy> strategyClass,
+               Map<String, String> strategyOptions,
+               boolean durableWrites,
+               Iterable<CFMetaData> cfDefs,
+               UTMetaData userTypes)
+    {
         this.name = name;
         this.strategyClass = strategyClass == null ? NetworkTopologyStrategy.class : strategyClass;
         this.strategyOptions = strategyOptions;
@@ -53,6 +65,7 @@ public final class KSMetaData
             cfmap.put(cfm.cfName, cfm);
         this.cfMetaData = Collections.unmodifiableMap(cfmap);
         this.durableWrites = durableWrites;
+        this.userTypes = userTypes;
     }
 
     // For new user created keyspaces (through CQL)
@@ -67,12 +80,12 @@ public final class KSMetaData
 
     public static KSMetaData newKeyspace(String name, Class<? extends AbstractReplicationStrategy> strategyClass, Map<String, String> options, boolean durablesWrites, Iterable<CFMetaData> cfDefs)
     {
-        return new KSMetaData(name, strategyClass, options, durablesWrites, cfDefs);
+        return new KSMetaData(name, strategyClass, options, durablesWrites, cfDefs, new UTMetaData());
     }
 
     public static KSMetaData cloneWith(KSMetaData ksm, Iterable<CFMetaData> cfDefs)
     {
-        return new KSMetaData(ksm.name, ksm.strategyClass, ksm.strategyOptions, ksm.durableWrites, cfDefs);
+        return new KSMetaData(ksm.name, ksm.strategyClass, ksm.strategyOptions, ksm.durableWrites, cfDefs, ksm.userTypes);
     }
 
     public static KSMetaData systemKeyspace()
@@ -127,7 +140,8 @@ public final class KSMetaData
                 && ObjectUtils.equals(other.strategyClass, strategyClass)
                 && ObjectUtils.equals(other.strategyOptions, strategyOptions)
                 && other.cfMetaData.equals(cfMetaData)
-                && other.durableWrites == durableWrites;
+                && other.durableWrites == durableWrites
+                && ObjectUtils.equals(other.userTypes, userTypes);
     }
 
     public Map<String, CFMetaData> cfMetaData()
@@ -215,7 +229,6 @@ public final class KSMetaData
         return this;
     }
 
-
     public KSMetaData reloadAttributes()
     {
         Row ksDefRow = SystemKeyspace.readSchemaRow(name);
@@ -223,7 +236,7 @@ public final class KSMetaData
         if (ksDefRow.cf == null)
             throw new RuntimeException(String.format("%s not found in the schema definitions keyspaceName (%s).", name, SystemKeyspace.SCHEMA_KEYSPACES_CF));
 
-        return fromSchema(ksDefRow, Collections.<CFMetaData>emptyList());
+        return fromSchema(ksDefRow, Collections.<CFMetaData>emptyList(), userTypes);
     }
 
     public Mutation dropFromSchema(long timestamp)
@@ -234,6 +247,7 @@ public final class KSMetaData
         mutation.delete(SystemKeyspace.SCHEMA_COLUMNFAMILIES_CF, timestamp);
         mutation.delete(SystemKeyspace.SCHEMA_COLUMNS_CF, timestamp);
         mutation.delete(SystemKeyspace.SCHEMA_TRIGGERS_CF, timestamp);
+        mutation.delete(SystemKeyspace.SCHEMA_USER_TYPES_CF, timestamp);
 
         return mutation;
     }
@@ -251,6 +265,7 @@ public final class KSMetaData
         for (CFMetaData cfm : cfMetaData.values())
             cfm.toSchema(mutation, timestamp);
 
+        userTypes.toSchema(mutation, timestamp);
         return mutation;
     }
 
@@ -261,7 +276,7 @@ public final class KSMetaData
      *
      * @return deserialized keyspace without cf_defs
      */
-    public static KSMetaData fromSchema(Row row, Iterable<CFMetaData> cfms)
+    public static KSMetaData fromSchema(Row row, Iterable<CFMetaData> cfms, UTMetaData userTypes)
     {
         UntypedResultSet.Row result = QueryProcessor.resultify("SELECT * FROM system.schema_keyspaces", row).one();
         try
@@ -270,7 +285,8 @@ public final class KSMetaData
                                   AbstractReplicationStrategy.getClass(result.getString("strategy_class")),
                                   fromJsonMap(result.getString("strategy_options")),
                                   result.getBoolean("durable_writes"),
-                                  cfms);
+                                  cfms,
+                                  userTypes);
         }
         catch (ConfigurationException e)
         {
@@ -286,10 +302,11 @@ public final class KSMetaData
      *
      * @return deserialized keyspace with cf_defs
      */
-    public static KSMetaData fromSchema(Row serializedKs, Row serializedCFs)
+    public static KSMetaData fromSchema(Row serializedKs, Row serializedCFs, Row serializedUserTypes)
     {
         Map<String, CFMetaData> cfs = deserializeColumnFamilies(serializedCFs);
-        return fromSchema(serializedKs, cfs.values());
+        UTMetaData userTypes = new UTMetaData(UTMetaData.fromSchema(serializedUserTypes));
+        return fromSchema(serializedKs, cfs.values(), userTypes);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d63d07b9/src/java/org/apache/cassandra/config/Schema.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/Schema.java b/src/java/org/apache/cassandra/config/Schema.java
index a38c097..e65b399 100644
--- a/src/java/org/apache/cassandra/config/Schema.java
+++ b/src/java/org/apache/cassandra/config/Schema.java
@@ -61,8 +61,6 @@ public class Schema
     /* metadata map for faster ColumnFamily lookup */
     private final BiMap<Pair<String, String>, UUID> cfIdMap = HashBiMap.create();
 
-    public final UTMetaData userTypes = new UTMetaData();
-
     private volatile UUID version;
 
     // 59adb24e-f3cd-3e02-97f0-5b395827453f
@@ -119,24 +117,6 @@ public class Schema
         return this;
     }
 
-    public Schema loadUserTypes()
-    {
-        userTypes.addAll(UTMetaData.fromSchema(SystemKeyspace.serializedSchema(SystemKeyspace.SCHEMA_USER_TYPES_CF)));
-        return this;
-    }
-
-    public Schema loadType(UserType newType)
-    {
-        userTypes.addType(newType);
-        return this;
-    }
-
-    public Schema dropType(UserType droppedType)
-    {
-        userTypes.removeType(droppedType);
-        return this;
-    }
-
     /**
      * Get keyspace instance by name
      *
@@ -422,8 +402,7 @@ public class Schema
     {
         try
         {
-            return !row.cf.metadata().cfName.equals(SystemKeyspace.SCHEMA_USER_TYPES_CF)
-                && systemKeyspaceNames.contains(ByteBufferUtil.string(row.key.key));
+            return systemKeyspaceNames.contains(ByteBufferUtil.string(row.key.key));
         }
         catch (CharacterCodingException e)
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d63d07b9/src/java/org/apache/cassandra/config/UTMetaData.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/UTMetaData.java b/src/java/org/apache/cassandra/config/UTMetaData.java
index 76f3999..b3061bb 100644
--- a/src/java/org/apache/cassandra/config/UTMetaData.java
+++ b/src/java/org/apache/cassandra/config/UTMetaData.java
@@ -35,27 +35,23 @@ import org.apache.cassandra.utils.ByteBufferUtil;
  */
 public final class UTMetaData
 {
-    private static final ColumnIdentifier COLUMN_NAMES = new ColumnIdentifier("column_names", false);
-    private static final ColumnIdentifier COLUMN_TYPES = new ColumnIdentifier("column_types", false);
+    private final Map<ByteBuffer, UserType> userTypes;
 
-    private final Map<ByteBuffer, UserType> userTypes = new HashMap<>();
-
-    // Only for Schema. You should generally not create instance of this, but rather use
-    // the global reference Schema.instance().userTypes;
-    UTMetaData() {}
+    public UTMetaData()
+    {
+        this(new HashMap<ByteBuffer, UserType>());
+    }
 
-    public static UTMetaData fromSchema(UntypedResultSet rows)
+    UTMetaData(Map<ByteBuffer, UserType> types)
     {
-        UTMetaData m = new UTMetaData();
-        for (UntypedResultSet.Row row : rows)
-            m.addType(fromSchema(row));
-        return m;
+        this.userTypes = types;
     }
 
     private static UserType fromSchema(UntypedResultSet.Row row)
     {
         try
         {
+            String keyspace = row.getString("keyspace_name");
             ByteBuffer name = ByteBufferUtil.bytes(row.getString("type_name"));
             List<String> rawColumns = row.getList("column_names", UTF8Type.instance);
             List<String> rawTypes = row.getList("column_types", UTF8Type.instance);
@@ -68,7 +64,7 @@ public final class UTMetaData
             for (String rawType : rawTypes)
                 types.add(TypeParser.parse(rawType));
 
-            return new UserType(name, columns, types);
+            return new UserType(keyspace, name, columns, types);
         }
         catch (RequestValidationException e)
         {
@@ -77,54 +73,58 @@ public final class UTMetaData
         }
     }
 
-    public static UTMetaData fromSchema(List<Row> rows)
+    public static Map<ByteBuffer, UserType> fromSchema(Row row)
     {
-        UntypedResultSet result = QueryProcessor.resultify("SELECT * FROM system." + SystemKeyspace.SCHEMA_USER_TYPES_CF, rows);
-        return fromSchema(result);
+        UntypedResultSet results = QueryProcessor.resultify("SELECT * FROM system." + SystemKeyspace.SCHEMA_USER_TYPES_CF, row);
+        Map<ByteBuffer, UserType> types = new HashMap<>(results.size());
+        for (UntypedResultSet.Row result : results)
+        {
+            UserType type = fromSchema(result);
+            types.put(type.name, type);
+        }
+        return types;
     }
 
     public static Mutation toSchema(UserType newType, long timestamp)
     {
-        Mutation mutation = new Mutation(Keyspace.SYSTEM_KS, newType.name);
+        return toSchema(new Mutation(Keyspace.SYSTEM_KS, SystemKeyspace.getSchemaKSKey(newType.keyspace)), newType, timestamp);
+    }
+
+    public static Mutation toSchema(Mutation mutation, UserType newType, long timestamp)
+    {
         ColumnFamily cf = mutation.addOrGet(SystemKeyspace.SCHEMA_USER_TYPES_CF);
 
-        CFMetaData cfm = CFMetaData.SchemaUserTypesCf;
-        UpdateParameters params = new UpdateParameters(cfm, Collections.<ByteBuffer>emptyList(), timestamp, 0, null);
-        Composite prefix = cfm.comparator.builder().build();
+        Composite prefix = CFMetaData.SchemaUserTypesCf.comparator.make(newType.name);
+        CFRowAdder adder = new CFRowAdder(cf, prefix, timestamp);
 
-        List<ByteBuffer> columnTypes = new ArrayList<>(newType.types.size());
-        for (AbstractType<?> type : newType.types)
-            columnTypes.add(ByteBufferUtil.bytes(type.toString()));
+        adder.resetCollection("column_names");
+        adder.resetCollection("column_types");
 
-        try
-        {
-            new Lists.Setter(cfm.getColumnDefinition(COLUMN_NAMES), new Lists.Value(newType.columnNames)).execute(newType.name, cf, prefix, params);
-            new Lists.Setter(cfm.getColumnDefinition(COLUMN_TYPES), new Lists.Value(columnTypes)).execute(newType.name, cf, prefix, params);
-        }
-        catch (RequestValidationException e)
-        {
-            throw new AssertionError();
-        }
+        for (ByteBuffer name : newType.columnNames)
+            adder.addListEntry("column_names", name);
+        for (AbstractType<?> type : newType.types)
+            adder.addListEntry("column_types", type.toString());
 
         return mutation;
     }
 
-    public static Mutation dropFromSchema(UserType droppedType, long timestamp)
+    public Mutation toSchema(Mutation mutation, long timestamp)
     {
-        Mutation mutation = new Mutation(Keyspace.SYSTEM_KS, droppedType.name);
-        mutation.delete(SystemKeyspace.SCHEMA_USER_TYPES_CF, timestamp);
+        for (UserType ut : userTypes.values())
+            toSchema(mutation, ut, timestamp);
         return mutation;
     }
 
-    public void addAll(UTMetaData types)
+    public static Mutation dropFromSchema(UserType droppedType, long timestamp)
     {
-        for (UserType type : types.userTypes.values())
-            addType(type);
-    }
+        Mutation mutation = new Mutation(Keyspace.SYSTEM_KS, SystemKeyspace.getSchemaKSKey(droppedType.keyspace));
+        ColumnFamily cf = mutation.addOrGet(SystemKeyspace.SCHEMA_USER_TYPES_CF);
+        int ldt = (int) (System.currentTimeMillis() / 1000);
 
-    public UserType getType(ColumnIdentifier typeName)
-    {
-        return getType(typeName.bytes);
+        Composite prefix = CFMetaData.SchemaUserTypesCf.comparator.make(droppedType.name);
+        cf.addAtom(new RangeTombstone(prefix, prefix.end(), timestamp, ldt));
+
+        return mutation;
     }
 
     public UserType getType(ByteBuffer typeName)
@@ -138,9 +138,7 @@ public final class UTMetaData
         return new HashMap<>(userTypes);
     }
 
-    // This is *not* thread safe. As far as the global instance is concerned, only
-    // Schema.loadType() (which is only called in DefsTables that is synchronized)
-    // should use this.
+    // This is *not* thread safe but is only called in DefsTables that is synchronized.
     public void addType(UserType type)
     {
         UserType old = userTypes.get(type.name);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d63d07b9/src/java/org/apache/cassandra/cql3/AbstractMarker.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/AbstractMarker.java b/src/java/org/apache/cassandra/cql3/AbstractMarker.java
index 165cb00..2b9c6c9 100644
--- a/src/java/org/apache/cassandra/cql3/AbstractMarker.java
+++ b/src/java/org/apache/cassandra/cql3/AbstractMarker.java
@@ -58,7 +58,7 @@ public abstract class AbstractMarker extends Term.NonTerminal
             this.bindIndex = bindIndex;
         }
 
-        public AbstractMarker prepare(ColumnSpecification receiver) throws InvalidRequestException
+        public AbstractMarker prepare(String keyspace, ColumnSpecification receiver) throws InvalidRequestException
         {
             if (!(receiver.type instanceof CollectionType))
                 return new Constants.Marker(bindIndex, receiver);
@@ -72,7 +72,7 @@ public abstract class AbstractMarker extends Term.NonTerminal
             throw new AssertionError();
         }
 
-        public boolean isAssignableTo(ColumnSpecification receiver)
+        public boolean isAssignableTo(String keyspace, ColumnSpecification receiver)
         {
             return true;
         }
@@ -99,7 +99,7 @@ public abstract class AbstractMarker extends Term.NonTerminal
         }
 
         @Override
-        public AbstractMarker prepare(ColumnSpecification receiver) throws InvalidRequestException
+        public AbstractMarker prepare(String keyspace, ColumnSpecification receiver) throws InvalidRequestException
         {
             if (receiver.type instanceof CollectionType)
                 throw new InvalidRequestException("Invalid IN relation on collection column");

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d63d07b9/src/java/org/apache/cassandra/cql3/AssignementTestable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/AssignementTestable.java b/src/java/org/apache/cassandra/cql3/AssignementTestable.java
index d707809..2253cf7 100644
--- a/src/java/org/apache/cassandra/cql3/AssignementTestable.java
+++ b/src/java/org/apache/cassandra/cql3/AssignementTestable.java
@@ -17,10 +17,12 @@
  */
 package org.apache.cassandra.cql3;
 
+import org.apache.cassandra.exceptions.InvalidRequestException;
+
 public interface AssignementTestable
 {
     /**
      * @return whether this object can be assigned to the provided receiver
      */
-    public boolean isAssignableTo(ColumnSpecification receiver);
+    public boolean isAssignableTo(String keyspace, ColumnSpecification receiver) throws InvalidRequestException;
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d63d07b9/src/java/org/apache/cassandra/cql3/Attributes.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/Attributes.java b/src/java/org/apache/cassandra/cql3/Attributes.java
index 97ce31a..7c33e5b 100644
--- a/src/java/org/apache/cassandra/cql3/Attributes.java
+++ b/src/java/org/apache/cassandra/cql3/Attributes.java
@@ -120,8 +120,8 @@ public class Attributes
 
         public Attributes prepare(String ksName, String cfName) throws InvalidRequestException
         {
-            Term ts = timestamp == null ? null : timestamp.prepare(timestampReceiver(ksName, cfName));
-            Term ttl = timeToLive == null ? null : timeToLive.prepare(timeToLiveReceiver(ksName, cfName));
+            Term ts = timestamp == null ? null : timestamp.prepare(ksName, timestampReceiver(ksName, cfName));
+            Term ttl = timeToLive == null ? null : timeToLive.prepare(ksName, timeToLiveReceiver(ksName, cfName));
             return new Attributes(ts, ttl);
         }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d63d07b9/src/java/org/apache/cassandra/cql3/CQL3Type.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/CQL3Type.java b/src/java/org/apache/cassandra/cql3/CQL3Type.java
index fa6e3a0..2ff6fb5 100644
--- a/src/java/org/apache/cassandra/cql3/CQL3Type.java
+++ b/src/java/org/apache/cassandra/cql3/CQL3Type.java
@@ -19,6 +19,7 @@ package org.apache.cassandra.cql3;
 
 import java.nio.ByteBuffer;
 
+import org.apache.cassandra.config.KSMetaData;
 import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.db.marshal.*;
 import org.apache.cassandra.exceptions.InvalidRequestException;
@@ -28,8 +29,6 @@ import org.apache.cassandra.exceptions.SyntaxException;
 public interface CQL3Type
 {
     public boolean isCollection();
-    public boolean isCounter();
-    public boolean isUserType();
     public AbstractType<?> getType();
 
     public enum Native implements CQL3Type
@@ -68,16 +67,6 @@ public interface CQL3Type
             return type;
         }
 
-        public boolean isCounter()
-        {
-            return this == COUNTER;
-        }
-
-        public boolean isUserType()
-        {
-            return false;
-        }
-
         @Override
         public String toString()
         {
@@ -109,16 +98,6 @@ public interface CQL3Type
             return type;
         }
 
-        public boolean isCounter()
-        {
-            return false;
-        }
-
-        public boolean isUserType()
-        {
-            return false;
-        }
-
         @Override
         public final boolean equals(Object o)
         {
@@ -144,61 +123,21 @@ public interface CQL3Type
 
     public static class Collection implements CQL3Type
     {
-        CollectionType type;
+        private final CollectionType type;
 
         public Collection(CollectionType type)
         {
             this.type = type;
         }
 
-        public static Collection map(CQL3Type t1, CQL3Type t2) throws InvalidRequestException
-        {
-            if (t1.isCollection() || t2.isCollection())
-                throw new InvalidRequestException("map type cannot contain another collection");
-            if (t1.isCounter() || t2.isCounter())
-                throw new InvalidRequestException("counters are not allowed inside a collection");
-
-            return new Collection(MapType.getInstance(t1.getType(), t2.getType()));
-        }
-
-        public static Collection list(CQL3Type t) throws InvalidRequestException
-        {
-            if (t.isCollection())
-                throw new InvalidRequestException("list type cannot contain another collection");
-            if (t.isCounter())
-                throw new InvalidRequestException("counters are not allowed inside a collection");
-
-            return new Collection(ListType.getInstance(t.getType()));
-        }
-
-        public static Collection set(CQL3Type t) throws InvalidRequestException
-        {
-            if (t.isCollection())
-                throw new InvalidRequestException("set type cannot contain another collection");
-            if (t.isCounter())
-                throw new InvalidRequestException("counters are not allowed inside a collection");
-
-            return new Collection(SetType.getInstance(t.getType()));
-        }
-
-        public boolean isCollection()
-        {
-            return true;
-        }
-
         public AbstractType<?> getType()
         {
             return type;
         }
 
-        public boolean isCounter()
-        {
-            return false;
-        }
-
-        public boolean isUserType()
+        public boolean isCollection()
         {
-            return false;
+            return true;
         }
 
         @Override
@@ -237,32 +176,18 @@ public interface CQL3Type
     public static class UserDefined implements CQL3Type
     {
         // Keeping this separatly from type just to simplify toString()
-        ColumnIdentifier name;
-        UserType type;
+        private final String name;
+        private final UserType type;
 
-        private UserDefined(ColumnIdentifier name, UserType type)
+        private UserDefined(String name, UserType type)
         {
             this.name = name;
             this.type = type;
         }
 
-        public static UserDefined create(ByteBuffer name, UserType type)
+        public static UserDefined create(UserType type)
         {
-            return new UserDefined(new ColumnIdentifier(name, UTF8Type.instance), type);
-        }
-
-        public static UserDefined create(ColumnIdentifier name) throws InvalidRequestException
-        {
-            UserType type = Schema.instance.userTypes.getType(name);
-            if (type == null)
-                throw new InvalidRequestException("Unknown type " + name);
-
-            return new UserDefined(name, type);
-        }
-
-        public boolean isUserType()
-        {
-            return true;
+            return new UserDefined(UTF8Type.instance.compose(type.name), type);
         }
 
         public boolean isCollection()
@@ -270,11 +195,6 @@ public interface CQL3Type
             return false;
         }
 
-        public boolean isCounter()
-        {
-            return false;
-        }
-
         public AbstractType<?> getType()
         {
             return type;
@@ -299,7 +219,163 @@ public interface CQL3Type
         @Override
         public String toString()
         {
-            return name.toString();
+            return name;
         }
     }
+
+    // For UserTypes, we need to know the current keyspace to resolve the
+    // actual type used, so Raw is a "not yet prepared" CQL3Type.
+    public abstract class Raw
+    {
+        public boolean isCollection()
+        {
+            return false;
+        }
+
+        public boolean isCounter()
+        {
+            return false;
+        }
+
+        public abstract CQL3Type prepare(String keyspace) throws InvalidRequestException;
+
+        public static Raw from(CQL3Type type)
+        {
+            return new RawType(type);
+        }
+
+        public static Raw userType(UTName name)
+        {
+            return new RawUT(name);
+        }
+
+        public static Raw map(CQL3Type.Raw t1, CQL3Type.Raw t2) throws InvalidRequestException
+        {
+            if (t1.isCollection() || t2.isCollection())
+                throw new InvalidRequestException("map type cannot contain another collection");
+            if (t1.isCounter() || t2.isCounter())
+                throw new InvalidRequestException("counters are not allowed inside a collection");
+
+            return new RawCollection(CollectionType.Kind.MAP, t1, t2);
+        }
+
+        public static Raw list(CQL3Type.Raw t) throws InvalidRequestException
+        {
+            if (t.isCollection())
+                throw new InvalidRequestException("list type cannot contain another collection");
+            if (t.isCounter())
+                throw new InvalidRequestException("counters are not allowed inside a collection");
+
+            return new RawCollection(CollectionType.Kind.LIST, null, t);
+        }
+
+        public static Raw set(CQL3Type.Raw t) throws InvalidRequestException
+        {
+            if (t.isCollection())
+                throw new InvalidRequestException("set type cannot contain another collection");
+            if (t.isCounter())
+                throw new InvalidRequestException("counters are not allowed inside a collection");
+
+            return new RawCollection(CollectionType.Kind.SET, null, t);
+        }
+
+        private static class RawType extends Raw
+        {
+            private CQL3Type type;
+
+            private RawType(CQL3Type type)
+            {
+                this.type = type;
+            }
+
+            public CQL3Type prepare(String keyspace) throws InvalidRequestException
+            {
+                return type;
+            }
+
+            public boolean isCounter()
+            {
+                return type == Native.COUNTER;
+            }
+
+            @Override
+            public String toString()
+            {
+                return type.toString();
+            }
+        }
+
+        private static class RawCollection extends Raw
+        {
+            private final CollectionType.Kind kind;
+            private final CQL3Type.Raw keys;
+            private final CQL3Type.Raw values;
+
+            private RawCollection(CollectionType.Kind kind, CQL3Type.Raw keys, CQL3Type.Raw values)
+            {
+                this.kind = kind;
+                this.keys = keys;
+                this.values = values;
+            }
+
+            public boolean isCollection()
+            {
+                return true;
+            }
+
+            public CQL3Type prepare(String keyspace) throws InvalidRequestException
+            {
+                switch (kind)
+                {
+                    case LIST: return new Collection(ListType.getInstance(values.prepare(keyspace).getType()));
+                    case SET:  return new Collection(SetType.getInstance(values.prepare(keyspace).getType()));
+                    case MAP:  return new Collection(MapType.getInstance(keys.prepare(keyspace).getType(), values.prepare(keyspace).getType()));
+                }
+                throw new AssertionError();
+            }
+
+            @Override
+            public String toString()
+            {
+                switch (kind)
+                {
+                    case LIST: return "list<" + values + ">";
+                    case SET:  return "set<" + values + ">";
+                    case MAP:  return "map<" + keys + ", " + values + ">";
+                }
+                throw new AssertionError();
+            }
+        }
+
+        private static class RawUT extends Raw
+        {
+
+            private final UTName name;
+
+            private RawUT(UTName name)
+            {
+                this.name = name;
+            }
+
+            public CQL3Type prepare(String keyspace) throws InvalidRequestException
+            {
+                name.setKeyspace(keyspace);
+
+                KSMetaData ksm = Schema.instance.getKSMetaData(name.getKeyspace());
+                if (ksm == null)
+                    throw new InvalidRequestException("Unknown keyspace " + name.getKeyspace());
+                UserType type = ksm.userTypes.getType(name.getUserTypeName());
+                if (type == null)
+                    throw new InvalidRequestException("Unknown type " + name);
+
+                return new UserDefined(name.toString(), type);
+            }
+
+            @Override
+            public String toString()
+            {
+                return name.toString();
+            }
+    }
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d63d07b9/src/java/org/apache/cassandra/cql3/Constants.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/Constants.java b/src/java/org/apache/cassandra/cql3/Constants.java
index 1784752..44e96ef 100644
--- a/src/java/org/apache/cassandra/cql3/Constants.java
+++ b/src/java/org/apache/cassandra/cql3/Constants.java
@@ -60,15 +60,15 @@ public abstract class Constants
             }
         };
 
-        public Term prepare(ColumnSpecification receiver) throws InvalidRequestException
+        public Term prepare(String keyspace, ColumnSpecification receiver) throws InvalidRequestException
         {
-            if (!isAssignableTo(receiver))
+            if (!isAssignableTo(keyspace, receiver))
                 throw new InvalidRequestException("Invalid null value for counter increment/decrement");
 
             return NULL_VALUE;
         }
 
-        public boolean isAssignableTo(ColumnSpecification receiver)
+        public boolean isAssignableTo(String keyspace, ColumnSpecification receiver)
         {
             return !(receiver.type instanceof CounterColumnType);
         }
@@ -122,9 +122,9 @@ public abstract class Constants
             return new Literal(Type.HEX, text);
         }
 
-        public Value prepare(ColumnSpecification receiver) throws InvalidRequestException
+        public Value prepare(String keyspace, ColumnSpecification receiver) throws InvalidRequestException
         {
-            if (!isAssignableTo(receiver))
+            if (!isAssignableTo(keyspace, receiver))
                 throw new InvalidRequestException(String.format("Invalid %s constant (%s) for %s of type %s", type, text, receiver, receiver.type.asCQL3Type()));
 
             return new Value(parsedValue(receiver.type));
@@ -154,7 +154,7 @@ public abstract class Constants
             return text;
         }
 
-        public boolean isAssignableTo(ColumnSpecification receiver)
+        public boolean isAssignableTo(String keyspace, ColumnSpecification receiver)
         {
             CQL3Type receiverType = receiver.type.asCQL3Type();
             if (receiverType.isCollection())

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d63d07b9/src/java/org/apache/cassandra/cql3/Cql.g
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/Cql.g b/src/java/org/apache/cassandra/cql3/Cql.g
index 15c18db..f9f66df 100644
--- a/src/java/org/apache/cassandra/cql3/Cql.g
+++ b/src/java/org/apache/cassandra/cql3/Cql.g
@@ -533,7 +533,7 @@ cfamOrdering[CreateTableStatement.RawStatement expr]
 createTypeStatement returns [CreateTypeStatement expr]
     @init { boolean ifNotExists = false; }
     : K_CREATE K_TYPE (K_IF K_NOT K_EXISTS { ifNotExists = true; } )?
-         tn=non_type_ident { $expr = new CreateTypeStatement(tn, ifNotExists); }
+         tn=userTypeName { $expr = new CreateTypeStatement(tn, ifNotExists); }
          '(' typeColumns[expr] ( ',' typeColumns[expr]? )* ')'
     ;
 
@@ -617,10 +617,10 @@ alterTableStatement returns [AlterTableStatement expr]
  * ALTER TYPE <name> RENAME <field> TO <newtype> AND ...;
  */
 alterTypeStatement returns [AlterTypeStatement expr]
-    : K_ALTER K_TYPE name=non_type_ident
+    : K_ALTER K_TYPE name=userTypeName
           ( K_ALTER f=cident K_TYPE v=comparatorType { $expr = AlterTypeStatement.alter(name, f, v); }
           | K_ADD   f=cident v=comparatorType        { $expr = AlterTypeStatement.addition(name, f, v); }
-          | K_RENAME K_TO new_name=non_type_ident    { $expr = AlterTypeStatement.typeRename(name, new_name); }
+          | K_RENAME K_TO new_name=userTypeName      { $expr = AlterTypeStatement.typeRename(name, new_name); }
           | K_RENAME
                { Map<ColumnIdentifier, ColumnIdentifier> renames = new HashMap<ColumnIdentifier, ColumnIdentifier>(); }
                  id1=cident K_TO toId1=cident { renames.put(id1, toId1); }
@@ -651,7 +651,7 @@ dropTableStatement returns [DropTableStatement stmt]
  */
 dropTypeStatement returns [DropTypeStatement stmt]
     @init { boolean ifExists = false; }
-    : K_DROP K_TYPE (K_IF K_EXISTS { ifExists = true; } )? name=non_type_ident { $stmt = new DropTypeStatement(name, ifExists); }
+    : K_DROP K_TYPE (K_IF K_EXISTS { ifExists = true; } )? name=userTypeName { $stmt = new DropTypeStatement(name, ifExists); }
     ;
 
 /**
@@ -801,6 +801,10 @@ columnFamilyName returns [CFName name]
     : (cfOrKsName[name, true] '.')? cfOrKsName[name, false]
     ;
 
+userTypeName returns [UTName name]
+    : (ks=cident '.')? ut=non_type_ident { return new UTName(ks, ut); }
+    ;
+
 cfOrKsName[CFName name, boolean isKs]
     : t=IDENT              { if (isKs) $name.setKeyspace($t.text, false); else $name.setColumnFamily($t.text, false); }
     | t=QUOTED_NAME        { if (isKs) $name.setKeyspace($t.text, true); else $name.setColumnFamily($t.text, true); }
@@ -959,14 +963,14 @@ relation[List<Relation> clauses]
     | '(' relation[$clauses] ')'
     ;
 
-comparatorType returns [CQL3Type t]
-    : c=native_type     { $t = c; }
+comparatorType returns [CQL3Type.Raw t]
+    : n=native_type     { $t = CQL3Type.Raw.from(n); }
     | c=collection_type { $t = c; }
-    | id=non_type_ident  { try { $t = CQL3Type.UserDefined.create(id); } catch (InvalidRequestException e) { addRecognitionError(e.getMessage()); }}
+    | id=userTypeName  { $t = CQL3Type.Raw.userType(id); }
     | s=STRING_LITERAL
       {
         try {
-            $t = new CQL3Type.Custom($s.text);
+            $t = CQL3Type.Raw.from(new CQL3Type.Custom($s.text));
         } catch (SyntaxException e) {
             addRecognitionError("Cannot parse type " + $s.text + ": " + e.getMessage());
         } catch (ConfigurationException e) {
@@ -994,17 +998,17 @@ native_type returns [CQL3Type t]
     | K_TIMEUUID  { $t = CQL3Type.Native.TIMEUUID; }
     ;
 
-collection_type returns [CQL3Type pt]
+collection_type returns [CQL3Type.Raw pt]
     : K_MAP  '<' t1=comparatorType ',' t2=comparatorType '>'
         { try {
             // if we can't parse either t1 or t2, antlr will "recover" and we may have t1 or t2 null.
             if (t1 != null && t2 != null)
-                $pt = CQL3Type.Collection.map(t1, t2);
+                $pt = CQL3Type.Raw.map(t1, t2);
           } catch (InvalidRequestException e) { addRecognitionError(e.getMessage()); } }
     | K_LIST '<' t=comparatorType '>'
-        { try { if (t != null) $pt = CQL3Type.Collection.list(t); } catch (InvalidRequestException e) { addRecognitionError(e.getMessage()); } }
+        { try { if (t != null) $pt = CQL3Type.Raw.list(t); } catch (InvalidRequestException e) { addRecognitionError(e.getMessage()); } }
     | K_SET  '<' t=comparatorType '>'
-        { try { if (t != null) $pt = CQL3Type.Collection.set(t); } catch (InvalidRequestException e) { addRecognitionError(e.getMessage()); } }
+        { try { if (t != null) $pt = CQL3Type.Raw.set(t); } catch (InvalidRequestException e) { addRecognitionError(e.getMessage()); } }
     ;
 
 username

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d63d07b9/src/java/org/apache/cassandra/cql3/Lists.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/Lists.java b/src/java/org/apache/cassandra/cql3/Lists.java
index ab21a64..8dbe59c 100644
--- a/src/java/org/apache/cassandra/cql3/Lists.java
+++ b/src/java/org/apache/cassandra/cql3/Lists.java
@@ -63,16 +63,16 @@ public abstract class Lists
             this.elements = elements;
         }
 
-        public Term prepare(ColumnSpecification receiver) throws InvalidRequestException
+        public Term prepare(String keyspace, ColumnSpecification receiver) throws InvalidRequestException
         {
-            validateAssignableTo(receiver);
+            validateAssignableTo(keyspace, receiver);
 
             ColumnSpecification valueSpec = Lists.valueSpecOf(receiver);
             List<Term> values = new ArrayList<Term>(elements.size());
             boolean allTerminal = true;
             for (Term.Raw rt : elements)
             {
-                Term t = rt.prepare(valueSpec);
+                Term t = rt.prepare(keyspace, valueSpec);
 
                 if (t.containsBindMarker())
                     throw new InvalidRequestException(String.format("Invalid list literal for %s: bind variables are not supported inside collection literals", receiver));
@@ -86,7 +86,7 @@ public abstract class Lists
             return allTerminal ? value.bind(Collections.<ByteBuffer>emptyList()) : value;
         }
 
-        private void validateAssignableTo(ColumnSpecification receiver) throws InvalidRequestException
+        private void validateAssignableTo(String keyspace, ColumnSpecification receiver) throws InvalidRequestException
         {
             if (!(receiver.type instanceof ListType))
                 throw new InvalidRequestException(String.format("Invalid list literal for %s of type %s", receiver, receiver.type.asCQL3Type()));
@@ -94,16 +94,16 @@ public abstract class Lists
             ColumnSpecification valueSpec = Lists.valueSpecOf(receiver);
             for (Term.Raw rt : elements)
             {
-                if (!rt.isAssignableTo(valueSpec))
+                if (!rt.isAssignableTo(keyspace, valueSpec))
                     throw new InvalidRequestException(String.format("Invalid list literal for %s: value %s is not of type %s", receiver, rt, valueSpec.type.asCQL3Type()));
             }
         }
 
-        public boolean isAssignableTo(ColumnSpecification receiver)
+        public boolean isAssignableTo(String keyspace, ColumnSpecification receiver)
         {
             try
             {
-                validateAssignableTo(receiver);
+                validateAssignableTo(keyspace, receiver);
                 return true;
             }
             catch (InvalidRequestException e)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d63d07b9/src/java/org/apache/cassandra/cql3/Maps.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/Maps.java b/src/java/org/apache/cassandra/cql3/Maps.java
index d156845..3f9cc95 100644
--- a/src/java/org/apache/cassandra/cql3/Maps.java
+++ b/src/java/org/apache/cassandra/cql3/Maps.java
@@ -64,9 +64,9 @@ public abstract class Maps
             this.entries = entries;
         }
 
-        public Term prepare(ColumnSpecification receiver) throws InvalidRequestException
+        public Term prepare(String keyspace, ColumnSpecification receiver) throws InvalidRequestException
         {
-            validateAssignableTo(receiver);
+            validateAssignableTo(keyspace, receiver);
 
             ColumnSpecification keySpec = Maps.keySpecOf(receiver);
             ColumnSpecification valueSpec = Maps.valueSpecOf(receiver);
@@ -74,8 +74,8 @@ public abstract class Maps
             boolean allTerminal = true;
             for (Pair<Term.Raw, Term.Raw> entry : entries)
             {
-                Term k = entry.left.prepare(keySpec);
-                Term v = entry.right.prepare(valueSpec);
+                Term k = entry.left.prepare(keyspace, keySpec);
+                Term v = entry.right.prepare(keyspace, valueSpec);
 
                 if (k.containsBindMarker() || v.containsBindMarker())
                     throw new InvalidRequestException(String.format("Invalid map literal for %s: bind variables are not supported inside collection literals", receiver));
@@ -89,7 +89,7 @@ public abstract class Maps
             return allTerminal ? value.bind(Collections.<ByteBuffer>emptyList()) : value;
         }
 
-        private void validateAssignableTo(ColumnSpecification receiver) throws InvalidRequestException
+        private void validateAssignableTo(String keyspace, ColumnSpecification receiver) throws InvalidRequestException
         {
             if (!(receiver.type instanceof MapType))
                 throw new InvalidRequestException(String.format("Invalid map literal for %s of type %s", receiver, receiver.type.asCQL3Type()));
@@ -98,18 +98,18 @@ public abstract class Maps
             ColumnSpecification valueSpec = Maps.valueSpecOf(receiver);
             for (Pair<Term.Raw, Term.Raw> entry : entries)
             {
-                if (!entry.left.isAssignableTo(keySpec))
+                if (!entry.left.isAssignableTo(keyspace, keySpec))
                     throw new InvalidRequestException(String.format("Invalid map literal for %s: key %s is not of type %s", receiver, entry.left, keySpec.type.asCQL3Type()));
-                if (!entry.right.isAssignableTo(valueSpec))
+                if (!entry.right.isAssignableTo(keyspace, valueSpec))
                     throw new InvalidRequestException(String.format("Invalid map literal for %s: value %s is not of type %s", receiver, entry.right, valueSpec.type.asCQL3Type()));
             }
         }
 
-        public boolean isAssignableTo(ColumnSpecification receiver)
+        public boolean isAssignableTo(String keyspace, ColumnSpecification receiver)
         {
             try
             {
-                validateAssignableTo(receiver);
+                validateAssignableTo(keyspace, receiver);
                 return true;
             }
             catch (InvalidRequestException e)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d63d07b9/src/java/org/apache/cassandra/cql3/Operation.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/Operation.java b/src/java/org/apache/cassandra/cql3/Operation.java
index 689cee0..3aeed2e 100644
--- a/src/java/org/apache/cassandra/cql3/Operation.java
+++ b/src/java/org/apache/cassandra/cql3/Operation.java
@@ -111,7 +111,7 @@ public abstract class Operation
          * be a true column.
          * @return the prepared update operation.
          */
-        public Operation prepare(ColumnDefinition receiver) throws InvalidRequestException;
+        public Operation prepare(String keyspace, ColumnDefinition receiver) throws InvalidRequestException;
 
         /**
          * @return whether this operation can be applied alongside the {@code
@@ -145,7 +145,7 @@ public abstract class Operation
          * @param receiver the "column" this operation applies to.
          * @return the prepared delete operation.
          */
-        public Operation prepare(ColumnDefinition receiver) throws InvalidRequestException;
+        public Operation prepare(String keyspace, ColumnDefinition receiver) throws InvalidRequestException;
     }
 
     public static class SetValue implements RawUpdate
@@ -157,9 +157,9 @@ public abstract class Operation
             this.value = value;
         }
 
-        public Operation prepare(ColumnDefinition receiver) throws InvalidRequestException
+        public Operation prepare(String keyspace, ColumnDefinition receiver) throws InvalidRequestException
         {
-            Term v = value.prepare(receiver);
+            Term v = value.prepare(keyspace, receiver);
 
             if (receiver.type instanceof CounterColumnType)
                 throw new InvalidRequestException(String.format("Cannot set the value of counter column %s (counters can only be incremented/decremented, not set)", receiver));
@@ -203,7 +203,7 @@ public abstract class Operation
             this.value = value;
         }
 
-        public Operation prepare(ColumnDefinition receiver) throws InvalidRequestException
+        public Operation prepare(String keyspace, ColumnDefinition receiver) throws InvalidRequestException
         {
             if (!(receiver.type instanceof CollectionType))
                 throw new InvalidRequestException(String.format("Invalid operation (%s) for non collection column %s", toString(receiver), receiver));
@@ -211,14 +211,14 @@ public abstract class Operation
             switch (((CollectionType)receiver.type).kind)
             {
                 case LIST:
-                    Term idx = selector.prepare(Lists.indexSpecOf(receiver));
-                    Term lval = value.prepare(Lists.valueSpecOf(receiver));
+                    Term idx = selector.prepare(keyspace, Lists.indexSpecOf(receiver));
+                    Term lval = value.prepare(keyspace, Lists.valueSpecOf(receiver));
                     return new Lists.SetterByIndex(receiver, idx, lval);
                 case SET:
                     throw new InvalidRequestException(String.format("Invalid operation (%s) for set column %s", toString(receiver), receiver));
                 case MAP:
-                    Term key = selector.prepare(Maps.keySpecOf(receiver));
-                    Term mval = value.prepare(Maps.valueSpecOf(receiver));
+                    Term key = selector.prepare(keyspace, Maps.keySpecOf(receiver));
+                    Term mval = value.prepare(keyspace, Maps.valueSpecOf(receiver));
                     return new Maps.SetterByKey(receiver, key, mval);
             }
             throw new AssertionError();
@@ -246,9 +246,9 @@ public abstract class Operation
             this.value = value;
         }
 
-        public Operation prepare(ColumnDefinition receiver) throws InvalidRequestException
+        public Operation prepare(String keyspace, ColumnDefinition receiver) throws InvalidRequestException
         {
-            Term v = value.prepare(receiver);
+            Term v = value.prepare(keyspace, receiver);
 
             if (!(receiver.type instanceof CollectionType))
             {
@@ -289,9 +289,9 @@ public abstract class Operation
             this.value = value;
         }
 
-        public Operation prepare(ColumnDefinition receiver) throws InvalidRequestException
+        public Operation prepare(String keyspace, ColumnDefinition receiver) throws InvalidRequestException
         {
-            Term v = value.prepare(receiver);
+            Term v = value.prepare(keyspace, receiver);
 
             if (!(receiver.type instanceof CollectionType))
             {
@@ -332,9 +332,9 @@ public abstract class Operation
             this.value = value;
         }
 
-        public Operation prepare(ColumnDefinition receiver) throws InvalidRequestException
+        public Operation prepare(String keyspace, ColumnDefinition receiver) throws InvalidRequestException
         {
-            Term v = value.prepare(receiver);
+            Term v = value.prepare(keyspace, receiver);
 
             if (!(receiver.type instanceof ListType))
                 throw new InvalidRequestException(String.format("Invalid operation (%s) for non list column %s", toString(receiver), receiver));
@@ -367,7 +367,7 @@ public abstract class Operation
             return id;
         }
 
-        public Operation prepare(ColumnDefinition receiver) throws InvalidRequestException
+        public Operation prepare(String keyspace, ColumnDefinition receiver) throws InvalidRequestException
         {
             // No validation, deleting a column is always "well typed"
             return new Constants.Deleter(receiver);
@@ -390,7 +390,7 @@ public abstract class Operation
             return id;
         }
 
-        public Operation prepare(ColumnDefinition receiver) throws InvalidRequestException
+        public Operation prepare(String keyspace, ColumnDefinition receiver) throws InvalidRequestException
         {
             if (!(receiver.type instanceof CollectionType))
                 throw new InvalidRequestException(String.format("Invalid deletion operation for non collection column %s", receiver));
@@ -398,13 +398,13 @@ public abstract class Operation
             switch (((CollectionType)receiver.type).kind)
             {
                 case LIST:
-                    Term idx = element.prepare(Lists.indexSpecOf(receiver));
+                    Term idx = element.prepare(keyspace, Lists.indexSpecOf(receiver));
                     return new Lists.DiscarderByIndex(receiver, idx);
                 case SET:
-                    Term elt = element.prepare(Sets.valueSpecOf(receiver));
+                    Term elt = element.prepare(keyspace, Sets.valueSpecOf(receiver));
                     return new Sets.Discarder(receiver, elt);
                 case MAP:
-                    Term key = element.prepare(Maps.keySpecOf(receiver));
+                    Term key = element.prepare(keyspace, Maps.keySpecOf(receiver));
                     return new Maps.DiscarderByKey(receiver, key);
             }
             throw new AssertionError();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d63d07b9/src/java/org/apache/cassandra/cql3/Sets.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/Sets.java b/src/java/org/apache/cassandra/cql3/Sets.java
index 2531b2a..dddea09 100644
--- a/src/java/org/apache/cassandra/cql3/Sets.java
+++ b/src/java/org/apache/cassandra/cql3/Sets.java
@@ -62,9 +62,9 @@ public abstract class Sets
             this.elements = elements;
         }
 
-        public Term prepare(ColumnSpecification receiver) throws InvalidRequestException
+        public Term prepare(String keyspace, ColumnSpecification receiver) throws InvalidRequestException
         {
-            validateAssignableTo(receiver);
+            validateAssignableTo(keyspace, receiver);
 
             // We've parsed empty maps as a set literal to break the ambiguity so
             // handle that case now
@@ -77,7 +77,7 @@ public abstract class Sets
             boolean allTerminal = true;
             for (Term.Raw rt : elements)
             {
-                Term t = rt.prepare(valueSpec);
+                Term t = rt.prepare(keyspace, valueSpec);
 
                 if (t.containsBindMarker())
                     throw new InvalidRequestException(String.format("Invalid set literal for %s: bind variables are not supported inside collection literals", receiver));
@@ -91,7 +91,7 @@ public abstract class Sets
             return allTerminal ? value.bind(Collections.<ByteBuffer>emptyList()) : value;
         }
 
-        private void validateAssignableTo(ColumnSpecification receiver) throws InvalidRequestException
+        private void validateAssignableTo(String keyspace, ColumnSpecification receiver) throws InvalidRequestException
         {
             if (!(receiver.type instanceof SetType))
             {
@@ -106,16 +106,16 @@ public abstract class Sets
             ColumnSpecification valueSpec = Sets.valueSpecOf(receiver);
             for (Term.Raw rt : elements)
             {
-                if (!rt.isAssignableTo(valueSpec))
+                if (!rt.isAssignableTo(keyspace, valueSpec))
                     throw new InvalidRequestException(String.format("Invalid set literal for %s: value %s is not of type %s", receiver, rt, valueSpec.type.asCQL3Type()));
             }
         }
 
-        public boolean isAssignableTo(ColumnSpecification receiver)
+        public boolean isAssignableTo(String keyspace, ColumnSpecification receiver)
         {
             try
             {
-                validateAssignableTo(receiver);
+                validateAssignableTo(keyspace, receiver);
                 return true;
             }
             catch (InvalidRequestException e)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d63d07b9/src/java/org/apache/cassandra/cql3/Term.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/Term.java b/src/java/org/apache/cassandra/cql3/Term.java
index d69fc33..d539ecf 100644
--- a/src/java/org/apache/cassandra/cql3/Term.java
+++ b/src/java/org/apache/cassandra/cql3/Term.java
@@ -88,7 +88,7 @@ public interface Term
          * case this RawTerm describe a list index or a map key, etc...
          * @return the prepared term.
          */
-        public Term prepare(ColumnSpecification receiver) throws InvalidRequestException;
+        public Term prepare(String keyspace, ColumnSpecification receiver) throws InvalidRequestException;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d63d07b9/src/java/org/apache/cassandra/cql3/TypeCast.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/TypeCast.java b/src/java/org/apache/cassandra/cql3/TypeCast.java
index 66b5300..45e8c5b 100644
--- a/src/java/org/apache/cassandra/cql3/TypeCast.java
+++ b/src/java/org/apache/cassandra/cql3/TypeCast.java
@@ -21,34 +21,41 @@ import org.apache.cassandra.exceptions.InvalidRequestException;
 
 public class TypeCast implements Term.Raw
 {
-    private final CQL3Type type;
+    private final CQL3Type.Raw type;
     private final Term.Raw term;
 
-    public TypeCast(CQL3Type type, Term.Raw term)
+    public TypeCast(CQL3Type.Raw type, Term.Raw term)
     {
         this.type = type;
         this.term = term;
     }
 
-    public Term prepare(ColumnSpecification receiver) throws InvalidRequestException
+    public Term prepare(String keyspace, ColumnSpecification receiver) throws InvalidRequestException
     {
-        if (!term.isAssignableTo(castedSpecOf(receiver)))
+        if (!term.isAssignableTo(keyspace, castedSpecOf(keyspace, receiver)))
             throw new InvalidRequestException(String.format("Cannot cast value %s to type %s", term, type));
 
-        if (!isAssignableTo(receiver))
+        if (!isAssignableTo(keyspace, receiver))
             throw new InvalidRequestException(String.format("Cannot assign value %s to %s of type %s", this, receiver, receiver.type.asCQL3Type()));
 
-        return term.prepare(receiver);
+        return term.prepare(keyspace, receiver);
     }
 
-    private ColumnSpecification castedSpecOf(ColumnSpecification receiver)
+    private ColumnSpecification castedSpecOf(String keyspace, ColumnSpecification receiver) throws InvalidRequestException
     {
-        return new ColumnSpecification(receiver.ksName, receiver.cfName, new ColumnIdentifier(toString(), true), type.getType());
+        return new ColumnSpecification(receiver.ksName, receiver.cfName, new ColumnIdentifier(toString(), true), type.prepare(keyspace).getType());
     }
 
-    public boolean isAssignableTo(ColumnSpecification receiver)
+    public boolean isAssignableTo(String keyspace, ColumnSpecification receiver) throws InvalidRequestException
     {
-        return receiver.type.asCQL3Type().equals(type);
+        try
+        {
+            return receiver.type.asCQL3Type().equals(type.prepare(keyspace));
+        }
+        catch (InvalidRequestException e)
+        {
+            throw new AssertionError();
+        }
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d63d07b9/src/java/org/apache/cassandra/cql3/UTName.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/UTName.java b/src/java/org/apache/cassandra/cql3/UTName.java
new file mode 100644
index 0000000..a157720
--- /dev/null
+++ b/src/java/org/apache/cassandra/cql3/UTName.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.cql3;
+
+import java.nio.ByteBuffer;
+
+public class UTName
+{
+    private String ksName;
+    private final ColumnIdentifier utName;
+
+    public UTName(ColumnIdentifier ksName, ColumnIdentifier utName)
+    {
+        this.ksName = ksName == null ? null : ksName.toString();
+        this.utName = utName;
+    }
+
+    public boolean hasKeyspace()
+    {
+        return ksName != null;
+    }
+
+    public void setKeyspace(String keyspace)
+    {
+        this.ksName = keyspace;
+    }
+
+    public String getKeyspace()
+    {
+        return ksName;
+    }
+
+    public ByteBuffer getUserTypeName()
+    {
+        return utName.bytes;
+    }
+
+    @Override
+    public String toString()
+    {
+        return (hasKeyspace() ? (ksName + ".") : "") + utName;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d63d07b9/src/java/org/apache/cassandra/cql3/UserTypes.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/UserTypes.java b/src/java/org/apache/cassandra/cql3/UserTypes.java
index dd546ee..2fd1a0f 100644
--- a/src/java/org/apache/cassandra/cql3/UserTypes.java
+++ b/src/java/org/apache/cassandra/cql3/UserTypes.java
@@ -47,9 +47,9 @@ public abstract class UserTypes
             this.entries = entries;
         }
 
-        public Term prepare(ColumnSpecification receiver) throws InvalidRequestException
+        public Term prepare(String keyspace, ColumnSpecification receiver) throws InvalidRequestException
         {
-            validateAssignableTo(receiver);
+            validateAssignableTo(keyspace, receiver);
 
             UserType ut = (UserType)receiver.type;
             boolean allTerminal = true;
@@ -57,7 +57,7 @@ public abstract class UserTypes
             for (int i = 0; i < ut.types.size(); i++)
             {
                 ColumnIdentifier field = new ColumnIdentifier(ut.columnNames.get(i), UTF8Type.instance);
-                Term value = entries.get(field).prepare(fieldSpecOf(receiver, i));
+                Term value = entries.get(field).prepare(keyspace, fieldSpecOf(receiver, i));
 
                 if (value instanceof Term.NonTerminal)
                     allTerminal = false;
@@ -68,7 +68,7 @@ public abstract class UserTypes
             return allTerminal ? value.bind(Collections.<ByteBuffer>emptyList()) : value;
         }
 
-        private void validateAssignableTo(ColumnSpecification receiver) throws InvalidRequestException
+        private void validateAssignableTo(String keyspace, ColumnSpecification receiver) throws InvalidRequestException
         {
             if (!(receiver.type instanceof UserType))
                 throw new InvalidRequestException(String.format("Invalid user type literal for %s of type %s", receiver, receiver.type.asCQL3Type()));
@@ -82,16 +82,16 @@ public abstract class UserTypes
                     throw new InvalidRequestException(String.format("Invalid user type literal for %s: missing field %s", receiver, field));
 
                 ColumnSpecification fieldSpec = fieldSpecOf(receiver, i);
-                if (!value.isAssignableTo(fieldSpec))
+                if (!value.isAssignableTo(keyspace, fieldSpec))
                     throw new InvalidRequestException(String.format("Invalid user type literal for %s: field %s is not of type %s", receiver, field, fieldSpec.type.asCQL3Type()));
             }
         }
 
-        public boolean isAssignableTo(ColumnSpecification receiver)
+        public boolean isAssignableTo(String keyspace, ColumnSpecification receiver)
         {
             try
             {
-                validateAssignableTo(receiver);
+                validateAssignableTo(keyspace, receiver);
                 return true;
             }
             catch (InvalidRequestException e)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d63d07b9/src/java/org/apache/cassandra/cql3/functions/FunctionCall.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/functions/FunctionCall.java b/src/java/org/apache/cassandra/cql3/functions/FunctionCall.java
index 8db03e6..083543a 100644
--- a/src/java/org/apache/cassandra/cql3/functions/FunctionCall.java
+++ b/src/java/org/apache/cassandra/cql3/functions/FunctionCall.java
@@ -102,15 +102,15 @@ public class FunctionCall extends Term.NonTerminal
             this.terms = terms;
         }
 
-        public Term prepare(ColumnSpecification receiver) throws InvalidRequestException
+        public Term prepare(String keyspace, ColumnSpecification receiver) throws InvalidRequestException
         {
-            Function fun = Functions.get(functionName, terms, receiver);
+            Function fun = Functions.get(keyspace, functionName, terms, receiver);
 
             List<Term> parameters = new ArrayList<Term>(terms.size());
             boolean allTerminal = true;
             for (int i = 0; i < terms.size(); i++)
             {
-                Term t = terms.get(i).prepare(Functions.makeArgSpec(receiver, fun, i));
+                Term t = terms.get(i).prepare(keyspace, Functions.makeArgSpec(receiver, fun, i));
                 if (t instanceof NonTerminal)
                     allTerminal = false;
                 parameters.add(t);
@@ -135,7 +135,7 @@ public class FunctionCall extends Term.NonTerminal
             return fun.execute(buffers);
         }
 
-        public boolean isAssignableTo(ColumnSpecification receiver)
+        public boolean isAssignableTo(String keyspace, ColumnSpecification receiver)
         {
             AbstractType<?> returnType = Functions.getReturnType(functionName, receiver.ksName, receiver.cfName);
             // Note: if returnType == null, it means the function doesn't exist. We may get this if an undefined function

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d63d07b9/src/java/org/apache/cassandra/cql3/functions/Functions.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/functions/Functions.java b/src/java/org/apache/cassandra/cql3/functions/Functions.java
index 97a0e91..27f8b3c 100644
--- a/src/java/org/apache/cassandra/cql3/functions/Functions.java
+++ b/src/java/org/apache/cassandra/cql3/functions/Functions.java
@@ -77,7 +77,7 @@ public abstract class Functions
                 fun.argsType().get(i));
     }
 
-    public static Function get(String name, List<? extends AssignementTestable> providedArgs, ColumnSpecification receiver) throws InvalidRequestException
+    public static Function get(String keyspace, String name, List<? extends AssignementTestable> providedArgs, ColumnSpecification receiver) throws InvalidRequestException
     {
         List<Function.Factory> factories = declared.get(name.toLowerCase());
         if (factories.isEmpty())
@@ -87,7 +87,7 @@ public abstract class Functions
         if (factories.size() == 1)
         {
             Function fun = factories.get(0).create(receiver.ksName, receiver.cfName);
-            validateTypes(fun, providedArgs, receiver);
+            validateTypes(keyspace, fun, providedArgs, receiver);
             return fun;
         }
 
@@ -95,7 +95,7 @@ public abstract class Functions
         for (Function.Factory factory : factories)
         {
             Function toTest = factory.create(receiver.ksName, receiver.cfName);
-            if (!isValidType(toTest, providedArgs, receiver))
+            if (!isValidType(keyspace, toTest, providedArgs, receiver))
                 continue;
 
             if (candidate == null)
@@ -108,7 +108,7 @@ public abstract class Functions
         return candidate;
     }
 
-    private static void validateTypes(Function fun, List<? extends AssignementTestable> providedArgs, ColumnSpecification receiver) throws InvalidRequestException
+    private static void validateTypes(String keyspace, Function fun, List<? extends AssignementTestable> providedArgs, ColumnSpecification receiver) throws InvalidRequestException
     {
         if (!receiver.type.asCQL3Type().equals(fun.returnType().asCQL3Type()))
             throw new InvalidRequestException(String.format("Type error: cannot assign result of function %s (type %s) to %s (type %s)", fun.name(), fun.returnType().asCQL3Type(), receiver, receiver.type.asCQL3Type()));
@@ -126,12 +126,12 @@ public abstract class Functions
                 continue;
 
             ColumnSpecification expected = makeArgSpec(receiver, fun, i);
-            if (!provided.isAssignableTo(expected))
+            if (!provided.isAssignableTo(keyspace, expected))
                 throw new InvalidRequestException(String.format("Type error: %s cannot be passed as argument %d of function %s of type %s", provided, i, fun.name(), expected.type.asCQL3Type()));
         }
     }
 
-    private static boolean isValidType(Function fun, List<? extends AssignementTestable> providedArgs, ColumnSpecification receiver)
+    private static boolean isValidType(String keyspace, Function fun, List<? extends AssignementTestable> providedArgs, ColumnSpecification receiver) throws InvalidRequestException
     {
         if (!receiver.type.asCQL3Type().equals(fun.returnType().asCQL3Type()))
             return false;
@@ -149,7 +149,7 @@ public abstract class Functions
                 continue;
 
             ColumnSpecification expected = makeArgSpec(receiver, fun, i);
-            if (!provided.isAssignableTo(expected))
+            if (!provided.isAssignableTo(keyspace, expected))
                 return false;
         }
         return true;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d63d07b9/src/java/org/apache/cassandra/cql3/statements/AlterTableStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/AlterTableStatement.java b/src/java/org/apache/cassandra/cql3/statements/AlterTableStatement.java
index eb52a13..6f90a02 100644
--- a/src/java/org/apache/cassandra/cql3/statements/AlterTableStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/AlterTableStatement.java
@@ -41,12 +41,12 @@ public class AlterTableStatement extends SchemaAlteringStatement
     }
 
     public final Type oType;
-    public final CQL3Type validator;
+    public final CQL3Type.Raw validator;
     public final ColumnIdentifier columnName;
     private final CFPropDefs cfProps;
     private final Map<ColumnIdentifier, ColumnIdentifier> renames;
 
-    public AlterTableStatement(CFName name, Type type, ColumnIdentifier columnName, CQL3Type validator, CFPropDefs cfProps, Map<ColumnIdentifier, ColumnIdentifier> renames)
+    public AlterTableStatement(CFName name, Type type, ColumnIdentifier columnName, CQL3Type.Raw validator, CFPropDefs cfProps, Map<ColumnIdentifier, ColumnIdentifier> renames)
     {
         super(name);
         this.oType = type;
@@ -71,6 +71,8 @@ public class AlterTableStatement extends SchemaAlteringStatement
         CFMetaData meta = validateColumnFamily(keyspace(), columnFamily());
         CFMetaData cfm = meta.clone();
 
+        CQL3Type validator = this.validator.prepare(keyspace());
+
         ColumnDefinition def = columnName == null ? null : cfm.getColumnDefinition(columnName);
         switch (oType)
         {