You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2014/01/07 17:47:18 UTC

[2/2] git commit: Make user types keyspace scoped

Make user types keyspace scoped

patch by slebresne; reviewed by iamaleksey for CASSANDRA-6438


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/d63d07b9
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/d63d07b9
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/d63d07b9

Branch: refs/heads/trunk
Commit: d63d07b9270d73a289086c69002b5a0023b2d233
Parents: 0a1b277
Author: Sylvain Lebresne <sy...@datastax.com>
Authored: Wed Dec 4 10:20:40 2013 +0100
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Tue Jan 7 17:47:11 2014 +0100

----------------------------------------------------------------------
 .../org/apache/cassandra/config/CFMetaData.java |   8 +-
 .../cassandra/config/DatabaseDescriptor.java    |   2 -
 .../org/apache/cassandra/config/KSMetaData.java |  35 ++-
 .../org/apache/cassandra/config/Schema.java     |  23 +-
 .../org/apache/cassandra/config/UTMetaData.java |  88 ++++---
 .../apache/cassandra/cql3/AbstractMarker.java   |   6 +-
 .../cassandra/cql3/AssignementTestable.java     |   4 +-
 .../org/apache/cassandra/cql3/Attributes.java   |   4 +-
 .../org/apache/cassandra/cql3/CQL3Type.java     | 256 ++++++++++++-------
 .../org/apache/cassandra/cql3/Constants.java    |  12 +-
 src/java/org/apache/cassandra/cql3/Cql.g        |  28 +-
 src/java/org/apache/cassandra/cql3/Lists.java   |  14 +-
 src/java/org/apache/cassandra/cql3/Maps.java    |  18 +-
 .../org/apache/cassandra/cql3/Operation.java    |  40 +--
 src/java/org/apache/cassandra/cql3/Sets.java    |  14 +-
 src/java/org/apache/cassandra/cql3/Term.java    |   2 +-
 .../org/apache/cassandra/cql3/TypeCast.java     |  27 +-
 src/java/org/apache/cassandra/cql3/UTName.java  |  58 +++++
 .../org/apache/cassandra/cql3/UserTypes.java    |  14 +-
 .../cassandra/cql3/functions/FunctionCall.java  |   8 +-
 .../cassandra/cql3/functions/Functions.java     |  14 +-
 .../cql3/statements/AlterTableStatement.java    |   6 +-
 .../cql3/statements/AlterTypeStatement.java     | 129 +++++-----
 .../cassandra/cql3/statements/CFStatement.java  |  11 +-
 .../cql3/statements/CreateTableStatement.java   |   8 +-
 .../cql3/statements/CreateTypeStatement.java    |  45 ++--
 .../cql3/statements/DeleteStatement.java        |   2 +-
 .../cql3/statements/DropTypeStatement.java      |  51 ++--
 .../cql3/statements/ModificationStatement.java  |   8 +-
 .../cql3/statements/SelectStatement.java        |  12 +-
 .../cassandra/cql3/statements/Selection.java    |   4 +-
 .../cql3/statements/UpdateStatement.java        |   6 +-
 .../org/apache/cassandra/db/CFRowAdder.java     |  21 ++
 .../org/apache/cassandra/db/DefsTables.java     | 113 +++++---
 .../apache/cassandra/db/marshal/TypeParser.java |  10 +-
 .../apache/cassandra/db/marshal/UserType.java   |  19 +-
 .../cassandra/service/MigrationManager.java     |   1 -
 37 files changed, 686 insertions(+), 435 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/d63d07b9/src/java/org/apache/cassandra/config/CFMetaData.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/CFMetaData.java b/src/java/org/apache/cassandra/config/CFMetaData.java
index d8ae26a..78ee300 100644
--- a/src/java/org/apache/cassandra/config/CFMetaData.java
+++ b/src/java/org/apache/cassandra/config/CFMetaData.java
@@ -39,6 +39,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.cql3.*;
+import org.apache.cassandra.cql3.statements.CFStatement;
 import org.apache.cassandra.cql3.statements.CreateTableStatement;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.compaction.AbstractCompactionStrategy;
@@ -172,10 +173,11 @@ public final class CFMetaData
                                                               + ") WITH COMMENT='triggers metadata table'");
 
     public static final CFMetaData SchemaUserTypesCf = compile("CREATE TABLE " + SystemKeyspace.SCHEMA_USER_TYPES_CF + " ("
+                                                               + "keyspace_name text,"
                                                                + "type_name text,"
                                                                + "column_names list<text>,"
                                                                + "column_types list<text>,"
-                                                               + "PRIMARY KEY (type_name)"
+                                                               + "PRIMARY KEY (keyspace_name, type_name)"
                                                                + ") WITH COMMENT='Defined user types' AND gc_grace_seconds=8640");
 
     public static final CFMetaData HintsCf = compile("CREATE TABLE " + SystemKeyspace.HINTS_CF + " ("
@@ -517,7 +519,9 @@ public final class CFMetaData
     {
         try
         {
-            CreateTableStatement statement = (CreateTableStatement) QueryProcessor.parseStatement(cql).prepare().statement;
+            CFStatement parsed = (CFStatement)QueryProcessor.parseStatement(cql);
+            parsed.prepareKeyspace(keyspace);
+            CreateTableStatement statement = (CreateTableStatement) parsed.prepare().statement;
             CFMetaData cfm = newSystemMetadata(keyspace, statement.columnFamily(), "", statement.comparator);
             statement.applyPropertiesTo(cfm);
             return cfm.rebuild();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d63d07b9/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 6b49d21..cef5e14 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -517,8 +517,6 @@ public class DatabaseDescriptor
     /** load keyspace (keyspace) definitions, but do not initialize the keyspace instances. */
     public static void loadSchemas()
     {
-        Schema.instance.loadUserTypes();
-
         ColumnFamilyStore schemaCFS = SystemKeyspace.schemaCFS(SystemKeyspace.SCHEMA_KEYSPACES_CF);
 
         // if keyspace with definitions is empty try loading the old way

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d63d07b9/src/java/org/apache/cassandra/config/KSMetaData.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/KSMetaData.java b/src/java/org/apache/cassandra/config/KSMetaData.java
index c3fe641..05e6248 100644
--- a/src/java/org/apache/cassandra/config/KSMetaData.java
+++ b/src/java/org/apache/cassandra/config/KSMetaData.java
@@ -43,8 +43,20 @@ public final class KSMetaData
     private final Map<String, CFMetaData> cfMetaData;
     public final boolean durableWrites;
 
+    public final UTMetaData userTypes;
+
     KSMetaData(String name, Class<? extends AbstractReplicationStrategy> strategyClass, Map<String, String> strategyOptions, boolean durableWrites, Iterable<CFMetaData> cfDefs)
     {
+        this(name, strategyClass, strategyOptions, durableWrites, cfDefs, new UTMetaData());
+    }
+
+    KSMetaData(String name,
+               Class<? extends AbstractReplicationStrategy> strategyClass,
+               Map<String, String> strategyOptions,
+               boolean durableWrites,
+               Iterable<CFMetaData> cfDefs,
+               UTMetaData userTypes)
+    {
         this.name = name;
         this.strategyClass = strategyClass == null ? NetworkTopologyStrategy.class : strategyClass;
         this.strategyOptions = strategyOptions;
@@ -53,6 +65,7 @@ public final class KSMetaData
             cfmap.put(cfm.cfName, cfm);
         this.cfMetaData = Collections.unmodifiableMap(cfmap);
         this.durableWrites = durableWrites;
+        this.userTypes = userTypes;
     }
 
     // For new user created keyspaces (through CQL)
@@ -67,12 +80,12 @@ public final class KSMetaData
 
     public static KSMetaData newKeyspace(String name, Class<? extends AbstractReplicationStrategy> strategyClass, Map<String, String> options, boolean durablesWrites, Iterable<CFMetaData> cfDefs)
     {
-        return new KSMetaData(name, strategyClass, options, durablesWrites, cfDefs);
+        return new KSMetaData(name, strategyClass, options, durablesWrites, cfDefs, new UTMetaData());
     }
 
     public static KSMetaData cloneWith(KSMetaData ksm, Iterable<CFMetaData> cfDefs)
     {
-        return new KSMetaData(ksm.name, ksm.strategyClass, ksm.strategyOptions, ksm.durableWrites, cfDefs);
+        return new KSMetaData(ksm.name, ksm.strategyClass, ksm.strategyOptions, ksm.durableWrites, cfDefs, ksm.userTypes);
     }
 
     public static KSMetaData systemKeyspace()
@@ -127,7 +140,8 @@ public final class KSMetaData
                 && ObjectUtils.equals(other.strategyClass, strategyClass)
                 && ObjectUtils.equals(other.strategyOptions, strategyOptions)
                 && other.cfMetaData.equals(cfMetaData)
-                && other.durableWrites == durableWrites;
+                && other.durableWrites == durableWrites
+                && ObjectUtils.equals(other.userTypes, userTypes);
     }
 
     public Map<String, CFMetaData> cfMetaData()
@@ -215,7 +229,6 @@ public final class KSMetaData
         return this;
     }
 
-
     public KSMetaData reloadAttributes()
     {
         Row ksDefRow = SystemKeyspace.readSchemaRow(name);
@@ -223,7 +236,7 @@ public final class KSMetaData
         if (ksDefRow.cf == null)
             throw new RuntimeException(String.format("%s not found in the schema definitions keyspaceName (%s).", name, SystemKeyspace.SCHEMA_KEYSPACES_CF));
 
-        return fromSchema(ksDefRow, Collections.<CFMetaData>emptyList());
+        return fromSchema(ksDefRow, Collections.<CFMetaData>emptyList(), userTypes);
     }
 
     public Mutation dropFromSchema(long timestamp)
@@ -234,6 +247,7 @@ public final class KSMetaData
         mutation.delete(SystemKeyspace.SCHEMA_COLUMNFAMILIES_CF, timestamp);
         mutation.delete(SystemKeyspace.SCHEMA_COLUMNS_CF, timestamp);
         mutation.delete(SystemKeyspace.SCHEMA_TRIGGERS_CF, timestamp);
+        mutation.delete(SystemKeyspace.SCHEMA_USER_TYPES_CF, timestamp);
 
         return mutation;
     }
@@ -251,6 +265,7 @@ public final class KSMetaData
         for (CFMetaData cfm : cfMetaData.values())
             cfm.toSchema(mutation, timestamp);
 
+        userTypes.toSchema(mutation, timestamp);
         return mutation;
     }
 
@@ -261,7 +276,7 @@ public final class KSMetaData
      *
      * @return deserialized keyspace without cf_defs
      */
-    public static KSMetaData fromSchema(Row row, Iterable<CFMetaData> cfms)
+    public static KSMetaData fromSchema(Row row, Iterable<CFMetaData> cfms, UTMetaData userTypes)
     {
         UntypedResultSet.Row result = QueryProcessor.resultify("SELECT * FROM system.schema_keyspaces", row).one();
         try
@@ -270,7 +285,8 @@ public final class KSMetaData
                                   AbstractReplicationStrategy.getClass(result.getString("strategy_class")),
                                   fromJsonMap(result.getString("strategy_options")),
                                   result.getBoolean("durable_writes"),
-                                  cfms);
+                                  cfms,
+                                  userTypes);
         }
         catch (ConfigurationException e)
         {
@@ -286,10 +302,11 @@ public final class KSMetaData
      *
      * @return deserialized keyspace with cf_defs
      */
-    public static KSMetaData fromSchema(Row serializedKs, Row serializedCFs)
+    public static KSMetaData fromSchema(Row serializedKs, Row serializedCFs, Row serializedUserTypes)
     {
         Map<String, CFMetaData> cfs = deserializeColumnFamilies(serializedCFs);
-        return fromSchema(serializedKs, cfs.values());
+        UTMetaData userTypes = new UTMetaData(UTMetaData.fromSchema(serializedUserTypes));
+        return fromSchema(serializedKs, cfs.values(), userTypes);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d63d07b9/src/java/org/apache/cassandra/config/Schema.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/Schema.java b/src/java/org/apache/cassandra/config/Schema.java
index a38c097..e65b399 100644
--- a/src/java/org/apache/cassandra/config/Schema.java
+++ b/src/java/org/apache/cassandra/config/Schema.java
@@ -61,8 +61,6 @@ public class Schema
     /* metadata map for faster ColumnFamily lookup */
     private final BiMap<Pair<String, String>, UUID> cfIdMap = HashBiMap.create();
 
-    public final UTMetaData userTypes = new UTMetaData();
-
     private volatile UUID version;
 
     // 59adb24e-f3cd-3e02-97f0-5b395827453f
@@ -119,24 +117,6 @@ public class Schema
         return this;
     }
 
-    public Schema loadUserTypes()
-    {
-        userTypes.addAll(UTMetaData.fromSchema(SystemKeyspace.serializedSchema(SystemKeyspace.SCHEMA_USER_TYPES_CF)));
-        return this;
-    }
-
-    public Schema loadType(UserType newType)
-    {
-        userTypes.addType(newType);
-        return this;
-    }
-
-    public Schema dropType(UserType droppedType)
-    {
-        userTypes.removeType(droppedType);
-        return this;
-    }
-
     /**
      * Get keyspace instance by name
      *
@@ -422,8 +402,7 @@ public class Schema
     {
         try
         {
-            return !row.cf.metadata().cfName.equals(SystemKeyspace.SCHEMA_USER_TYPES_CF)
-                && systemKeyspaceNames.contains(ByteBufferUtil.string(row.key.key));
+            return systemKeyspaceNames.contains(ByteBufferUtil.string(row.key.key));
         }
         catch (CharacterCodingException e)
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d63d07b9/src/java/org/apache/cassandra/config/UTMetaData.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/UTMetaData.java b/src/java/org/apache/cassandra/config/UTMetaData.java
index 76f3999..b3061bb 100644
--- a/src/java/org/apache/cassandra/config/UTMetaData.java
+++ b/src/java/org/apache/cassandra/config/UTMetaData.java
@@ -35,27 +35,23 @@ import org.apache.cassandra.utils.ByteBufferUtil;
  */
 public final class UTMetaData
 {
-    private static final ColumnIdentifier COLUMN_NAMES = new ColumnIdentifier("column_names", false);
-    private static final ColumnIdentifier COLUMN_TYPES = new ColumnIdentifier("column_types", false);
+    private final Map<ByteBuffer, UserType> userTypes;
 
-    private final Map<ByteBuffer, UserType> userTypes = new HashMap<>();
-
-    // Only for Schema. You should generally not create instance of this, but rather use
-    // the global reference Schema.instance().userTypes;
-    UTMetaData() {}
+    public UTMetaData()
+    {
+        this(new HashMap<ByteBuffer, UserType>());
+    }
 
-    public static UTMetaData fromSchema(UntypedResultSet rows)
+    UTMetaData(Map<ByteBuffer, UserType> types)
     {
-        UTMetaData m = new UTMetaData();
-        for (UntypedResultSet.Row row : rows)
-            m.addType(fromSchema(row));
-        return m;
+        this.userTypes = types;
     }
 
     private static UserType fromSchema(UntypedResultSet.Row row)
     {
         try
         {
+            String keyspace = row.getString("keyspace_name");
             ByteBuffer name = ByteBufferUtil.bytes(row.getString("type_name"));
             List<String> rawColumns = row.getList("column_names", UTF8Type.instance);
             List<String> rawTypes = row.getList("column_types", UTF8Type.instance);
@@ -68,7 +64,7 @@ public final class UTMetaData
             for (String rawType : rawTypes)
                 types.add(TypeParser.parse(rawType));
 
-            return new UserType(name, columns, types);
+            return new UserType(keyspace, name, columns, types);
         }
         catch (RequestValidationException e)
         {
@@ -77,54 +73,58 @@ public final class UTMetaData
         }
     }
 
-    public static UTMetaData fromSchema(List<Row> rows)
+    public static Map<ByteBuffer, UserType> fromSchema(Row row)
     {
-        UntypedResultSet result = QueryProcessor.resultify("SELECT * FROM system." + SystemKeyspace.SCHEMA_USER_TYPES_CF, rows);
-        return fromSchema(result);
+        UntypedResultSet results = QueryProcessor.resultify("SELECT * FROM system." + SystemKeyspace.SCHEMA_USER_TYPES_CF, row);
+        Map<ByteBuffer, UserType> types = new HashMap<>(results.size());
+        for (UntypedResultSet.Row result : results)
+        {
+            UserType type = fromSchema(result);
+            types.put(type.name, type);
+        }
+        return types;
     }
 
     public static Mutation toSchema(UserType newType, long timestamp)
     {
-        Mutation mutation = new Mutation(Keyspace.SYSTEM_KS, newType.name);
+        return toSchema(new Mutation(Keyspace.SYSTEM_KS, SystemKeyspace.getSchemaKSKey(newType.keyspace)), newType, timestamp);
+    }
+
+    public static Mutation toSchema(Mutation mutation, UserType newType, long timestamp)
+    {
         ColumnFamily cf = mutation.addOrGet(SystemKeyspace.SCHEMA_USER_TYPES_CF);
 
-        CFMetaData cfm = CFMetaData.SchemaUserTypesCf;
-        UpdateParameters params = new UpdateParameters(cfm, Collections.<ByteBuffer>emptyList(), timestamp, 0, null);
-        Composite prefix = cfm.comparator.builder().build();
+        Composite prefix = CFMetaData.SchemaUserTypesCf.comparator.make(newType.name);
+        CFRowAdder adder = new CFRowAdder(cf, prefix, timestamp);
 
-        List<ByteBuffer> columnTypes = new ArrayList<>(newType.types.size());
-        for (AbstractType<?> type : newType.types)
-            columnTypes.add(ByteBufferUtil.bytes(type.toString()));
+        adder.resetCollection("column_names");
+        adder.resetCollection("column_types");
 
-        try
-        {
-            new Lists.Setter(cfm.getColumnDefinition(COLUMN_NAMES), new Lists.Value(newType.columnNames)).execute(newType.name, cf, prefix, params);
-            new Lists.Setter(cfm.getColumnDefinition(COLUMN_TYPES), new Lists.Value(columnTypes)).execute(newType.name, cf, prefix, params);
-        }
-        catch (RequestValidationException e)
-        {
-            throw new AssertionError();
-        }
+        for (ByteBuffer name : newType.columnNames)
+            adder.addListEntry("column_names", name);
+        for (AbstractType<?> type : newType.types)
+            adder.addListEntry("column_types", type.toString());
 
         return mutation;
     }
 
-    public static Mutation dropFromSchema(UserType droppedType, long timestamp)
+    public Mutation toSchema(Mutation mutation, long timestamp)
     {
-        Mutation mutation = new Mutation(Keyspace.SYSTEM_KS, droppedType.name);
-        mutation.delete(SystemKeyspace.SCHEMA_USER_TYPES_CF, timestamp);
+        for (UserType ut : userTypes.values())
+            toSchema(mutation, ut, timestamp);
         return mutation;
     }
 
-    public void addAll(UTMetaData types)
+    public static Mutation dropFromSchema(UserType droppedType, long timestamp)
     {
-        for (UserType type : types.userTypes.values())
-            addType(type);
-    }
+        Mutation mutation = new Mutation(Keyspace.SYSTEM_KS, SystemKeyspace.getSchemaKSKey(droppedType.keyspace));
+        ColumnFamily cf = mutation.addOrGet(SystemKeyspace.SCHEMA_USER_TYPES_CF);
+        int ldt = (int) (System.currentTimeMillis() / 1000);
 
-    public UserType getType(ColumnIdentifier typeName)
-    {
-        return getType(typeName.bytes);
+        Composite prefix = CFMetaData.SchemaUserTypesCf.comparator.make(droppedType.name);
+        cf.addAtom(new RangeTombstone(prefix, prefix.end(), timestamp, ldt));
+
+        return mutation;
     }
 
     public UserType getType(ByteBuffer typeName)
@@ -138,9 +138,7 @@ public final class UTMetaData
         return new HashMap<>(userTypes);
     }
 
-    // This is *not* thread safe. As far as the global instance is concerned, only
-    // Schema.loadType() (which is only called in DefsTables that is synchronized)
-    // should use this.
+    // This is *not* thread safe but is only called in DefsTables that is synchronized.
     public void addType(UserType type)
     {
         UserType old = userTypes.get(type.name);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d63d07b9/src/java/org/apache/cassandra/cql3/AbstractMarker.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/AbstractMarker.java b/src/java/org/apache/cassandra/cql3/AbstractMarker.java
index 165cb00..2b9c6c9 100644
--- a/src/java/org/apache/cassandra/cql3/AbstractMarker.java
+++ b/src/java/org/apache/cassandra/cql3/AbstractMarker.java
@@ -58,7 +58,7 @@ public abstract class AbstractMarker extends Term.NonTerminal
             this.bindIndex = bindIndex;
         }
 
-        public AbstractMarker prepare(ColumnSpecification receiver) throws InvalidRequestException
+        public AbstractMarker prepare(String keyspace, ColumnSpecification receiver) throws InvalidRequestException
         {
             if (!(receiver.type instanceof CollectionType))
                 return new Constants.Marker(bindIndex, receiver);
@@ -72,7 +72,7 @@ public abstract class AbstractMarker extends Term.NonTerminal
             throw new AssertionError();
         }
 
-        public boolean isAssignableTo(ColumnSpecification receiver)
+        public boolean isAssignableTo(String keyspace, ColumnSpecification receiver)
         {
             return true;
         }
@@ -99,7 +99,7 @@ public abstract class AbstractMarker extends Term.NonTerminal
         }
 
         @Override
-        public AbstractMarker prepare(ColumnSpecification receiver) throws InvalidRequestException
+        public AbstractMarker prepare(String keyspace, ColumnSpecification receiver) throws InvalidRequestException
         {
             if (receiver.type instanceof CollectionType)
                 throw new InvalidRequestException("Invalid IN relation on collection column");

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d63d07b9/src/java/org/apache/cassandra/cql3/AssignementTestable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/AssignementTestable.java b/src/java/org/apache/cassandra/cql3/AssignementTestable.java
index d707809..2253cf7 100644
--- a/src/java/org/apache/cassandra/cql3/AssignementTestable.java
+++ b/src/java/org/apache/cassandra/cql3/AssignementTestable.java
@@ -17,10 +17,12 @@
  */
 package org.apache.cassandra.cql3;
 
+import org.apache.cassandra.exceptions.InvalidRequestException;
+
 public interface AssignementTestable
 {
     /**
      * @return whether this object can be assigned to the provided receiver
      */
-    public boolean isAssignableTo(ColumnSpecification receiver);
+    public boolean isAssignableTo(String keyspace, ColumnSpecification receiver) throws InvalidRequestException;
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d63d07b9/src/java/org/apache/cassandra/cql3/Attributes.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/Attributes.java b/src/java/org/apache/cassandra/cql3/Attributes.java
index 97ce31a..7c33e5b 100644
--- a/src/java/org/apache/cassandra/cql3/Attributes.java
+++ b/src/java/org/apache/cassandra/cql3/Attributes.java
@@ -120,8 +120,8 @@ public class Attributes
 
         public Attributes prepare(String ksName, String cfName) throws InvalidRequestException
         {
-            Term ts = timestamp == null ? null : timestamp.prepare(timestampReceiver(ksName, cfName));
-            Term ttl = timeToLive == null ? null : timeToLive.prepare(timeToLiveReceiver(ksName, cfName));
+            Term ts = timestamp == null ? null : timestamp.prepare(ksName, timestampReceiver(ksName, cfName));
+            Term ttl = timeToLive == null ? null : timeToLive.prepare(ksName, timeToLiveReceiver(ksName, cfName));
             return new Attributes(ts, ttl);
         }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d63d07b9/src/java/org/apache/cassandra/cql3/CQL3Type.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/CQL3Type.java b/src/java/org/apache/cassandra/cql3/CQL3Type.java
index fa6e3a0..2ff6fb5 100644
--- a/src/java/org/apache/cassandra/cql3/CQL3Type.java
+++ b/src/java/org/apache/cassandra/cql3/CQL3Type.java
@@ -19,6 +19,7 @@ package org.apache.cassandra.cql3;
 
 import java.nio.ByteBuffer;
 
+import org.apache.cassandra.config.KSMetaData;
 import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.db.marshal.*;
 import org.apache.cassandra.exceptions.InvalidRequestException;
@@ -28,8 +29,6 @@ import org.apache.cassandra.exceptions.SyntaxException;
 public interface CQL3Type
 {
     public boolean isCollection();
-    public boolean isCounter();
-    public boolean isUserType();
     public AbstractType<?> getType();
 
     public enum Native implements CQL3Type
@@ -68,16 +67,6 @@ public interface CQL3Type
             return type;
         }
 
-        public boolean isCounter()
-        {
-            return this == COUNTER;
-        }
-
-        public boolean isUserType()
-        {
-            return false;
-        }
-
         @Override
         public String toString()
         {
@@ -109,16 +98,6 @@ public interface CQL3Type
             return type;
         }
 
-        public boolean isCounter()
-        {
-            return false;
-        }
-
-        public boolean isUserType()
-        {
-            return false;
-        }
-
         @Override
         public final boolean equals(Object o)
         {
@@ -144,61 +123,21 @@ public interface CQL3Type
 
     public static class Collection implements CQL3Type
     {
-        CollectionType type;
+        private final CollectionType type;
 
         public Collection(CollectionType type)
         {
             this.type = type;
         }
 
-        public static Collection map(CQL3Type t1, CQL3Type t2) throws InvalidRequestException
-        {
-            if (t1.isCollection() || t2.isCollection())
-                throw new InvalidRequestException("map type cannot contain another collection");
-            if (t1.isCounter() || t2.isCounter())
-                throw new InvalidRequestException("counters are not allowed inside a collection");
-
-            return new Collection(MapType.getInstance(t1.getType(), t2.getType()));
-        }
-
-        public static Collection list(CQL3Type t) throws InvalidRequestException
-        {
-            if (t.isCollection())
-                throw new InvalidRequestException("list type cannot contain another collection");
-            if (t.isCounter())
-                throw new InvalidRequestException("counters are not allowed inside a collection");
-
-            return new Collection(ListType.getInstance(t.getType()));
-        }
-
-        public static Collection set(CQL3Type t) throws InvalidRequestException
-        {
-            if (t.isCollection())
-                throw new InvalidRequestException("set type cannot contain another collection");
-            if (t.isCounter())
-                throw new InvalidRequestException("counters are not allowed inside a collection");
-
-            return new Collection(SetType.getInstance(t.getType()));
-        }
-
-        public boolean isCollection()
-        {
-            return true;
-        }
-
         public AbstractType<?> getType()
         {
             return type;
         }
 
-        public boolean isCounter()
-        {
-            return false;
-        }
-
-        public boolean isUserType()
+        public boolean isCollection()
         {
-            return false;
+            return true;
         }
 
         @Override
@@ -237,32 +176,18 @@ public interface CQL3Type
     public static class UserDefined implements CQL3Type
     {
         // Keeping this separatly from type just to simplify toString()
-        ColumnIdentifier name;
-        UserType type;
+        private final String name;
+        private final UserType type;
 
-        private UserDefined(ColumnIdentifier name, UserType type)
+        private UserDefined(String name, UserType type)
         {
             this.name = name;
             this.type = type;
         }
 
-        public static UserDefined create(ByteBuffer name, UserType type)
+        public static UserDefined create(UserType type)
         {
-            return new UserDefined(new ColumnIdentifier(name, UTF8Type.instance), type);
-        }
-
-        public static UserDefined create(ColumnIdentifier name) throws InvalidRequestException
-        {
-            UserType type = Schema.instance.userTypes.getType(name);
-            if (type == null)
-                throw new InvalidRequestException("Unknown type " + name);
-
-            return new UserDefined(name, type);
-        }
-
-        public boolean isUserType()
-        {
-            return true;
+            return new UserDefined(UTF8Type.instance.compose(type.name), type);
         }
 
         public boolean isCollection()
@@ -270,11 +195,6 @@ public interface CQL3Type
             return false;
         }
 
-        public boolean isCounter()
-        {
-            return false;
-        }
-
         public AbstractType<?> getType()
         {
             return type;
@@ -299,7 +219,163 @@ public interface CQL3Type
         @Override
         public String toString()
         {
-            return name.toString();
+            return name;
         }
     }
+
+    // For UserTypes, we need to know the current keyspace to resolve the
+    // actual type used, so Raw is a "not yet prepared" CQL3Type.
+    public abstract class Raw
+    {
+        public boolean isCollection()
+        {
+            return false;
+        }
+
+        public boolean isCounter()
+        {
+            return false;
+        }
+
+        public abstract CQL3Type prepare(String keyspace) throws InvalidRequestException;
+
+        public static Raw from(CQL3Type type)
+        {
+            return new RawType(type);
+        }
+
+        public static Raw userType(UTName name)
+        {
+            return new RawUT(name);
+        }
+
+        public static Raw map(CQL3Type.Raw t1, CQL3Type.Raw t2) throws InvalidRequestException
+        {
+            if (t1.isCollection() || t2.isCollection())
+                throw new InvalidRequestException("map type cannot contain another collection");
+            if (t1.isCounter() || t2.isCounter())
+                throw new InvalidRequestException("counters are not allowed inside a collection");
+
+            return new RawCollection(CollectionType.Kind.MAP, t1, t2);
+        }
+
+        public static Raw list(CQL3Type.Raw t) throws InvalidRequestException
+        {
+            if (t.isCollection())
+                throw new InvalidRequestException("list type cannot contain another collection");
+            if (t.isCounter())
+                throw new InvalidRequestException("counters are not allowed inside a collection");
+
+            return new RawCollection(CollectionType.Kind.LIST, null, t);
+        }
+
+        public static Raw set(CQL3Type.Raw t) throws InvalidRequestException
+        {
+            if (t.isCollection())
+                throw new InvalidRequestException("set type cannot contain another collection");
+            if (t.isCounter())
+                throw new InvalidRequestException("counters are not allowed inside a collection");
+
+            return new RawCollection(CollectionType.Kind.SET, null, t);
+        }
+
+        private static class RawType extends Raw
+        {
+            private CQL3Type type;
+
+            private RawType(CQL3Type type)
+            {
+                this.type = type;
+            }
+
+            public CQL3Type prepare(String keyspace) throws InvalidRequestException
+            {
+                return type;
+            }
+
+            public boolean isCounter()
+            {
+                return type == Native.COUNTER;
+            }
+
+            @Override
+            public String toString()
+            {
+                return type.toString();
+            }
+        }
+
+        private static class RawCollection extends Raw
+        {
+            private final CollectionType.Kind kind;
+            private final CQL3Type.Raw keys;
+            private final CQL3Type.Raw values;
+
+            private RawCollection(CollectionType.Kind kind, CQL3Type.Raw keys, CQL3Type.Raw values)
+            {
+                this.kind = kind;
+                this.keys = keys;
+                this.values = values;
+            }
+
+            public boolean isCollection()
+            {
+                return true;
+            }
+
+            public CQL3Type prepare(String keyspace) throws InvalidRequestException
+            {
+                switch (kind)
+                {
+                    case LIST: return new Collection(ListType.getInstance(values.prepare(keyspace).getType()));
+                    case SET:  return new Collection(SetType.getInstance(values.prepare(keyspace).getType()));
+                    case MAP:  return new Collection(MapType.getInstance(keys.prepare(keyspace).getType(), values.prepare(keyspace).getType()));
+                }
+                throw new AssertionError();
+            }
+
+            @Override
+            public String toString()
+            {
+                switch (kind)
+                {
+                    case LIST: return "list<" + values + ">";
+                    case SET:  return "set<" + values + ">";
+                    case MAP:  return "map<" + keys + ", " + values + ">";
+                }
+                throw new AssertionError();
+            }
+        }
+
+        private static class RawUT extends Raw
+        {
+
+            private final UTName name;
+
+            private RawUT(UTName name)
+            {
+                this.name = name;
+            }
+
+            public CQL3Type prepare(String keyspace) throws InvalidRequestException
+            {
+                name.setKeyspace(keyspace);
+
+                KSMetaData ksm = Schema.instance.getKSMetaData(name.getKeyspace());
+                if (ksm == null)
+                    throw new InvalidRequestException("Unknown keyspace " + name.getKeyspace());
+                UserType type = ksm.userTypes.getType(name.getUserTypeName());
+                if (type == null)
+                    throw new InvalidRequestException("Unknown type " + name);
+
+                return new UserDefined(name.toString(), type);
+            }
+
+            @Override
+            public String toString()
+            {
+                return name.toString();
+            }
+    }
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d63d07b9/src/java/org/apache/cassandra/cql3/Constants.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/Constants.java b/src/java/org/apache/cassandra/cql3/Constants.java
index 1784752..44e96ef 100644
--- a/src/java/org/apache/cassandra/cql3/Constants.java
+++ b/src/java/org/apache/cassandra/cql3/Constants.java
@@ -60,15 +60,15 @@ public abstract class Constants
             }
         };
 
-        public Term prepare(ColumnSpecification receiver) throws InvalidRequestException
+        public Term prepare(String keyspace, ColumnSpecification receiver) throws InvalidRequestException
         {
-            if (!isAssignableTo(receiver))
+            if (!isAssignableTo(keyspace, receiver))
                 throw new InvalidRequestException("Invalid null value for counter increment/decrement");
 
             return NULL_VALUE;
         }
 
-        public boolean isAssignableTo(ColumnSpecification receiver)
+        public boolean isAssignableTo(String keyspace, ColumnSpecification receiver)
         {
             return !(receiver.type instanceof CounterColumnType);
         }
@@ -122,9 +122,9 @@ public abstract class Constants
             return new Literal(Type.HEX, text);
         }
 
-        public Value prepare(ColumnSpecification receiver) throws InvalidRequestException
+        public Value prepare(String keyspace, ColumnSpecification receiver) throws InvalidRequestException
         {
-            if (!isAssignableTo(receiver))
+            if (!isAssignableTo(keyspace, receiver))
                 throw new InvalidRequestException(String.format("Invalid %s constant (%s) for %s of type %s", type, text, receiver, receiver.type.asCQL3Type()));
 
             return new Value(parsedValue(receiver.type));
@@ -154,7 +154,7 @@ public abstract class Constants
             return text;
         }
 
-        public boolean isAssignableTo(ColumnSpecification receiver)
+        public boolean isAssignableTo(String keyspace, ColumnSpecification receiver)
         {
             CQL3Type receiverType = receiver.type.asCQL3Type();
             if (receiverType.isCollection())

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d63d07b9/src/java/org/apache/cassandra/cql3/Cql.g
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/Cql.g b/src/java/org/apache/cassandra/cql3/Cql.g
index 15c18db..f9f66df 100644
--- a/src/java/org/apache/cassandra/cql3/Cql.g
+++ b/src/java/org/apache/cassandra/cql3/Cql.g
@@ -533,7 +533,7 @@ cfamOrdering[CreateTableStatement.RawStatement expr]
 createTypeStatement returns [CreateTypeStatement expr]
     @init { boolean ifNotExists = false; }
     : K_CREATE K_TYPE (K_IF K_NOT K_EXISTS { ifNotExists = true; } )?
-         tn=non_type_ident { $expr = new CreateTypeStatement(tn, ifNotExists); }
+         tn=userTypeName { $expr = new CreateTypeStatement(tn, ifNotExists); }
          '(' typeColumns[expr] ( ',' typeColumns[expr]? )* ')'
     ;
 
@@ -617,10 +617,10 @@ alterTableStatement returns [AlterTableStatement expr]
  * ALTER TYPE <name> RENAME <field> TO <newtype> AND ...;
  */
 alterTypeStatement returns [AlterTypeStatement expr]
-    : K_ALTER K_TYPE name=non_type_ident
+    : K_ALTER K_TYPE name=userTypeName
           ( K_ALTER f=cident K_TYPE v=comparatorType { $expr = AlterTypeStatement.alter(name, f, v); }
           | K_ADD   f=cident v=comparatorType        { $expr = AlterTypeStatement.addition(name, f, v); }
-          | K_RENAME K_TO new_name=non_type_ident    { $expr = AlterTypeStatement.typeRename(name, new_name); }
+          | K_RENAME K_TO new_name=userTypeName      { $expr = AlterTypeStatement.typeRename(name, new_name); }
           | K_RENAME
                { Map<ColumnIdentifier, ColumnIdentifier> renames = new HashMap<ColumnIdentifier, ColumnIdentifier>(); }
                  id1=cident K_TO toId1=cident { renames.put(id1, toId1); }
@@ -651,7 +651,7 @@ dropTableStatement returns [DropTableStatement stmt]
  */
 dropTypeStatement returns [DropTypeStatement stmt]
     @init { boolean ifExists = false; }
-    : K_DROP K_TYPE (K_IF K_EXISTS { ifExists = true; } )? name=non_type_ident { $stmt = new DropTypeStatement(name, ifExists); }
+    : K_DROP K_TYPE (K_IF K_EXISTS { ifExists = true; } )? name=userTypeName { $stmt = new DropTypeStatement(name, ifExists); }
     ;
 
 /**
@@ -801,6 +801,10 @@ columnFamilyName returns [CFName name]
     : (cfOrKsName[name, true] '.')? cfOrKsName[name, false]
     ;
 
+userTypeName returns [UTName name]
+    : (ks=cident '.')? ut=non_type_ident { return new UTName(ks, ut); }
+    ;
+
 cfOrKsName[CFName name, boolean isKs]
     : t=IDENT              { if (isKs) $name.setKeyspace($t.text, false); else $name.setColumnFamily($t.text, false); }
     | t=QUOTED_NAME        { if (isKs) $name.setKeyspace($t.text, true); else $name.setColumnFamily($t.text, true); }
@@ -959,14 +963,14 @@ relation[List<Relation> clauses]
     | '(' relation[$clauses] ')'
     ;
 
-comparatorType returns [CQL3Type t]
-    : c=native_type     { $t = c; }
+comparatorType returns [CQL3Type.Raw t]
+    : n=native_type     { $t = CQL3Type.Raw.from(n); }
     | c=collection_type { $t = c; }
-    | id=non_type_ident  { try { $t = CQL3Type.UserDefined.create(id); } catch (InvalidRequestException e) { addRecognitionError(e.getMessage()); }}
+    | id=userTypeName  { $t = CQL3Type.Raw.userType(id); }
     | s=STRING_LITERAL
       {
         try {
-            $t = new CQL3Type.Custom($s.text);
+            $t = CQL3Type.Raw.from(new CQL3Type.Custom($s.text));
         } catch (SyntaxException e) {
             addRecognitionError("Cannot parse type " + $s.text + ": " + e.getMessage());
         } catch (ConfigurationException e) {
@@ -994,17 +998,17 @@ native_type returns [CQL3Type t]
     | K_TIMEUUID  { $t = CQL3Type.Native.TIMEUUID; }
     ;
 
-collection_type returns [CQL3Type pt]
+collection_type returns [CQL3Type.Raw pt]
     : K_MAP  '<' t1=comparatorType ',' t2=comparatorType '>'
         { try {
             // if we can't parse either t1 or t2, antlr will "recover" and we may have t1 or t2 null.
             if (t1 != null && t2 != null)
-                $pt = CQL3Type.Collection.map(t1, t2);
+                $pt = CQL3Type.Raw.map(t1, t2);
           } catch (InvalidRequestException e) { addRecognitionError(e.getMessage()); } }
     | K_LIST '<' t=comparatorType '>'
-        { try { if (t != null) $pt = CQL3Type.Collection.list(t); } catch (InvalidRequestException e) { addRecognitionError(e.getMessage()); } }
+        { try { if (t != null) $pt = CQL3Type.Raw.list(t); } catch (InvalidRequestException e) { addRecognitionError(e.getMessage()); } }
     | K_SET  '<' t=comparatorType '>'
-        { try { if (t != null) $pt = CQL3Type.Collection.set(t); } catch (InvalidRequestException e) { addRecognitionError(e.getMessage()); } }
+        { try { if (t != null) $pt = CQL3Type.Raw.set(t); } catch (InvalidRequestException e) { addRecognitionError(e.getMessage()); } }
     ;
 
 username

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d63d07b9/src/java/org/apache/cassandra/cql3/Lists.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/Lists.java b/src/java/org/apache/cassandra/cql3/Lists.java
index ab21a64..8dbe59c 100644
--- a/src/java/org/apache/cassandra/cql3/Lists.java
+++ b/src/java/org/apache/cassandra/cql3/Lists.java
@@ -63,16 +63,16 @@ public abstract class Lists
             this.elements = elements;
         }
 
-        public Term prepare(ColumnSpecification receiver) throws InvalidRequestException
+        public Term prepare(String keyspace, ColumnSpecification receiver) throws InvalidRequestException
         {
-            validateAssignableTo(receiver);
+            validateAssignableTo(keyspace, receiver);
 
             ColumnSpecification valueSpec = Lists.valueSpecOf(receiver);
             List<Term> values = new ArrayList<Term>(elements.size());
             boolean allTerminal = true;
             for (Term.Raw rt : elements)
             {
-                Term t = rt.prepare(valueSpec);
+                Term t = rt.prepare(keyspace, valueSpec);
 
                 if (t.containsBindMarker())
                     throw new InvalidRequestException(String.format("Invalid list literal for %s: bind variables are not supported inside collection literals", receiver));
@@ -86,7 +86,7 @@ public abstract class Lists
             return allTerminal ? value.bind(Collections.<ByteBuffer>emptyList()) : value;
         }
 
-        private void validateAssignableTo(ColumnSpecification receiver) throws InvalidRequestException
+        private void validateAssignableTo(String keyspace, ColumnSpecification receiver) throws InvalidRequestException
         {
             if (!(receiver.type instanceof ListType))
                 throw new InvalidRequestException(String.format("Invalid list literal for %s of type %s", receiver, receiver.type.asCQL3Type()));
@@ -94,16 +94,16 @@ public abstract class Lists
             ColumnSpecification valueSpec = Lists.valueSpecOf(receiver);
             for (Term.Raw rt : elements)
             {
-                if (!rt.isAssignableTo(valueSpec))
+                if (!rt.isAssignableTo(keyspace, valueSpec))
                     throw new InvalidRequestException(String.format("Invalid list literal for %s: value %s is not of type %s", receiver, rt, valueSpec.type.asCQL3Type()));
             }
         }
 
-        public boolean isAssignableTo(ColumnSpecification receiver)
+        public boolean isAssignableTo(String keyspace, ColumnSpecification receiver)
         {
             try
             {
-                validateAssignableTo(receiver);
+                validateAssignableTo(keyspace, receiver);
                 return true;
             }
             catch (InvalidRequestException e)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d63d07b9/src/java/org/apache/cassandra/cql3/Maps.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/Maps.java b/src/java/org/apache/cassandra/cql3/Maps.java
index d156845..3f9cc95 100644
--- a/src/java/org/apache/cassandra/cql3/Maps.java
+++ b/src/java/org/apache/cassandra/cql3/Maps.java
@@ -64,9 +64,9 @@ public abstract class Maps
             this.entries = entries;
         }
 
-        public Term prepare(ColumnSpecification receiver) throws InvalidRequestException
+        public Term prepare(String keyspace, ColumnSpecification receiver) throws InvalidRequestException
         {
-            validateAssignableTo(receiver);
+            validateAssignableTo(keyspace, receiver);
 
             ColumnSpecification keySpec = Maps.keySpecOf(receiver);
             ColumnSpecification valueSpec = Maps.valueSpecOf(receiver);
@@ -74,8 +74,8 @@ public abstract class Maps
             boolean allTerminal = true;
             for (Pair<Term.Raw, Term.Raw> entry : entries)
             {
-                Term k = entry.left.prepare(keySpec);
-                Term v = entry.right.prepare(valueSpec);
+                Term k = entry.left.prepare(keyspace, keySpec);
+                Term v = entry.right.prepare(keyspace, valueSpec);
 
                 if (k.containsBindMarker() || v.containsBindMarker())
                     throw new InvalidRequestException(String.format("Invalid map literal for %s: bind variables are not supported inside collection literals", receiver));
@@ -89,7 +89,7 @@ public abstract class Maps
             return allTerminal ? value.bind(Collections.<ByteBuffer>emptyList()) : value;
         }
 
-        private void validateAssignableTo(ColumnSpecification receiver) throws InvalidRequestException
+        private void validateAssignableTo(String keyspace, ColumnSpecification receiver) throws InvalidRequestException
         {
             if (!(receiver.type instanceof MapType))
                 throw new InvalidRequestException(String.format("Invalid map literal for %s of type %s", receiver, receiver.type.asCQL3Type()));
@@ -98,18 +98,18 @@ public abstract class Maps
             ColumnSpecification valueSpec = Maps.valueSpecOf(receiver);
             for (Pair<Term.Raw, Term.Raw> entry : entries)
             {
-                if (!entry.left.isAssignableTo(keySpec))
+                if (!entry.left.isAssignableTo(keyspace, keySpec))
                     throw new InvalidRequestException(String.format("Invalid map literal for %s: key %s is not of type %s", receiver, entry.left, keySpec.type.asCQL3Type()));
-                if (!entry.right.isAssignableTo(valueSpec))
+                if (!entry.right.isAssignableTo(keyspace, valueSpec))
                     throw new InvalidRequestException(String.format("Invalid map literal for %s: value %s is not of type %s", receiver, entry.right, valueSpec.type.asCQL3Type()));
             }
         }
 
-        public boolean isAssignableTo(ColumnSpecification receiver)
+        public boolean isAssignableTo(String keyspace, ColumnSpecification receiver)
         {
             try
             {
-                validateAssignableTo(receiver);
+                validateAssignableTo(keyspace, receiver);
                 return true;
             }
             catch (InvalidRequestException e)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d63d07b9/src/java/org/apache/cassandra/cql3/Operation.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/Operation.java b/src/java/org/apache/cassandra/cql3/Operation.java
index 689cee0..3aeed2e 100644
--- a/src/java/org/apache/cassandra/cql3/Operation.java
+++ b/src/java/org/apache/cassandra/cql3/Operation.java
@@ -111,7 +111,7 @@ public abstract class Operation
          * be a true column.
          * @return the prepared update operation.
          */
-        public Operation prepare(ColumnDefinition receiver) throws InvalidRequestException;
+        public Operation prepare(String keyspace, ColumnDefinition receiver) throws InvalidRequestException;
 
         /**
          * @return whether this operation can be applied alongside the {@code
@@ -145,7 +145,7 @@ public abstract class Operation
          * @param receiver the "column" this operation applies to.
          * @return the prepared delete operation.
          */
-        public Operation prepare(ColumnDefinition receiver) throws InvalidRequestException;
+        public Operation prepare(String keyspace, ColumnDefinition receiver) throws InvalidRequestException;
     }
 
     public static class SetValue implements RawUpdate
@@ -157,9 +157,9 @@ public abstract class Operation
             this.value = value;
         }
 
-        public Operation prepare(ColumnDefinition receiver) throws InvalidRequestException
+        public Operation prepare(String keyspace, ColumnDefinition receiver) throws InvalidRequestException
         {
-            Term v = value.prepare(receiver);
+            Term v = value.prepare(keyspace, receiver);
 
             if (receiver.type instanceof CounterColumnType)
                 throw new InvalidRequestException(String.format("Cannot set the value of counter column %s (counters can only be incremented/decremented, not set)", receiver));
@@ -203,7 +203,7 @@ public abstract class Operation
             this.value = value;
         }
 
-        public Operation prepare(ColumnDefinition receiver) throws InvalidRequestException
+        public Operation prepare(String keyspace, ColumnDefinition receiver) throws InvalidRequestException
         {
             if (!(receiver.type instanceof CollectionType))
                 throw new InvalidRequestException(String.format("Invalid operation (%s) for non collection column %s", toString(receiver), receiver));
@@ -211,14 +211,14 @@ public abstract class Operation
             switch (((CollectionType)receiver.type).kind)
             {
                 case LIST:
-                    Term idx = selector.prepare(Lists.indexSpecOf(receiver));
-                    Term lval = value.prepare(Lists.valueSpecOf(receiver));
+                    Term idx = selector.prepare(keyspace, Lists.indexSpecOf(receiver));
+                    Term lval = value.prepare(keyspace, Lists.valueSpecOf(receiver));
                     return new Lists.SetterByIndex(receiver, idx, lval);
                 case SET:
                     throw new InvalidRequestException(String.format("Invalid operation (%s) for set column %s", toString(receiver), receiver));
                 case MAP:
-                    Term key = selector.prepare(Maps.keySpecOf(receiver));
-                    Term mval = value.prepare(Maps.valueSpecOf(receiver));
+                    Term key = selector.prepare(keyspace, Maps.keySpecOf(receiver));
+                    Term mval = value.prepare(keyspace, Maps.valueSpecOf(receiver));
                     return new Maps.SetterByKey(receiver, key, mval);
             }
             throw new AssertionError();
@@ -246,9 +246,9 @@ public abstract class Operation
             this.value = value;
         }
 
-        public Operation prepare(ColumnDefinition receiver) throws InvalidRequestException
+        public Operation prepare(String keyspace, ColumnDefinition receiver) throws InvalidRequestException
         {
-            Term v = value.prepare(receiver);
+            Term v = value.prepare(keyspace, receiver);
 
             if (!(receiver.type instanceof CollectionType))
             {
@@ -289,9 +289,9 @@ public abstract class Operation
             this.value = value;
         }
 
-        public Operation prepare(ColumnDefinition receiver) throws InvalidRequestException
+        public Operation prepare(String keyspace, ColumnDefinition receiver) throws InvalidRequestException
         {
-            Term v = value.prepare(receiver);
+            Term v = value.prepare(keyspace, receiver);
 
             if (!(receiver.type instanceof CollectionType))
             {
@@ -332,9 +332,9 @@ public abstract class Operation
             this.value = value;
         }
 
-        public Operation prepare(ColumnDefinition receiver) throws InvalidRequestException
+        public Operation prepare(String keyspace, ColumnDefinition receiver) throws InvalidRequestException
         {
-            Term v = value.prepare(receiver);
+            Term v = value.prepare(keyspace, receiver);
 
             if (!(receiver.type instanceof ListType))
                 throw new InvalidRequestException(String.format("Invalid operation (%s) for non list column %s", toString(receiver), receiver));
@@ -367,7 +367,7 @@ public abstract class Operation
             return id;
         }
 
-        public Operation prepare(ColumnDefinition receiver) throws InvalidRequestException
+        public Operation prepare(String keyspace, ColumnDefinition receiver) throws InvalidRequestException
         {
             // No validation, deleting a column is always "well typed"
             return new Constants.Deleter(receiver);
@@ -390,7 +390,7 @@ public abstract class Operation
             return id;
         }
 
-        public Operation prepare(ColumnDefinition receiver) throws InvalidRequestException
+        public Operation prepare(String keyspace, ColumnDefinition receiver) throws InvalidRequestException
         {
             if (!(receiver.type instanceof CollectionType))
                 throw new InvalidRequestException(String.format("Invalid deletion operation for non collection column %s", receiver));
@@ -398,13 +398,13 @@ public abstract class Operation
             switch (((CollectionType)receiver.type).kind)
             {
                 case LIST:
-                    Term idx = element.prepare(Lists.indexSpecOf(receiver));
+                    Term idx = element.prepare(keyspace, Lists.indexSpecOf(receiver));
                     return new Lists.DiscarderByIndex(receiver, idx);
                 case SET:
-                    Term elt = element.prepare(Sets.valueSpecOf(receiver));
+                    Term elt = element.prepare(keyspace, Sets.valueSpecOf(receiver));
                     return new Sets.Discarder(receiver, elt);
                 case MAP:
-                    Term key = element.prepare(Maps.keySpecOf(receiver));
+                    Term key = element.prepare(keyspace, Maps.keySpecOf(receiver));
                     return new Maps.DiscarderByKey(receiver, key);
             }
             throw new AssertionError();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d63d07b9/src/java/org/apache/cassandra/cql3/Sets.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/Sets.java b/src/java/org/apache/cassandra/cql3/Sets.java
index 2531b2a..dddea09 100644
--- a/src/java/org/apache/cassandra/cql3/Sets.java
+++ b/src/java/org/apache/cassandra/cql3/Sets.java
@@ -62,9 +62,9 @@ public abstract class Sets
             this.elements = elements;
         }
 
-        public Term prepare(ColumnSpecification receiver) throws InvalidRequestException
+        public Term prepare(String keyspace, ColumnSpecification receiver) throws InvalidRequestException
         {
-            validateAssignableTo(receiver);
+            validateAssignableTo(keyspace, receiver);
 
             // We've parsed empty maps as a set literal to break the ambiguity so
             // handle that case now
@@ -77,7 +77,7 @@ public abstract class Sets
             boolean allTerminal = true;
             for (Term.Raw rt : elements)
             {
-                Term t = rt.prepare(valueSpec);
+                Term t = rt.prepare(keyspace, valueSpec);
 
                 if (t.containsBindMarker())
                     throw new InvalidRequestException(String.format("Invalid set literal for %s: bind variables are not supported inside collection literals", receiver));
@@ -91,7 +91,7 @@ public abstract class Sets
             return allTerminal ? value.bind(Collections.<ByteBuffer>emptyList()) : value;
         }
 
-        private void validateAssignableTo(ColumnSpecification receiver) throws InvalidRequestException
+        private void validateAssignableTo(String keyspace, ColumnSpecification receiver) throws InvalidRequestException
         {
             if (!(receiver.type instanceof SetType))
             {
@@ -106,16 +106,16 @@ public abstract class Sets
             ColumnSpecification valueSpec = Sets.valueSpecOf(receiver);
             for (Term.Raw rt : elements)
             {
-                if (!rt.isAssignableTo(valueSpec))
+                if (!rt.isAssignableTo(keyspace, valueSpec))
                     throw new InvalidRequestException(String.format("Invalid set literal for %s: value %s is not of type %s", receiver, rt, valueSpec.type.asCQL3Type()));
             }
         }
 
-        public boolean isAssignableTo(ColumnSpecification receiver)
+        public boolean isAssignableTo(String keyspace, ColumnSpecification receiver)
         {
             try
             {
-                validateAssignableTo(receiver);
+                validateAssignableTo(keyspace, receiver);
                 return true;
             }
             catch (InvalidRequestException e)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d63d07b9/src/java/org/apache/cassandra/cql3/Term.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/Term.java b/src/java/org/apache/cassandra/cql3/Term.java
index d69fc33..d539ecf 100644
--- a/src/java/org/apache/cassandra/cql3/Term.java
+++ b/src/java/org/apache/cassandra/cql3/Term.java
@@ -88,7 +88,7 @@ public interface Term
          * case this RawTerm describe a list index or a map key, etc...
          * @return the prepared term.
          */
-        public Term prepare(ColumnSpecification receiver) throws InvalidRequestException;
+        public Term prepare(String keyspace, ColumnSpecification receiver) throws InvalidRequestException;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d63d07b9/src/java/org/apache/cassandra/cql3/TypeCast.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/TypeCast.java b/src/java/org/apache/cassandra/cql3/TypeCast.java
index 66b5300..45e8c5b 100644
--- a/src/java/org/apache/cassandra/cql3/TypeCast.java
+++ b/src/java/org/apache/cassandra/cql3/TypeCast.java
@@ -21,34 +21,41 @@ import org.apache.cassandra.exceptions.InvalidRequestException;
 
 public class TypeCast implements Term.Raw
 {
-    private final CQL3Type type;
+    private final CQL3Type.Raw type;
     private final Term.Raw term;
 
-    public TypeCast(CQL3Type type, Term.Raw term)
+    public TypeCast(CQL3Type.Raw type, Term.Raw term)
     {
         this.type = type;
         this.term = term;
     }
 
-    public Term prepare(ColumnSpecification receiver) throws InvalidRequestException
+    public Term prepare(String keyspace, ColumnSpecification receiver) throws InvalidRequestException
     {
-        if (!term.isAssignableTo(castedSpecOf(receiver)))
+        if (!term.isAssignableTo(keyspace, castedSpecOf(keyspace, receiver)))
             throw new InvalidRequestException(String.format("Cannot cast value %s to type %s", term, type));
 
-        if (!isAssignableTo(receiver))
+        if (!isAssignableTo(keyspace, receiver))
             throw new InvalidRequestException(String.format("Cannot assign value %s to %s of type %s", this, receiver, receiver.type.asCQL3Type()));
 
-        return term.prepare(receiver);
+        return term.prepare(keyspace, receiver);
     }
 
-    private ColumnSpecification castedSpecOf(ColumnSpecification receiver)
+    private ColumnSpecification castedSpecOf(String keyspace, ColumnSpecification receiver) throws InvalidRequestException
     {
-        return new ColumnSpecification(receiver.ksName, receiver.cfName, new ColumnIdentifier(toString(), true), type.getType());
+        return new ColumnSpecification(receiver.ksName, receiver.cfName, new ColumnIdentifier(toString(), true), type.prepare(keyspace).getType());
     }
 
-    public boolean isAssignableTo(ColumnSpecification receiver)
+    public boolean isAssignableTo(String keyspace, ColumnSpecification receiver) throws InvalidRequestException
     {
-        return receiver.type.asCQL3Type().equals(type);
+        try
+        {
+            return receiver.type.asCQL3Type().equals(type.prepare(keyspace));
+        }
+        catch (InvalidRequestException e)
+        {
+            throw new AssertionError();
+        }
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d63d07b9/src/java/org/apache/cassandra/cql3/UTName.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/UTName.java b/src/java/org/apache/cassandra/cql3/UTName.java
new file mode 100644
index 0000000..a157720
--- /dev/null
+++ b/src/java/org/apache/cassandra/cql3/UTName.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.cql3;
+
+import java.nio.ByteBuffer;
+
+public class UTName
+{
+    private String ksName;
+    private final ColumnIdentifier utName;
+
+    public UTName(ColumnIdentifier ksName, ColumnIdentifier utName)
+    {
+        this.ksName = ksName == null ? null : ksName.toString();
+        this.utName = utName;
+    }
+
+    public boolean hasKeyspace()
+    {
+        return ksName != null;
+    }
+
+    public void setKeyspace(String keyspace)
+    {
+        this.ksName = keyspace;
+    }
+
+    public String getKeyspace()
+    {
+        return ksName;
+    }
+
+    public ByteBuffer getUserTypeName()
+    {
+        return utName.bytes;
+    }
+
+    @Override
+    public String toString()
+    {
+        return (hasKeyspace() ? (ksName + ".") : "") + utName;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d63d07b9/src/java/org/apache/cassandra/cql3/UserTypes.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/UserTypes.java b/src/java/org/apache/cassandra/cql3/UserTypes.java
index dd546ee..2fd1a0f 100644
--- a/src/java/org/apache/cassandra/cql3/UserTypes.java
+++ b/src/java/org/apache/cassandra/cql3/UserTypes.java
@@ -47,9 +47,9 @@ public abstract class UserTypes
             this.entries = entries;
         }
 
-        public Term prepare(ColumnSpecification receiver) throws InvalidRequestException
+        public Term prepare(String keyspace, ColumnSpecification receiver) throws InvalidRequestException
         {
-            validateAssignableTo(receiver);
+            validateAssignableTo(keyspace, receiver);
 
             UserType ut = (UserType)receiver.type;
             boolean allTerminal = true;
@@ -57,7 +57,7 @@ public abstract class UserTypes
             for (int i = 0; i < ut.types.size(); i++)
             {
                 ColumnIdentifier field = new ColumnIdentifier(ut.columnNames.get(i), UTF8Type.instance);
-                Term value = entries.get(field).prepare(fieldSpecOf(receiver, i));
+                Term value = entries.get(field).prepare(keyspace, fieldSpecOf(receiver, i));
 
                 if (value instanceof Term.NonTerminal)
                     allTerminal = false;
@@ -68,7 +68,7 @@ public abstract class UserTypes
             return allTerminal ? value.bind(Collections.<ByteBuffer>emptyList()) : value;
         }
 
-        private void validateAssignableTo(ColumnSpecification receiver) throws InvalidRequestException
+        private void validateAssignableTo(String keyspace, ColumnSpecification receiver) throws InvalidRequestException
         {
             if (!(receiver.type instanceof UserType))
                 throw new InvalidRequestException(String.format("Invalid user type literal for %s of type %s", receiver, receiver.type.asCQL3Type()));
@@ -82,16 +82,16 @@ public abstract class UserTypes
                     throw new InvalidRequestException(String.format("Invalid user type literal for %s: missing field %s", receiver, field));
 
                 ColumnSpecification fieldSpec = fieldSpecOf(receiver, i);
-                if (!value.isAssignableTo(fieldSpec))
+                if (!value.isAssignableTo(keyspace, fieldSpec))
                     throw new InvalidRequestException(String.format("Invalid user type literal for %s: field %s is not of type %s", receiver, field, fieldSpec.type.asCQL3Type()));
             }
         }
 
-        public boolean isAssignableTo(ColumnSpecification receiver)
+        public boolean isAssignableTo(String keyspace, ColumnSpecification receiver)
         {
             try
             {
-                validateAssignableTo(receiver);
+                validateAssignableTo(keyspace, receiver);
                 return true;
             }
             catch (InvalidRequestException e)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d63d07b9/src/java/org/apache/cassandra/cql3/functions/FunctionCall.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/functions/FunctionCall.java b/src/java/org/apache/cassandra/cql3/functions/FunctionCall.java
index 8db03e6..083543a 100644
--- a/src/java/org/apache/cassandra/cql3/functions/FunctionCall.java
+++ b/src/java/org/apache/cassandra/cql3/functions/FunctionCall.java
@@ -102,15 +102,15 @@ public class FunctionCall extends Term.NonTerminal
             this.terms = terms;
         }
 
-        public Term prepare(ColumnSpecification receiver) throws InvalidRequestException
+        public Term prepare(String keyspace, ColumnSpecification receiver) throws InvalidRequestException
         {
-            Function fun = Functions.get(functionName, terms, receiver);
+            Function fun = Functions.get(keyspace, functionName, terms, receiver);
 
             List<Term> parameters = new ArrayList<Term>(terms.size());
             boolean allTerminal = true;
             for (int i = 0; i < terms.size(); i++)
             {
-                Term t = terms.get(i).prepare(Functions.makeArgSpec(receiver, fun, i));
+                Term t = terms.get(i).prepare(keyspace, Functions.makeArgSpec(receiver, fun, i));
                 if (t instanceof NonTerminal)
                     allTerminal = false;
                 parameters.add(t);
@@ -135,7 +135,7 @@ public class FunctionCall extends Term.NonTerminal
             return fun.execute(buffers);
         }
 
-        public boolean isAssignableTo(ColumnSpecification receiver)
+        public boolean isAssignableTo(String keyspace, ColumnSpecification receiver)
         {
             AbstractType<?> returnType = Functions.getReturnType(functionName, receiver.ksName, receiver.cfName);
             // Note: if returnType == null, it means the function doesn't exist. We may get this if an undefined function

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d63d07b9/src/java/org/apache/cassandra/cql3/functions/Functions.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/functions/Functions.java b/src/java/org/apache/cassandra/cql3/functions/Functions.java
index 97a0e91..27f8b3c 100644
--- a/src/java/org/apache/cassandra/cql3/functions/Functions.java
+++ b/src/java/org/apache/cassandra/cql3/functions/Functions.java
@@ -77,7 +77,7 @@ public abstract class Functions
                 fun.argsType().get(i));
     }
 
-    public static Function get(String name, List<? extends AssignementTestable> providedArgs, ColumnSpecification receiver) throws InvalidRequestException
+    public static Function get(String keyspace, String name, List<? extends AssignementTestable> providedArgs, ColumnSpecification receiver) throws InvalidRequestException
     {
         List<Function.Factory> factories = declared.get(name.toLowerCase());
         if (factories.isEmpty())
@@ -87,7 +87,7 @@ public abstract class Functions
         if (factories.size() == 1)
         {
             Function fun = factories.get(0).create(receiver.ksName, receiver.cfName);
-            validateTypes(fun, providedArgs, receiver);
+            validateTypes(keyspace, fun, providedArgs, receiver);
             return fun;
         }
 
@@ -95,7 +95,7 @@ public abstract class Functions
         for (Function.Factory factory : factories)
         {
             Function toTest = factory.create(receiver.ksName, receiver.cfName);
-            if (!isValidType(toTest, providedArgs, receiver))
+            if (!isValidType(keyspace, toTest, providedArgs, receiver))
                 continue;
 
             if (candidate == null)
@@ -108,7 +108,7 @@ public abstract class Functions
         return candidate;
     }
 
-    private static void validateTypes(Function fun, List<? extends AssignementTestable> providedArgs, ColumnSpecification receiver) throws InvalidRequestException
+    private static void validateTypes(String keyspace, Function fun, List<? extends AssignementTestable> providedArgs, ColumnSpecification receiver) throws InvalidRequestException
     {
         if (!receiver.type.asCQL3Type().equals(fun.returnType().asCQL3Type()))
             throw new InvalidRequestException(String.format("Type error: cannot assign result of function %s (type %s) to %s (type %s)", fun.name(), fun.returnType().asCQL3Type(), receiver, receiver.type.asCQL3Type()));
@@ -126,12 +126,12 @@ public abstract class Functions
                 continue;
 
             ColumnSpecification expected = makeArgSpec(receiver, fun, i);
-            if (!provided.isAssignableTo(expected))
+            if (!provided.isAssignableTo(keyspace, expected))
                 throw new InvalidRequestException(String.format("Type error: %s cannot be passed as argument %d of function %s of type %s", provided, i, fun.name(), expected.type.asCQL3Type()));
         }
     }
 
-    private static boolean isValidType(Function fun, List<? extends AssignementTestable> providedArgs, ColumnSpecification receiver)
+    private static boolean isValidType(String keyspace, Function fun, List<? extends AssignementTestable> providedArgs, ColumnSpecification receiver) throws InvalidRequestException
     {
         if (!receiver.type.asCQL3Type().equals(fun.returnType().asCQL3Type()))
             return false;
@@ -149,7 +149,7 @@ public abstract class Functions
                 continue;
 
             ColumnSpecification expected = makeArgSpec(receiver, fun, i);
-            if (!provided.isAssignableTo(expected))
+            if (!provided.isAssignableTo(keyspace, expected))
                 return false;
         }
         return true;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d63d07b9/src/java/org/apache/cassandra/cql3/statements/AlterTableStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/AlterTableStatement.java b/src/java/org/apache/cassandra/cql3/statements/AlterTableStatement.java
index eb52a13..6f90a02 100644
--- a/src/java/org/apache/cassandra/cql3/statements/AlterTableStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/AlterTableStatement.java
@@ -41,12 +41,12 @@ public class AlterTableStatement extends SchemaAlteringStatement
     }
 
     public final Type oType;
-    public final CQL3Type validator;
+    public final CQL3Type.Raw validator;
     public final ColumnIdentifier columnName;
     private final CFPropDefs cfProps;
     private final Map<ColumnIdentifier, ColumnIdentifier> renames;
 
-    public AlterTableStatement(CFName name, Type type, ColumnIdentifier columnName, CQL3Type validator, CFPropDefs cfProps, Map<ColumnIdentifier, ColumnIdentifier> renames)
+    public AlterTableStatement(CFName name, Type type, ColumnIdentifier columnName, CQL3Type.Raw validator, CFPropDefs cfProps, Map<ColumnIdentifier, ColumnIdentifier> renames)
     {
         super(name);
         this.oType = type;
@@ -71,6 +71,8 @@ public class AlterTableStatement extends SchemaAlteringStatement
         CFMetaData meta = validateColumnFamily(keyspace(), columnFamily());
         CFMetaData cfm = meta.clone();
 
+        CQL3Type validator = this.validator.prepare(keyspace());
+
         ColumnDefinition def = columnName == null ? null : cfm.getColumnDefinition(columnName);
         switch (oType)
         {