You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ad...@apache.org on 2022/11/18 10:37:20 UTC

[cassandra] branch trunk updated: Add support for CQL functions on collections, tuples and UDTs

This is an automated email from the ASF dual-hosted git repository.

adelapena pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 6da9e33602 Add support for CQL functions on collections, tuples and UDTs
6da9e33602 is described below

commit 6da9e33602fad4b8bf9466dc0e9a73665469a195
Author: Andrés de la Peña <a....@gmail.com>
AuthorDate: Fri Sep 2 13:39:10 2022 +0100

    Add support for CQL functions on collections, tuples and UDTs
    
    patch by Andrés de la Peña; reviewed by Benjamin Lerer for CASSANDRA-17811
---
 CHANGES.txt                                        |   1 +
 .../apache/cassandra/auth/FunctionResource.java    |  17 +-
 .../apache/cassandra/cql3/AssignmentTestable.java  |  23 ++
 src/java/org/apache/cassandra/cql3/Constants.java  |  63 +++-
 src/java/org/apache/cassandra/cql3/Lists.java      |  19 +-
 src/java/org/apache/cassandra/cql3/Maps.java       |  26 +-
 .../org/apache/cassandra/cql3/QueryProcessor.java  |   2 +-
 src/java/org/apache/cassandra/cql3/Sets.java       |  18 +-
 src/java/org/apache/cassandra/cql3/Term.java       |   8 +-
 src/java/org/apache/cassandra/cql3/Tuples.java     |   4 +-
 .../cassandra/cql3/functions/AbstractFunction.java |  36 ++
 .../cassandra/cql3/functions/AggregateFcts.java    | 105 +++---
 .../cql3/functions/BytesConversionFcts.java        |  16 +-
 .../apache/cassandra/cql3/functions/CastFcts.java  |  16 +-
 .../cassandra/cql3/functions/FromJsonFct.java      |  19 +-
 .../cassandra/cql3/functions/FunctionCall.java     |  27 +-
 .../cassandra/cql3/functions/FunctionFactory.java  | 115 ++++++
 .../cql3/functions/FunctionParameter.java          | 109 ++++++
 .../cassandra/cql3/functions/FunctionResolver.java |  56 ++-
 .../cassandra/cql3/functions/NativeFunction.java   |   2 +-
 .../cassandra/cql3/functions/NativeFunctions.java  | 101 +++++
 .../cassandra/cql3/functions/OperationFcts.java    |  10 +-
 .../apache/cassandra/cql3/functions/TimeFcts.java  |  59 ++-
 .../apache/cassandra/cql3/functions/ToJsonFct.java |  14 +-
 .../apache/cassandra/cql3/functions/TokenFct.java  |  39 +-
 .../cassandra/cql3/functions/UDAggregate.java      |  14 +-
 .../cassandra/cql3/functions/UDFunction.java       |  10 +-
 .../{NativeFunction.java => UserFunction.java}     |  23 +-
 .../apache/cassandra/cql3/functions/UuidFcts.java  |   6 +-
 .../cql3/selection/AbstractFunctionSelector.java   |  14 +-
 .../cassandra/cql3/selection/Selectable.java       | 100 +++--
 .../cql3/statements/DescribeStatement.java         |  26 +-
 .../statements/PermissionsManagementStatement.java |  10 +-
 .../cql3/statements/schema/AlterTypeStatement.java |   2 +-
 .../schema/CreateAggregateStatement.java           |  23 +-
 .../statements/schema/CreateFunctionStatement.java |  10 +-
 .../statements/schema/DropAggregateStatement.java  |  18 +-
 .../statements/schema/DropFunctionStatement.java   |  22 +-
 .../cql3/statements/schema/DropTypeStatement.java  |   4 +-
 .../org/apache/cassandra/db/SystemKeyspace.java    |  22 +-
 .../apache/cassandra/db/marshal/AbstractType.java  |   6 +
 .../cassandra/io/sstable/CQLSSTableWriter.java     |  10 +-
 .../apache/cassandra/schema/DistributedSchema.java |   2 +-
 .../apache/cassandra/schema/KeyspaceMetadata.java  |  52 +--
 src/java/org/apache/cassandra/schema/Schema.java   |  18 +-
 .../cassandra/schema/SchemaChangeNotifier.java     |   8 +-
 .../org/apache/cassandra/schema/SchemaEvent.java   |   2 +-
 .../apache/cassandra/schema/SchemaKeyspace.java    |  14 +-
 .../schema/{Functions.java => UserFunctions.java}  | 112 ++----
 test/unit/org/apache/cassandra/SchemaLoader.java   |   2 +-
 .../cassandra/cql3/AssignmentTestableTest.java     |  58 +++
 .../cql3/functions/FunctionFactoryTest.java        | 414 +++++++++++++++++++++
 .../cql3/validation/entities/JsonTest.java         |  30 +-
 .../cql3/validation/entities/UFAuthTest.java       |   4 +-
 .../cql3/validation/entities/UFJavaTest.java       |   8 +-
 .../cassandra/cql3/validation/entities/UFTest.java |  18 +-
 .../validation/entities/WritetimeOrTTLTest.java    |  20 +-
 .../validation/operations/AggregationTest.java     | 112 +++++-
 58 files changed, 1564 insertions(+), 535 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index fd4903ef48..1728a8db8d 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.2
+ * Add support for CQL functions on collections, tuples and UDTs (CASSANDRA-17811)
  * Add flag to exclude nodes from local DC when running nodetool rebuild (CASSANDRA-17870)
  * Adding endpoint verification option to client_encryption_options (CASSANDRA-18034)
  * Replace 'wcwidth.py' with pypi module (CASSANDRA-17287)
diff --git a/src/java/org/apache/cassandra/auth/FunctionResource.java b/src/java/org/apache/cassandra/auth/FunctionResource.java
index 470919002f..b3ed3fe58d 100644
--- a/src/java/org/apache/cassandra/auth/FunctionResource.java
+++ b/src/java/org/apache/cassandra/auth/FunctionResource.java
@@ -29,13 +29,14 @@ import com.google.common.base.Splitter;
 import com.google.common.collect.Sets;
 import org.apache.commons.lang3.StringUtils;
 
+import org.apache.cassandra.cql3.functions.UserFunction;
 import org.apache.cassandra.schema.Schema;
 import org.apache.cassandra.cql3.CQL3Type;
-import org.apache.cassandra.cql3.functions.Function;
 import org.apache.cassandra.cql3.functions.FunctionName;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.db.marshal.TypeParser;
 import org.apache.cassandra.exceptions.InvalidRequestException;
+import org.apache.cassandra.schema.SchemaConstants;
 
 /**
  * IResource implementation representing functions.
@@ -135,7 +136,7 @@ public class FunctionResource implements IResource
         return new FunctionResource(keyspace, name, argTypes);
     }
 
-    public static FunctionResource function(Function function)
+    public static FunctionResource function(UserFunction function)
     {
         return new FunctionResource(function.name().keyspace, function.name().name, function.argTypes());
     }
@@ -254,6 +255,7 @@ public class FunctionResource implements IResource
 
     public boolean exists()
     {
+        validate();
         switch (level)
         {
             case ROOT:
@@ -261,13 +263,14 @@ public class FunctionResource implements IResource
             case KEYSPACE:
                 return Schema.instance.getKeyspaces().contains(keyspace);
             case FUNCTION:
-                return Schema.instance.findFunction(getFunctionName(), argTypes).isPresent();
+                return Schema.instance.findUserFunction(getFunctionName(), argTypes).isPresent();
         }
         throw new AssertionError();
     }
 
     public Set<Permission> applicablePermissions()
     {
+        validate();
         switch (level)
         {
             case ROOT:
@@ -275,7 +278,7 @@ public class FunctionResource implements IResource
                 return COLLECTION_LEVEL_PERMISSIONS;
             case FUNCTION:
             {
-                Optional<Function> function = Schema.instance.findFunction(getFunctionName(), argTypes);
+                Optional<UserFunction> function = Schema.instance.findUserFunction(getFunctionName(), argTypes);
                 assert function.isPresent() : "Unable to find function object for resource " + toString();
                 return function.get().isAggregate() ? AGGREGATE_FUNCTION_PERMISSIONS : SCALAR_FUNCTION_PERMISSIONS;
             }
@@ -283,6 +286,12 @@ public class FunctionResource implements IResource
         throw new AssertionError();
     }
 
+    private void validate()
+    {
+        if (SchemaConstants.SYSTEM_KEYSPACE_NAME.equals(keyspace))
+            throw new InvalidRequestException("Altering permissions on builtin functions is not supported");
+    }
+
     public int compareTo(FunctionResource o)
     {
         return this.name.compareTo(o.name);
diff --git a/src/java/org/apache/cassandra/cql3/AssignmentTestable.java b/src/java/org/apache/cassandra/cql3/AssignmentTestable.java
index 41b80eb87b..fdf21b9f5b 100644
--- a/src/java/org/apache/cassandra/cql3/AssignmentTestable.java
+++ b/src/java/org/apache/cassandra/cql3/AssignmentTestable.java
@@ -19,6 +19,10 @@ package org.apache.cassandra.cql3;
 
 import java.util.Collection;
 
+import javax.annotation.Nullable;
+
+import org.apache.cassandra.db.marshal.AbstractType;
+
 public interface AssignmentTestable
 {
     /**
@@ -32,6 +36,25 @@ public interface AssignmentTestable
      */
     public TestResult testAssignment(String keyspace, ColumnSpecification receiver);
 
+    /**
+     * @return A data type that can represent this, or {@code null} if we can't determine that type. The returned type
+     * won't necessarely be the exact type, but one that is compatible with it.
+     */
+    @Nullable
+    public AbstractType<?> getCompatibleTypeIfKnown(String keyspace);
+
+    /**
+     * @return A data type that can represent all the specified types, or {@code null} if there isn't one.
+     */
+    @Nullable
+    public static AbstractType<?> getCompatibleTypeIfKnown(Collection<AbstractType<?>> types)
+    {
+        return types.stream()
+                    .filter(type -> types.stream().allMatch(t -> t.testAssignment(type).isAssignable()))
+                    .findFirst()
+                    .orElse(null);
+    }
+
     public enum TestResult
     {
         EXACT_MATCH, WEAKLY_ASSIGNABLE, NOT_ASSIGNABLE;
diff --git a/src/java/org/apache/cassandra/cql3/Constants.java b/src/java/org/apache/cassandra/cql3/Constants.java
index 327b3f7bef..803f3985fa 100644
--- a/src/java/org/apache/cassandra/cql3/Constants.java
+++ b/src/java/org/apache/cassandra/cql3/Constants.java
@@ -20,17 +20,15 @@ package org.apache.cassandra.cql3;
 import java.math.BigDecimal;
 import java.math.BigInteger;
 import java.nio.ByteBuffer;
-import java.nio.charset.Charset;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import java.nio.charset.StandardCharsets;
 
+import org.apache.cassandra.db.Clustering;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.marshal.*;
 import org.apache.cassandra.db.rows.Cell;
 import org.apache.cassandra.db.rows.Row;
-import org.apache.cassandra.schema.ColumnMetadata;
-import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.marshal.*;
 import org.apache.cassandra.exceptions.InvalidRequestException;
+import org.apache.cassandra.schema.ColumnMetadata;
 import org.apache.cassandra.serializers.MarshalException;
 import org.apache.cassandra.transport.ProtocolVersion;
 import org.apache.cassandra.utils.ByteBufferUtil;
@@ -41,15 +39,14 @@ import org.apache.cassandra.utils.FastByteOperations;
  */
 public abstract class Constants
 {
-    private static final Logger logger = LoggerFactory.getLogger(Constants.class);
-
     public enum Type
     {
         STRING
         {
+            @Override
             public AbstractType<?> getPreferedTypeFor(String text)
             {
-                 if(Charset.forName("US-ASCII").newEncoder().canEncode(text))
+                 if (StandardCharsets.US_ASCII.newEncoder().canEncode(text))
                  {
                      return AsciiType.instance;
                  }
@@ -59,6 +56,7 @@ public abstract class Constants
         },
         INTEGER
         {
+            @Override
             public AbstractType<?> getPreferedTypeFor(String text)
             {
                 // We only try to determine the smallest possible type between int, long and BigInteger
@@ -73,9 +71,19 @@ public abstract class Constants
                 return IntegerType.instance;
             }
         },
-        UUID,
+        UUID
+        {
+            @Override
+            public AbstractType<?> getPreferedTypeFor(String text)
+            {
+                return java.util.UUID.fromString(text).version() == 1
+                       ? TimeUUIDType.instance
+                       : UUIDType.instance;
+            }
+        },
         FLOAT
         {
+            @Override
             public AbstractType<?> getPreferedTypeFor(String text)
             {
                 if ("NaN".equals(text) || "-NaN".equals(text) || "Infinity".equals(text) || "-Infinity".equals(text))
@@ -90,9 +98,30 @@ public abstract class Constants
                 return DecimalType.instance;
             }
         },
-        BOOLEAN,
-        HEX,
-        DURATION;
+        BOOLEAN
+        {
+            @Override
+            public AbstractType<?> getPreferedTypeFor(String text)
+            {
+                return BooleanType.instance;
+            }
+        },
+        HEX
+        {
+            @Override
+            public AbstractType<?> getPreferedTypeFor(String text)
+            {
+                return ByteType.instance;
+            }
+        },
+        DURATION
+        {
+            @Override
+            public AbstractType<?> getPreferedTypeFor(String text)
+            {
+                return DurationType.instance;
+            }
+        };
 
         /**
          * Returns the exact type for the specified text
@@ -362,6 +391,12 @@ public abstract class Constants
             return null;
         }
 
+        @Override
+        public AbstractType<?> getCompatibleTypeIfKnown(String keyspace)
+        {
+            return preferedType;
+        }
+
         public String getRawText()
         {
             return text;
diff --git a/src/java/org/apache/cassandra/cql3/Lists.java b/src/java/org/apache/cassandra/cql3/Lists.java
index bdac046998..7b6234ed27 100644
--- a/src/java/org/apache/cassandra/cql3/Lists.java
+++ b/src/java/org/apache/cassandra/cql3/Lists.java
@@ -26,6 +26,7 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Objects;
 import java.util.Optional;
+import java.util.Set;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
 import java.util.stream.StreamSupport;
@@ -127,13 +128,21 @@ public abstract class Lists
      * @param mapper the mapper used to retrieve the element types from the items
      * @return the exact ListType from the items if it can be known or <code>null</code>
      */
-    public static <T> AbstractType<?> getExactListTypeIfKnown(List<T> items,
-                                                              java.util.function.Function<T, AbstractType<?>> mapper)
+    public static <T> ListType<?> getExactListTypeIfKnown(List<T> items,
+                                                          java.util.function.Function<T, AbstractType<?>> mapper)
     {
         Optional<AbstractType<?>> type = items.stream().map(mapper).filter(Objects::nonNull).findFirst();
         return type.isPresent() ? ListType.getInstance(type.get(), false) : null;
     }
 
+    public static <T> ListType<?> getPreferredCompatibleType(List<T> items,
+                                                             java.util.function.Function<T, AbstractType<?>> mapper)
+    {
+        Set<AbstractType<?>> types = items.stream().map(mapper).filter(Objects::nonNull).collect(Collectors.toSet());
+        AbstractType<?> type = AssignmentTestable.getCompatibleTypeIfKnown(types);
+        return type == null ? null : ListType.getInstance(type, false);
+    }
+
     public static class Literal extends Term.Raw
     {
         private final List<Term.Raw> elements;
@@ -192,6 +201,12 @@ public abstract class Lists
             return getExactListTypeIfKnown(elements, p -> p.getExactTypeIfKnown(keyspace));
         }
 
+        @Override
+        public AbstractType<?> getCompatibleTypeIfKnown(String keyspace)
+        {
+            return Lists.getPreferredCompatibleType(elements, p -> p.getCompatibleTypeIfKnown(keyspace));
+        }
+
         public String getText()
         {
             return listToString(elements, Term.Raw::getText);
diff --git a/src/java/org/apache/cassandra/cql3/Maps.java b/src/java/org/apache/cassandra/cql3/Maps.java
index a2d23a699a..96db270c18 100644
--- a/src/java/org/apache/cassandra/cql3/Maps.java
+++ b/src/java/org/apache/cassandra/cql3/Maps.java
@@ -128,8 +128,8 @@ public abstract class Maps
      * @param mapper the mapper used to retrieve the key and value types from the entries
      * @return the exact MapType from the entries if it can be known or <code>null</code>
      */
-    public static <T> AbstractType<?> getExactMapTypeIfKnown(List<Pair<T, T>> entries,
-                                                             java.util.function.Function<T, AbstractType<?>> mapper)
+    public static <T> MapType<?, ?> getExactMapTypeIfKnown(List<Pair<T, T>> entries,
+                                                           java.util.function.Function<T, AbstractType<?>> mapper)
     {
         AbstractType<?> keyType = null;
         AbstractType<?> valueType = null;
@@ -145,6 +145,22 @@ public abstract class Maps
         return null;
     }
 
+    public static <T> MapType<?, ?> getPreferredCompatibleType(List<Pair<T, T>> entries,
+                                                               java.util.function.Function<T, AbstractType<?>> mapper)
+    {
+        Set<AbstractType<?>> keyTypes = entries.stream().map(Pair::left).map(mapper).filter(Objects::nonNull).collect(Collectors.toSet());
+        AbstractType<?> keyType = AssignmentTestable.getCompatibleTypeIfKnown(keyTypes);
+        if (keyType == null)
+            return null;
+
+        Set<AbstractType<?>> valueTypes = entries.stream().map(Pair::right).map(mapper).filter(Objects::nonNull).collect(Collectors.toSet());
+        AbstractType<?> valueType = AssignmentTestable.getCompatibleTypeIfKnown(valueTypes);
+        if (valueType == null)
+            return null;
+
+        return  MapType.getInstance(keyType, valueType, false);
+    }
+
     public static class Literal extends Term.Raw
     {
         public final List<Pair<Term.Raw, Term.Raw>> entries;
@@ -208,6 +224,12 @@ public abstract class Maps
             return getExactMapTypeIfKnown(entries, p -> p.getExactTypeIfKnown(keyspace));
         }
 
+        @Override
+        public AbstractType<?> getCompatibleTypeIfKnown(String keyspace)
+        {
+            return Maps.getPreferredCompatibleType(entries, p -> p.getCompatibleTypeIfKnown(keyspace));
+        }
+
         public String getText()
         {
             return mapToString(entries, Term.Raw::getText);
diff --git a/src/java/org/apache/cassandra/cql3/QueryProcessor.java b/src/java/org/apache/cassandra/cql3/QueryProcessor.java
index 188cb8aa0c..3c1bfd96ce 100644
--- a/src/java/org/apache/cassandra/cql3/QueryProcessor.java
+++ b/src/java/org/apache/cassandra/cql3/QueryProcessor.java
@@ -1021,7 +1021,7 @@ public class QueryProcessor implements QueryHandler
         {
             // in case there are other overloads, we have to remove all overloads since argument type
             // matching may change (due to type casting)
-            if (Schema.instance.getKeyspaceMetadata(ksName).functions.get(new FunctionName(ksName, functionName)).size() > 1)
+            if (Schema.instance.getKeyspaceMetadata(ksName).userFunctions.get(new FunctionName(ksName, functionName)).size() > 1)
                 removeInvalidPreparedStatementsForFunction(ksName, functionName);
         }
 
diff --git a/src/java/org/apache/cassandra/cql3/Sets.java b/src/java/org/apache/cassandra/cql3/Sets.java
index 104a857832..ee27e2aa6f 100644
--- a/src/java/org/apache/cassandra/cql3/Sets.java
+++ b/src/java/org/apache/cassandra/cql3/Sets.java
@@ -117,13 +117,21 @@ public abstract class Sets
      * @param mapper the mapper used to retrieve the element types from the items
      * @return the exact SetType from the items if it can be known or <code>null</code>
      */
-    public static <T> AbstractType<?> getExactSetTypeIfKnown(List<T> items,
-                                                             java.util.function.Function<T, AbstractType<?>> mapper)
+    public static <T> SetType<?> getExactSetTypeIfKnown(List<T> items,
+                                                        java.util.function.Function<T, AbstractType<?>> mapper)
     {
         Optional<AbstractType<?>> type = items.stream().map(mapper).filter(Objects::nonNull).findFirst();
         return type.isPresent() ? SetType.getInstance(type.get(), false) : null;
     }
 
+    public static <T> SetType<?> getPreferredCompatibleType(List<T> items,
+                                                            java.util.function.Function<T, AbstractType<?>> mapper)
+    {
+        Set<AbstractType<?>> types = items.stream().map(mapper).filter(Objects::nonNull).collect(Collectors.toSet());
+        AbstractType<?> type = AssignmentTestable.getCompatibleTypeIfKnown(types);
+        return type == null ? null : SetType.getInstance(type, false);
+    }
+
     public static class Literal extends Term.Raw
     {
         private final List<Term.Raw> elements;
@@ -194,6 +202,12 @@ public abstract class Sets
             return getExactSetTypeIfKnown(elements, p -> p.getExactTypeIfKnown(keyspace));
         }
 
+        @Override
+        public AbstractType<?> getCompatibleTypeIfKnown(String keyspace)
+        {
+            return Sets.getPreferredCompatibleType(elements, p -> p.getCompatibleTypeIfKnown(keyspace));
+        }
+
         public String getText()
         {
             return setToString(elements, Term.Raw::getText);
diff --git a/src/java/org/apache/cassandra/cql3/Term.java b/src/java/org/apache/cassandra/cql3/Term.java
index f536baa539..aaea1132c2 100644
--- a/src/java/org/apache/cassandra/cql3/Term.java
+++ b/src/java/org/apache/cassandra/cql3/Term.java
@@ -113,12 +113,18 @@ public interface Term
          *
          * @param keyspace the keyspace on which the statement containing this term is on.
          * @return the type of this {@code Term} if inferrable, or {@code null}
-         * otherwise (for instance, the type isn't inferable for a bind marker. Even for
+         * otherwise (for instance, the type isn't inferrable for a bind marker. Even for
          * literals, the exact type is not inferrable since they are valid for many
          * different types and so this will return {@code null} too).
          */
         public abstract AbstractType<?> getExactTypeIfKnown(String keyspace);
 
+        @Override
+        public AbstractType<?> getCompatibleTypeIfKnown(String keyspace)
+        {
+            return getExactTypeIfKnown(keyspace);
+        }
+
         @Override
         public String toString()
         {
diff --git a/src/java/org/apache/cassandra/cql3/Tuples.java b/src/java/org/apache/cassandra/cql3/Tuples.java
index 6e028c274d..34f630e48d 100644
--- a/src/java/org/apache/cassandra/cql3/Tuples.java
+++ b/src/java/org/apache/cassandra/cql3/Tuples.java
@@ -454,8 +454,8 @@ public class Tuples
      * @param mapper the mapper used to retrieve the element types from the  items
      * @return the exact TupleType from the items if it can be known or <code>null</code>
      */
-    public static <T> AbstractType<?> getExactTupleTypeIfKnown(List<T> items,
-                                                               java.util.function.Function<T, AbstractType<?>> mapper)
+    public static <T> TupleType getExactTupleTypeIfKnown(List<T> items,
+                                                         java.util.function.Function<T, AbstractType<?>> mapper)
     {
         List<AbstractType<?>> types = new ArrayList<>(items.size());
         for (T item : items)
diff --git a/src/java/org/apache/cassandra/cql3/functions/AbstractFunction.java b/src/java/org/apache/cassandra/cql3/functions/AbstractFunction.java
index aab20467f4..c3183f6413 100644
--- a/src/java/org/apache/cassandra/cql3/functions/AbstractFunction.java
+++ b/src/java/org/apache/cassandra/cql3/functions/AbstractFunction.java
@@ -117,6 +117,12 @@ public abstract class AbstractFunction implements Function
         return AssignmentTestable.TestResult.NOT_ASSIGNABLE;
     }
 
+    @Override
+    public AbstractType<?> getCompatibleTypeIfKnown(String keyspace)
+    {
+        return returnType();
+    }
+
     @Override
     public String toString()
     {
@@ -165,4 +171,34 @@ public abstract class AbstractFunction implements Function
                                                 .append(')')
                                                 .toString();
     }
+
+    /*
+     * We need to compare the CQL3 representation of the type because comparing
+     * the AbstractType will fail for example if a UDT has been changed.
+     * Reason is that UserType.equals() takes the field names and types into account.
+     * Example CQL sequence that would fail when comparing AbstractType:
+     *    CREATE TYPE foo ...
+     *    CREATE FUNCTION bar ( par foo ) RETURNS foo ...
+     *    ALTER TYPE foo ADD ...
+     * or
+     *    ALTER TYPE foo ALTER ...
+     * or
+     *    ALTER TYPE foo RENAME ...
+     */
+    public boolean typesMatch(List<AbstractType<?>> types)
+    {
+        if (argTypes().size() != types.size())
+            return false;
+
+        for (int i = 0; i < argTypes().size(); i++)
+            if (!typesMatch(argTypes().get(i), types.get(i)))
+                return false;
+
+        return true;
+    }
+
+    private static boolean typesMatch(AbstractType<?> t1, AbstractType<?> t2)
+    {
+        return t1.freeze().asCQL3Type().toString().equals(t2.freeze().asCQL3Type().toString());
+    }
 }
diff --git a/src/java/org/apache/cassandra/cql3/functions/AggregateFcts.java b/src/java/org/apache/cassandra/cql3/functions/AggregateFcts.java
index 5797de4a43..77243c622e 100644
--- a/src/java/org/apache/cassandra/cql3/functions/AggregateFcts.java
+++ b/src/java/org/apache/cassandra/cql3/functions/AggregateFcts.java
@@ -21,13 +21,8 @@ import java.math.BigDecimal;
 import java.math.BigInteger;
 import java.math.RoundingMode;
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashSet;
 import java.util.List;
-import java.util.Set;
 
-import org.apache.cassandra.cql3.CQL3Type;
 import org.apache.cassandra.db.marshal.*;
 import org.apache.cassandra.exceptions.InvalidRequestException;
 import org.apache.cassandra.transport.ProtocolVersion;
@@ -37,10 +32,8 @@ import org.apache.cassandra.transport.ProtocolVersion;
  */
 public abstract class AggregateFcts
 {
-    public static Collection<AggregateFunction> all()
+    public static void addFunctionsTo(NativeFunctions functions)
     {
-        Collection<AggregateFunction> functions = new ArrayList<>();
-
         functions.add(countRowsFunction);
 
         // sum for primitives
@@ -65,35 +58,37 @@ public abstract class AggregateFcts
         functions.add(avgFunctionForVarint);
         functions.add(avgFunctionForCounter);
 
-        // count, max, and min for all standard types
-        Set<AbstractType<?>> types = new HashSet<>();
-        for (CQL3Type type : CQL3Type.Native.values())
-        {
-            AbstractType<?> udfType = type.getType().udfType();
-            if (!types.add(udfType))
-                continue;
+        // count for all types
+        functions.add(makeCountFunction(BytesType.instance));
 
-            functions.add(AggregateFcts.makeCountFunction(udfType));
-            if (type != CQL3Type.Native.COUNTER)
+        // max for all types
+        functions.add(new FunctionFactory("max", FunctionParameter.anyType(true))
+        {
+            @Override
+            protected NativeFunction doGetOrCreateFunction(List<AbstractType<?>> argTypes, AbstractType<?> receiverType)
             {
-                functions.add(AggregateFcts.makeMaxFunction(udfType));
-                functions.add(AggregateFcts.makeMinFunction(udfType));
+                AbstractType<?> type = argTypes.get(0);
+                return type.isCounter() ? maxFunctionForCounter : makeMaxFunction(type);
             }
-            else
+        });
+
+        // min for all types
+        functions.add(new FunctionFactory("min", FunctionParameter.anyType(true))
+        {
+            @Override
+            protected NativeFunction doGetOrCreateFunction(List<AbstractType<?>> argTypes, AbstractType<?> receiverType)
             {
-                functions.add(AggregateFcts.maxFunctionForCounter);
-                functions.add(AggregateFcts.minFunctionForCounter);
+                AbstractType<?> type = argTypes.get(0);
+                return type.isCounter() ? minFunctionForCounter : makeMinFunction(type);
             }
-        }
-
-        return functions;
+        });
     }
 
     /**
      * The function used to count the number of rows of a result set. This function is called when COUNT(*) or COUNT(1)
      * is specified.
      */
-    public static final AggregateFunction countRowsFunction =
+    public static final NativeAggregateFunction countRowsFunction =
             new NativeAggregateFunction("countRows", LongType.instance)
             {
                 public Aggregate newAggregate()
@@ -129,7 +124,7 @@ public abstract class AggregateFcts
     /**
      * The SUM function for decimal values.
      */
-    public static final AggregateFunction sumFunctionForDecimal =
+    public static final NativeAggregateFunction sumFunctionForDecimal =
             new NativeAggregateFunction("sum", DecimalType.instance, DecimalType.instance)
             {
                 @Override
@@ -166,7 +161,7 @@ public abstract class AggregateFcts
     /**
      * The AVG function for decimal values.
      */
-    public static final AggregateFunction avgFunctionForDecimal =
+    public static final NativeAggregateFunction avgFunctionForDecimal =
             new NativeAggregateFunction("avg", DecimalType.instance, DecimalType.instance)
             {
                 public Aggregate newAggregate()
@@ -209,7 +204,7 @@ public abstract class AggregateFcts
     /**
      * The SUM function for varint values.
      */
-    public static final AggregateFunction sumFunctionForVarint =
+    public static final NativeAggregateFunction sumFunctionForVarint =
             new NativeAggregateFunction("sum", IntegerType.instance, IntegerType.instance)
             {
                 public Aggregate newAggregate()
@@ -245,7 +240,7 @@ public abstract class AggregateFcts
     /**
      * The AVG function for varint values.
      */
-    public static final AggregateFunction avgFunctionForVarint =
+    public static final NativeAggregateFunction avgFunctionForVarint =
             new NativeAggregateFunction("avg", IntegerType.instance, IntegerType.instance)
             {
                 public Aggregate newAggregate()
@@ -288,7 +283,7 @@ public abstract class AggregateFcts
     /**
      * The SUM function for byte values (tinyint).
      */
-    public static final AggregateFunction sumFunctionForByte =
+    public static final NativeAggregateFunction sumFunctionForByte =
             new NativeAggregateFunction("sum", ByteType.instance, ByteType.instance)
             {
                 public Aggregate newAggregate()
@@ -324,7 +319,7 @@ public abstract class AggregateFcts
     /**
      * AVG function for byte values (tinyint).
      */
-    public static final AggregateFunction avgFunctionForByte =
+    public static final NativeAggregateFunction avgFunctionForByte =
             new NativeAggregateFunction("avg", ByteType.instance, ByteType.instance)
             {
                 public Aggregate newAggregate()
@@ -342,7 +337,7 @@ public abstract class AggregateFcts
     /**
      * The SUM function for short values (smallint).
      */
-    public static final AggregateFunction sumFunctionForShort =
+    public static final NativeAggregateFunction sumFunctionForShort =
             new NativeAggregateFunction("sum", ShortType.instance, ShortType.instance)
             {
                 public Aggregate newAggregate()
@@ -378,7 +373,7 @@ public abstract class AggregateFcts
     /**
      * AVG function for for short values (smallint).
      */
-    public static final AggregateFunction avgFunctionForShort =
+    public static final NativeAggregateFunction avgFunctionForShort =
             new NativeAggregateFunction("avg", ShortType.instance, ShortType.instance)
             {
                 public Aggregate newAggregate()
@@ -396,7 +391,7 @@ public abstract class AggregateFcts
     /**
      * The SUM function for int32 values.
      */
-    public static final AggregateFunction sumFunctionForInt32 =
+    public static final NativeAggregateFunction sumFunctionForInt32 =
             new NativeAggregateFunction("sum", Int32Type.instance, Int32Type.instance)
             {
                 public Aggregate newAggregate()
@@ -432,7 +427,7 @@ public abstract class AggregateFcts
     /**
      * AVG function for int32 values.
      */
-    public static final AggregateFunction avgFunctionForInt32 =
+    public static final NativeAggregateFunction avgFunctionForInt32 =
             new NativeAggregateFunction("avg", Int32Type.instance, Int32Type.instance)
             {
                 public Aggregate newAggregate()
@@ -450,7 +445,7 @@ public abstract class AggregateFcts
     /**
      * The SUM function for long values.
      */
-    public static final AggregateFunction sumFunctionForLong =
+    public static final NativeAggregateFunction sumFunctionForLong =
             new NativeAggregateFunction("sum", LongType.instance, LongType.instance)
             {
                 public Aggregate newAggregate()
@@ -462,7 +457,7 @@ public abstract class AggregateFcts
     /**
      * AVG function for long values.
      */
-    public static final AggregateFunction avgFunctionForLong =
+    public static final NativeAggregateFunction avgFunctionForLong =
             new NativeAggregateFunction("avg", LongType.instance, LongType.instance)
             {
                 public Aggregate newAggregate()
@@ -480,7 +475,7 @@ public abstract class AggregateFcts
     /**
      * The SUM function for float values.
      */
-    public static final AggregateFunction sumFunctionForFloat =
+    public static final NativeAggregateFunction sumFunctionForFloat =
             new NativeAggregateFunction("sum", FloatType.instance, FloatType.instance)
             {
                 public Aggregate newAggregate()
@@ -498,7 +493,7 @@ public abstract class AggregateFcts
     /**
      * AVG function for float values.
      */
-    public static final AggregateFunction avgFunctionForFloat =
+    public static final NativeAggregateFunction avgFunctionForFloat =
             new NativeAggregateFunction("avg", FloatType.instance, FloatType.instance)
             {
                 public Aggregate newAggregate()
@@ -516,7 +511,7 @@ public abstract class AggregateFcts
     /**
      * The SUM function for double values.
      */
-    public static final AggregateFunction sumFunctionForDouble =
+    public static final NativeAggregateFunction sumFunctionForDouble =
             new NativeAggregateFunction("sum", DoubleType.instance, DoubleType.instance)
             {
                 public Aggregate newAggregate()
@@ -541,9 +536,9 @@ public abstract class AggregateFcts
         private double compensation;
         private double simpleSum;
 
-        private final AbstractType numberType;
+        private final AbstractType<?> numberType;
 
-        public FloatSumAggregate(AbstractType numberType)
+        public FloatSumAggregate(AbstractType<?> numberType)
         {
             this.numberType = numberType;
         }
@@ -599,9 +594,9 @@ public abstract class AggregateFcts
         private BigDecimal bigSum = null;
         private boolean overflow = false;
 
-        private final AbstractType numberType;
+        private final AbstractType<?> numberType;
 
-        public FloatAvgAggregate(AbstractType numberType)
+        public FloatAvgAggregate(AbstractType<?> numberType)
         {
             this.numberType = numberType;
         }
@@ -676,7 +671,7 @@ public abstract class AggregateFcts
     /**
      * AVG function for double values.
      */
-    public static final AggregateFunction avgFunctionForDouble =
+    public static final NativeAggregateFunction avgFunctionForDouble =
             new NativeAggregateFunction("avg", DoubleType.instance, DoubleType.instance)
             {
                 public Aggregate newAggregate()
@@ -694,7 +689,7 @@ public abstract class AggregateFcts
     /**
      * The SUM function for counter column values.
      */
-    public static final AggregateFunction sumFunctionForCounter =
+    public static final NativeAggregateFunction sumFunctionForCounter =
     new NativeAggregateFunction("sum", CounterColumnType.instance, CounterColumnType.instance)
     {
         public Aggregate newAggregate()
@@ -706,7 +701,7 @@ public abstract class AggregateFcts
     /**
      * AVG function for counter column values.
      */
-    public static final AggregateFunction avgFunctionForCounter =
+    public static final NativeAggregateFunction avgFunctionForCounter =
     new NativeAggregateFunction("avg", CounterColumnType.instance, CounterColumnType.instance)
     {
         public Aggregate newAggregate()
@@ -724,7 +719,7 @@ public abstract class AggregateFcts
     /**
      * The MIN function for counter column values.
      */
-    public static final AggregateFunction minFunctionForCounter =
+    public static final NativeAggregateFunction minFunctionForCounter =
     new NativeAggregateFunction("min", CounterColumnType.instance, CounterColumnType.instance)
     {
         public Aggregate newAggregate()
@@ -762,7 +757,7 @@ public abstract class AggregateFcts
     /**
      * MAX function for counter column values.
      */
-    public static final AggregateFunction maxFunctionForCounter =
+    public static final NativeAggregateFunction maxFunctionForCounter =
     new NativeAggregateFunction("max", CounterColumnType.instance, CounterColumnType.instance)
     {
         public Aggregate newAggregate()
@@ -803,7 +798,7 @@ public abstract class AggregateFcts
      * @param inputType the function input and output type
      * @return a MAX function for the specified type.
      */
-    public static AggregateFunction makeMaxFunction(final AbstractType<?> inputType)
+    public static NativeAggregateFunction makeMaxFunction(final AbstractType<?> inputType)
     {
         return new NativeAggregateFunction("max", inputType, inputType)
         {
@@ -844,7 +839,7 @@ public abstract class AggregateFcts
      * @param inputType the function input and output type
      * @return a MIN function for the specified type.
      */
-    public static AggregateFunction makeMinFunction(final AbstractType<?> inputType)
+    public static NativeAggregateFunction makeMinFunction(final AbstractType<?> inputType)
     {
         return new NativeAggregateFunction("min", inputType, inputType)
         {
@@ -885,7 +880,7 @@ public abstract class AggregateFcts
      * @param inputType the function input type
      * @return a COUNT function for the specified type.
      */
-    public static AggregateFunction makeCountFunction(AbstractType<?> inputType)
+    public static NativeAggregateFunction makeCountFunction(AbstractType<?> inputType)
     {
         return new NativeAggregateFunction("count", LongType.instance, inputType)
         {
@@ -957,9 +952,9 @@ public abstract class AggregateFcts
         private BigInteger bigSum = null;
         private boolean overflow = false;
 
-        private final AbstractType numberType;
+        private final AbstractType<?> numberType;
 
-        public AvgAggregate(AbstractType type)
+        public AvgAggregate(AbstractType<?> type)
         {
             this.numberType = type;
         }
diff --git a/src/java/org/apache/cassandra/cql3/functions/BytesConversionFcts.java b/src/java/org/apache/cassandra/cql3/functions/BytesConversionFcts.java
index 7e9708a9c0..993025590f 100644
--- a/src/java/org/apache/cassandra/cql3/functions/BytesConversionFcts.java
+++ b/src/java/org/apache/cassandra/cql3/functions/BytesConversionFcts.java
@@ -18,8 +18,6 @@
 package org.apache.cassandra.cql3.functions;
 
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collection;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
@@ -35,10 +33,8 @@ import org.apache.cassandra.serializers.MarshalException;
 
 public abstract class BytesConversionFcts
 {
-    public static Collection<Function> all()
+    public static void addFunctionsTo(NativeFunctions functions)
     {
-        Collection<Function> functions = new ArrayList<>();
-
         // because text and varchar ends up being synonymous, our automatic makeToBlobFunction doesn't work
         // for varchar, so we special case it below. We also skip blob for obvious reasons.
         Set<AbstractType<?>> types = new HashSet<>();
@@ -56,13 +52,11 @@ public abstract class BytesConversionFcts
 
         functions.add(VarcharAsBlobFct);
         functions.add(BlobAsVarcharFct);
-
-        return functions;
     }
 
     // Most of the XAsBlob and blobAsX functions are basically no-op since everything is
     // bytes internally. They only "trick" the type system.
-    public static Function makeToBlobFunction(AbstractType<?> fromType)
+    public static NativeFunction makeToBlobFunction(AbstractType<?> fromType)
     {
         String name = fromType.asCQL3Type() + "asblob";
         return new NativeScalarFunction(name, BytesType.instance, fromType)
@@ -74,7 +68,7 @@ public abstract class BytesConversionFcts
         };
     }
 
-    public static Function makeFromBlobFunction(final AbstractType<?> toType)
+    public static NativeFunction makeFromBlobFunction(final AbstractType<?> toType)
     {
         final String name = "blobas" + toType.asCQL3Type();
         return new NativeScalarFunction(name, toType, BytesType.instance)
@@ -97,7 +91,7 @@ public abstract class BytesConversionFcts
         };
     }
 
-    public static final Function VarcharAsBlobFct = new NativeScalarFunction("varcharasblob", BytesType.instance, UTF8Type.instance)
+    public static final NativeFunction VarcharAsBlobFct = new NativeScalarFunction("varcharasblob", BytesType.instance, UTF8Type.instance)
     {
         public ByteBuffer execute(ProtocolVersion protocolVersion, List<ByteBuffer> parameters)
         {
@@ -105,7 +99,7 @@ public abstract class BytesConversionFcts
         }
     };
 
-    public static final Function BlobAsVarcharFct = new NativeScalarFunction("blobasvarchar", UTF8Type.instance, BytesType.instance)
+    public static final NativeFunction BlobAsVarcharFct = new NativeScalarFunction("blobasvarchar", UTF8Type.instance, BytesType.instance)
     {
         public ByteBuffer execute(ProtocolVersion protocolVersion, List<ByteBuffer> parameters)
         {
diff --git a/src/java/org/apache/cassandra/cql3/functions/CastFcts.java b/src/java/org/apache/cassandra/cql3/functions/CastFcts.java
index 81986c804e..4ee89b1443 100644
--- a/src/java/org/apache/cassandra/cql3/functions/CastFcts.java
+++ b/src/java/org/apache/cassandra/cql3/functions/CastFcts.java
@@ -20,8 +20,6 @@ package org.apache.cassandra.cql3.functions;
 import java.math.BigDecimal;
 import java.math.BigInteger;
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collection;
 import java.util.List;
 
 import org.apache.cassandra.cql3.CQL3Type;
@@ -58,10 +56,8 @@ public final class CastFcts
 {
     private static final String FUNCTION_NAME_PREFIX = "castAs";
 
-    public static Collection<Function> all()
+    public static void addFunctionsTo(NativeFunctions functions)
     {
-        List<Function> functions = new ArrayList<>();
-
         @SuppressWarnings("unchecked")
         final AbstractType<? extends Number>[] numericTypes = new AbstractType[] {ByteType.instance,
                                                                                   ShortType.instance,
@@ -110,8 +106,6 @@ public final class CastFcts
 
         functions.add(CastAsTextFunction.create(UUIDType.instance, AsciiType.instance));
         functions.add(CastAsTextFunction.create(UUIDType.instance, UTF8Type.instance));
-
-        return functions;
     }
 
     /**
@@ -161,7 +155,7 @@ public final class CastFcts
      * @param outputType the output type
      * @param converter the function use to convert the input type into the output type
      */
-    private static <I, O> void addFunctionIfNeeded(List<Function> functions,
+    private static <I, O> void addFunctionIfNeeded(NativeFunctions functions,
                                                    AbstractType<I> inputType,
                                                    AbstractType<O> outputType,
                                                    java.util.function.Function<I, O> converter)
@@ -171,9 +165,9 @@ public final class CastFcts
     }
 
     @SuppressWarnings("unchecked")
-    private static <O, I> Function wrapJavaFunction(AbstractType<I> inputType,
-                                                    AbstractType<O> outputType,
-                                                    java.util.function.Function<I, O> converter)
+    private static <O, I> NativeFunction wrapJavaFunction(AbstractType<I> inputType,
+                                                          AbstractType<O> outputType,
+                                                          java.util.function.Function<I, O> converter)
     {
         return inputType.equals(CounterColumnType.instance)
                 ? JavaCounterFunctionWrapper.create(outputType, (java.util.function.Function<Long, O>) converter)
diff --git a/src/java/org/apache/cassandra/cql3/functions/FromJsonFct.java b/src/java/org/apache/cassandra/cql3/functions/FromJsonFct.java
index 01a6e20860..e50abe4efd 100644
--- a/src/java/org/apache/cassandra/cql3/functions/FromJsonFct.java
+++ b/src/java/org/apache/cassandra/cql3/functions/FromJsonFct.java
@@ -27,6 +27,7 @@ import org.apache.cassandra.cql3.Json;
 import org.apache.cassandra.cql3.QueryOptions;
 import org.apache.cassandra.db.marshal.*;
 import org.apache.cassandra.exceptions.FunctionExecutionException;
+import org.apache.cassandra.exceptions.InvalidRequestException;
 import org.apache.cassandra.serializers.MarshalException;
 import org.apache.cassandra.transport.ProtocolVersion;
 
@@ -69,11 +70,27 @@ public class FromJsonFct extends NativeScalarFunction
         }
         catch (IOException exc)
         {
-            throw FunctionExecutionException.create(NAME, Collections.singletonList("text"), String.format("Could not decode JSON string '%s': %s", jsonArg, exc.toString()));
+            throw FunctionExecutionException.create(NAME, Collections.singletonList("text"),
+                                                    String.format("Could not decode JSON string '%s': %s", jsonArg, exc));
         }
         catch (MarshalException exc)
         {
             throw FunctionExecutionException.create(this, exc);
         }
     }
+
+    public static void addFunctionsTo(NativeFunctions functions)
+    {
+        functions.add(new FunctionFactory(NAME.name, FunctionParameter.fixed(UTF8Type.instance))
+        {
+            @Override
+            protected NativeFunction doGetOrCreateFunction(List<AbstractType<?>> argTypes, AbstractType<?> receiverType)
+            {
+                if (receiverType == null)
+                    throw new InvalidRequestException("fromJson() cannot be used in the selection clause of a SELECT statement");
+
+                return FromJsonFct.getInstance(receiverType);
+            }
+        });
+    }
 }
diff --git a/src/java/org/apache/cassandra/cql3/functions/FunctionCall.java b/src/java/org/apache/cassandra/cql3/functions/FunctionCall.java
index 4fb1ba3af7..5dc5fc8c57 100644
--- a/src/java/org/apache/cassandra/cql3/functions/FunctionCall.java
+++ b/src/java/org/apache/cassandra/cql3/functions/FunctionCall.java
@@ -127,7 +127,7 @@ public class FunctionCall extends Term.NonTerminal
 
     public static class Raw extends Term.Raw
     {
-        private FunctionName name;
+        private final FunctionName name;
         private final List<Term.Raw> terms;
 
         public Raw(FunctionName name, List<Term.Raw> terms)
@@ -202,10 +202,11 @@ public class FunctionCall extends Term.NonTerminal
             {
                 Function fun = FunctionResolver.get(keyspace, name, terms, receiver.ksName, receiver.cfName, receiver.type);
 
-                // Because fromJson() can return whatever type the receiver is, we'll always get EXACT_MATCH.  To
-                // handle potentially ambiguous function calls with fromJson() as an argument, always return
-                // WEAKLY_ASSIGNABLE to force the user to typecast if necessary
-                if (fun != null && fun.name().equals(FromJsonFct.NAME))
+                // Because the return type of functions built by factories is not fixed but depending on the types of
+                // their arguments, we'll always get EXACT_MATCH.  To handle potentially ambiguous function calls with
+                // dynamically built functions as an argument, always return WEAKLY_ASSIGNABLE to force the user to
+                // typecast if necessary
+                if (fun != null && NativeFunctions.instance.hasFactory(fun.name()))
                     return TestResult.WEAKLY_ASSIGNABLE;
 
                 if (fun != null && receiver.type.udfType().equals(fun.returnType()))
@@ -221,14 +222,18 @@ public class FunctionCall extends Term.NonTerminal
             }
         }
 
+        @Override
         public AbstractType<?> getExactTypeIfKnown(String keyspace)
         {
-            // We could implement this, but the method is only used in selection clause, where FunctionCall is not used 
-            // we use a Selectable.WithFunction instead). And if that method is later used in other places, better to
-            // let that future patch make sure this can be implemented properly (note in particular we don't have access
-            // to the receiver type, which FunctionResolver.get() takes) rather than provide an implementation that may
-            // not work in all cases.
-            throw new UnsupportedOperationException();
+            try
+            {
+                Function fun = FunctionResolver.get(keyspace, name, terms, null, null, null);
+                return fun == null ? null : fun.returnType();
+            }
+            catch (InvalidRequestException e)
+            {
+                return null;
+            }
         }
 
         public String getText()
diff --git a/src/java/org/apache/cassandra/cql3/functions/FunctionFactory.java b/src/java/org/apache/cassandra/cql3/functions/FunctionFactory.java
new file mode 100644
index 0000000000..5cdcc80342
--- /dev/null
+++ b/src/java/org/apache/cassandra/cql3/functions/FunctionFactory.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.cql3.functions;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import org.apache.cassandra.cql3.AssignmentTestable;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.exceptions.InvalidRequestException;
+import org.apache.cassandra.schema.SchemaConstants;
+
+/**
+ * Class for dynamically building different overloads of a CQL {@link Function} according to specific function calls.
+ * <p>
+ * For example, the factory for the {@code max} function will return a {@code (text) -> text} function if it's called
+ * with an {@code text} argument, like in {@code max('abc')}. It however will return a {@code (list<int>) -> list<int>}
+ * function if it's called with {@code max([1,2,3])}, etc.
+ * <p>
+ * This is meant to be used to create functions that require too many overloads to have them pre-created in memory. Note
+ * that in the case of functions accepting collections, tuples or UDTs the number of overloads is potentially infinite.
+ */
+public abstract class FunctionFactory
+{
+    /** The name of the built functions. */
+    protected final FunctionName name;
+
+    /** The accepted parameters. */
+    protected final List<FunctionParameter> parameters;
+
+    /**
+     * @param name the name of the built functions
+     * @param parameters the accepted parameters
+     */
+    public FunctionFactory(String name, FunctionParameter... parameters)
+    {
+        this.name = FunctionName.nativeFunction(name);
+        this.parameters = Arrays.asList(parameters);
+    }
+
+    public FunctionName name()
+    {
+        return name;
+    }
+
+    /**
+     * Returns a function with a signature compatible with the specified function call.
+     *
+     * @param args the arguments in the function call for which the function is going to be built
+     * @param receiverType the expected return type of the function call for which the function is going to be built
+     * @param receiverKs the name of the recevier keyspace
+     * @param receiverCf the name of the recevier table
+     * @return a function with a signature compatible with the specified function call
+     */
+    public NativeFunction getOrCreateFunction(List<? extends AssignmentTestable> args,
+                                              AbstractType<?> receiverType,
+                                              String receiverKs,
+                                              String receiverCf)
+    {
+        // validate the number of arguments
+        if (args.size() != parameters.size())
+            throw new InvalidRequestException("Invalid number of arguments for function " + this);
+
+        // try to infer the types of the arguments
+        List<AbstractType<?>> types = new ArrayList<>(args.size());
+        for (int i = 0; i < args.size(); i++)
+        {
+            AssignmentTestable arg = args.get(i);
+            FunctionParameter parameter = parameters.get(i);
+            AbstractType<?> type = parameter.inferType(SchemaConstants.SYSTEM_KEYSPACE_NAME, arg, receiverType);
+            if (type == null)
+                throw new InvalidRequestException(String.format("Cannot infer type of argument %s in call to " +
+                                                                "function %s: use type casts to disambiguate",
+                                                                arg, this));
+            parameter.validateType(name, arg, type);
+            type = type.udfType();
+            types.add(type);
+        }
+
+        return doGetOrCreateFunction(types, receiverType);
+    }
+
+    /**
+     * Returns a function compatible with the specified signature.
+     *
+     * @param argTypes the types of the function arguments
+     * @param receiverType the expected return type of the function
+     * @return a function compatible with the specified signature
+     */
+    protected abstract NativeFunction doGetOrCreateFunction(List<AbstractType<?>> argTypes, AbstractType<?> receiverType);
+
+    @Override
+    public String toString()
+    {
+        return String.format("%s(%s)", name, parameters.stream().map(Object::toString).collect(Collectors.joining(", ")));
+    }
+}
diff --git a/src/java/org/apache/cassandra/cql3/functions/FunctionParameter.java b/src/java/org/apache/cassandra/cql3/functions/FunctionParameter.java
new file mode 100644
index 0000000000..0d5a0c4a12
--- /dev/null
+++ b/src/java/org/apache/cassandra/cql3/functions/FunctionParameter.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.cql3.functions;
+
+import javax.annotation.Nullable;
+
+import org.apache.cassandra.cql3.AssignmentTestable;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.exceptions.InvalidRequestException;
+
+import static java.lang.String.format;
+import static org.apache.cassandra.cql3.AssignmentTestable.TestResult.NOT_ASSIGNABLE;
+
+/**
+ * Generic, loose definition of a function parameter, able to infer the specific data type of the parameter in the
+ * function specifically built by a {@link FunctionFactory} for a particular function call.
+ */
+public interface FunctionParameter
+{
+    /**
+     * Tries to infer the data type of the parameter for an argument in a call to the function.
+     *
+     * @param keyspace the current keyspace
+     * @param arg a parameter value in a specific function call
+     * @param receiverType the type of the object that will receive the result of the function call
+     * @return the inferred data type of the parameter, or {@link null} it isn't possible to infer it
+     */
+    @Nullable
+    AbstractType<?> inferType(String keyspace, AssignmentTestable arg, @Nullable AbstractType<?> receiverType);
+
+    void validateType(FunctionName name, AssignmentTestable arg, AbstractType<?> argType);
+
+    /**
+     * @param type the accepted data type
+     * @return a function parameter definition that accepts values of a specific data type
+     */
+    public static FunctionParameter fixed(AbstractType<?> type)
+    {
+        return new FunctionParameter()
+        {
+            @Override
+            public AbstractType<?> inferType(String keyspace, AssignmentTestable arg, AbstractType<?> receiverType)
+            {
+                AbstractType<?> inferred = arg.getCompatibleTypeIfKnown(keyspace);
+                return inferred != null ? inferred : type;
+            }
+
+            @Override
+            public void validateType(FunctionName name, AssignmentTestable arg, AbstractType<?> argType)
+            {
+                if (argType.testAssignment(type) == NOT_ASSIGNABLE)
+                    throw new InvalidRequestException(format("Function %s requires an argument of type %s, " +
+                                                             "but found argument %s of type %s",
+                                                             name, type, arg, argType.asCQL3Type()));
+            }
+
+            @Override
+            public String toString()
+            {
+                return type.toString();
+            }
+        };
+    }
+
+    /**
+     * @param inferFromReceiver whether the parameter should try to use the function receiver to infer its data type
+     * @return a function parameter definition that accepts columns of any data type
+     */
+    public static FunctionParameter anyType(boolean inferFromReceiver)
+    {
+        return new FunctionParameter()
+        {
+            @Override
+            public AbstractType<?> inferType(String keyspace, AssignmentTestable arg, AbstractType<?> receiverType)
+            {
+                AbstractType<?> type = arg.getCompatibleTypeIfKnown(keyspace);
+                return type == null && inferFromReceiver ? receiverType : type;
+            }
+
+            @Override
+            public void validateType(FunctionName name, AssignmentTestable arg, AbstractType<?> argType)
+            {
+                // nothing to do here, all types are accepted
+            }
+
+            @Override
+            public String toString()
+            {
+                return "any";
+            }
+        };
+    }
+}
diff --git a/src/java/org/apache/cassandra/cql3/functions/FunctionResolver.java b/src/java/org/apache/cassandra/cql3/functions/FunctionResolver.java
index 7717bdb945..5109fe45a8 100644
--- a/src/java/org/apache/cassandra/cql3/functions/FunctionResolver.java
+++ b/src/java/org/apache/cassandra/cql3/functions/FunctionResolver.java
@@ -20,6 +20,9 @@ package org.apache.cassandra.cql3.functions;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
+import java.util.stream.Collectors;
+
+import javax.annotation.Nullable;
 
 import org.apache.cassandra.cql3.AbstractMarker;
 import org.apache.cassandra.cql3.AssignmentTestable;
@@ -38,11 +41,6 @@ public final class FunctionResolver
     {
     }
 
-    // We special case the token function because that's the only function whose argument types actually
-    // depend on the table on which the function is called. Because it's the sole exception, it's easier
-    // to handle it as a special case.
-    private static final FunctionName TOKEN_FUNCTION_NAME = FunctionName.nativeFunction("token");
-
     public static ColumnSpecification makeArgSpec(String receiverKs, String receiverCf, Function fun, int i)
     {
         return new ColumnSpecification(receiverKs,
@@ -59,8 +57,8 @@ public final class FunctionResolver
      * @param receiverCf the receiver's table
      * @param receiverType if the receiver type is known (during inserts, for example), this should be the type of
      *                     the receiver
-     * @throws InvalidRequestException
      */
+    @Nullable
     public static Function get(String keyspace,
                                FunctionName name,
                                List<? extends AssignmentTestable> providedArgs,
@@ -69,7 +67,7 @@ public final class FunctionResolver
                                AbstractType<?> receiverType)
     throws InvalidRequestException
     {
-        Collection<Function> candidates = collectCandidates(keyspace, name, receiverKs, receiverCf, receiverType);
+        Collection<Function> candidates = collectCandidates(keyspace, name, receiverKs, receiverCf, providedArgs, receiverType);
 
         if (candidates.isEmpty())
             return null;
@@ -89,39 +87,31 @@ public final class FunctionResolver
                                                           FunctionName name,
                                                           String receiverKs,
                                                           String receiverCf,
+                                                          List<? extends AssignmentTestable> providedArgs,
                                                           AbstractType<?> receiverType)
     {
         Collection<Function> candidates = new ArrayList<>();
 
-        if (name.equalsNativeFunction(TOKEN_FUNCTION_NAME))
-            candidates.add(new TokenFct(Schema.instance.getTableMetadata(receiverKs, receiverCf)));
-
-        // The toJson() function can accept any type of argument, so instances of it are not pre-declared.  Instead,
-        // we create new instances as needed while handling selectors (which is the only place that toJson() is supported,
-        // due to needing to know the argument types in advance).
-        if (name.equalsNativeFunction(ToJsonFct.NAME))
-            throw new InvalidRequestException("toJson() may only be used within the selection clause of SELECT statements");
-
-        // Similarly, we can only use fromJson when we know the receiver type (such as inserts)
-        if (name.equalsNativeFunction(FromJsonFct.NAME))
-        {
-            if (receiverType == null)
-                throw new InvalidRequestException("fromJson() cannot be used in the selection clause of a SELECT statement");
-            candidates.add(FromJsonFct.getInstance(receiverType));
-        }
-
-        if (!name.hasKeyspace())
+        if (name.hasKeyspace())
         {
-            // function name not fully qualified
-            // add 'SYSTEM' (native) candidates
-            candidates.addAll(Schema.instance.getFunctions(name.asNativeFunction()));
-            // add 'current keyspace' candidates
-            candidates.addAll(Schema.instance.getFunctions(new FunctionName(keyspace, name.name)));
+            // function name is fully qualified (keyspace + name)
+            candidates.addAll(Schema.instance.getUserFunctions(name));
+            candidates.addAll(NativeFunctions.instance.getFunctions(name));
+            candidates.addAll(NativeFunctions.instance.getFactories(name).stream()
+                                            .map(f -> f.getOrCreateFunction(providedArgs, receiverType, receiverKs, receiverCf))
+                                            .collect(Collectors.toList()));
         }
         else
         {
-            // function name is fully qualified (keyspace + name)
-            candidates.addAll(Schema.instance.getFunctions(name));
+            // function name is not fully qualified
+            // add 'current keyspace' candidates
+            candidates.addAll(Schema.instance.getUserFunctions(new FunctionName(keyspace, name.name)));
+            // add 'SYSTEM' (native) candidates
+            FunctionName nativeName = name.asNativeFunction();
+            candidates.addAll(NativeFunctions.instance.getFunctions(nativeName));
+            candidates.addAll(NativeFunctions.instance.getFactories(nativeName).stream()
+                                            .map(f -> f.getOrCreateFunction(providedArgs, receiverType, receiverKs, receiverCf))
+                                            .collect(Collectors.toList()));
         }
 
         return candidates;
@@ -253,7 +243,7 @@ public final class FunctionResolver
         if (providedArgs.size() != fun.argTypes().size())
             return AssignmentTestable.TestResult.NOT_ASSIGNABLE;
 
-        // It's an exact match if all are exact match, but is not assignable as soon as any is non assignable.
+        // It's an exact match if all are exact match, but is not assignable as soon as any is not assignable.
         AssignmentTestable.TestResult res = AssignmentTestable.TestResult.EXACT_MATCH;
         for (int i = 0; i < providedArgs.size(); i++)
         {
diff --git a/src/java/org/apache/cassandra/cql3/functions/NativeFunction.java b/src/java/org/apache/cassandra/cql3/functions/NativeFunction.java
index cafeca1af6..c13d5e8013 100644
--- a/src/java/org/apache/cassandra/cql3/functions/NativeFunction.java
+++ b/src/java/org/apache/cassandra/cql3/functions/NativeFunction.java
@@ -32,7 +32,7 @@ public abstract class NativeFunction extends AbstractFunction
     }
 
     @Override
-    public boolean isNative()
+    public final boolean isNative()
     {
         return true;
     }
diff --git a/src/java/org/apache/cassandra/cql3/functions/NativeFunctions.java b/src/java/org/apache/cassandra/cql3/functions/NativeFunctions.java
new file mode 100644
index 0000000000..f89bba870d
--- /dev/null
+++ b/src/java/org/apache/cassandra/cql3/functions/NativeFunctions.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.cql3.functions;
+
+import java.util.Collection;
+
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Multimap;
+
+/**
+ * A container of native functions. It stores both pre-built function overloads ({@link NativeFunction}) and
+ * dynamic generators of functions ({@link FunctionFactory}).
+ */
+public class NativeFunctions
+{
+    public static NativeFunctions instance = new NativeFunctions()
+    {
+        {
+            TokenFct.addFunctionsTo(this);
+            CastFcts.addFunctionsTo(this);
+            UuidFcts.addFunctionsTo(this);
+            TimeFcts.addFunctionsTo(this);
+            ToJsonFct.addFunctionsTo(this);
+            FromJsonFct.addFunctionsTo(this);
+            OperationFcts.addFunctionsTo(this);
+            AggregateFcts.addFunctionsTo(this);
+            BytesConversionFcts.addFunctionsTo(this);
+        }
+    };
+
+    /** Pre-built function overloads. */
+    private final Multimap<FunctionName, NativeFunction> functions = HashMultimap.create();
+
+    /** Dynamic function factories. */
+    private final Multimap<FunctionName, FunctionFactory> factories = HashMultimap.create();
+
+    public void add(NativeFunction function)
+    {
+        functions.put(function.name(), function);
+    }
+
+    public void addAll(NativeFunction... functions)
+    {
+        for (NativeFunction function : functions)
+            add(function);
+    }
+
+    public void add(FunctionFactory factory)
+    {
+        factories.put(factory.name(), factory);
+    }
+
+    /**
+     * Returns all the registered pre-built functions overloads with the specified name.
+     *
+     * @param name a function name
+     * @return the pre-built functions with the specified name
+     */
+    public Collection<NativeFunction> getFunctions(FunctionName name)
+    {
+        return functions.get(name);
+    }
+
+    /**
+     * Returns all the registered functions factories with the specified name.
+     *
+     * @param name a function name
+     * @return the function factories with the specified name
+     */
+    public Collection<FunctionFactory> getFactories(FunctionName name)
+    {
+        return factories.get(name);
+    }
+
+    /**
+     * Returns whether there is a function factory with the specified name.
+     *
+     * @param name a function name
+     * @return {@code true} if there is a factory with the specified name, {@code false} otherwise
+     */
+    public boolean hasFactory(FunctionName name)
+    {
+        return factories.containsKey(name);
+    }
+}
diff --git a/src/java/org/apache/cassandra/cql3/functions/OperationFcts.java b/src/java/org/apache/cassandra/cql3/functions/OperationFcts.java
index b00ced7cfc..9e0fef1443 100644
--- a/src/java/org/apache/cassandra/cql3/functions/OperationFcts.java
+++ b/src/java/org/apache/cassandra/cql3/functions/OperationFcts.java
@@ -18,8 +18,6 @@
 package org.apache.cassandra.cql3.functions;
 
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collection;
 import java.util.List;
 
 import org.apache.cassandra.schema.SchemaConstants;
@@ -219,10 +217,8 @@ public final class OperationFcts
      */
     public static final String NEGATION_FUNCTION_NAME = "_negate";
 
-    public static Collection<Function> all()
+    public static void addFunctionsTo(NativeFunctions functions)
     {
-        List<Function> functions = new ArrayList<>();
-
         final NumberType<?>[] numericTypes = new NumberType[] { ByteType.instance,
                                                                 ShortType.instance,
                                                                 Int32Type.instance,
@@ -251,11 +247,9 @@ public final class OperationFcts
         }
 
         addStringConcatenations(functions);
-
-        return functions;
     }
 
-    private static void addStringConcatenations(List<Function> functions)
+    private static void addStringConcatenations(NativeFunctions functions)
     {
         functions.add(new StringOperationFunction(UTF8Type.instance, UTF8Type.instance, OPERATION.ADDITION, UTF8Type.instance));
         functions.add(new StringOperationFunction(AsciiType.instance, AsciiType.instance, OPERATION.ADDITION, AsciiType.instance));
diff --git a/src/java/org/apache/cassandra/cql3/functions/TimeFcts.java b/src/java/org/apache/cassandra/cql3/functions/TimeFcts.java
index 27592102b7..9a6957db4b 100644
--- a/src/java/org/apache/cassandra/cql3/functions/TimeFcts.java
+++ b/src/java/org/apache/cassandra/cql3/functions/TimeFcts.java
@@ -18,10 +18,8 @@
 package org.apache.cassandra.cql3.functions;
 
 import java.nio.ByteBuffer;
-import java.util.Collection;
 import java.util.List;
 
-import com.google.common.collect.ImmutableList;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -40,34 +38,34 @@ public abstract class TimeFcts
 {
     public static Logger logger = LoggerFactory.getLogger(TimeFcts.class);
 
-    public static Collection<Function> all()
+    public static void addFunctionsTo(NativeFunctions functions)
     {
-        return ImmutableList.of(now("now", TimeUUIDType.instance),
-                                now("currenttimeuuid", TimeUUIDType.instance),
-                                now("currenttimestamp", TimestampType.instance),
-                                now("currentdate", SimpleDateType.instance),
-                                now("currenttime", TimeType.instance),
-                                minTimeuuidFct,
-                                maxTimeuuidFct,
-                                dateOfFct,
-                                unixTimestampOfFct,
-                                toDate(TimeUUIDType.instance),
-                                toTimestamp(TimeUUIDType.instance),
-                                toUnixTimestamp(TimeUUIDType.instance),
-                                toUnixTimestamp(TimestampType.instance),
-                                toDate(TimestampType.instance),
-                                toUnixTimestamp(SimpleDateType.instance),
-                                toTimestamp(SimpleDateType.instance),
-                                FloorTimestampFunction.newInstance(),
-                                FloorTimestampFunction.newInstanceWithStartTimeArgument(),
-                                FloorTimeUuidFunction.newInstance(),
-                                FloorTimeUuidFunction.newInstanceWithStartTimeArgument(),
-                                FloorDateFunction.newInstance(),
-                                FloorDateFunction.newInstanceWithStartTimeArgument(),
-                                floorTime);
+        functions.addAll(now("now", TimeUUIDType.instance),
+                         now("currenttimeuuid", TimeUUIDType.instance),
+                         now("currenttimestamp", TimestampType.instance),
+                         now("currentdate", SimpleDateType.instance),
+                         now("currenttime", TimeType.instance),
+                         minTimeuuidFct,
+                         maxTimeuuidFct,
+                         dateOfFct,
+                         unixTimestampOfFct,
+                         toDate(TimeUUIDType.instance),
+                         toTimestamp(TimeUUIDType.instance),
+                         toUnixTimestamp(TimeUUIDType.instance),
+                         toUnixTimestamp(TimestampType.instance),
+                         toDate(TimestampType.instance),
+                         toUnixTimestamp(SimpleDateType.instance),
+                         toTimestamp(SimpleDateType.instance),
+                         FloorTimestampFunction.newInstance(),
+                         FloorTimestampFunction.newInstanceWithStartTimeArgument(),
+                         FloorTimeUuidFunction.newInstance(),
+                         FloorTimeUuidFunction.newInstanceWithStartTimeArgument(),
+                         FloorDateFunction.newInstance(),
+                         FloorDateFunction.newInstanceWithStartTimeArgument(),
+                         floorTime);
     }
 
-    public static final Function now(final String name, final TemporalType<?> type)
+    public static final NativeFunction now(final String name, final TemporalType<?> type)
     {
         return new NativeScalarFunction(name, type)
         {
@@ -85,7 +83,7 @@ public abstract class TimeFcts
         };
     };
 
-    public static final Function minTimeuuidFct = new NativeScalarFunction("mintimeuuid", TimeUUIDType.instance, TimestampType.instance)
+    public static final NativeFunction minTimeuuidFct = new NativeScalarFunction("mintimeuuid", TimeUUIDType.instance, TimestampType.instance)
     {
         public ByteBuffer execute(ProtocolVersion protocolVersion, List<ByteBuffer> parameters)
         {
@@ -97,7 +95,7 @@ public abstract class TimeFcts
         }
     };
 
-    public static final Function maxTimeuuidFct = new NativeScalarFunction("maxtimeuuid", TimeUUIDType.instance, TimestampType.instance)
+    public static final NativeFunction maxTimeuuidFct = new NativeScalarFunction("maxtimeuuid", TimeUUIDType.instance, TimestampType.instance)
     {
         public ByteBuffer execute(ProtocolVersion protocolVersion, List<ByteBuffer> parameters)
         {
@@ -252,7 +250,7 @@ public abstract class TimeFcts
          {
              super("floor", returnType, argsType);
              // The function can accept either 2 parameters (time and duration) or 3 parameters (time, duration and startTime)r
-             assert argsType.length == 2 || argsType.length == 3; 
+             assert argsType.length == 2 || argsType.length == 3;
          }
 
          @Override
@@ -509,4 +507,3 @@ public abstract class TimeFcts
          }
      };
  }
-
diff --git a/src/java/org/apache/cassandra/cql3/functions/ToJsonFct.java b/src/java/org/apache/cassandra/cql3/functions/ToJsonFct.java
index d0f2b0b8f7..e2149e8f19 100644
--- a/src/java/org/apache/cassandra/cql3/functions/ToJsonFct.java
+++ b/src/java/org/apache/cassandra/cql3/functions/ToJsonFct.java
@@ -30,8 +30,6 @@ import java.util.concurrent.ConcurrentHashMap;
 
 public class ToJsonFct extends NativeScalarFunction
 {
-    public static final FunctionName NAME = FunctionName.nativeFunction("tojson");
-
     private static final Map<AbstractType<?>, ToJsonFct> instances = new ConcurrentHashMap<>();
 
     public static ToJsonFct getInstance(List<AbstractType<?>> argTypes) throws InvalidRequestException
@@ -63,4 +61,16 @@ public class ToJsonFct extends NativeScalarFunction
 
         return ByteBufferUtil.bytes(argTypes.get(0).toJSONString(parameter, protocolVersion));
     }
+
+    public static void addFunctionsTo(NativeFunctions functions)
+    {
+        functions.add(new FunctionFactory("tojson", FunctionParameter.anyType(false))
+        {
+            @Override
+            protected NativeFunction doGetOrCreateFunction(List<AbstractType<?>> argTypes, AbstractType<?> receiverType)
+            {
+                return ToJsonFct.getInstance(argTypes);
+            }
+        });
+    }
 }
diff --git a/src/java/org/apache/cassandra/cql3/functions/TokenFct.java b/src/java/org/apache/cassandra/cql3/functions/TokenFct.java
index e93084f78e..7fe93225fa 100644
--- a/src/java/org/apache/cassandra/cql3/functions/TokenFct.java
+++ b/src/java/org/apache/cassandra/cql3/functions/TokenFct.java
@@ -20,7 +20,9 @@ package org.apache.cassandra.cql3.functions;
 import java.nio.ByteBuffer;
 import java.util.List;
 
+import org.apache.cassandra.cql3.AssignmentTestable;
 import org.apache.cassandra.schema.ColumnMetadata;
+import org.apache.cassandra.schema.Schema;
 import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.db.CBuilder;
 import org.apache.cassandra.db.marshal.AbstractType;
@@ -37,9 +39,9 @@ public class TokenFct extends NativeScalarFunction
         this.metadata = metadata;
     }
 
-    private static AbstractType[] getKeyTypes(TableMetadata metadata)
+    private static AbstractType<?>[] getKeyTypes(TableMetadata metadata)
     {
-        AbstractType[] types = new AbstractType[metadata.partitionKeyColumns().size()];
+        AbstractType<?>[] types = new AbstractType[metadata.partitionKeyColumns().size()];
         int i = 0;
         for (ColumnMetadata def : metadata.partitionKeyColumns())
             types[i++] = def.type;
@@ -58,4 +60,37 @@ public class TokenFct extends NativeScalarFunction
         }
         return metadata.partitioner.getTokenFactory().toByteArray(metadata.partitioner.getToken(builder.build().serializeAsPartitionKey()));
     }
+
+    public static void addFunctionsTo(NativeFunctions functions)
+    {
+        functions.add(new FunctionFactory("token")
+        {
+            @Override
+            public NativeFunction getOrCreateFunction(List<? extends AssignmentTestable> args,
+                                                      AbstractType<?> receiverType,
+                                                      String receiverKs,
+                                                      String receiverCf)
+            {
+                if (receiverKs == null)
+                    throw new InvalidRequestException("No receiver keyspace has been specified for function " + name);
+
+                if (receiverCf == null)
+                    throw new InvalidRequestException("No receiver table has been specified for function " + name);
+
+                TableMetadata metadata = Schema.instance.getTableMetadata(receiverKs, receiverCf);
+                if (metadata == null)
+                    throw new InvalidRequestException(String.format("The receiver table %s.%s specified by call to " +
+                                                                    "function %s hasn't been found",
+                                                                    receiverKs, receiverCf, name));
+
+                return new TokenFct(metadata);
+            }
+
+            @Override
+            protected NativeFunction doGetOrCreateFunction(List<AbstractType<?>> argTypes, AbstractType<?> receiverType)
+            {
+                throw new AssertionError("Should be unreachable");
+            }
+        });
+    }
 }
diff --git a/src/java/org/apache/cassandra/cql3/functions/UDAggregate.java b/src/java/org/apache/cassandra/cql3/functions/UDAggregate.java
index ad1609ae89..08483b3c64 100644
--- a/src/java/org/apache/cassandra/cql3/functions/UDAggregate.java
+++ b/src/java/org/apache/cassandra/cql3/functions/UDAggregate.java
@@ -26,14 +26,13 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.cql3.CqlBuilder;
-import org.apache.cassandra.cql3.SchemaElement;
 import org.apache.cassandra.cql3.functions.types.TypeCodec;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.db.marshal.UserType;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.exceptions.InvalidRequestException;
 import org.apache.cassandra.schema.Difference;
-import org.apache.cassandra.schema.Functions;
+import org.apache.cassandra.schema.UserFunctions;
 import org.apache.cassandra.tracing.Tracing;
 import org.apache.cassandra.transport.ProtocolVersion;
 
@@ -44,7 +43,7 @@ import static org.apache.cassandra.utils.Clock.Global.nanoTime;
 /**
  * Base class for user-defined-aggregates.
  */
-public class UDAggregate extends AbstractFunction implements AggregateFunction, SchemaElement
+public class UDAggregate extends UserFunction implements AggregateFunction
 {
     protected static final Logger logger = LoggerFactory.getLogger(UDAggregate.class);
 
@@ -95,7 +94,7 @@ public class UDAggregate extends AbstractFunction implements AggregateFunction,
     private static UDFunction findFunction(FunctionName udaName, Collection<UDFunction> functions, FunctionName name, List<AbstractType<?>> arguments)
     {
         return functions.stream()
-                        .filter(f -> f.name().equals(name) && Functions.typesMatch(f.argTypes(), arguments))
+                        .filter(f -> f.name().equals(name) && f.typesMatch(arguments))
                         .findFirst()
                         .orElseThrow(() -> new ConfigurationException(String.format("Unable to find function %s referenced by UDA %s", name, udaName)));
     }
@@ -150,11 +149,6 @@ public class UDAggregate extends AbstractFunction implements AggregateFunction,
         return true;
     }
 
-    public boolean isNative()
-    {
-        return false;
-    }
-
     public ScalarFunction stateFunction()
     {
         return stateFunction;
@@ -329,7 +323,7 @@ public class UDAggregate extends AbstractFunction implements AggregateFunction,
     @Override
     public int hashCode()
     {
-        return Objects.hashCode(name, Functions.typeHashCode(argTypes), Functions.typeHashCode(returnType), stateFunction, finalFunction, stateType, initcond);
+        return Objects.hashCode(name, UserFunctions.typeHashCode(argTypes), UserFunctions.typeHashCode(returnType), stateFunction, finalFunction, stateType, initcond);
     }
 
     @Override
diff --git a/src/java/org/apache/cassandra/cql3/functions/UDFunction.java b/src/java/org/apache/cassandra/cql3/functions/UDFunction.java
index 5ef065b032..52cd1fc187 100644
--- a/src/java/org/apache/cassandra/cql3/functions/UDFunction.java
+++ b/src/java/org/apache/cassandra/cql3/functions/UDFunction.java
@@ -46,7 +46,6 @@ import org.apache.cassandra.config.Config;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.cql3.ColumnIdentifier;
 import org.apache.cassandra.cql3.CqlBuilder;
-import org.apache.cassandra.cql3.SchemaElement;
 import org.apache.cassandra.cql3.functions.types.DataType;
 import org.apache.cassandra.cql3.functions.types.TypeCodec;
 import org.apache.cassandra.db.marshal.AbstractType;
@@ -67,7 +66,7 @@ import static org.apache.cassandra.utils.Clock.Global.nanoTime;
 /**
  * Base class for User Defined Functions.
  */
-public abstract class UDFunction extends AbstractFunction implements ScalarFunction, SchemaElement
+public abstract class UDFunction extends UserFunction implements ScalarFunction
 {
     protected static final Logger logger = LoggerFactory.getLogger(UDFunction.class);
 
@@ -617,11 +616,6 @@ public abstract class UDFunction extends AbstractFunction implements ScalarFunct
         return false;
     }
 
-    public boolean isNative()
-    {
-        return false;
-    }
-
     public boolean isCalledOnNullInput()
     {
         return calledOnNullInput;
@@ -760,7 +754,7 @@ public abstract class UDFunction extends AbstractFunction implements ScalarFunct
     @Override
     public int hashCode()
     {
-        return Objects.hashCode(name, Functions.typeHashCode(argTypes), Functions.typeHashCode(returnType), returnType, language, body);
+        return Objects.hashCode(name, UserFunctions.typeHashCode(argTypes), UserFunctions.typeHashCode(returnType), returnType, language, body);
     }
 
     private static class UDFClassLoader extends ClassLoader
diff --git a/src/java/org/apache/cassandra/cql3/functions/NativeFunction.java b/src/java/org/apache/cassandra/cql3/functions/UserFunction.java
similarity index 64%
copy from src/java/org/apache/cassandra/cql3/functions/NativeFunction.java
copy to src/java/org/apache/cassandra/cql3/functions/UserFunction.java
index cafeca1af6..7fa10a6516 100644
--- a/src/java/org/apache/cassandra/cql3/functions/NativeFunction.java
+++ b/src/java/org/apache/cassandra/cql3/functions/UserFunction.java
@@ -15,32 +15,27 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.cassandra.cql3.functions;
 
-import java.util.Arrays;
+import java.util.List;
 
+import org.apache.cassandra.cql3.SchemaElement;
 import org.apache.cassandra.db.marshal.AbstractType;
 
 /**
- * Base class for our native/hardcoded functions.
+ * A non-native, user-defined function, like UDFs and UDAs.
  */
-public abstract class NativeFunction extends AbstractFunction
+public abstract class UserFunction extends AbstractFunction implements SchemaElement
 {
-    protected NativeFunction(String name, AbstractType<?> returnType, AbstractType<?>... argTypes)
-    {
-        super(FunctionName.nativeFunction(name), Arrays.asList(argTypes), returnType);
-    }
-
-    @Override
-    public boolean isNative()
+    public UserFunction(FunctionName name, List<AbstractType<?>> argTypes, AbstractType<?> returnType)
     {
-        return true;
+        super(name, argTypes, returnType);
     }
 
     @Override
-    public boolean isPure()
+    public final boolean isNative()
     {
-        // Most of our functions are pure, the other ones should override this
-        return true;
+        return false;
     }
 }
diff --git a/src/java/org/apache/cassandra/cql3/functions/UuidFcts.java b/src/java/org/apache/cassandra/cql3/functions/UuidFcts.java
index 3d82eceefb..0083883623 100644
--- a/src/java/org/apache/cassandra/cql3/functions/UuidFcts.java
+++ b/src/java/org/apache/cassandra/cql3/functions/UuidFcts.java
@@ -26,12 +26,12 @@ import org.apache.cassandra.transport.ProtocolVersion;
 
 public abstract class UuidFcts
 {
-    public static Collection<Function> all()
+    public static void addFunctionsTo(NativeFunctions functions)
     {
-        return Collections.singleton(uuidFct);
+        functions.add(uuidFct);
     }
 
-    public static final Function uuidFct = new NativeScalarFunction("uuid", UUIDType.instance)
+    public static final NativeFunction uuidFct = new NativeScalarFunction("uuid", UUIDType.instance)
     {
         public ByteBuffer execute(ProtocolVersion protocolVersion, List<ByteBuffer> parameters)
         {
diff --git a/src/java/org/apache/cassandra/cql3/selection/AbstractFunctionSelector.java b/src/java/org/apache/cassandra/cql3/selection/AbstractFunctionSelector.java
index bf2564edd2..2d97ffd5c8 100644
--- a/src/java/org/apache/cassandra/cql3/selection/AbstractFunctionSelector.java
+++ b/src/java/org/apache/cassandra/cql3/selection/AbstractFunctionSelector.java
@@ -22,14 +22,14 @@ import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
-import java.util.Optional;
 
 import com.google.common.base.Objects;
 import com.google.common.collect.Iterables;
 
 import org.apache.commons.lang3.text.StrBuilder;
+
+import org.apache.cassandra.cql3.functions.FunctionResolver;
 import org.apache.cassandra.schema.ColumnMetadata;
-import org.apache.cassandra.schema.Schema;
 import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.transport.ProtocolVersion;
 import org.apache.cassandra.utils.ByteBufferUtil;
@@ -64,17 +64,15 @@ abstract class AbstractFunctionSelector<T extends Function> extends Selector
                 argTypes.add(readType(metadata, in));
             }
 
-            Optional<Function> optional = Schema.instance.findFunction(name, argTypes);
+            Function function = FunctionResolver.get(metadata.keyspace, name, argTypes, metadata.keyspace, metadata.name, null);
 
-            if (!optional.isPresent())
+            if (function == null)
                 throw new IOException(String.format("Unknown serialized function %s(%s)",
                                                     name,
                                                     argTypes.stream()
                                                             .map(p -> p.asCQL3Type().toString())
                                                             .collect(joining(", "))));
 
-            Function function = optional.get();
-
             boolean isPartial = in.readBoolean();
             if (isPartial)
             {
@@ -102,8 +100,8 @@ abstract class AbstractFunctionSelector<T extends Function> extends Selector
         }
 
         protected abstract Selector newFunctionSelector(Function function, List<Selector> argSelectors);
-    };
-    
+    }
+
     protected final T fun;
 
     /**
diff --git a/src/java/org/apache/cassandra/cql3/selection/Selectable.java b/src/java/org/apache/cassandra/cql3/selection/Selectable.java
index 88e70f8ce3..0adf017fda 100644
--- a/src/java/org/apache/cassandra/cql3/selection/Selectable.java
+++ b/src/java/org/apache/cassandra/cql3/selection/Selectable.java
@@ -22,8 +22,6 @@ import java.util.*;
 import java.util.function.Predicate;
 import java.util.stream.Collectors;
 
-import org.apache.commons.lang3.text.StrBuilder;
-
 import org.apache.cassandra.cql3.*;
 import org.apache.cassandra.cql3.functions.*;
 import org.apache.cassandra.cql3.selection.Selector.Factory;
@@ -92,6 +90,12 @@ public interface Selectable extends AssignmentTestable
         return type == null ? TestResult.NOT_ASSIGNABLE : type.testAssignment(keyspace, receiver);
     }
 
+    @Override
+    public default AbstractType<?> getCompatibleTypeIfKnown(String keyspace)
+    {
+        return getExactTypeIfKnown(keyspace);
+    }
+
     default int addAndGetIndex(ColumnMetadata def, List<ColumnMetadata> l)
     {
         int idx = l.indexOf(def);
@@ -192,6 +196,12 @@ public interface Selectable extends AssignmentTestable
             return rawTerm.getExactTypeIfKnown(keyspace);
         }
 
+        @Override
+        public AbstractType<?> getCompatibleTypeIfKnown(String keyspace)
+        {
+            return rawTerm.getCompatibleTypeIfKnown(keyspace);
+        }
+
         @Override
         public boolean selectColumns(Predicate<ColumnMetadata> predicate)
         {
@@ -388,17 +398,11 @@ public interface Selectable extends AssignmentTestable
                     preparedArgs.add(arg.prepare(table));
 
                 FunctionName name = functionName;
-                // We need to circumvent the normal function lookup process for toJson() because instances of the function
-                // are not pre-declared (because it can accept any type of argument). We also have to wait until we have the
-                // selector factories of the argument so we can access their final type.
-                if (functionName.equalsNativeFunction(ToJsonFct.NAME))
-                {
-                    return new WithToJSonFunction(preparedArgs);
-                }
-                // Also, COUNT(x) is equivalent to COUNT(*) for any non-null term x (since count(x) don't care about it's argument outside of check for nullness) and
-                // for backward compatibilty we want to support COUNT(1), but we actually have COUNT(x) method for every existing (simple) input types so currently COUNT(1)
-                // will throw as ambiguous (since 1 works for any type). So we have have to special case COUNT.
-                else if (functionName.equalsNativeFunction(FunctionName.nativeFunction("count"))
+                // COUNT(x) is equivalent to COUNT(*) for any non-null term x (since count(x) don't care about its
+                // argument outside of check for nullness) and for backward compatibilty we want to support COUNT(1),
+                // but we actually have COUNT(x) method for every existing (simple) input types so currently COUNT(1)
+                // will throw as ambiguous (since 1 works for any type). So we have to special case COUNT.
+                if (functionName.equalsNativeFunction(FunctionName.nativeFunction("count"))
                         && preparedArgs.size() == 1
                         && (preparedArgs.get(0) instanceof WithTerm)
                         && (((WithTerm)preparedArgs.get(0)).rawTerm instanceof Constants.Literal))
@@ -421,44 +425,6 @@ public interface Selectable extends AssignmentTestable
         }
     }
 
-    public static class WithToJSonFunction implements Selectable
-    {
-        public final List<Selectable> args;
-
-        private WithToJSonFunction(List<Selectable> args)
-        {
-            this.args = args;
-        }
-
-        @Override
-        public String toString()
-        {
-            return new StrBuilder().append(ToJsonFct.NAME)
-                                   .append("(")
-                                   .appendWithSeparators(args, ", ")
-                                   .append(")")
-                                   .toString();
-        }
-
-        public Selector.Factory newSelectorFactory(TableMetadata table, AbstractType<?> expectedType, List<ColumnMetadata> defs, VariableSpecifications boundNames)
-        {
-            SelectorFactories factories = SelectorFactories.createFactoriesAndCollectColumnDefinitions(args, null, table, defs, boundNames);
-            Function fun = ToJsonFct.getInstance(factories.getReturnTypes());
-            return AbstractFunctionSelector.newFactory(fun, factories);
-        }
-
-        public AbstractType<?> getExactTypeIfKnown(String keyspace)
-        {
-            return UTF8Type.instance;
-        }
-
-        @Override
-        public boolean selectColumns(Predicate<ColumnMetadata> predicate)
-        {
-            return Selectable.selectColumns(args, predicate);
-        }
-    }
-
     public static class WithCast implements Selectable
     {
         private final CQL3Type type;
@@ -712,6 +678,17 @@ public interface Selectable extends AssignmentTestable
             return Tuples.getExactTupleTypeIfKnown(selectables, p -> p.getExactTypeIfKnown(keyspace));
         }
 
+        @Override
+        public AbstractType<?> getCompatibleTypeIfKnown(String keyspace)
+        {
+            // If there is only one element we cannot know if it is an element between parentheses or a tuple
+            // with only one element. By consequence, we need to force the user to specify the type.
+            if (selectables.size() == 1)
+                return null;
+
+            return Tuples.getExactTupleTypeIfKnown(selectables, p -> p.getCompatibleTypeIfKnown(keyspace));
+        }
+
         @Override
         public boolean selectColumns(Predicate<ColumnMetadata> predicate)
         {
@@ -797,6 +774,12 @@ public interface Selectable extends AssignmentTestable
             return Lists.getExactListTypeIfKnown(selectables, p -> p.getExactTypeIfKnown(keyspace));
         }
 
+        @Override
+        public AbstractType<?> getCompatibleTypeIfKnown(String keyspace)
+        {
+            return Lists.getPreferredCompatibleType(selectables, p -> p.getCompatibleTypeIfKnown(keyspace));
+        }
+
         @Override
         public boolean selectColumns(Predicate<ColumnMetadata> predicate)
         {
@@ -890,6 +873,12 @@ public interface Selectable extends AssignmentTestable
             return Sets.getExactSetTypeIfKnown(selectables, p -> p.getExactTypeIfKnown(keyspace));
         }
 
+        @Override
+        public AbstractType<?> getCompatibleTypeIfKnown(String keyspace)
+        {
+            return Sets.getPreferredCompatibleType(selectables, p -> p.getCompatibleTypeIfKnown(keyspace));
+        }
+
         @Override
         public boolean selectColumns(Predicate<ColumnMetadata> predicate)
         {
@@ -1022,7 +1011,14 @@ public interface Selectable extends AssignmentTestable
         @Override
         public AbstractType<?> getExactTypeIfKnown(String keyspace)
         {
-            // Lets force the user to specify the type.
+            // Let's force the user to specify the type.
+            return null;
+        }
+
+        @Override
+        public AbstractType<?> getCompatibleTypeIfKnown(String keyspace)
+        {
+            // Let's force the user to specify the type.
             return null;
         }
 
diff --git a/src/java/org/apache/cassandra/cql3/statements/DescribeStatement.java b/src/java/org/apache/cassandra/cql3/statements/DescribeStatement.java
index b1f576e41f..96a02475a4 100644
--- a/src/java/org/apache/cassandra/cql3/statements/DescribeStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/DescribeStatement.java
@@ -67,17 +67,17 @@ public abstract class DescribeStatement<T> extends CQLStatement.Raw implements C
     private static final String CF = "describe";
 
     /**
-     * The columns returned by the describe queries that only list elements names (e.g. DESCRIBE KEYSPACES, DESCRIBE TABLES...) 
+     * The columns returned by the describe queries that only list elements names (e.g. DESCRIBE KEYSPACES, DESCRIBE TABLES...)
      */
-    private static final List<ColumnSpecification> LIST_METADATA = 
+    private static final List<ColumnSpecification> LIST_METADATA =
             ImmutableList.of(new ColumnSpecification(KS, CF, new ColumnIdentifier("keyspace_name", true), UTF8Type.instance),
                              new ColumnSpecification(KS, CF, new ColumnIdentifier("type", true), UTF8Type.instance),
                              new ColumnSpecification(KS, CF, new ColumnIdentifier("name", true), UTF8Type.instance));
 
     /**
-     * The columns returned by the describe queries that returns the CREATE STATEMENT for the different elements (e.g. DESCRIBE KEYSPACE, DESCRIBE TABLE ...) 
+     * The columns returned by the describe queries that returns the CREATE STATEMENT for the different elements (e.g. DESCRIBE KEYSPACE, DESCRIBE TABLE ...)
      */
-    private static final List<ColumnSpecification> ELEMENT_METADATA = 
+    private static final List<ColumnSpecification> ELEMENT_METADATA =
             ImmutableList.<ColumnSpecification>builder().addAll(LIST_METADATA)
                                                         .add(new ColumnSpecification(KS, CF, new ColumnIdentifier("create_statement", true), UTF8Type.instance))
                                                         .build();
@@ -308,7 +308,7 @@ public abstract class DescribeStatement<T> extends CQLStatement.Raw implements C
      */
     public static DescribeStatement<SchemaElement> functions()
     {
-        return new Listing(ks -> ks.functions.udfs());
+        return new Listing(ks -> ks.userFunctions.udfs());
     }
 
     /**
@@ -316,7 +316,7 @@ public abstract class DescribeStatement<T> extends CQLStatement.Raw implements C
      */
     public static DescribeStatement<SchemaElement> aggregates()
     {
-        return new Listing(ks -> ks.functions.udas());
+        return new Listing(ks -> ks.userFunctions.udas());
     }
 
     /**
@@ -387,7 +387,7 @@ public abstract class DescribeStatement<T> extends CQLStatement.Raw implements C
     public static class Element extends DescribeStatement<SchemaElement>
     {
         /**
-         * The keyspace name 
+         * The keyspace name
          */
         private final String keyspace;
 
@@ -445,8 +445,8 @@ public abstract class DescribeStatement<T> extends CQLStatement.Raw implements C
         if (!onlyKeyspace)
         {
             s = Stream.concat(s, ks.types.sortedStream());
-            s = Stream.concat(s, ks.functions.udfs().sorted(SchemaElement.NAME_COMPARATOR));
-            s = Stream.concat(s, ks.functions.udas().sorted(SchemaElement.NAME_COMPARATOR));
+            s = Stream.concat(s, ks.userFunctions.udfs().sorted(SchemaElement.NAME_COMPARATOR));
+            s = Stream.concat(s, ks.userFunctions.udas().sorted(SchemaElement.NAME_COMPARATOR));
             s = Stream.concat(s, ks.tables.stream().sorted(SchemaElement.NAME_COMPARATOR)
                                                    .flatMap(tm -> getTableElements(ks, tm)));
         }
@@ -534,7 +534,7 @@ public abstract class DescribeStatement<T> extends CQLStatement.Raw implements C
     {
         return new Element(keyspace, name, (ks, n) -> {
 
-            return checkNotEmpty(ks.functions.getUdfs(new FunctionName(ks.name, n)),
+            return checkNotEmpty(ks.userFunctions.getUdfs(new FunctionName(ks.name, n)),
                                  "User defined function '%s' not found in '%s'", n, ks.name).stream()
                                                                                              .sorted(SchemaElement.NAME_COMPARATOR);
         });
@@ -547,7 +547,7 @@ public abstract class DescribeStatement<T> extends CQLStatement.Raw implements C
     {
         return new Element(keyspace, name, (ks, n) -> {
 
-            return checkNotEmpty(ks.functions.getUdas(new FunctionName(ks.name, n)),
+            return checkNotEmpty(ks.userFunctions.getUdas(new FunctionName(ks.name, n)),
                                  "User defined aggregate '%s' not found in '%s'", n, ks.name).stream()
                                                                                               .sorted(SchemaElement.NAME_COMPARATOR);
         });
@@ -681,7 +681,7 @@ public abstract class DescribeStatement<T> extends CQLStatement.Raw implements C
                 list.add(trimIfPresent(DatabaseDescriptor.getPartitionerName(), "org.apache.cassandra.dht."));
                 list.add(trimIfPresent(DatabaseDescriptor.getEndpointSnitch().getClass().getName(),
                                             "org.apache.cassandra.locator."));
- 
+
                 String useKs = state.getRawKeyspace();
                 if (mustReturnsRangeOwnerships(useKs))
                 {
@@ -721,7 +721,7 @@ public abstract class DescribeStatement<T> extends CQLStatement.Raw implements C
             @Override
             protected List<ByteBuffer> toRow(List<Object> elements, boolean withInternals)
             {
-                ImmutableList.Builder<ByteBuffer> builder = ImmutableList.builder(); 
+                ImmutableList.Builder<ByteBuffer> builder = ImmutableList.builder();
 
                 builder.add(UTF8Type.instance.decompose((String) elements.get(CLUSTER_NAME_INDEX)),
                             UTF8Type.instance.decompose((String) elements.get(PARTITIONER_NAME_INDEX)),
diff --git a/src/java/org/apache/cassandra/cql3/statements/PermissionsManagementStatement.java b/src/java/org/apache/cassandra/cql3/statements/PermissionsManagementStatement.java
index aa7e85ba73..e809a27a45 100644
--- a/src/java/org/apache/cassandra/cql3/statements/PermissionsManagementStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/PermissionsManagementStatement.java
@@ -21,7 +21,6 @@ import java.util.Set;
 
 import org.apache.cassandra.auth.*;
 import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.schema.SchemaConstants;
 import org.apache.cassandra.cql3.RoleName;
 import org.apache.cassandra.exceptions.InvalidRequestException;
 import org.apache.cassandra.exceptions.RequestValidationException;
@@ -55,13 +54,6 @@ public abstract class PermissionsManagementStatement extends AuthorizationStatem
         // called both here and in authorize(), as in some cases we do not call the latter.
         resource = maybeCorrectResource(resource, state);
 
-        // altering permissions on builtin functions is not supported
-        if (resource instanceof FunctionResource
-            && SchemaConstants.SYSTEM_KEYSPACE_NAME.equals(((FunctionResource)resource).getKeyspace()))
-        {
-            throw new InvalidRequestException("Altering permissions on builtin functions is not supported");
-        }
-
         if (!resource.exists())
             throw new InvalidRequestException(String.format("Resource %s doesn't exist", resource));
     }
@@ -78,7 +70,7 @@ public abstract class PermissionsManagementStatement extends AuthorizationStatem
         for (Permission p : permissions)
             state.ensurePermission(p, resource);
     }
-    
+
     @Override
     public String toString()
     {
diff --git a/src/java/org/apache/cassandra/cql3/statements/schema/AlterTypeStatement.java b/src/java/org/apache/cassandra/cql3/statements/schema/AlterTypeStatement.java
index 9c3be11b8e..89cb990ae2 100644
--- a/src/java/org/apache/cassandra/cql3/statements/schema/AlterTypeStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/schema/AlterTypeStatement.java
@@ -179,7 +179,7 @@ public abstract class AlterTypeStatement extends AlterSchemaStatement
         UserType apply(KeyspaceMetadata keyspace, UserType userType)
         {
             List<String> dependentAggregates =
-                keyspace.functions
+                keyspace.userFunctions
                         .udas()
                         .filter(uda -> null != uda.initialCondition() && uda.stateType().referencesUserType(userType.name))
                         .map(uda -> uda.name().toString())
diff --git a/src/java/org/apache/cassandra/cql3/statements/schema/CreateAggregateStatement.java b/src/java/org/apache/cassandra/cql3/statements/schema/CreateAggregateStatement.java
index 48eb63aacd..9b051755d3 100644
--- a/src/java/org/apache/cassandra/cql3/statements/schema/CreateAggregateStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/schema/CreateAggregateStatement.java
@@ -31,9 +31,14 @@ import org.apache.cassandra.auth.FunctionResource;
 import org.apache.cassandra.auth.IResource;
 import org.apache.cassandra.auth.Permission;
 import org.apache.cassandra.cql3.*;
-import org.apache.cassandra.cql3.functions.*;
+import org.apache.cassandra.cql3.functions.FunctionName;
+import org.apache.cassandra.cql3.functions.ScalarFunction;
+import org.apache.cassandra.cql3.functions.UDAggregate;
+import org.apache.cassandra.cql3.functions.UDFunction;
+import org.apache.cassandra.cql3.functions.UDHelper;
+import org.apache.cassandra.cql3.functions.UserFunction;
 import org.apache.cassandra.db.marshal.AbstractType;
-import org.apache.cassandra.schema.Functions.FunctionsDiff;
+import org.apache.cassandra.schema.UserFunctions.FunctionsDiff;
 import org.apache.cassandra.schema.KeyspaceMetadata;
 import org.apache.cassandra.schema.Keyspaces;
 import org.apache.cassandra.schema.Keyspaces.KeyspacesDiff;
@@ -114,8 +119,8 @@ public final class CreateAggregateStatement extends AlterSchemaStatement
         AbstractType<?> stateType = rawStateType.prepare(keyspaceName, keyspace.types).getType().udfType();
         List<AbstractType<?>> stateFunctionArguments = Lists.newArrayList(concat(singleton(stateType), argumentTypes));
 
-        Function stateFunction =
-            keyspace.functions
+        UserFunction stateFunction =
+            keyspace.userFunctions
                     .find(stateFunctionName, stateFunctionArguments)
                     .orElseThrow(() -> ire("State function %s doesn't exist", stateFunctionString()));
 
@@ -132,12 +137,12 @@ public final class CreateAggregateStatement extends AlterSchemaStatement
          * Resolve the final function and return type
          */
 
-        Function finalFunction = null;
+        UserFunction finalFunction = null;
         AbstractType<?> returnType = stateFunction.returnType();
 
         if (null != finalFunctionName)
         {
-            finalFunction = keyspace.functions.find(finalFunctionName, singletonList(stateType)).orElse(null);
+            finalFunction = keyspace.userFunctions.find(finalFunctionName, singletonList(stateType)).orElse(null);
             if (null == finalFunction)
                 throw ire("Final function %s doesn't exist", finalFunctionString());
 
@@ -196,7 +201,7 @@ public final class CreateAggregateStatement extends AlterSchemaStatement
                             (ScalarFunction) finalFunction,
                             initialValue);
 
-        Function existingAggregate = keyspace.functions.find(aggregate.name(), argumentTypes).orElse(null);
+        UserFunction existingAggregate = keyspace.userFunctions.find(aggregate.name(), argumentTypes).orElse(null);
         if (null != existingAggregate)
         {
             if (!existingAggregate.isAggregate())
@@ -217,7 +222,7 @@ public final class CreateAggregateStatement extends AlterSchemaStatement
             }
         }
 
-        return schema.withAddedOrUpdated(keyspace.withSwapped(keyspace.functions.withAddedOrUpdated(aggregate)));
+        return schema.withAddedOrUpdated(keyspace.withSwapped(keyspace.userFunctions.withAddedOrUpdated(aggregate)));
     }
 
     SchemaChange schemaChangeEvent(KeyspacesDiff diff)
@@ -239,7 +244,7 @@ public final class CreateAggregateStatement extends AlterSchemaStatement
     {
         FunctionName name = new FunctionName(keyspaceName, aggregateName);
 
-        if (Schema.instance.findFunction(name, Lists.transform(rawArgumentTypes, t -> t.prepare(keyspaceName).getType())).isPresent() && orReplace)
+        if (Schema.instance.findUserFunction(name, Lists.transform(rawArgumentTypes, t -> t.prepare(keyspaceName).getType())).isPresent() && orReplace)
             client.ensurePermission(Permission.ALTER, FunctionResource.functionFromCql(keyspaceName, aggregateName, rawArgumentTypes));
         else
             client.ensurePermission(Permission.CREATE, FunctionResource.keyspace(keyspaceName));
diff --git a/src/java/org/apache/cassandra/cql3/statements/schema/CreateFunctionStatement.java b/src/java/org/apache/cassandra/cql3/statements/schema/CreateFunctionStatement.java
index bf297aa986..99f6092503 100644
--- a/src/java/org/apache/cassandra/cql3/statements/schema/CreateFunctionStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/schema/CreateFunctionStatement.java
@@ -30,11 +30,11 @@ import org.apache.cassandra.auth.*;
 import org.apache.cassandra.cql3.CQL3Type;
 import org.apache.cassandra.cql3.CQLStatement;
 import org.apache.cassandra.cql3.ColumnIdentifier;
-import org.apache.cassandra.cql3.functions.Function;
 import org.apache.cassandra.cql3.functions.FunctionName;
 import org.apache.cassandra.cql3.functions.UDFunction;
+import org.apache.cassandra.cql3.functions.UserFunction;
 import org.apache.cassandra.db.marshal.AbstractType;
-import org.apache.cassandra.schema.Functions.FunctionsDiff;
+import org.apache.cassandra.schema.UserFunctions.FunctionsDiff;
 import org.apache.cassandra.schema.KeyspaceMetadata;
 import org.apache.cassandra.schema.Keyspaces;
 import org.apache.cassandra.schema.Keyspaces.KeyspacesDiff;
@@ -119,7 +119,7 @@ public final class CreateFunctionStatement extends AlterSchemaStatement
                               language,
                               body);
 
-        Function existingFunction = keyspace.functions.find(function.name(), argumentTypes).orElse(null);
+        UserFunction existingFunction = keyspace.userFunctions.find(function.name(), argumentTypes).orElse(null);
         if (null != existingFunction)
         {
             if (existingFunction.isAggregate())
@@ -149,7 +149,7 @@ public final class CreateFunctionStatement extends AlterSchemaStatement
             // TODO: update dependent aggregates
         }
 
-        return schema.withAddedOrUpdated(keyspace.withSwapped(keyspace.functions.withAddedOrUpdated(function)));
+        return schema.withAddedOrUpdated(keyspace.withSwapped(keyspace.userFunctions.withAddedOrUpdated(function)));
     }
 
     SchemaChange schemaChangeEvent(KeyspacesDiff diff)
@@ -171,7 +171,7 @@ public final class CreateFunctionStatement extends AlterSchemaStatement
     {
         FunctionName name = new FunctionName(keyspaceName, functionName);
 
-        if (Schema.instance.findFunction(name, Lists.transform(rawArgumentTypes, t -> t.prepare(keyspaceName).getType().udfType())).isPresent() && orReplace)
+        if (Schema.instance.findUserFunction(name, Lists.transform(rawArgumentTypes, t -> t.prepare(keyspaceName).getType().udfType())).isPresent() && orReplace)
             client.ensurePermission(Permission.ALTER, FunctionResource.functionFromCql(keyspaceName, functionName, rawArgumentTypes));
         else
             client.ensurePermission(Permission.CREATE, FunctionResource.keyspace(keyspaceName));
diff --git a/src/java/org/apache/cassandra/cql3/statements/schema/DropAggregateStatement.java b/src/java/org/apache/cassandra/cql3/statements/schema/DropAggregateStatement.java
index 0cb1cbeb36..186891e2f1 100644
--- a/src/java/org/apache/cassandra/cql3/statements/schema/DropAggregateStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/schema/DropAggregateStatement.java
@@ -28,9 +28,9 @@ import org.apache.cassandra.auth.FunctionResource;
 import org.apache.cassandra.auth.Permission;
 import org.apache.cassandra.cql3.CQL3Type;
 import org.apache.cassandra.cql3.CQLStatement;
-import org.apache.cassandra.cql3.functions.Function;
 import org.apache.cassandra.cql3.functions.FunctionName;
 import org.apache.cassandra.cql3.functions.UDAggregate;
+import org.apache.cassandra.cql3.functions.UserFunction;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.schema.*;
 import org.apache.cassandra.schema.Keyspaces.KeyspacesDiff;
@@ -80,7 +80,7 @@ public final class DropAggregateStatement extends AlterSchemaStatement
             throw ire("Aggregate '%s' doesn't exist", name);
         }
 
-        Collection<Function> aggregates = keyspace.functions.get(new FunctionName(keyspaceName, aggregateName));
+        Collection<UserFunction> aggregates = keyspace.userFunctions.get(new FunctionName(keyspaceName, aggregateName));
         if (aggregates.size() > 1 && !argumentsSpeficied)
         {
             throw ire("'DROP AGGREGATE %s' matches multiple function definitions; " +
@@ -97,11 +97,11 @@ public final class DropAggregateStatement extends AlterSchemaStatement
 
         List<AbstractType<?>> argumentTypes = prepareArgumentTypes(keyspace.types);
 
-        Predicate<Function> filter = Functions.Filter.UDA;
+        Predicate<UserFunction> filter = UserFunctions.Filter.UDA;
         if (argumentsSpeficied)
-            filter = filter.and(f -> Functions.typesMatch(f.argTypes(), argumentTypes));
+            filter = filter.and(f -> f.typesMatch(argumentTypes));
 
-        Function aggregate = aggregates.stream().filter(filter).findAny().orElse(null);
+        UserFunction aggregate = aggregates.stream().filter(filter).findAny().orElse(null);
         if (null == aggregate)
         {
             if (ifExists)
@@ -110,12 +110,12 @@ public final class DropAggregateStatement extends AlterSchemaStatement
             throw ire("Aggregate '%s' doesn't exist", name);
         }
 
-        return schema.withAddedOrUpdated(keyspace.withSwapped(keyspace.functions.without(aggregate)));
+        return schema.withAddedOrUpdated(keyspace.withSwapped(keyspace.userFunctions.without(aggregate)));
     }
 
     SchemaChange schemaChangeEvent(KeyspacesDiff diff)
     {
-        Functions dropped = diff.altered.get(0).udas.dropped;
+        UserFunctions dropped = diff.altered.get(0).udas.dropped;
         assert dropped.size() == 1;
         return SchemaChange.forAggregate(Change.DROPPED, (UDAggregate) dropped.iterator().next());
     }
@@ -126,9 +126,9 @@ public final class DropAggregateStatement extends AlterSchemaStatement
         if (null == keyspace)
             return;
 
-        Stream<Function> functions = keyspace.functions.get(new FunctionName(keyspaceName, aggregateName)).stream();
+        Stream<UserFunction> functions = keyspace.userFunctions.get(new FunctionName(keyspaceName, aggregateName)).stream();
         if (argumentsSpeficied)
-            functions = functions.filter(f -> Functions.typesMatch(f.argTypes(), prepareArgumentTypes(keyspace.types)));
+            functions = functions.filter(f -> f.typesMatch(prepareArgumentTypes(keyspace.types)));
 
         functions.forEach(f -> client.ensurePermission(Permission.DROP, FunctionResource.function(f)));
     }
diff --git a/src/java/org/apache/cassandra/cql3/statements/schema/DropFunctionStatement.java b/src/java/org/apache/cassandra/cql3/statements/schema/DropFunctionStatement.java
index d9d637de87..dd6afbd1cd 100644
--- a/src/java/org/apache/cassandra/cql3/statements/schema/DropFunctionStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/schema/DropFunctionStatement.java
@@ -28,7 +28,9 @@ import org.apache.cassandra.auth.FunctionResource;
 import org.apache.cassandra.auth.Permission;
 import org.apache.cassandra.cql3.CQL3Type;
 import org.apache.cassandra.cql3.CQLStatement;
-import org.apache.cassandra.cql3.functions.*;
+import org.apache.cassandra.cql3.functions.FunctionName;
+import org.apache.cassandra.cql3.functions.UDFunction;
+import org.apache.cassandra.cql3.functions.UserFunction;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.schema.*;
 import org.apache.cassandra.schema.Keyspaces.KeyspacesDiff;
@@ -79,7 +81,7 @@ public final class DropFunctionStatement extends AlterSchemaStatement
             throw ire("Function '%s' doesn't exist", name);
         }
 
-        Collection<Function> functions = keyspace.functions.get(new FunctionName(keyspaceName, functionName));
+        Collection<UserFunction> functions = keyspace.userFunctions.get(new FunctionName(keyspaceName, functionName));
         if (functions.size() > 1 && !argumentsSpeficied)
         {
             throw ire("'DROP FUNCTION %s' matches multiple function definitions; " +
@@ -96,11 +98,11 @@ public final class DropFunctionStatement extends AlterSchemaStatement
 
         List<AbstractType<?>> argumentTypes = prepareArgumentTypes(keyspace.types);
 
-        Predicate<Function> filter = Functions.Filter.UDF;
+        Predicate<UserFunction> filter = UserFunctions.Filter.UDF;
         if (argumentsSpeficied)
-            filter = filter.and(f -> Functions.typesMatch(f.argTypes(), argumentTypes));
+            filter = filter.and(f -> f.typesMatch(argumentTypes));
 
-        Function function = functions.stream().filter(filter).findAny().orElse(null);
+        UserFunction function = functions.stream().filter(filter).findAny().orElse(null);
         if (null == function)
         {
             if (ifExists)
@@ -110,7 +112,7 @@ public final class DropFunctionStatement extends AlterSchemaStatement
         }
 
         String dependentAggregates =
-            keyspace.functions
+            keyspace.userFunctions
                     .aggregatesUsingFunction(function)
                     .map(a -> a.name().toString())
                     .collect(joining(", "));
@@ -118,12 +120,12 @@ public final class DropFunctionStatement extends AlterSchemaStatement
         if (!dependentAggregates.isEmpty())
             throw ire("Function '%s' is still referenced by aggregates %s", name, dependentAggregates);
 
-        return schema.withAddedOrUpdated(keyspace.withSwapped(keyspace.functions.without(function)));
+        return schema.withAddedOrUpdated(keyspace.withSwapped(keyspace.userFunctions.without(function)));
     }
 
     SchemaChange schemaChangeEvent(KeyspacesDiff diff)
     {
-        Functions dropped = diff.altered.get(0).udfs.dropped;
+        UserFunctions dropped = diff.altered.get(0).udfs.dropped;
         assert dropped.size() == 1;
         return SchemaChange.forFunction(Change.DROPPED, (UDFunction) dropped.iterator().next());
     }
@@ -134,9 +136,9 @@ public final class DropFunctionStatement extends AlterSchemaStatement
         if (null == keyspace)
             return;
 
-        Stream<Function> functions = keyspace.functions.get(new FunctionName(keyspaceName, functionName)).stream();
+        Stream<UserFunction> functions = keyspace.userFunctions.get(new FunctionName(keyspaceName, functionName)).stream();
         if (argumentsSpeficied)
-            functions = functions.filter(f -> Functions.typesMatch(f.argTypes(), prepareArgumentTypes(keyspace.types)));
+            functions = functions.filter(f -> f.typesMatch(prepareArgumentTypes(keyspace.types)));
 
         functions.forEach(f -> client.ensurePermission(Permission.DROP, FunctionResource.function(f)));
     }
diff --git a/src/java/org/apache/cassandra/cql3/statements/schema/DropTypeStatement.java b/src/java/org/apache/cassandra/cql3/statements/schema/DropTypeStatement.java
index d188bdb792..97830c882a 100644
--- a/src/java/org/apache/cassandra/cql3/statements/schema/DropTypeStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/schema/DropTypeStatement.java
@@ -24,7 +24,7 @@ import org.apache.cassandra.audit.AuditLogEntryType;
 import org.apache.cassandra.auth.Permission;
 import org.apache.cassandra.cql3.CQLStatement;
 import org.apache.cassandra.cql3.UTName;
-import org.apache.cassandra.cql3.functions.Function;
+import org.apache.cassandra.cql3.functions.UserFunction;
 import org.apache.cassandra.db.marshal.UserType;
 import org.apache.cassandra.schema.KeyspaceMetadata;
 import org.apache.cassandra.schema.Keyspaces.KeyspacesDiff;
@@ -82,7 +82,7 @@ public final class DropTypeStatement extends AlterSchemaStatement
          * 2) other user type that can nest the one we drop and
          * 3) existing tables referencing the type (maybe in a nested way).
          */
-        Iterable<Function> functions = keyspace.functions.referencingUserType(name);
+        Iterable<UserFunction> functions = keyspace.userFunctions.referencingUserType(name);
         if (!isEmpty(functions))
         {
             throw ire("Cannot drop user type '%s.%s' as it is still used by functions %s",
diff --git a/src/java/org/apache/cassandra/db/SystemKeyspace.java b/src/java/org/apache/cassandra/db/SystemKeyspace.java
index 5280b3f1da..bca5a18d08 100644
--- a/src/java/org/apache/cassandra/db/SystemKeyspace.java
+++ b/src/java/org/apache/cassandra/db/SystemKeyspace.java
@@ -57,12 +57,6 @@ import org.slf4j.LoggerFactory;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.cql3.QueryProcessor;
 import org.apache.cassandra.cql3.UntypedResultSet;
-import org.apache.cassandra.cql3.functions.AggregateFcts;
-import org.apache.cassandra.cql3.functions.BytesConversionFcts;
-import org.apache.cassandra.cql3.functions.CastFcts;
-import org.apache.cassandra.cql3.functions.OperationFcts;
-import org.apache.cassandra.cql3.functions.TimeFcts;
-import org.apache.cassandra.cql3.functions.UuidFcts;
 import org.apache.cassandra.cql3.statements.schema.CreateTableStatement;
 import org.apache.cassandra.db.commitlog.CommitLogPosition;
 import org.apache.cassandra.db.compaction.CompactionHistoryTabularData;
@@ -93,7 +87,7 @@ import org.apache.cassandra.metrics.RestorableMeter;
 import org.apache.cassandra.metrics.TopPartitionTracker;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.schema.CompactionParams;
-import org.apache.cassandra.schema.Functions;
+import org.apache.cassandra.schema.UserFunctions;
 import org.apache.cassandra.schema.KeyspaceMetadata;
 import org.apache.cassandra.schema.KeyspaceParams;
 import org.apache.cassandra.schema.Schema;
@@ -517,7 +511,7 @@ public final class SystemKeyspace
 
     public static KeyspaceMetadata metadata()
     {
-        return KeyspaceMetadata.create(SchemaConstants.SYSTEM_KEYSPACE_NAME, KeyspaceParams.local(), tables(), Views.none(), Types.none(), functions());
+        return KeyspaceMetadata.create(SchemaConstants.SYSTEM_KEYSPACE_NAME, KeyspaceParams.local(), tables(), Views.none(), Types.none(), UserFunctions.none());
     }
 
     private static Tables tables()
@@ -547,18 +541,6 @@ public final class SystemKeyspace
                          TopPartitions);
     }
 
-    private static Functions functions()
-    {
-        return Functions.builder()
-                        .add(UuidFcts.all())
-                        .add(TimeFcts.all())
-                        .add(BytesConversionFcts.all())
-                        .add(AggregateFcts.all())
-                        .add(CastFcts.all())
-                        .add(OperationFcts.all())
-                        .build();
-    }
-
     private static volatile Map<TableId, Pair<CommitLogPosition, Long>> truncationRecords;
 
     public enum BootstrapState
diff --git a/src/java/org/apache/cassandra/db/marshal/AbstractType.java b/src/java/org/apache/cassandra/db/marshal/AbstractType.java
index 8f54cb65a6..b421f2eabb 100644
--- a/src/java/org/apache/cassandra/db/marshal/AbstractType.java
+++ b/src/java/org/apache/cassandra/db/marshal/AbstractType.java
@@ -709,4 +709,10 @@ public abstract class AbstractType<T> implements Comparator<ByteBuffer>, Assignm
     {
         return testAssignment(receiver.type);
     }
+
+    @Override
+    public AbstractType<?> getCompatibleTypeIfKnown(String keyspace)
+    {
+        return this;
+    }
 }
diff --git a/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java
index 6d5456124f..604a4ee361 100644
--- a/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java
@@ -563,11 +563,11 @@ public class CQLSSTableWriter implements Closeable
                 String keyspaceName = schemaStatement.keyspace();
 
                 Schema.instance.transform(SchemaTransformations.addKeyspace(KeyspaceMetadata.create(keyspaceName,
-                                                                                                           KeyspaceParams.simple(1),
-                                                                                                           Tables.none(),
-                                                                                                           Views.none(),
-                                                                                                           Types.none(),
-                                                                                                           Functions.none()), true));
+                                                                                                    KeyspaceParams.simple(1),
+                                                                                                    Tables.none(),
+                                                                                                    Views.none(),
+                                                                                                    Types.none(),
+                                                                                                    UserFunctions.none()), true));
 
                 KeyspaceMetadata ksm = Schema.instance.getKeyspaceMetadata(keyspaceName);
 
diff --git a/src/java/org/apache/cassandra/schema/DistributedSchema.java b/src/java/org/apache/cassandra/schema/DistributedSchema.java
index 4fed9bb1e8..d2f30aa727 100644
--- a/src/java/org/apache/cassandra/schema/DistributedSchema.java
+++ b/src/java/org/apache/cassandra/schema/DistributedSchema.java
@@ -91,7 +91,7 @@ public class DistributedSchema
             ksm.tables.forEach(tm -> Preconditions.checkArgument(tm.keyspace.equals(ksm.name), "Table %s metadata points to keyspace %s while defined in keyspace %s", tm.name, tm.keyspace, ksm.name));
             ksm.views.forEach(vm -> Preconditions.checkArgument(vm.keyspace().equals(ksm.name), "View %s metadata points to keyspace %s while defined in keyspace %s", vm.name(), vm.keyspace(), ksm.name));
             ksm.types.forEach(ut -> Preconditions.checkArgument(ut.keyspace.equals(ksm.name), "Type %s points to keyspace %s while defined in keyspace %s", ut.name, ut.keyspace, ksm.name));
-            ksm.functions.forEach(f -> Preconditions.checkArgument(f.name().keyspace.equals(ksm.name), "Function %s points to keyspace %s while defined in keyspace %s", f.name().name, f.name().keyspace, ksm.name));
+            ksm.userFunctions.forEach(f -> Preconditions.checkArgument(f.name().keyspace.equals(ksm.name), "Function %s points to keyspace %s while defined in keyspace %s", f.name().name, f.name().keyspace, ksm.name));
         });
     }
 }
diff --git a/src/java/org/apache/cassandra/schema/KeyspaceMetadata.java b/src/java/org/apache/cassandra/schema/KeyspaceMetadata.java
index 6d85391d30..4adf464100 100644
--- a/src/java/org/apache/cassandra/schema/KeyspaceMetadata.java
+++ b/src/java/org/apache/cassandra/schema/KeyspaceMetadata.java
@@ -35,7 +35,7 @@ import org.apache.cassandra.cql3.functions.UDFunction;
 import org.apache.cassandra.db.marshal.UserType;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.locator.AbstractReplicationStrategy;
-import org.apache.cassandra.schema.Functions.FunctionsDiff;
+import org.apache.cassandra.schema.UserFunctions.FunctionsDiff;
 import org.apache.cassandra.schema.Tables.TablesDiff;
 import org.apache.cassandra.schema.Types.TypesDiff;
 import org.apache.cassandra.schema.Views.ViewsDiff;
@@ -61,9 +61,9 @@ public final class KeyspaceMetadata implements SchemaElement
     public final Tables tables;
     public final Views views;
     public final Types types;
-    public final Functions functions;
+    public final UserFunctions userFunctions;
 
-    private KeyspaceMetadata(String name, Kind kind, KeyspaceParams params, Tables tables, Views views, Types types, Functions functions)
+    private KeyspaceMetadata(String name, Kind kind, KeyspaceParams params, Tables tables, Views views, Types types, UserFunctions functions)
     {
         this.name = name;
         this.kind = kind;
@@ -71,57 +71,57 @@ public final class KeyspaceMetadata implements SchemaElement
         this.tables = tables;
         this.views = views;
         this.types = types;
-        this.functions = functions;
+        this.userFunctions = functions;
     }
 
     public static KeyspaceMetadata create(String name, KeyspaceParams params)
     {
-        return new KeyspaceMetadata(name, Kind.REGULAR, params, Tables.none(), Views.none(), Types.none(), Functions.none());
+        return new KeyspaceMetadata(name, Kind.REGULAR, params, Tables.none(), Views.none(), Types.none(), UserFunctions.none());
     }
 
     public static KeyspaceMetadata create(String name, KeyspaceParams params, Tables tables)
     {
-        return new KeyspaceMetadata(name, Kind.REGULAR, params, tables, Views.none(), Types.none(), Functions.none());
+        return new KeyspaceMetadata(name, Kind.REGULAR, params, tables, Views.none(), Types.none(), UserFunctions.none());
     }
 
-    public static KeyspaceMetadata create(String name, KeyspaceParams params, Tables tables, Views views, Types types, Functions functions)
+    public static KeyspaceMetadata create(String name, KeyspaceParams params, Tables tables, Views views, Types types, UserFunctions functions)
     {
         return new KeyspaceMetadata(name, Kind.REGULAR, params, tables, views, types, functions);
     }
 
     public static KeyspaceMetadata virtual(String name, Tables tables)
     {
-        return new KeyspaceMetadata(name, Kind.VIRTUAL, KeyspaceParams.local(), tables, Views.none(), Types.none(), Functions.none());
+        return new KeyspaceMetadata(name, Kind.VIRTUAL, KeyspaceParams.local(), tables, Views.none(), Types.none(), UserFunctions.none());
     }
 
     public KeyspaceMetadata withSwapped(KeyspaceParams params)
     {
-        return new KeyspaceMetadata(name, kind, params, tables, views, types, functions);
+        return new KeyspaceMetadata(name, kind, params, tables, views, types, userFunctions);
     }
 
     public KeyspaceMetadata withSwapped(Tables regular)
     {
-        return new KeyspaceMetadata(name, kind, params, regular, views, types, functions);
+        return new KeyspaceMetadata(name, kind, params, regular, views, types, userFunctions);
     }
 
     public KeyspaceMetadata withSwapped(Views views)
     {
-        return new KeyspaceMetadata(name, kind, params, tables, views, types, functions);
+        return new KeyspaceMetadata(name, kind, params, tables, views, types, userFunctions);
     }
 
     public KeyspaceMetadata withSwapped(Types types)
     {
-        return new KeyspaceMetadata(name, kind, params, tables, views, types, functions);
+        return new KeyspaceMetadata(name, kind, params, tables, views, types, userFunctions);
     }
 
-    public KeyspaceMetadata withSwapped(Functions functions)
+    public KeyspaceMetadata withSwapped(UserFunctions functions)
     {
         return new KeyspaceMetadata(name, kind, params, tables, views, types, functions);
     }
 
     public KeyspaceMetadata empty()
     {
-        return new KeyspaceMetadata(this.name, this.kind, this.params, Tables.none(), Views.none(), Types.none(), Functions.none());
+        return new KeyspaceMetadata(this.name, this.kind, this.params, Tables.none(), Views.none(), Types.none(), UserFunctions.none());
     }
 
     public boolean isVirtual()
@@ -141,7 +141,7 @@ public final class KeyspaceMetadata implements SchemaElement
                                     tables.withUpdatedUserType(udt),
                                     views.withUpdatedUserTypes(udt),
                                     types.withUpdatedUserType(udt),
-                                    functions.withUpdatedUserType(udt));
+                                    userFunctions.withUpdatedUserType(udt));
     }
 
     public Iterable<TableMetadata> tablesAndViews()
@@ -200,7 +200,7 @@ public final class KeyspaceMetadata implements SchemaElement
     @Override
     public int hashCode()
     {
-        return Objects.hashCode(name, kind, params, tables, views, functions, types);
+        return Objects.hashCode(name, kind, params, tables, views, userFunctions, types);
     }
 
     @Override
@@ -215,12 +215,12 @@ public final class KeyspaceMetadata implements SchemaElement
         KeyspaceMetadata other = (KeyspaceMetadata) o;
 
         return name.equals(other.name)
-            && kind == other.kind
-            && params.equals(other.params)
-            && tables.equals(other.tables)
-            && views.equals(other.views)
-            && functions.equals(other.functions)
-            && types.equals(other.types);
+               && kind == other.kind
+               && params.equals(other.params)
+               && tables.equals(other.tables)
+               && views.equals(other.views)
+               && userFunctions.equals(other.userFunctions)
+               && types.equals(other.types);
     }
 
     @Override
@@ -232,7 +232,7 @@ public final class KeyspaceMetadata implements SchemaElement
                           .add("params", params)
                           .add("tables", tables)
                           .add("views", views)
-                          .add("functions", functions)
+                          .add("functions", userFunctions)
                           .add("types", types)
                           .toString();
     }
@@ -385,10 +385,10 @@ public final class KeyspaceMetadata implements SchemaElement
 
             @SuppressWarnings("unchecked") FunctionsDiff<UDFunction>  udfs = FunctionsDiff.NONE;
             @SuppressWarnings("unchecked") FunctionsDiff<UDAggregate> udas = FunctionsDiff.NONE;
-            if (before.functions != after.functions)
+            if (before.userFunctions != after.userFunctions)
             {
-                udfs = Functions.udfsDiff(before.functions, after.functions);
-                udas = Functions.udasDiff(before.functions, after.functions);
+                udfs = UserFunctions.udfsDiff(before.userFunctions, after.userFunctions);
+                udas = UserFunctions.udasDiff(before.userFunctions, after.userFunctions);
             }
 
             if (before.params.equals(after.params) && tables.isEmpty() && views.isEmpty() && types.isEmpty() && udfs.isEmpty() && udas.isEmpty())
diff --git a/src/java/org/apache/cassandra/schema/Schema.java b/src/java/org/apache/cassandra/schema/Schema.java
index dee24c18cf..52c18f496d 100644
--- a/src/java/org/apache/cassandra/schema/Schema.java
+++ b/src/java/org/apache/cassandra/schema/Schema.java
@@ -461,13 +461,13 @@ public class Schema implements SchemaProvider
     /* Function helpers */
 
     /**
-     * Get all function overloads with the specified name
+     * Get all user-defined function overloads with the specified name.
      *
      * @param name fully qualified function name
      * @return an empty list if the keyspace or the function name are not found;
-     *         a non-empty collection of {@link Function} otherwise
+     *         a non-empty collection of {@link UserFunction} otherwise
      */
-    public Collection<Function> getFunctions(FunctionName name)
+    public Collection<UserFunction> getUserFunctions(FunctionName name)
     {
         if (!name.hasKeyspace())
             throw new IllegalArgumentException(String.format("Function name must be fully qualified: got %s", name));
@@ -475,26 +475,24 @@ public class Schema implements SchemaProvider
         KeyspaceMetadata ksm = getKeyspaceMetadata(name.keyspace);
         return ksm == null
                ? Collections.emptyList()
-               : ksm.functions.get(name);
+               : ksm.userFunctions.get(name);
     }
 
     /**
-     * Find the function with the specified name
+     * Find the function with the specified name and arguments.
      *
      * @param name     fully qualified function name
      * @param argTypes function argument types
      * @return an empty {@link Optional} if the keyspace or the function name are not found;
      *         a non-empty optional of {@link Function} otherwise
      */
-    public Optional<Function> findFunction(FunctionName name, List<AbstractType<?>> argTypes)
+    public Optional<UserFunction> findUserFunction(FunctionName name, List<AbstractType<?>> argTypes)
     {
         if (!name.hasKeyspace())
             throw new IllegalArgumentException(String.format("Function name must be fully quallified: got %s", name));
 
-        KeyspaceMetadata ksm = getKeyspaceMetadata(name.keyspace);
-        return ksm == null
-               ? Optional.empty()
-               : ksm.functions.find(name, argTypes);
+        return Optional.ofNullable(getKeyspaceMetadata(name.keyspace))
+                       .flatMap(ksm -> ksm.userFunctions.find(name, argTypes));
     }
 
     /* Version control */
diff --git a/src/java/org/apache/cassandra/schema/SchemaChangeNotifier.java b/src/java/org/apache/cassandra/schema/SchemaChangeNotifier.java
index c4537e1999..449e99bed1 100644
--- a/src/java/org/apache/cassandra/schema/SchemaChangeNotifier.java
+++ b/src/java/org/apache/cassandra/schema/SchemaChangeNotifier.java
@@ -52,8 +52,8 @@ public class SchemaChangeNotifier
         keyspace.types.forEach(this::notifyCreateType);
         keyspace.tables.forEach(this::notifyCreateTable);
         keyspace.views.forEach(this::notifyCreateView);
-        keyspace.functions.udfs().forEach(this::notifyCreateFunction);
-        keyspace.functions.udas().forEach(this::notifyCreateAggregate);
+        keyspace.userFunctions.udfs().forEach(this::notifyCreateFunction);
+        keyspace.userFunctions.udas().forEach(this::notifyCreateAggregate);
     }
 
     public void notifyKeyspaceAltered(KeyspaceMetadata.KeyspaceDiff delta, boolean dropData)
@@ -84,8 +84,8 @@ public class SchemaChangeNotifier
 
     public void notifyKeyspaceDropped(KeyspaceMetadata keyspace, boolean dropData)
     {
-        keyspace.functions.udas().forEach(this::notifyDropAggregate);
-        keyspace.functions.udfs().forEach(this::notifyDropFunction);
+        keyspace.userFunctions.udas().forEach(this::notifyDropAggregate);
+        keyspace.userFunctions.udfs().forEach(this::notifyDropFunction);
         keyspace.views.forEach(view -> notifyDropView(view, dropData));
         keyspace.tables.forEach(metadata -> notifyDropTable(metadata, dropData));
         keyspace.types.forEach(this::notifyDropType);
diff --git a/src/java/org/apache/cassandra/schema/SchemaEvent.java b/src/java/org/apache/cassandra/schema/SchemaEvent.java
index 5703fe29b5..8cdd0c8c4b 100644
--- a/src/java/org/apache/cassandra/schema/SchemaEvent.java
+++ b/src/java/org/apache/cassandra/schema/SchemaEvent.java
@@ -172,7 +172,7 @@ public final class SchemaEvent extends DiagnosticEvent
         if (ksm.params != null) ret.put("params", ksm.params.toString());
         if (ksm.tables != null) ret.put("tables", ksm.tables.toString());
         if (ksm.views != null) ret.put("views", ksm.views.toString());
-        if (ksm.functions != null) ret.put("functions", ksm.functions.toString());
+        if (ksm.userFunctions != null) ret.put("functions", ksm.userFunctions.toString());
         if (ksm.types != null) ret.put("types", ksm.types.toString());
         return ret;
     }
diff --git a/src/java/org/apache/cassandra/schema/SchemaKeyspace.java b/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
index 0604fc1ffd..23544cc0b9 100644
--- a/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
+++ b/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
@@ -63,7 +63,7 @@ import static org.apache.cassandra.utils.Simulate.With.GLOBAL_CLOCK;
 
 /**
  * system_schema.* tables and methods for manipulating them.
- * 
+ *
  * Please notice this class is _not_ thread safe and all methods which reads or updates the data in schema keyspace
  * should be accessed only from the implementation of {@link SchemaUpdateHandler} in synchronized blocks.
  */
@@ -471,8 +471,8 @@ public final class SchemaKeyspace
         keyspace.tables.forEach(table -> addTableToSchemaMutation(table, true, builder));
         keyspace.views.forEach(view -> addViewToSchemaMutation(view, true, builder));
         keyspace.types.forEach(type -> addTypeToSchemaMutation(type, builder));
-        keyspace.functions.udfs().forEach(udf -> addFunctionToSchemaMutation(udf, builder));
-        keyspace.functions.udas().forEach(uda -> addAggregateToSchemaMutation(uda, builder));
+        keyspace.userFunctions.udfs().forEach(udf -> addFunctionToSchemaMutation(udf, builder));
+        keyspace.userFunctions.udas().forEach(uda -> addAggregateToSchemaMutation(uda, builder));
 
         return builder;
     }
@@ -871,7 +871,7 @@ public final class SchemaKeyspace
         Types types = fetchTypes(keyspaceName);
         Tables tables = fetchTables(keyspaceName, types);
         Views views = fetchViews(keyspaceName, types);
-        Functions functions = fetchFunctions(keyspaceName, types);
+        UserFunctions functions = fetchFunctions(keyspaceName, types);
         return KeyspaceMetadata.create(keyspaceName, params, tables, views, types, functions);
     }
 
@@ -1139,12 +1139,12 @@ public final class SchemaKeyspace
         return new ViewMetadata(baseTableId, baseTableName, includeAll, whereClause, metadata);
     }
 
-    private static Functions fetchFunctions(String keyspaceName, Types types)
+    private static UserFunctions fetchFunctions(String keyspaceName, Types types)
     {
         Collection<UDFunction> udfs = fetchUDFs(keyspaceName, types);
         Collection<UDAggregate> udas = fetchUDAs(keyspaceName, udfs, types);
 
-        return org.apache.cassandra.schema.Functions.builder().add(udfs).add(udas).build();
+        return UserFunctions.builder().add(udfs).add(udas).build();
     }
 
     private static Collection<UDFunction> fetchUDFs(String keyspaceName, Types types)
@@ -1181,7 +1181,7 @@ public final class SchemaKeyspace
          * TODO: find a way to get rid of Schema.instance dependency; evaluate if the opimisation below makes a difference
          * in the first place. Remove if it isn't.
          */
-        org.apache.cassandra.cql3.functions.Function existing = Schema.instance.findFunction(name, argTypes).orElse(null);
+        UserFunction existing = Schema.instance.findUserFunction(name, argTypes).orElse(null);
         if (existing instanceof UDFunction)
         {
             // This check prevents duplicate compilation of effectively the same UDF.
diff --git a/src/java/org/apache/cassandra/schema/Functions.java b/src/java/org/apache/cassandra/schema/UserFunctions.java
similarity index 67%
rename from src/java/org/apache/cassandra/schema/Functions.java
rename to src/java/org/apache/cassandra/schema/UserFunctions.java
index c5de3b85ea..b40c704d0b 100644
--- a/src/java/org/apache/cassandra/schema/Functions.java
+++ b/src/java/org/apache/cassandra/schema/UserFunctions.java
@@ -34,16 +34,15 @@ import static java.util.stream.Collectors.toList;
 import static com.google.common.collect.Iterables.any;
 
 /**
- * An immutable container for a keyspace's UDAs and UDFs (and, in case of {@link org.apache.cassandra.db.SystemKeyspace},
- * native functions and aggregates).
+ * An immutable container for a keyspace's UDAs and UDFs.
  */
-public final class Functions implements Iterable<Function>
+public final class UserFunctions implements Iterable<UserFunction>
 {
-    public enum Filter implements Predicate<Function>
+    public enum Filter implements Predicate<UserFunction>
     {
         ALL, UDF, UDA;
 
-        public boolean test(Function function)
+        public boolean test(UserFunction function)
         {
             switch (this)
             {
@@ -54,9 +53,9 @@ public final class Functions implements Iterable<Function>
         }
     }
 
-    private final ImmutableMultimap<FunctionName, Function> functions;
+    private final ImmutableMultimap<FunctionName, UserFunction> functions;
 
-    private Functions(Builder builder)
+    private UserFunctions(Builder builder)
     {
         functions = builder.functions.build();
     }
@@ -66,22 +65,17 @@ public final class Functions implements Iterable<Function>
         return new Builder();
     }
 
-    public static Functions none()
+    public static UserFunctions none()
     {
         return builder().build();
     }
 
-    public static Functions of(Function... funs)
-    {
-        return builder().add(funs).build();
-    }
-
-    public Iterator<Function> iterator()
+    public Iterator<UserFunction> iterator()
     {
         return functions.values().iterator();
     }
 
-    public Stream<Function> stream()
+    public Stream<UserFunction> stream()
     {
         return functions.values().stream();
     }
@@ -107,12 +101,12 @@ public final class Functions implements Iterable<Function>
         return stream().filter(Filter.UDA).map(f -> (UDAggregate) f);
     }
 
-    public Iterable<Function> referencingUserType(ByteBuffer name)
+    public Iterable<UserFunction> referencingUserType(ByteBuffer name)
     {
         return Iterables.filter(this, f -> f.referencesUserType(name));
     }
 
-    public Functions withUpdatedUserType(UserType udt)
+    public UserFunctions withUpdatedUserType(UserType udt)
     {
         if (!any(this, f -> f.referencesUserType(udt.name)))
             return this;
@@ -138,7 +132,7 @@ public final class Functions implements Iterable<Function>
      * @param name fully qualified function name
      * @return an empty list if the function name is not found; a non-empty collection of {@link Function} otherwise
      */
-    public Collection<Function> get(FunctionName name)
+    public Collection<UserFunction> get(FunctionName name)
     {
         return functions.get(name);
     }
@@ -173,7 +167,7 @@ public final class Functions implements Iterable<Function>
                         .collect(Collectors.toList());
     }
 
-    public Optional<Function> find(FunctionName name, List<AbstractType<?>> argTypes)
+    public Optional<UserFunction> find(FunctionName name, List<AbstractType<?>> argTypes)
     {
         return find(name, argTypes, Filter.ALL);
     }
@@ -185,10 +179,10 @@ public final class Functions implements Iterable<Function>
      * @param argTypes function argument types
      * @return an empty {@link Optional} if the function name is not found; a non-empty optional of {@link Function} otherwise
      */
-    public Optional<Function> find(FunctionName name, List<AbstractType<?>> argTypes, Filter filter)
+    public Optional<UserFunction> find(FunctionName name, List<AbstractType<?>> argTypes, Filter filter)
     {
         return get(name).stream()
-                        .filter(filter.and(fun -> typesMatch(fun.argTypes(), argTypes)))
+                        .filter(filter.and(fun -> fun.typesMatch(argTypes)))
                         .findAny();
     }
 
@@ -197,36 +191,6 @@ public final class Functions implements Iterable<Function>
         return functions.isEmpty();
     }
 
-    /*
-     * We need to compare the CQL3 representation of the type because comparing
-     * the AbstractType will fail for example if a UDT has been changed.
-     * Reason is that UserType.equals() takes the field names and types into account.
-     * Example CQL sequence that would fail when comparing AbstractType:
-     *    CREATE TYPE foo ...
-     *    CREATE FUNCTION bar ( par foo ) RETURNS foo ...
-     *    ALTER TYPE foo ADD ...
-     * or
-     *    ALTER TYPE foo ALTER ...
-     * or
-     *    ALTER TYPE foo RENAME ...
-     */
-    private static boolean typesMatch(AbstractType<?> t1, AbstractType<?> t2)
-    {
-        return t1.freeze().asCQL3Type().toString().equals(t2.freeze().asCQL3Type().toString());
-    }
-
-    public static boolean typesMatch(List<AbstractType<?>> t1, List<AbstractType<?>> t2)
-    {
-        if (t1.size() != t2.size())
-            return false;
-
-        for (int i = 0; i < t1.size(); i++)
-            if (!typesMatch(t1.get(i), t2.get(i)))
-                return false;
-
-        return true;
-    }
-
     public static int typeHashCode(AbstractType<?> t)
     {
         return t.asCQL3Type().toString().hashCode();
@@ -240,7 +204,7 @@ public final class Functions implements Iterable<Function>
         return h;
     }
 
-    public Functions filter(Predicate<Function> predicate)
+    public UserFunctions filter(Predicate<UserFunction> predicate)
     {
         Builder builder = builder();
         stream().filter(predicate).forEach(builder::add);
@@ -250,7 +214,7 @@ public final class Functions implements Iterable<Function>
     /**
      * Create a Functions instance with the provided function added
      */
-    public Functions with(Function fun)
+    public UserFunctions with(UserFunction fun)
     {
         if (find(fun.name(), fun.argTypes()).isPresent())
             throw new IllegalStateException(String.format("Function %s already exists", fun.name()));
@@ -261,7 +225,7 @@ public final class Functions implements Iterable<Function>
     /**
      * Creates a Functions instance with the function with the provided name and argument types removed
      */
-    public Functions without(FunctionName name, List<AbstractType<?>> argTypes)
+    public UserFunctions without(FunctionName name, List<AbstractType<?>> argTypes)
     {
         Function fun =
             find(name, argTypes).orElseThrow(() -> new IllegalStateException(String.format("Function %s doesn't exists", name)));
@@ -269,14 +233,14 @@ public final class Functions implements Iterable<Function>
         return without(fun);
     }
 
-    public Functions without(Function function)
+    public UserFunctions without(Function function)
     {
         return builder().add(Iterables.filter(this, f -> f != function)).build();
     }
 
-    public Functions withAddedOrUpdated(Function function)
+    public UserFunctions withAddedOrUpdated(UserFunction function)
     {
-        return builder().add(Iterables.filter(this, f -> !(f.name().equals(function.name()) && Functions.typesMatch(f.argTypes(), function.argTypes()))))
+        return builder().add(Iterables.filter(this, f -> !(f.name().equals(function.name()) && f.typesMatch(function.argTypes()))))
                         .add(function)
                         .build();
     }
@@ -284,7 +248,7 @@ public final class Functions implements Iterable<Function>
     @Override
     public boolean equals(Object o)
     {
-        return this == o || (o instanceof Functions && functions.equals(((Functions) o).functions));
+        return this == o || (o instanceof UserFunctions && functions.equals(((UserFunctions) o).functions));
     }
 
     @Override
@@ -301,7 +265,7 @@ public final class Functions implements Iterable<Function>
 
     public static final class Builder
     {
-        final ImmutableMultimap.Builder<FunctionName, Function> functions = new ImmutableMultimap.Builder<>();
+        final ImmutableMultimap.Builder<FunctionName, UserFunction> functions = new ImmutableMultimap.Builder<>();
 
         private Builder()
         {
@@ -309,25 +273,25 @@ public final class Functions implements Iterable<Function>
             functions.orderValuesBy(Comparator.comparingInt(Object::hashCode));
         }
 
-        public Functions build()
+        public UserFunctions build()
         {
-            return new Functions(this);
+            return new UserFunctions(this);
         }
 
-        public Builder add(Function fun)
+        public Builder add(UserFunction fun)
         {
             functions.put(fun.name(), fun);
             return this;
         }
 
-        public Builder add(Function... funs)
+        public Builder add(UserFunction... funs)
         {
-            for (Function fun : funs)
+            for (UserFunction fun : funs)
                 add(fun);
             return this;
         }
 
-        public Builder add(Iterable<? extends Function> funs)
+        public Builder add(Iterable<? extends UserFunction> funs)
         {
             funs.forEach(this::add);
             return this;
@@ -335,35 +299,35 @@ public final class Functions implements Iterable<Function>
     }
 
     @SuppressWarnings("unchecked")
-    static FunctionsDiff<UDFunction> udfsDiff(Functions before, Functions after)
+    static FunctionsDiff<UDFunction> udfsDiff(UserFunctions before, UserFunctions after)
     {
         return (FunctionsDiff<UDFunction>) FunctionsDiff.diff(before, after, Filter.UDF);
     }
 
     @SuppressWarnings("unchecked")
-    static FunctionsDiff<UDAggregate> udasDiff(Functions before, Functions after)
+    static FunctionsDiff<UDAggregate> udasDiff(UserFunctions before, UserFunctions after)
     {
         return (FunctionsDiff<UDAggregate>) FunctionsDiff.diff(before, after, Filter.UDA);
     }
 
-    public static final class FunctionsDiff<T extends Function> extends Diff<Functions, T>
+    public static final class FunctionsDiff<T extends Function> extends Diff<UserFunctions, T>
     {
-        static final FunctionsDiff NONE = new FunctionsDiff<>(Functions.none(), Functions.none(), ImmutableList.of());
+        static final FunctionsDiff NONE = new FunctionsDiff<>(UserFunctions.none(), UserFunctions.none(), ImmutableList.of());
 
-        private FunctionsDiff(Functions created, Functions dropped, ImmutableCollection<Altered<T>> altered)
+        private FunctionsDiff(UserFunctions created, UserFunctions dropped, ImmutableCollection<Altered<T>> altered)
         {
             super(created, dropped, altered);
         }
 
-        private static FunctionsDiff diff(Functions before, Functions after, Filter filter)
+        private static FunctionsDiff diff(UserFunctions before, UserFunctions after, Filter filter)
         {
             if (before == after)
                 return NONE;
 
-            Functions created = after.filter(filter.and(k -> !before.find(k.name(), k.argTypes(), filter).isPresent()));
-            Functions dropped = before.filter(filter.and(k -> !after.find(k.name(), k.argTypes(), filter).isPresent()));
+            UserFunctions created = after.filter(filter.and(k -> !before.find(k.name(), k.argTypes(), filter).isPresent()));
+            UserFunctions dropped = before.filter(filter.and(k -> !after.find(k.name(), k.argTypes(), filter).isPresent()));
 
-            ImmutableList.Builder<Altered<Function>> altered = ImmutableList.builder();
+            ImmutableList.Builder<Altered<UserFunction>> altered = ImmutableList.builder();
             before.stream().filter(filter).forEach(functionBefore ->
             {
                 after.find(functionBefore.name(), functionBefore.argTypes(), filter).ifPresent(functionAfter ->
diff --git a/test/unit/org/apache/cassandra/SchemaLoader.java b/test/unit/org/apache/cassandra/SchemaLoader.java
index e2ef487ddf..d1dfa96af8 100644
--- a/test/unit/org/apache/cassandra/SchemaLoader.java
+++ b/test/unit/org/apache/cassandra/SchemaLoader.java
@@ -274,7 +274,7 @@ public class SchemaLoader
 
     public static void createKeyspace(String name, KeyspaceParams params, Tables tables, Types types)
     {
-        SchemaTestUtil.announceNewKeyspace(KeyspaceMetadata.create(name, params, tables, Views.none(), types, Functions.none()));
+        SchemaTestUtil.announceNewKeyspace(KeyspaceMetadata.create(name, params, tables, Views.none(), types, UserFunctions.none()));
     }
 
     public static void setupAuth(IRoleManager roleManager, IAuthenticator authenticator, IAuthorizer authorizer, INetworkAuthorizer networkAuthorizer)
diff --git a/test/unit/org/apache/cassandra/cql3/AssignmentTestableTest.java b/test/unit/org/apache/cassandra/cql3/AssignmentTestableTest.java
new file mode 100644
index 0000000000..46c00f1fa9
--- /dev/null
+++ b/test/unit/org/apache/cassandra/cql3/AssignmentTestableTest.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.cql3;
+
+import java.util.Arrays;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.marshal.AsciiType;
+import org.apache.cassandra.db.marshal.Int32Type;
+import org.apache.cassandra.db.marshal.LongType;
+import org.apache.cassandra.db.marshal.UTF8Type;
+
+public class AssignmentTestableTest
+{
+    @Test
+    public void testGetPreferredCompatibleType()
+    {
+        testGetPreferredCompatibleType(null);
+
+        testGetPreferredCompatibleType(AsciiType.instance, AsciiType.instance);
+        testGetPreferredCompatibleType(UTF8Type.instance, UTF8Type.instance);
+        testGetPreferredCompatibleType(AsciiType.instance, AsciiType.instance, AsciiType.instance);
+        testGetPreferredCompatibleType(UTF8Type.instance, UTF8Type.instance, UTF8Type.instance);
+        testGetPreferredCompatibleType(UTF8Type.instance, AsciiType.instance, UTF8Type.instance);
+        testGetPreferredCompatibleType(UTF8Type.instance, UTF8Type.instance, AsciiType.instance);
+
+        testGetPreferredCompatibleType(Int32Type.instance, Int32Type.instance);
+        testGetPreferredCompatibleType(LongType.instance, LongType.instance);
+        testGetPreferredCompatibleType(Int32Type.instance, Int32Type.instance, Int32Type.instance);
+        testGetPreferredCompatibleType(LongType.instance, LongType.instance, LongType.instance);
+        testGetPreferredCompatibleType(null, Int32Type.instance, LongType.instance); // not assignable
+        testGetPreferredCompatibleType(null, LongType.instance, Int32Type.instance); // not assignable
+    }
+
+    public void testGetPreferredCompatibleType(AbstractType<?> type, AbstractType<?>... types)
+    {
+        Assert.assertEquals(type, AssignmentTestable.getCompatibleTypeIfKnown(Arrays.asList(types)));
+    }
+}
diff --git a/test/unit/org/apache/cassandra/cql3/functions/FunctionFactoryTest.java b/test/unit/org/apache/cassandra/cql3/functions/FunctionFactoryTest.java
new file mode 100644
index 0000000000..3107d16e75
--- /dev/null
+++ b/test/unit/org/apache/cassandra/cql3/functions/FunctionFactoryTest.java
@@ -0,0 +1,414 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.cql3.functions;
+
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.util.Date;
+import java.util.List;
+import java.util.UUID;
+
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.cassandra.cql3.CQLTester;
+import org.apache.cassandra.cql3.Duration;
+import org.apache.cassandra.cql3.UntypedResultSet;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.marshal.UTF8Type;
+import org.apache.cassandra.transport.ProtocolVersion;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.TimeUUID;
+
+public class FunctionFactoryTest extends CQLTester
+{
+    /**
+     * A function that just returns its only argument without any changes.
+     * Calls to this function will try to infer the type of its argument, if missing, from the function's receiver.
+     */
+    private static final FunctionFactory IDENTITY = new FunctionFactory("identity", FunctionParameter.anyType(true))
+    {
+        @Override
+        protected NativeFunction doGetOrCreateFunction(List<AbstractType<?>> argTypes, AbstractType<?> receiverType)
+        {
+            return new NativeScalarFunction(name.name, argTypes.get(0), argTypes.get(0))
+            {
+                @Override
+                public ByteBuffer execute(ProtocolVersion protocol, List<ByteBuffer> parameters)
+                {
+                    return parameters.get(0);
+                }
+            };
+        }
+    };
+
+    /**
+     * A function that returns the string representation of its only argument.
+     * Calls to this function won't try to infer the type of its argument, if missing, from the function's receiver.
+     */
+    private static final FunctionFactory TO_STRING = new FunctionFactory("tostring", FunctionParameter.anyType(false))
+    {
+        @Override
+        protected NativeFunction doGetOrCreateFunction(List<AbstractType<?>> argTypes, AbstractType<?> receiverType)
+        {
+            return new NativeScalarFunction(name.name, UTF8Type.instance, argTypes.get(0))
+            {
+                @Override
+                public ByteBuffer execute(ProtocolVersion protocol, List<ByteBuffer> parameters)
+                {
+                    ByteBuffer value = parameters.get(0);
+                    if (value == null)
+                        return null;
+
+                    return UTF8Type.instance.decompose(argTypes.get(0).compose(value).toString());
+                }
+            };
+        }
+    };
+
+    private static final UUID uuid = UUID.fromString("62c3e96f-55cd-493b-8c8e-5a18883a1698");
+    private static final TimeUUID timeUUID = TimeUUID.fromString("00346642-2d2f-11ed-a261-0242ac120002");
+    private static final BigInteger bigint = new BigInteger("12345678901234567890");
+    private static final BigDecimal bigdecimal = new BigDecimal("1234567890.1234567890");
+    private static final Date date = new Date();
+    private static final Duration duration = Duration.newInstance(1, 2, 3);
+    private static final InetAddress inet = new InetSocketAddress(0).getAddress();
+    private static final ByteBuffer blob = ByteBufferUtil.hexToBytes("ABCDEF");
+
+    @BeforeClass
+    public static void beforeClass()
+    {
+        NativeFunctions.instance.add(IDENTITY);
+        NativeFunctions.instance.add(TO_STRING);
+    }
+
+    @Test
+    public void testInvalidNumberOfArguments() throws Throwable
+    {
+        createTable("CREATE TABLE %s (k int PRIMARY KEY)");
+        String msg = "Invalid number of arguments for function system.identity(any)";
+        assertInvalidMessage(msg, "SELECT identity() FROM %s");
+        assertInvalidMessage(msg, "SELECT identity(1, 2) FROM %s");
+        assertInvalidMessage(msg, "SELECT identity('1', '2', '3') FROM %s");
+    }
+
+    @Test
+    public void testUnknownFunction() throws Throwable
+    {
+        createTable("CREATE TABLE %s (k int PRIMARY KEY)");
+        assertInvalidMessage("Unknown function 'unknown'", "SELECT unknown() FROM %s");
+    }
+
+    @Test
+    public void testSimpleTypes() throws Throwable
+    {
+        createTable("CREATE TABLE %s (k int PRIMARY KEY, " +
+                    "tinyint tinyint, " +
+                    "smallint smallint, " +
+                    "int int, " +
+                    "bigint bigint, " +
+                    "float float, " +
+                    "double double, " +
+                    "varint varint, " +
+                    "decimal decimal, " +
+                    "text text, " +
+                    "ascii ascii, " +
+                    "boolean boolean, " +
+                    "date date, " +
+                    "timestamp timestamp, " +
+                    "duration duration, " +
+                    "uuid uuid, " +
+                    "timeuuid timeuuid," +
+                    "inet inet," +
+                    "blob blob)");
+
+        // Test with empty table
+        String select = "SELECT " +
+                        "identity(tinyint),  identity(smallint), identity(int), " +
+                        "identity(bigint), identity(float), identity(double), " +
+                        "identity(varint), identity(decimal), identity(text), " +
+                        "identity(ascii), identity(boolean), identity(date), " +
+                        "identity(timestamp), identity(duration), identity(uuid), " +
+                        "identity(timeuuid), identity(inet), identity(blob) " +
+                        "FROM %s";
+        UntypedResultSet rs = execute(select);
+        assertColumnNames(rs,
+                          "system.identity(tinyint)", "system.identity(smallint)", "system.identity(int)",
+                          "system.identity(bigint)", "system.identity(float)", "system.identity(double)",
+                          "system.identity(varint)", "system.identity(decimal)", "system.identity(text)",
+                          "system.identity(ascii)", "system.identity(boolean)", "system.identity(date)",
+                          "system.identity(timestamp)", "system.identity(duration)", "system.identity(uuid)",
+                          "system.identity(timeuuid)", "system.identity(inet)", "system.identity(blob)");
+        assertEmpty(rs);
+
+        // Test with not-empty table
+        Object[] row = row((byte) 1, (short) 1, 123, 123L, 1.23f, 1.23d, bigint, bigdecimal,
+                           "ábc", "abc", true, 1, date, duration, uuid, timeUUID, inet, blob);
+        execute("INSERT INTO %s (k, tinyint, smallint, int, bigint, float, double, varint, decimal, " +
+                "text, ascii, boolean, date, timestamp, duration, uuid, timeuuid, inet, blob) " +
+                "VALUES (1, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", row);
+        assertRows(execute(select), row);
+
+        // Test with bind markers
+        execute("INSERT INTO %s (k, tinyint, smallint, int, bigint, float, double, varint, decimal, " +
+                "text, ascii, boolean, date, timestamp, duration, uuid, timeuuid, inet, blob) " +
+                "VALUES (1, " +
+                "identity(?), identity(?), identity(?), identity(?), identity(?), identity(?), " +
+                "identity(?), identity(?), identity(?), identity(?), identity(?), identity(?), " +
+                "identity(?), identity(?), identity(?), identity(?), identity(?), identity(?))", row);
+        assertRows(execute(select), row);
+
+        // Test literals
+        testLiteral("(tinyint) 1", (byte) 1);
+        testLiteral("(smallint) 1", (short) 1);
+        testLiteral(123);
+        testLiteral(1234567890123L);
+        testLiteral(1.23);
+        testLiteral(1234567.1234567D);
+        testLiteral(bigint);
+        testLiteral(bigdecimal);
+        testLiteral("'abc'", "abc");
+        testLiteral("'ábc'", "ábc");
+        testLiteral(true);
+        testLiteral(false);
+        testLiteral("(timestamp) '1970-01-01 00:00:00.000+0000'", new Date(0));
+        testLiteral("(time) '00:00:00.000000'", 0L);
+        testLiteral(duration);
+        testLiteral(uuid);
+        testLiteral(timeUUID);
+        testLiteral("(inet) '0.0.0.0'", inet);
+        testLiteral("0x" + ByteBufferUtil.bytesToHex(blob), blob);
+    }
+
+    @Test
+    public void testSets() throws Throwable
+    {
+        createTable("CREATE TABLE %s (k int PRIMARY KEY, s set<int>, fs frozen<set<int>>)");
+
+        // Test with empty table
+        String select = "SELECT identity(s), identity(fs) FROM %s";
+        UntypedResultSet rs = execute(select);
+        assertColumnNames(rs, "system.identity(s)", "system.identity(fs)");
+        assertEmpty(rs);
+
+        // Test with not-empty table
+        execute("INSERT INTO %s (k, s, fs) VALUES (1, {1, 2}, {1, 2})");
+        execute("INSERT INTO %s (k, s, fs) VALUES (2, {1, 2, 3}, {1, 2, 3})");
+        execute("INSERT INTO %s (k, s, fs) VALUES (3, {2, 1}, {2, 1})");
+        assertRows(execute(select),
+                   row(set(1, 2), set(1, 2)),
+                   row(set(1, 2, 3), set(1, 2, 3)),
+                   row(set(2, 1), set(2, 1)));
+
+        // Test with bind markers
+        Object[] row = row(set(1, 2, 3), set(4, 5, 6));
+        execute("INSERT INTO %s (k, s, fs) VALUES (4, identity(?), identity(?))", row);
+        assertRows(execute("SELECT s, fs FROM %s WHERE k = 4"), row);
+
+        // Test literals
+        testLiteralFails("[]");
+        testLiteral("{1, 1234567890}", set(1, 1234567890));
+        testLiteral(String.format("{1, %s}", bigint), set(BigInteger.ONE, bigint));
+        testLiteral("{'abc'}", set("abc"));
+        testLiteral("{'ábc'}", set("ábc"));
+        testLiteral("{'abc', 'ábc'}", set("abc", "ábc"));
+        testLiteral("{'ábc', 'abc'}", set("ábc", "abc"));
+        testLiteral("{true}", set(true));
+        testLiteral("{false}", set(false));
+        testLiteral(String.format("{%s}", uuid), set(uuid));
+        testLiteral(String.format("{%s}", timeUUID), set(timeUUID));
+        testLiteral(String.format("{%s, %s}", uuid, timeUUID), set(uuid, timeUUID.asUUID()));
+    }
+
+    @Test
+    public void testLists() throws Throwable
+    {
+        createTable("CREATE TABLE %s (k int PRIMARY KEY, l list<int>, fl frozen<list<int>>)");
+
+        // Test with empty table
+        String select = "SELECT identity(l), identity(fl) FROM %s";
+        UntypedResultSet rs = execute(select);
+        assertColumnNames(rs, "system.identity(l)", "system.identity(fl)");
+        assertEmpty(rs);
+
+        // Test with not-empty table
+        execute("INSERT INTO %s (k, l, fl) VALUES (1, [1, 2], [1, 2])");
+        execute("INSERT INTO %s (k, l, fl) VALUES (2, [1, 2, 3], [1, 2, 3])");
+        execute("INSERT INTO %s (k, l, fl) VALUES (3, [2, 1], [2, 1])");
+        assertRows(execute(select),
+                   row(list(1, 2), list(1, 2)),
+                   row(list(1, 2, 3), list(1, 2, 3)),
+                   row(list(2, 1), list(2, 1)));
+
+        // Test with bind markers
+        Object[] row = row(list(1, 2, 3), list(4, 5, 6));
+        execute("INSERT INTO %s (k, l, fl) VALUES (4, identity(?), identity(?))", row);
+        assertRows(execute("SELECT l, fl FROM %s WHERE k = 4"), row);
+
+        // Test literals
+        testLiteralFails("[]");
+        testLiteral("[1, 1234567890]", list(1, 1234567890));
+        testLiteral(String.format("[1, %s]", bigint), list(BigInteger.ONE, bigint));
+        testLiteral("['abc']", list("abc"));
+        testLiteral("['ábc']", list("ábc"));
+        testLiteral("['abc', 'ábc']", list("abc", "ábc"));
+        testLiteral("['ábc', 'abc']", list("ábc", "abc"));
+        testLiteral("[true]", list(true));
+        testLiteral("[false]", list(false));
+        testLiteral(String.format("[%s]", uuid), list(uuid));
+        testLiteral(String.format("[%s]", timeUUID), list(timeUUID));
+        testLiteral(String.format("[%s, %s]", uuid, timeUUID), list(uuid, timeUUID.asUUID()));
+    }
+
+    @Test
+    public void testMaps() throws Throwable
+    {
+        createTable("CREATE TABLE %s (k int PRIMARY KEY, m map<int, int>, fm frozen<map<int, int>>)");
+
+        // Test with empty table
+        String select = "SELECT identity(m), identity(fm) FROM %s";
+        UntypedResultSet rs = execute(select);
+        assertColumnNames(rs, "system.identity(m)", "system.identity(fm)");
+        assertEmpty(rs);
+
+        // Test with not-empty table
+        execute("INSERT INTO %s (k, m, fm) VALUES (1, {1:10, 2:20}, {1:10, 2:20})");
+        execute("INSERT INTO %s (k, m, fm) VALUES (2, {1:10, 2:20, 3:30}, {1:10, 2:20, 3:30})");
+        execute("INSERT INTO %s (k, m, fm) VALUES (3, {2:20, 1:10}, {2:20, 1:10})");
+        assertRows(execute(select),
+                   row(map(1, 10, 2, 20), map(1, 10, 2, 20)),
+                   row(map(1, 10, 2, 20, 3, 30), map(1, 10, 2, 20, 3, 30)),
+                   row(map(1, 10, 2, 20), map(1, 10, 2, 20)));
+
+        // Test with bind markers
+        Object[] row = row(map(1, 10, 2, 20), map(3, 30, 4, 40));
+        execute("INSERT INTO %s (k, m, fm) VALUES (4, identity(?), identity(?))", row);
+        assertRows(execute("SELECT m, fm FROM %s WHERE k = 4"), row);
+
+        // Test literals
+        testLiteralFails("{}");
+        testLiteralFails("{1: 10, 2: 20}");
+        testLiteral("(map<int, int>) {1: 10, 2: 20}", map(1, 10, 2, 20));
+        testLiteral("(map<int, text>) {1: 'abc', 2: 'ábc'}", map(1, "abc", 2, "ábc"));
+        testLiteral("(map<text, int>) {'abc': 1, 'ábc': 2}", map("abc", 1, "ábc", 2));
+    }
+
+    @Test
+    public void testTuples() throws Throwable
+    {
+        createTable("CREATE TABLE %s (k int PRIMARY KEY, t tuple<int, text, boolean>)");
+
+        // Test with empty table
+        String select = "SELECT identity(t) FROM %s";
+        UntypedResultSet rs = execute(select);
+        assertColumnNames(rs, "system.identity(t)");
+        assertEmpty(rs);
+
+        // Test with not-empty table
+        execute("INSERT INTO %s (k, t) VALUES (1, (1, 'a', false))");
+        execute("INSERT INTO %s (k, t) VALUES (2, (2, 'b', true))");
+        execute("INSERT INTO %s (k, t) VALUES (3, (3, null, true))");
+        assertRows(execute(select),
+                   row(tuple(1, "a", false)),
+                   row(tuple(2, "b", true)),
+                   row(tuple(3, null, true)));
+
+        // Test with bind markers
+        Object[] row = row(tuple(4, "d", false));
+        execute("INSERT INTO %s (k, t) VALUES (4, identity(?))", row);
+        assertRows(execute("SELECT t FROM %s WHERE k = 4"), row);
+
+        // Test literals
+        testLiteralFails("(1)");
+        testLiteral("(tuple<int>) (1)", tuple(1));
+        testLiteral("(1, 'a')", tuple(1, "a"));
+        testLiteral("(1, 'a', false)", tuple(1, "a", false));
+    }
+
+    @Test
+    public void testUDTs() throws Throwable
+    {
+        String udt = createType("CREATE TYPE %s (x int)");
+        createTable("CREATE TABLE %s (k int PRIMARY KEY, u frozen<" + udt + ">, fu frozen<" + udt + ">)");
+
+        // Test with empty table
+        String select = "SELECT identity(u), identity(fu) FROM %s";
+        UntypedResultSet rs = execute(select);
+        assertColumnNames(rs, "system.identity(u)", "system.identity(fu)");
+        assertEmpty(rs);
+
+        // Test with not-empty table
+        execute("INSERT INTO %s (k, u, fu) VALUES (1, {x: 2}, null)");
+        execute("INSERT INTO %s (k, u, fu) VALUES (2, {x: 4}, {x: 6})");
+        execute("INSERT INTO %s (k, u, fu) VALUES (4, null, {x: 8})");
+        assertRows(execute(select),
+                   row(userType("x", 2), null),
+                   row(userType("x", 4), userType("x", 6)),
+                   row(null, userType("x", 8)));
+
+        // Test with bind markers
+        Object[] row = row(userType("x", 4), userType("x", 5));
+        execute("INSERT INTO %s (k, u, fu) VALUES (4, identity(?), identity(?))", row);
+        assertRows(execute("SELECT u, fu FROM %s WHERE k = 4"), row);
+
+        // Test literals
+        testLiteralFails("{}");
+        testLiteralFails("{x: 10}");
+        testLiteral('(' + udt + "){x: 10}", tuple(10));
+    }
+
+    @Test
+    public void testNestedCalls() throws Throwable
+    {
+        createTable("CREATE TABLE %s (k int PRIMARY KEY, v int, t text)");
+
+        // Test function that infers parameter type from receiver type
+        execute("INSERT INTO %s (k, v) VALUES (1, identity(identity(2)))");
+        assertRows(execute("SELECT v FROM %s WHERE k = 1"), row(2));
+        execute("INSERT INTO %s (k, v) VALUES (1, identity(identity((int) ?)))", 3);
+        assertRows(execute("SELECT v FROM %s WHERE k = 1"), row(3));
+        assertRows(execute("SELECT identity(identity(v)) FROM %s WHERE k = 1"), row(3));
+
+        // Test function that does not infer parameter type from receiver type
+        execute("INSERT INTO %s (k, t) VALUES (1, tostring(tostring(4)))");
+        assertRows(execute("SELECT t FROM %s WHERE k = 1"), row("4"));
+        execute("INSERT INTO %s (k, t) VALUES (1, tostring(tostring((int) ?)))", 5);
+        assertRows(execute("SELECT tostring(tostring(t)) FROM %s WHERE k = 1"), row("5"));
+    }
+
+    private void testLiteral(Object literal) throws Throwable
+    {
+        testLiteral(literal, literal);
+    }
+
+    private void testLiteral(Object functionArgs, Object expectedResult) throws Throwable
+    {
+        assertRows(execute(String.format("SELECT %s(%s) FROM %%s LIMIT 1", IDENTITY.name(), functionArgs)),
+                   row(expectedResult));
+    }
+
+    private void testLiteralFails(Object functionArgs) throws Throwable
+    {
+        assertInvalidMessage("Cannot infer type of argument " + functionArgs,
+                             String.format("SELECT %s(%s) FROM %%s", IDENTITY.name(), functionArgs));
+    }
+}
diff --git a/test/unit/org/apache/cassandra/cql3/validation/entities/JsonTest.java b/test/unit/org/apache/cassandra/cql3/validation/entities/JsonTest.java
index 810ee5aa83..3f5408fed4 100644
--- a/test/unit/org/apache/cassandra/cql3/validation/entities/JsonTest.java
+++ b/test/unit/org/apache/cassandra/cql3/validation/entities/JsonTest.java
@@ -46,7 +46,7 @@ import static org.junit.Assert.fail;
 
 public class JsonTest extends CQLTester
 {
-    // This method will be ran instead of the CQLTester#setUpClass
+    // This method will be run instead of the CQLTester#setUpClass
     @BeforeClass
     public static void setUpClass()
     {
@@ -247,7 +247,7 @@ public class JsonTest extends CQLTester
                 "durationval duration)");
 
         // fromJson() can only be used when the receiver type is known
-        assertInvalidMessage("fromJson() cannot be used in the selection clause", "SELECT fromJson(asciival) FROM %s", 0, 0);
+        assertInvalidMessage("fromJson() cannot be used in the selection clause", "SELECT fromJson(textval) FROM %s", 0, 0);
 
         String func1 = createFunction(KEYSPACE, "int", "CREATE FUNCTION %s (a int) CALLED ON NULL INPUT RETURNS text LANGUAGE java AS $$ return a.toString(); $$");
         createFunctionOverload(func1, "int", "CREATE FUNCTION %s (a text) CALLED ON NULL INPUT RETURNS text LANGUAGE java AS $$ return new String(a); $$");
@@ -745,13 +745,25 @@ public class JsonTest extends CQLTester
                 "udtval frozen<" + typeName + ">," +
                 "durationval duration)");
 
-        // toJson() can only be used in selections
-        assertInvalidMessage("toJson() may only be used within the selection clause",
-                "INSERT INTO %s (k, asciival) VALUES (?, toJson(?))", 0, 0);
-        assertInvalidMessage("toJson() may only be used within the selection clause",
-                "UPDATE %s SET asciival = toJson(?) WHERE k = ?", 0, 0);
-        assertInvalidMessage("toJson() may only be used within the selection clause",
-                "DELETE FROM %s WHERE k = fromJson(toJson(?))", 0);
+        // toJson() can be used out of the selection clause (with literals)
+        execute("INSERT INTO %s (k, textval) VALUES (?, toJson(1234))", 0);
+        assertRows(execute("SELECT textval FROM %s WHERE k = ?", 0), row("1234"));
+        assertRows(execute("SELECT textval FROM %s WHERE textval = toJson(1234) ALLOW FILTERING"), row("1234"));
+        execute("UPDATE %s SET textval = toJson(-1234) WHERE k = ?", 0);
+        assertRows(execute("SELECT textval FROM %s WHERE k = ?", 0), row("-1234"));
+        assertRows(execute("SELECT textval FROM %s WHERE textval = toJson(-1234) ALLOW FILTERING"), row("-1234"));
+        execute("DELETE FROM %s WHERE k = fromJson(toJson(0))");
+        assertEmpty(execute("SELECT textval FROM %s WHERE k = ?", 0));
+
+        // toJson() can be used out of the selection clause (with markers)
+        execute("INSERT INTO %s (k, textval) VALUES (?, toJson((int) ?))", 0, 123123);
+        assertRows(execute("SELECT textval FROM %s WHERE k = ?", 0), row("123123"));
+        assertRows(execute("SELECT textval FROM %s WHERE textval = toJson((int) ?) ALLOW FILTERING", 123123), row("123123"));
+        execute("UPDATE %s SET textval = toJson((int) ?) WHERE k = ?", -123123, 0);
+        assertRows(execute("SELECT textval FROM %s WHERE k = ?", 0), row("-123123"));
+        assertRows(execute("SELECT textval FROM %s WHERE textval = toJson((int) ?) ALLOW FILTERING", -123123), row("-123123"));
+        execute("DELETE FROM %s WHERE k = fromJson(toJson((int) ?))", 0);
+        assertEmpty(execute("SELECT textval FROM %s WHERE k = ?", 0));
 
         // ================ ascii ================
         execute("INSERT INTO %s (k, asciival) VALUES (?, ?)", 0, "ascii text");
diff --git a/test/unit/org/apache/cassandra/cql3/validation/entities/UFAuthTest.java b/test/unit/org/apache/cassandra/cql3/validation/entities/UFAuthTest.java
index 4a2d71fc89..f65f2f6b19 100644
--- a/test/unit/org/apache/cassandra/cql3/validation/entities/UFAuthTest.java
+++ b/test/unit/org/apache/cassandra/cql3/validation/entities/UFAuthTest.java
@@ -28,9 +28,9 @@ import org.junit.Test;
 
 import org.apache.cassandra.auth.*;
 import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.cql3.functions.UserFunction;
 import org.apache.cassandra.schema.Schema;
 import org.apache.cassandra.cql3.*;
-import org.apache.cassandra.cql3.functions.Function;
 import org.apache.cassandra.cql3.functions.FunctionName;
 import org.apache.cassandra.cql3.statements.BatchStatement;
 import org.apache.cassandra.cql3.statements.ModificationStatement;
@@ -635,7 +635,7 @@ public class UFAuthTest extends CQLTester
         // It is here to avoid having to duplicate the functionality of CqlParser
         // for transforming cql types into AbstractTypes
         FunctionName fn = parseFunctionName(functionName);
-        Collection<Function> functions = Schema.instance.getFunctions(fn);
+        Collection<UserFunction> functions = Schema.instance.getUserFunctions(fn);
         assertEquals(String.format("Expected a single function definition for %s, but found %s",
                                    functionName,
                                    functions.size()),
diff --git a/test/unit/org/apache/cassandra/cql3/validation/entities/UFJavaTest.java b/test/unit/org/apache/cassandra/cql3/validation/entities/UFJavaTest.java
index 47f1cbfede..ad59b4cc60 100644
--- a/test/unit/org/apache/cassandra/cql3/validation/entities/UFJavaTest.java
+++ b/test/unit/org/apache/cassandra/cql3/validation/entities/UFJavaTest.java
@@ -657,7 +657,7 @@ public class UFJavaTest extends CQLTester
                               "AS $$return " +
                               "     udt.getString(\"txt\");$$;",
                               fName1replace, type));
-        Assert.assertEquals(1, Schema.instance.getFunctions(parseFunctionName(fName1replace)).size());
+        Assert.assertEquals(1, Schema.instance.getUserFunctions(parseFunctionName(fName1replace)).size());
         execute(String.format("CREATE OR REPLACE FUNCTION %s( udt %s ) " +
                               "CALLED ON NULL INPUT " +
                               "RETURNS int " +
@@ -665,7 +665,7 @@ public class UFJavaTest extends CQLTester
                               "AS $$return " +
                               "     Integer.valueOf(udt.getInt(\"i\"));$$;",
                               fName2replace, type));
-        Assert.assertEquals(1, Schema.instance.getFunctions(parseFunctionName(fName2replace)).size());
+        Assert.assertEquals(1, Schema.instance.getUserFunctions(parseFunctionName(fName2replace)).size());
         execute(String.format("CREATE OR REPLACE FUNCTION %s( udt %s ) " +
                               "CALLED ON NULL INPUT " +
                               "RETURNS double " +
@@ -673,7 +673,7 @@ public class UFJavaTest extends CQLTester
                               "AS $$return " +
                               "     Double.valueOf(udt.getDouble(\"added\"));$$;",
                               fName3replace, type));
-        Assert.assertEquals(1, Schema.instance.getFunctions(parseFunctionName(fName3replace)).size());
+        Assert.assertEquals(1, Schema.instance.getUserFunctions(parseFunctionName(fName3replace)).size());
         execute(String.format("CREATE OR REPLACE FUNCTION %s( udt %s ) " +
                               "RETURNS NULL ON NULL INPUT " +
                               "RETURNS %s " +
@@ -681,7 +681,7 @@ public class UFJavaTest extends CQLTester
                               "AS $$return " +
                               "     udt;$$;",
                               fName4replace, type, type));
-        Assert.assertEquals(1, Schema.instance.getFunctions(parseFunctionName(fName4replace)).size());
+        Assert.assertEquals(1, Schema.instance.getUserFunctions(parseFunctionName(fName4replace)).size());
 
         assertRows(execute("SELECT " + fName1replace + "(udt) FROM %s WHERE key = 2"),
                    row("two"));
diff --git a/test/unit/org/apache/cassandra/cql3/validation/entities/UFTest.java b/test/unit/org/apache/cassandra/cql3/validation/entities/UFTest.java
index 6999bf6528..8cbf0d7fee 100644
--- a/test/unit/org/apache/cassandra/cql3/validation/entities/UFTest.java
+++ b/test/unit/org/apache/cassandra/cql3/validation/entities/UFTest.java
@@ -152,7 +152,7 @@ public class UFTest extends CQLTester
 
         FunctionName fSinName = parseFunctionName(fSin);
 
-        Assert.assertEquals(1, Schema.instance.getFunctions(parseFunctionName(fSin)).size());
+        Assert.assertEquals(1, Schema.instance.getUserFunctions(parseFunctionName(fSin)).size());
 
         assertRows(execute("SELECT function_name, language FROM system_schema.functions WHERE keyspace_name=?", KEYSPACE_PER_TEST),
                    row(fSinName.name, "java"));
@@ -161,7 +161,7 @@ public class UFTest extends CQLTester
 
         assertRows(execute("SELECT function_name, language FROM system_schema.functions WHERE keyspace_name=?", KEYSPACE_PER_TEST));
 
-        Assert.assertEquals(0, Schema.instance.getFunctions(fSinName).size());
+        Assert.assertEquals(0, Schema.instance.getUserFunctions(fSinName).size());
     }
 
     @Test
@@ -178,7 +178,7 @@ public class UFTest extends CQLTester
 
         FunctionName fSinName = parseFunctionName(fSin);
 
-        Assert.assertEquals(1, Schema.instance.getFunctions(parseFunctionName(fSin)).size());
+        Assert.assertEquals(1, Schema.instance.getUserFunctions(parseFunctionName(fSin)).size());
 
         // create a pairs of Select and Inserts. One statement in each pair uses the function so when we
         // drop it those statements should be removed from the cache in QueryProcessor. The other statements
@@ -216,7 +216,7 @@ public class UFTest extends CQLTester
                 "LANGUAGE java " +
                 "AS 'return Double.valueOf(Math.sin(input));'");
 
-        Assert.assertEquals(1, Schema.instance.getFunctions(fSinName).size());
+        Assert.assertEquals(1, Schema.instance.getUserFunctions(fSinName).size());
 
         preparedSelect1= QueryProcessor.instance.prepare(
                                          String.format("SELECT key, %s(d) FROM %s.%s", fSin, KEYSPACE, currentTable()),
@@ -331,7 +331,7 @@ public class UFTest extends CQLTester
                                         "RETURNS double " +
                                         "LANGUAGE javascript " +
                                         "AS 'input'");
-        Assert.assertEquals(1, Schema.instance.getFunctions(parseFunctionName(function)).size());
+        Assert.assertEquals(1, Schema.instance.getUserFunctions(parseFunctionName(function)).size());
 
         List<ResultMessage.Prepared> prepared = new ArrayList<>();
         // prepare statements which use the function to provide a DelayedValue
@@ -765,7 +765,7 @@ public class UFTest extends CQLTester
 
         FunctionName fNameName = parseFunctionName(fName);
 
-        Assert.assertEquals(1, Schema.instance.getFunctions(fNameName).size());
+        Assert.assertEquals(1, Schema.instance.getUserFunctions(fNameName).size());
 
         ResultMessage.Prepared prepared = QueryProcessor.instance.prepare(String.format("SELECT key, %s(udt) FROM %s.%s", fName, KEYSPACE, currentTable()),
                                                                  ClientState.forInternalCalls());
@@ -782,7 +782,7 @@ public class UFTest extends CQLTester
         Assert.assertNull(QueryProcessor.instance.getPrepared(prepared.statementId));
 
         // function stays
-        Assert.assertEquals(1, Schema.instance.getFunctions(fNameName).size());
+        Assert.assertEquals(1, Schema.instance.getUserFunctions(fNameName).size());
     }
 
     @Test
@@ -851,7 +851,7 @@ public class UFTest extends CQLTester
                                       "AS 'throw new RuntimeException();';");
 
         KeyspaceMetadata ksm = Schema.instance.getKeyspaceMetadata(KEYSPACE_PER_TEST);
-        UDFunction f = (UDFunction) ksm.functions.get(parseFunctionName(fName)).iterator().next();
+        UDFunction f = (UDFunction) ksm.userFunctions.get(parseFunctionName(fName)).iterator().next();
 
         UDFunction broken = UDFunction.createBrokenFunction(f.name(),
                                                             f.argNames(),
@@ -861,7 +861,7 @@ public class UFTest extends CQLTester
                                                             "java",
                                                             f.body(),
                                                             new InvalidRequestException("foo bar is broken"));
-        SchemaTestUtil.addOrUpdateKeyspace(ksm.withSwapped(ksm.functions.without(f.name(), f.argTypes()).with(broken)), false);
+        SchemaTestUtil.addOrUpdateKeyspace(ksm.withSwapped(ksm.userFunctions.without(f.name(), f.argTypes()).with(broken)), false);
 
         assertInvalidThrowMessage("foo bar is broken", InvalidRequestException.class,
                                   "SELECT key, " + fName + "(dval) FROM %s");
diff --git a/test/unit/org/apache/cassandra/cql3/validation/entities/WritetimeOrTTLTest.java b/test/unit/org/apache/cassandra/cql3/validation/entities/WritetimeOrTTLTest.java
index 4f956b1f8d..571196d444 100644
--- a/test/unit/org/apache/cassandra/cql3/validation/entities/WritetimeOrTTLTest.java
+++ b/test/unit/org/apache/cassandra/cql3/validation/entities/WritetimeOrTTLTest.java
@@ -27,8 +27,6 @@ import org.junit.Test;
 import org.apache.cassandra.cql3.CQLTester;
 import org.apache.cassandra.cql3.UntypedResultSet;
 import org.apache.cassandra.db.marshal.Int32Type;
-import org.apache.cassandra.db.marshal.ListType;
-import org.apache.cassandra.db.marshal.LongType;
 import org.apache.cassandra.exceptions.InvalidRequestException;
 
 import static java.lang.String.format;
@@ -1071,10 +1069,8 @@ public class WritetimeOrTTLTest extends CQLTester
         assertRows("SELECT max(maxwritetime(v)) FROM %s", row(10L));
 
         // Frozen collection
-        // Note that currently the tested system functions (min and max) return collections in their serialized format,
-        // this is something that we might want to improve in the future (CASSANDRA-17811).
-        assertRows("SELECT min(fs) FROM %s", row(ListType.getInstance(Int32Type.instance, false).decompose(Arrays.asList(1, 2, 3))));
-        assertRows("SELECT max(fs) FROM %s", row(ListType.getInstance(Int32Type.instance, false).decompose(Arrays.asList(10, 20, 30))));
+        assertRows("SELECT min(fs) FROM %s", row(set(1, 2, 3)));
+        assertRows("SELECT max(fs) FROM %s", row(set(10, 20, 30)));
         assertRows("SELECT writetime(fs) FROM %s", row(10L), row(1L));
         assertRows("SELECT min(writetime(fs)) FROM %s", row(1L));
         assertRows("SELECT max(writetime(fs)) FROM %s", row(10L));
@@ -1082,13 +1078,11 @@ public class WritetimeOrTTLTest extends CQLTester
         assertRows("SELECT max(maxwritetime(fs)) FROM %s", row(10L));
 
         // Multi-cell collection
-        // Note that currently the tested system functions (min and max) return collections in their serialized format,
-        // this is something that we might want to improve in the future (CASSANDRA-17811).
-        assertRows("SELECT min(s) FROM %s", row(ListType.getInstance(Int32Type.instance, false).decompose(Arrays.asList(1, 2, 3))));
-        assertRows("SELECT max(s) FROM %s", row(ListType.getInstance(Int32Type.instance, false).decompose(Arrays.asList(10, 20, 30))));
-        assertRows("SELECT writetime(s) FROM %s", row(Arrays.asList(10L, 20L, 20L)), row(Arrays.asList(1L, 2L, 2L)));
-        assertRows("SELECT min(writetime(s)) FROM %s", row(ListType.getInstance(LongType.instance, false).decompose(Arrays.asList(1L, 2L, 2L))));
-        assertRows("SELECT max(writetime(s)) FROM %s", row(ListType.getInstance(LongType.instance, false).decompose(Arrays.asList(10L, 20L, 20L))));
+        assertRows("SELECT min(s) FROM %s", row(set(1, 2, 3)));
+        assertRows("SELECT max(s) FROM %s", row(set(10, 20, 30)));
+        assertRows("SELECT writetime(s) FROM %s", row(list(10L, 20L, 20L)), row(list(1L, 2L, 2L)));
+        assertRows("SELECT min(writetime(s)) FROM %s", row(list(1L, 2L, 2L)));
+        assertRows("SELECT max(writetime(s)) FROM %s", row(list(10L, 20L, 20L)));
         assertRows("SELECT min(maxwritetime(s)) FROM %s", row(2L));
         assertRows("SELECT max(maxwritetime(s)) FROM %s", row(20L));
     }
diff --git a/test/unit/org/apache/cassandra/cql3/validation/operations/AggregationTest.java b/test/unit/org/apache/cassandra/cql3/validation/operations/AggregationTest.java
index c5ee079c55..ca0c5200a0 100644
--- a/test/unit/org/apache/cassandra/cql3/validation/operations/AggregationTest.java
+++ b/test/unit/org/apache/cassandra/cql3/validation/operations/AggregationTest.java
@@ -239,6 +239,114 @@ public class AggregationTest extends CQLTester
                    row(2L, 4L, 2L, 3L, 6L));
     }
 
+    @Test
+    public void testAggregateWithSets() throws Throwable
+    {
+        createTable("CREATE TABLE %s (k int PRIMARY KEY, s set<int>, fs frozen<set<int>>)");
+
+        // Test with empty table
+        String select = "SELECT count(s), count(fs), min(s), min(fs), max(s), max(fs) FROM %s";
+        UntypedResultSet rs = execute(select);
+        assertColumnNames(rs,
+                          "system.count(s)", "system.count(fs)",
+                          "system.min(s)", "system.min(fs)",
+                          "system.max(s)", "system.max(fs)");
+        assertRows(rs, row(0L, 0L, null, null, null, null));
+
+        // Test with not-empty table
+        execute("INSERT INTO %s (k, s, fs) VALUES (1, {1, 2}, {1, 2})");
+        execute("INSERT INTO %s (k, s, fs) VALUES (2, {1, 2, 3}, {1, 2, 3})");
+        execute("INSERT INTO %s (k, s, fs) VALUES (3, {2, 1}, {2, 1})");
+        assertRows(execute(select), row(3L, 3L, set(1, 2), set(1, 2), set(1, 2, 3), set(1, 2, 3)));
+    }
+
+    @Test
+    public void testAggregateWithLists() throws Throwable
+    {
+        createTable("CREATE TABLE %s (k int PRIMARY KEY, l list<int>, fl frozen<list<int>>)");
+
+        // Test with empty table
+        String select = "SELECT count(l), count(fl), min(l), min(fl), max(l), max(fl) FROM %s";
+        UntypedResultSet rs = execute(select);
+        assertColumnNames(rs,
+                          "system.count(l)", "system.count(fl)",
+                          "system.min(l)", "system.min(fl)",
+                          "system.max(l)", "system.max(fl)");
+        assertRows(rs, row(0L, 0L, null, null, null, null));
+
+        // Test with not-empty table
+        execute("INSERT INTO %s (k, l, fl) VALUES (1, [1, 2], [1, 2])");
+        execute("INSERT INTO %s (k, l, fl) VALUES (2, [1, 2, 3], [1, 2, 3])");
+        execute("INSERT INTO %s (k, l, fl) VALUES (3, [2, 1], [2, 1])");
+        assertRows(execute(select),
+                   row(3L, 3L, list(1, 2), list(1, 2), list(2, 1), list(2, 1)));
+    }
+
+    @Test
+    public void testAggregateWithMaps() throws Throwable
+    {
+        createTable("CREATE TABLE %s (k int PRIMARY KEY, m map<int, int>, fm frozen<map<int, int>>)");
+
+        // Test with empty table
+        String select = "SELECT count(m), count(fm), min(m), min(fm), max(m), max(fm) FROM %s";
+        UntypedResultSet rs = execute(select);
+        assertColumnNames(rs,
+                          "system.count(m)", "system.count(fm)",
+                          "system.min(m)", "system.min(fm)",
+                          "system.max(m)", "system.max(fm)");
+        assertRows(rs, row(0L, 0L, null, null, null, null));
+
+        // Test with not-empty table
+        execute("INSERT INTO %s (k, m, fm) VALUES (1, {1:10, 2:20}, {1:10, 2:20})");
+        execute("INSERT INTO %s (k, m, fm) VALUES (2, {1:10, 2:20, 3:30}, {1:10, 2:20, 3:30})");
+        execute("INSERT INTO %s (k, m, fm) VALUES (3, {2:20, 1:10}, {2:20, 1:10})");
+        assertRows(execute(select),
+                   row(3L, 3L,
+                       map(1, 10, 2, 20), map(1, 10, 2, 20),
+                       map(1, 10, 2, 20, 3, 30), map(1, 10, 2, 20, 3, 30)));
+    }
+
+    @Test
+    public void testAggregateWithTuples() throws Throwable
+    {
+        createTable("CREATE TABLE %s (k int PRIMARY KEY, t tuple<int, text, boolean>)");
+
+        // Test with empty table
+        String select = "SELECT count(t), min(t), max(t) FROM %s";
+        UntypedResultSet rs = execute(select);
+        assertColumnNames(rs, "system.count(t)", "system.min(t)", "system.max(t)");
+        assertRows(rs, row(0L, null, null));
+
+        // Test with not-empty table
+        execute("INSERT INTO %s (k, t) VALUES (1, (1, 'a', false))");
+        execute("INSERT INTO %s (k, t) VALUES (2, (2, 'b', true))");
+        execute("INSERT INTO %s (k, t) VALUES (3, (3, null, true))");
+        assertRows(execute(select), row(3L, tuple(1, "a", false), tuple(3, null, true)));
+    }
+
+    @Test
+    public void testAggregateWithUDTs() throws Throwable
+    {
+        String udt = createType("CREATE TYPE %s (x int)");
+        createTable("CREATE TABLE %s (k int PRIMARY KEY, u frozen<" + udt + ">, fu frozen<" + udt + ">)");
+
+        // Test with empty table
+        String select = "SELECT count(u), count(fu), min(u), min(fu), max(u), max(fu) FROM %s";
+        UntypedResultSet rs = execute(select);
+        assertColumnNames(rs,
+                          "system.count(u)", "system.count(fu)",
+                          "system.min(u)", "system.min(fu)",
+                          "system.max(u)", "system.max(fu)");
+        assertRows(rs, row(0L, 0L, null, null, null, null));
+
+        // Test with not-empty table
+        execute("INSERT INTO %s (k, u, fu) VALUES (1, {x: 2}, null)");
+        execute("INSERT INTO %s (k, u, fu) VALUES (2, {x: 4}, {x: 6})");
+        execute("INSERT INTO %s (k, u, fu) VALUES (3, null, {x: 8})");
+        assertRows(execute(select),
+                   row(2L, 2L, userType("x", 2), userType("x", 6), userType("x", 4), userType("x", 8)));
+    }
+
     @Test
     public void testAggregateWithUdtFields() throws Throwable
     {
@@ -258,8 +366,8 @@ public class AggregationTest extends CQLTester
         assertRows(execute("SELECT count(b.x), max(b.x) as max, b.x, c.x as first FROM %s"),
                    row(3L, 8, 2, null));
 
-        assertInvalidMessage("Invalid field selection: system.max(b) of type blob is not a user type",
-                             "SELECT max(b).x as max FROM %s");
+        assertRows(execute("SELECT count(b), min(b).x, max(b).x, count(c), min(c).x, max(c).x FROM %s"),
+                   row(3L, 2, 8, 2L, 6, 12));
     }
 
     @Test


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org