You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ad...@apache.org on 2022/11/25 14:11:01 UTC

[cassandra] branch trunk updated: Add CQL scalar functions for collection aggregation

This is an automated email from the ASF dual-hosted git repository.

adelapena pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new b7c7972a51 Add CQL scalar functions for collection aggregation
b7c7972a51 is described below

commit b7c7972a51ab6be6e5f410d2b12c770f5b7ebc98
Author: Andrés de la Peña <a....@gmail.com>
AuthorDate: Wed Nov 16 12:23:43 2022 +0000

    Add CQL scalar functions for collection aggregation
    
    patch by Andrés de la Peña; reviewed by Benjamin Lerer and Berenguer Blasi for CASSANDRA-18060
---
 CHANGES.txt                                        |   1 +
 NEWS.txt                                           |   3 +
 .../cassandra/examples/CQL/avg_with_cast.cql       |   1 +
 .../cassandra/examples/CQL/sum_with_cast.cql       |   1 +
 doc/modules/cassandra/pages/cql/functions.adoc     |  51 +-
 .../cassandra/cql3/functions/AggregateFcts.java    |  39 ++
 .../cassandra/cql3/functions/CollectionFcts.java   | 370 +++++++++++++
 .../cql3/functions/FunctionParameter.java          | 129 ++++-
 .../cassandra/cql3/functions/NativeFunctions.java  |   1 +
 .../cassandra/db/marshal/CollectionType.java       |   8 +
 .../org/apache/cassandra/db/marshal/ListType.java  |   7 +
 .../org/apache/cassandra/db/marshal/MapType.java   |   7 +
 .../org/apache/cassandra/db/marshal/SetType.java   |   7 +
 .../serializers/CollectionSerializer.java          |  23 +
 .../cassandra/serializers/MapSerializer.java       |   7 +
 .../cql3/functions/CollectionFctsTest.java         | 595 +++++++++++++++++++++
 16 files changed, 1246 insertions(+), 4 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 166b5b4c80..69c57eeb9a 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.2
+ * Add CQL scalar functions for collection aggregation (CASSANDRA-18060)
  * Make cassandra.replayList property for CommitLogReplayer possible to react on keyspaces only (CASSANDRA-18044)
  * Add Mathematical functions (CASSANDRA-17221)
  * Make incremental backup configurable per table (CASSANDRA-15402)
diff --git a/NEWS.txt b/NEWS.txt
index 2238c43f36..f8eefc986b 100644
--- a/NEWS.txt
+++ b/NEWS.txt
@@ -107,6 +107,9 @@ New features
       is set to true and table property is set to false, incremental backups for that specific table will not be done.
       When 'incremental_backups' in casandra.yaml is set to false, the newly added table property does not have any effect.
       Both properties have to be set to true (cassandra.yaml and table property) in order to make incremental backups.
+    - Added new CQL native scalar functions for collections. The new functions are mostly analogous to the existing
+      aggregation functions, but they operate on the elements of collection columns. The new functions are `map_keys`,
+      `map_values`, `collection_count`, `collection_min`, `collection_max`, `collection_sum` and `collection_avg`.
 
 Upgrading
 ---------
diff --git a/doc/modules/cassandra/examples/CQL/avg_with_cast.cql b/doc/modules/cassandra/examples/CQL/avg_with_cast.cql
new file mode 100644
index 0000000000..95839b4afb
--- /dev/null
+++ b/doc/modules/cassandra/examples/CQL/avg_with_cast.cql
@@ -0,0 +1 @@
+SELECT AVG (CAST (players AS FLOAT)) FROM plays;
diff --git a/doc/modules/cassandra/examples/CQL/sum_with_cast.cql b/doc/modules/cassandra/examples/CQL/sum_with_cast.cql
new file mode 100644
index 0000000000..3261df1cfd
--- /dev/null
+++ b/doc/modules/cassandra/examples/CQL/sum_with_cast.cql
@@ -0,0 +1 @@
+SELECT SUM (CAST (players AS VARINT)) FROM plays;
diff --git a/doc/modules/cassandra/pages/cql/functions.adoc b/doc/modules/cassandra/pages/cql/functions.adoc
index 39fc9860a4..e01ef329c9 100644
--- a/doc/modules/cassandra/pages/cql/functions.adoc
+++ b/doc/modules/cassandra/pages/cql/functions.adoc
@@ -238,7 +238,7 @@ For every xref:cql/types.adoc#native-types[type] supported by CQL, the function
 Conversely, the function `blobAsType` takes a 64-bit `blob` argument and converts it to a `bigint` value. 
 For example, `bigintAsBlob(3)` returns `0x0000000000000003` and `blobAsBigint(0x0000000000000003)` returns `3`.
 
-==== Math Functions
+===== Math Functions
 
 Cql provides the following math functions: `abs`, `exp`, `log`, `log10`, and `round`.
 The return type for these functions is always the same as the input type.
@@ -258,6 +258,29 @@ The return type for these functions is always the same as the input type.
 |`round` | Rounds the input to the nearest whole number using rounding mode `HALF_UP`.
 |===
 
+===== Collection functions
+
+A number of functions are provided to operate on collection columns.
+
+[cols=",,",options="header",]
+|===
+|Function name |Input type |Description
+
+| `map_keys` | `map` | Gets the keys of the `map` argument, returned as a `set`.
+
+| `map_values` | `map` | Gets the values of the `map` argument, returned as a `list`.
+
+| `collection_count` | `map`, `set` or `list` | Gets the number of elements in the collection argument.
+
+| `collection_min` | `set` or `list` | Gets the minimum element in the collection argument.
+
+| `collection_max` | `set` or `list` | Gets the maximum element in the collection argument.
+
+| `collection_sum` | numeric `set` or `list` | Computes the sum of the elements in the collection argument. The returned value is of the same type as the input collection elements, so there is a risk of overflowing the data type if the sum of the values exceeds the maximum value that the type can represent.
+
+| `collection_avg` | numeric `set` or `list` | Computes the average of the elements in the collection argument. The average of an empty collection returns zero. The returned value is of the same type as the input collection elements, which might include rounding and truncations. For example `collection_avg([1, 2])` returns `1` instead of `1.5`.
+|===
+
 [[user-defined-scalar-functions]]
 ==== User-defined functions
 
@@ -427,7 +450,9 @@ include::example$CQL/min_max.cql[]
 
 ===== Sum
 
-The `sum` function sums up all the values returned by a query for a given column. 
+The `sum` function sums up all the values returned by a query for a given column.
+
+The returned value is of the same type as the input collection elements, so there is a risk of overflowing if the sum of the values exceeds the maximum value that the type can represent.
 
 For example:
 
@@ -436,9 +461,18 @@ For example:
 include::example$CQL/sum.cql[]
 ----
 
+The returned value is of the same type as the input values, so there is a risk of overflowing the type if the sum of the
+values exceeds the maximum value that the type can represent. You can use type casting to cast the input values as a
+type large enough to contain the type. For example:
+
+[source,cql]
+----
+include::example$CQL/sum_with_cast.cql[]
+----
+
 ===== Avg
 
-The `avg` function computes the average of all the values returned by a query for a given column. 
+The `avg` function computes the average of all the values returned by a query for a given column.
 
 For example:
 
@@ -447,6 +481,17 @@ For example:
 include::example$CQL/avg.cql[]
 ----
 
+The average of an empty collection returns zero.
+
+The returned value is of the same type as the input values, which might include rounding and truncations.
+For example `collection_avg([1, 2])` returns `1` instead of `1.5`.
+You can use type casting to cast to a type with the desired decimal precision. For example:
+
+[source,cql]
+----
+include::example$CQL/avg_with_cast.cql[]
+----
+
 [[user-defined-aggregates-functions]]
 ==== User-Defined Aggregates (UDAs)
 
diff --git a/src/java/org/apache/cassandra/cql3/functions/AggregateFcts.java b/src/java/org/apache/cassandra/cql3/functions/AggregateFcts.java
index 77243c622e..b3432996e6 100644
--- a/src/java/org/apache/cassandra/cql3/functions/AggregateFcts.java
+++ b/src/java/org/apache/cassandra/cql3/functions/AggregateFcts.java
@@ -160,6 +160,8 @@ public abstract class AggregateFcts
 
     /**
      * The AVG function for decimal values.
+     * </p>
+     * The average of an empty value set returns zero.
      */
     public static final NativeAggregateFunction avgFunctionForDecimal =
             new NativeAggregateFunction("avg", DecimalType.instance, DecimalType.instance)
@@ -239,6 +241,9 @@ public abstract class AggregateFcts
 
     /**
      * The AVG function for varint values.
+     * </p>
+     * The average of an empty value set returns zero. The returned value is of the same type as the input values, 
+     * so the returned average won't have a decimal part.
      */
     public static final NativeAggregateFunction avgFunctionForVarint =
             new NativeAggregateFunction("avg", IntegerType.instance, IntegerType.instance)
@@ -282,6 +287,9 @@ public abstract class AggregateFcts
 
     /**
      * The SUM function for byte values (tinyint).
+     * </p>
+     * The returned value is of the same type as the input values, so there is a risk of overflow if the sum of the
+     * values exceeds the maximum value that the type can represent.
      */
     public static final NativeAggregateFunction sumFunctionForByte =
             new NativeAggregateFunction("sum", ByteType.instance, ByteType.instance)
@@ -318,6 +326,9 @@ public abstract class AggregateFcts
 
     /**
      * AVG function for byte values (tinyint).
+     * </p>
+     * The average of an empty value set returns zero. The returned value is of the same type as the input values, 
+     * so the returned average won't have a decimal part.
      */
     public static final NativeAggregateFunction avgFunctionForByte =
             new NativeAggregateFunction("avg", ByteType.instance, ByteType.instance)
@@ -336,6 +347,9 @@ public abstract class AggregateFcts
 
     /**
      * The SUM function for short values (smallint).
+     * </p>
+     * The returned value is of the same type as the input values, so there is a risk of overflow if the sum of the
+     * values exceeds the maximum value that the type can represent.
      */
     public static final NativeAggregateFunction sumFunctionForShort =
             new NativeAggregateFunction("sum", ShortType.instance, ShortType.instance)
@@ -372,6 +386,9 @@ public abstract class AggregateFcts
 
     /**
      * AVG function for for short values (smallint).
+     * </p>
+     * The average of an empty value set returns zero. The returned value is of the same type as the input values, 
+     * so the returned average won't have a decimal part.
      */
     public static final NativeAggregateFunction avgFunctionForShort =
             new NativeAggregateFunction("avg", ShortType.instance, ShortType.instance)
@@ -390,6 +407,9 @@ public abstract class AggregateFcts
 
     /**
      * The SUM function for int32 values.
+     * </p>
+     * The returned value is of the same type as the input values, so there is a risk of overflow if the sum of the
+     * values exceeds the maximum value that the type can represent.
      */
     public static final NativeAggregateFunction sumFunctionForInt32 =
             new NativeAggregateFunction("sum", Int32Type.instance, Int32Type.instance)
@@ -426,6 +446,9 @@ public abstract class AggregateFcts
 
     /**
      * AVG function for int32 values.
+     * </p>
+     * The average of an empty value set returns zero. The returned value is of the same type as the input values, 
+     * so the returned average won't have a decimal part.
      */
     public static final NativeAggregateFunction avgFunctionForInt32 =
             new NativeAggregateFunction("avg", Int32Type.instance, Int32Type.instance)
@@ -444,6 +467,9 @@ public abstract class AggregateFcts
 
     /**
      * The SUM function for long values.
+     * </p>
+     * The returned value is of the same type as the input values, so there is a risk of overflow if the sum of the
+     * values exceeds the maximum value that the type can represent.
      */
     public static final NativeAggregateFunction sumFunctionForLong =
             new NativeAggregateFunction("sum", LongType.instance, LongType.instance)
@@ -456,6 +482,9 @@ public abstract class AggregateFcts
 
     /**
      * AVG function for long values.
+     * </p>
+     * The average of an empty value set returns zero. The returned value is of the same type as the input values, 
+     * so the returned average won't have a decimal part.
      */
     public static final NativeAggregateFunction avgFunctionForLong =
             new NativeAggregateFunction("avg", LongType.instance, LongType.instance)
@@ -474,6 +503,9 @@ public abstract class AggregateFcts
 
     /**
      * The SUM function for float values.
+     * </p>
+     * The returned value is of the same type as the input values, so there is a risk of overflow if the sum of the
+     * values exceeds the maximum value that the type can represent.
      */
     public static final NativeAggregateFunction sumFunctionForFloat =
             new NativeAggregateFunction("sum", FloatType.instance, FloatType.instance)
@@ -492,6 +524,8 @@ public abstract class AggregateFcts
 
     /**
      * AVG function for float values.
+     * </p>
+     * The average of an empty value set returns zero.
      */
     public static final NativeAggregateFunction avgFunctionForFloat =
             new NativeAggregateFunction("avg", FloatType.instance, FloatType.instance)
@@ -510,6 +544,9 @@ public abstract class AggregateFcts
 
     /**
      * The SUM function for double values.
+     * </p>
+     * The returned value is of the same type as the input values, so there is a risk of overflow if the sum of the
+     * values exceeds the maximum value that the type can represent.
      */
     public static final NativeAggregateFunction sumFunctionForDouble =
             new NativeAggregateFunction("sum", DoubleType.instance, DoubleType.instance)
@@ -670,6 +707,8 @@ public abstract class AggregateFcts
 
     /**
      * AVG function for double values.
+     * </p>
+     * The average of an empty value set returns zero.
      */
     public static final NativeAggregateFunction avgFunctionForDouble =
             new NativeAggregateFunction("avg", DoubleType.instance, DoubleType.instance)
diff --git a/src/java/org/apache/cassandra/cql3/functions/CollectionFcts.java b/src/java/org/apache/cassandra/cql3/functions/CollectionFcts.java
new file mode 100644
index 0000000000..54a95647ce
--- /dev/null
+++ b/src/java/org/apache/cassandra/cql3/functions/CollectionFcts.java
@@ -0,0 +1,370 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.cql3.functions;
+
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import com.google.common.collect.ImmutableList;
+
+import org.apache.cassandra.cql3.CQL3Type;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.marshal.CollectionType;
+import org.apache.cassandra.db.marshal.Int32Type;
+import org.apache.cassandra.db.marshal.ListType;
+import org.apache.cassandra.db.marshal.MapType;
+import org.apache.cassandra.db.marshal.SetType;
+import org.apache.cassandra.transport.ProtocolVersion;
+
+/**
+ * Native CQL functions for collections (sets, list and maps).
+ * <p>
+ * All the functions provided here are {@link NativeScalarFunction}, and they are meant to be applied to single
+ * collection values to perform some kind of aggregation with the elements of the collection argument. When possible,
+ * the implementation of these aggregation functions is based on the accross-rows aggregation functions available on
+ * {@link AggregateFcts}, so both across-rows and within-collection aggregations have the same behaviour.
+ */
+public class CollectionFcts
+{
+    public static void addFunctionsTo(NativeFunctions functions)
+    {
+        functions.add(new FunctionFactory("map_keys", FunctionParameter.anyMap())
+        {
+            @Override
+            protected NativeFunction doGetOrCreateFunction(List<AbstractType<?>> argTypes, AbstractType<?> receiverType)
+            {
+                return makeMapKeysFunction(name.name, (MapType<?, ?>) argTypes.get(0));
+            }
+        });
+
+        functions.add(new FunctionFactory("map_values", FunctionParameter.anyMap())
+        {
+            @Override
+            protected NativeFunction doGetOrCreateFunction(List<AbstractType<?>> argTypes, AbstractType<?> receiverType)
+            {
+                return makeMapValuesFunction(name.name, (MapType<?, ?>) argTypes.get(0));
+            }
+        });
+
+        functions.add(new FunctionFactory("collection_count", FunctionParameter.anyCollection())
+        {
+            @Override
+            protected NativeFunction doGetOrCreateFunction(List<AbstractType<?>> argTypes, AbstractType<?> receiverType)
+            {
+                return makeCollectionCountFunction(name.name, (CollectionType<?>) argTypes.get(0));
+            }
+        });
+
+        functions.add(new FunctionFactory("collection_min", FunctionParameter.setOrList())
+        {
+            @Override
+            protected NativeFunction doGetOrCreateFunction(List<AbstractType<?>> argTypes, AbstractType<?> receiverType)
+            {
+                return makeCollectionMinFunction(name.name, (CollectionType<?>) argTypes.get(0));
+            }
+        });
+
+        functions.add(new FunctionFactory("collection_max", FunctionParameter.setOrList())
+        {
+            @Override
+            protected NativeFunction doGetOrCreateFunction(List<AbstractType<?>> argTypes, AbstractType<?> receiverType)
+            {
+                return makeCollectionMaxFunction(name.name, (CollectionType<?>) argTypes.get(0));
+            }
+        });
+
+        functions.add(new FunctionFactory("collection_sum", FunctionParameter.numericSetOrList())
+        {
+            @Override
+            protected NativeFunction doGetOrCreateFunction(List<AbstractType<?>> argTypes, AbstractType<?> receiverType)
+            {
+                return makeCollectionSumFunction(name.name, (CollectionType<?>) argTypes.get(0));
+            }
+        });
+
+        functions.add(new FunctionFactory("collection_avg", FunctionParameter.numericSetOrList())
+        {
+            @Override
+            protected NativeFunction doGetOrCreateFunction(List<AbstractType<?>> argTypes, AbstractType<?> receiverType)
+            {
+                return makeCollectionAvgFunction(name.name, (CollectionType<?>) argTypes.get(0));
+            }
+        });
+    }
+
+    /**
+     * Returns a native scalar function for getting the keys of a map column, as a set.
+     *
+     * @param name      the name of the function
+     * @param inputType the type of the map argument by the returned function
+     * @param <K>       the class of the map argument keys
+     * @param <V>       the class of the map argument values
+     * @return a function returning a serialized set containing the keys of the map passed as argument
+     */
+    private static <K, V> NativeScalarFunction makeMapKeysFunction(String name, MapType<K, V> inputType)
+    {
+        SetType<K> outputType = SetType.getInstance(inputType.getKeysType(), false);
+
+        return new NativeScalarFunction(name, outputType, inputType)
+        {
+            @Override
+            public ByteBuffer execute(ProtocolVersion protocolVersion, List<ByteBuffer> parameters)
+            {
+                ByteBuffer value = parameters.get(0);
+                if (value == null)
+                    return null;
+
+                Map<K, V> map = inputType.compose(value);
+                Set<K> keys = map.keySet();
+                return outputType.decompose(keys);
+            }
+        };
+    }
+
+    /**
+     * Returns a native scalar function for getting the values of a map column, as a list.
+     *
+     * @param name      the name of the function
+     * @param inputType the type of the map argument accepted by the returned function
+     * @param <K>       the class of the map argument keys
+     * @param <V>       the class of the map argument values
+     * @return a function returning a serialized list containing the values of the map passed as argument
+     */
+    private static <K, V> NativeScalarFunction makeMapValuesFunction(String name, MapType<K, V> inputType)
+    {
+        ListType<V> outputType = ListType.getInstance(inputType.getValuesType(), false);
+
+        return new NativeScalarFunction(name, outputType, inputType)
+        {
+            @Override
+            public ByteBuffer execute(ProtocolVersion protocolVersion, List<ByteBuffer> parameters)
+            {
+                ByteBuffer value = parameters.get(0);
+                if (value == null)
+                    return null;
+
+                Map<K, V> map = inputType.compose(value);
+                List<V> values = ImmutableList.copyOf(map.values());
+                return outputType.decompose(values);
+            }
+        };
+    }
+
+    /**
+     * Returns a native scalar function for getting the number of elements in a collection.
+     *
+     * @param name      the name of the function
+     * @param inputType the type of the collection argument accepted by the returned function
+     * @param <T>       the type of the elements of the collection argument
+     * @return a function returning the number of elements in the collection passed as argument, as a 32-bit integer
+     */
+    private static <T> NativeScalarFunction makeCollectionCountFunction(String name, CollectionType<T> inputType)
+    {
+        return new NativeScalarFunction(name, Int32Type.instance, inputType)
+        {
+            @Override
+            public ByteBuffer execute(ProtocolVersion protocolVersion, List<ByteBuffer> parameters)
+            {
+                ByteBuffer value = parameters.get(0);
+                if (value == null)
+                    return null;
+
+                int size = inputType.size(value);
+                return Int32Type.instance.decompose(size);
+            }
+        };
+    }
+
+    /**
+     * Returns a native scalar function for getting the min element in a collection.
+     *
+     * @param name      the name of the function
+     * @param inputType the type of the collection argument accepted by the returned function
+     * @param <T>       the type of the elements of the collection argument
+     * @return a function returning the min element in the collection passed as argument
+     */
+    private static <T> NativeScalarFunction makeCollectionMinFunction(String name, CollectionType<T> inputType)
+    {
+        AbstractType<?> elementsType = elementsType(inputType);
+        NativeAggregateFunction function = elementsType.isCounter()
+                                           ? AggregateFcts.minFunctionForCounter
+                                           : AggregateFcts.makeMinFunction(elementsType);
+        return new CollectionAggregationFunction(name, inputType, function);
+    }
+
+    /**
+     * Returns a native scalar function for getting the max element in a collection.
+     *
+     * @param name      the name of the function
+     * @param inputType the type of the collection argument accepted by the returned function
+     * @param <T>       the type of the elements of the collection argument
+     * @return a function returning the max element in the collection passed as argument
+     */
+    private static <T> NativeScalarFunction makeCollectionMaxFunction(String name, CollectionType<T> inputType)
+    {
+        AbstractType<?> elementsType = elementsType(inputType);
+        NativeAggregateFunction function = elementsType.isCounter()
+                                           ? AggregateFcts.maxFunctionForCounter
+                                           : AggregateFcts.makeMaxFunction(elementsType);
+        return new CollectionAggregationFunction(name, inputType, function);
+    }
+
+    /**
+     * Returns a native scalar function for getting the sum of the elements in a numeric collection.
+     * </p>
+     * The value returned by the function is of the same type as elements of its input collection, so there is a risk
+     * of overflow if the sum of the values exceeds the maximum value that the type can represent.
+     *
+     * @param name      the name of the function
+     * @param inputType the type of the collection argument accepted by the returned function
+     * @param <T>       the type of the elements of the collection argument
+     * @return a function returning the sum of the elements in the collection passed as argument
+     */
+    private static <T> NativeScalarFunction makeCollectionSumFunction(String name, CollectionType<T> inputType)
+    {
+        CQL3Type elementsType = elementsType(inputType).asCQL3Type();
+        NativeAggregateFunction function = getSumFunction((CQL3Type.Native) elementsType);
+        return new CollectionAggregationFunction(name, inputType, function);
+    }
+
+    private static NativeAggregateFunction getSumFunction(CQL3Type.Native type)
+    {
+        switch (type)
+        {
+            case TINYINT:
+                return AggregateFcts.sumFunctionForByte;
+            case SMALLINT:
+                return AggregateFcts.sumFunctionForShort;
+            case INT:
+                return AggregateFcts.sumFunctionForInt32;
+            case BIGINT:
+                return AggregateFcts.sumFunctionForLong;
+            case FLOAT:
+                return AggregateFcts.sumFunctionForFloat;
+            case DOUBLE:
+                return AggregateFcts.sumFunctionForDouble;
+            case VARINT:
+                return AggregateFcts.sumFunctionForVarint;
+            case DECIMAL:
+                return AggregateFcts.sumFunctionForDecimal;
+            default:
+                throw new AssertionError("Expected numeric collection but found " + type);
+        }
+    }
+
+    /**
+     * Returns a native scalar function for getting the average of the elements in a numeric collection.
+     * </p>
+     * The average of an empty collection returns zero. The value returned by the function is of the same type as the
+     * elements of its input collection, so if those don't have a decimal part then the returned average won't have a
+     * decimal part either.
+     *
+     * @param name      the name of the function
+     * @param inputType the type of the collection argument accepted by the returned function
+     * @param <T>       the type of the elements of the collection argument
+     * @return a function returning the average value of the elements in the collection passed as argument
+     */
+    private static <T> NativeScalarFunction makeCollectionAvgFunction(String name, CollectionType<T> inputType)
+    {
+        CQL3Type elementsType = elementsType(inputType).asCQL3Type();
+        NativeAggregateFunction function = getAvgFunction((CQL3Type.Native) elementsType);
+        return new CollectionAggregationFunction(name, inputType, function);
+    }
+
+    private static NativeAggregateFunction getAvgFunction(CQL3Type.Native type)
+    {
+        switch (type)
+        {
+            case TINYINT:
+                return AggregateFcts.avgFunctionForByte;
+            case SMALLINT:
+                return AggregateFcts.avgFunctionForShort;
+            case INT:
+                return AggregateFcts.avgFunctionForInt32;
+            case BIGINT:
+                return AggregateFcts.avgFunctionForLong;
+            case FLOAT:
+                return AggregateFcts.avgFunctionForFloat;
+            case DOUBLE:
+                return AggregateFcts.avgFunctionForDouble;
+            case VARINT:
+                return AggregateFcts.avgFunctionForVarint;
+            case DECIMAL:
+                return AggregateFcts.avgFunctionForDecimal;
+            default:
+                throw new AssertionError("Expected numeric collection but found " + type);
+        }
+    }
+
+    /**
+     * @return the type of the elements of the specified collection type.
+     */
+    private static AbstractType<?> elementsType(CollectionType<?> type)
+    {
+        if (type.kind == CollectionType.Kind.LIST)
+        {
+            return ((ListType<?>) type).getElementsType();
+        }
+
+        if (type.kind == CollectionType.Kind.SET)
+        {
+            return ((SetType<?>) type).getElementsType();
+        }
+
+        throw new AssertionError("Cannot get the element type of: " + type);
+    }
+
+    /**
+     * A {@link NativeScalarFunction} for aggregating the elements of a collection according to the aggregator of
+     * a certain {@link NativeAggregateFunction}.
+     * <p>
+     * {@link NativeAggregateFunction} is meant to be used for aggregating values accross rows, but here we use that
+     * function to aggregate the elements of a single collection value. That way, functions such as {@code avg} and
+     * {@code collection_avg} should have the same behaviour when applied to row columns or collection elements.
+     */
+    private static class CollectionAggregationFunction extends NativeScalarFunction
+    {
+        private final CollectionType<?> inputType;
+        private final NativeAggregateFunction aggregateFunction;
+
+        public CollectionAggregationFunction(String name,
+                                             CollectionType<?> inputType,
+                                             NativeAggregateFunction aggregateFunction)
+        {
+            super(name, aggregateFunction.returnType, inputType);
+            this.inputType = inputType;
+            this.aggregateFunction = aggregateFunction;
+        }
+
+        @Override
+        public ByteBuffer execute(ProtocolVersion version, List<ByteBuffer> parameters)
+        {
+            ByteBuffer value = parameters.get(0);
+            if (value == null)
+                return null;
+
+            AggregateFunction.Aggregate aggregate = aggregateFunction.newAggregate();
+            inputType.forEach(value, version, element -> aggregate.addInput(version, Collections.singletonList(element)));
+            return aggregate.compute(version);
+        }
+    }
+}
diff --git a/src/java/org/apache/cassandra/cql3/functions/FunctionParameter.java b/src/java/org/apache/cassandra/cql3/functions/FunctionParameter.java
index 0d5a0c4a12..78ad7a33e3 100644
--- a/src/java/org/apache/cassandra/cql3/functions/FunctionParameter.java
+++ b/src/java/org/apache/cassandra/cql3/functions/FunctionParameter.java
@@ -22,6 +22,11 @@ import javax.annotation.Nullable;
 
 import org.apache.cassandra.cql3.AssignmentTestable;
 import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.marshal.CollectionType;
+import org.apache.cassandra.db.marshal.ListType;
+import org.apache.cassandra.db.marshal.MapType;
+import org.apache.cassandra.db.marshal.NumberType;
+import org.apache.cassandra.db.marshal.SetType;
 import org.apache.cassandra.exceptions.InvalidRequestException;
 
 import static java.lang.String.format;
@@ -42,7 +47,10 @@ public interface FunctionParameter
      * @return the inferred data type of the parameter, or {@link null} it isn't possible to infer it
      */
     @Nullable
-    AbstractType<?> inferType(String keyspace, AssignmentTestable arg, @Nullable AbstractType<?> receiverType);
+    default AbstractType<?> inferType(String keyspace, AssignmentTestable arg, @Nullable AbstractType<?> receiverType)
+    {
+        return arg.getCompatibleTypeIfKnown(keyspace);
+    }
 
     void validateType(FunctionName name, AssignmentTestable arg, AbstractType<?> argType);
 
@@ -106,4 +114,123 @@ public interface FunctionParameter
             }
         };
     }
+
+    /**
+     * @return a function parameter definition that accepts values of type {@link CollectionType}, independently of the
+     * types of its elements.
+     */
+    public static FunctionParameter anyCollection()
+    {
+        return new FunctionParameter()
+        {
+            @Override
+            public void validateType(FunctionName name, AssignmentTestable arg, AbstractType<?> argType)
+            {
+                if (!argType.isCollection())
+                    throw new InvalidRequestException(format("Function %s requires a collection argument, " +
+                                                             "but found argument %s of type %s",
+                                                             name, arg, argType.asCQL3Type()));
+            }
+
+            @Override
+            public String toString()
+            {
+                return "collection";
+            }
+        };
+    }
+
+    /**
+     * @return a function parameter definition that accepts values of type {@link SetType} or {@link ListType}.
+     */
+    public static FunctionParameter setOrList()
+    {
+        return new FunctionParameter()
+        {
+            @Override
+            public void validateType(FunctionName name, AssignmentTestable arg, AbstractType<?> argType)
+            {
+                if (argType.isCollection())
+                {
+                    CollectionType.Kind kind = ((CollectionType<?>) argType).kind;
+                    if (kind == CollectionType.Kind.SET || kind == CollectionType.Kind.LIST)
+                        return;
+                }
+
+                throw new InvalidRequestException(format("Function %s requires a set or list argument, " +
+                                                         "but found argument %s of type %s",
+                                                         name, arg, argType.asCQL3Type()));
+            }
+
+            @Override
+            public String toString()
+            {
+                return "numeric_set_or_list";
+            }
+        };
+    }
+
+    /**
+     * @return a function parameter definition that accepts values of type {@link SetType} or {@link ListType},
+     * provided that its elements are numeric.
+     */
+    public static FunctionParameter numericSetOrList()
+    {
+        return new FunctionParameter()
+        {
+            @Override
+            public void validateType(FunctionName name, AssignmentTestable arg, AbstractType<?> argType)
+            {
+                AbstractType<?> elementType = null;
+                if (argType.isCollection())
+                {
+                    CollectionType<?> collectionType = (CollectionType<?>) argType;
+                    if (collectionType.kind == CollectionType.Kind.SET)
+                    {
+                        elementType = ((SetType<?>) argType).getElementsType();
+                    }
+                    else if (collectionType.kind == CollectionType.Kind.LIST)
+                    {
+                        elementType = ((ListType<?>) argType).getElementsType();
+                    }
+                }
+
+                if (!(elementType instanceof NumberType))
+                    throw new InvalidRequestException(format("Function %s requires a numeric set/list argument, " +
+                                                             "but found argument %s of type %s",
+                                                             name, arg, argType.asCQL3Type()));
+            }
+
+            @Override
+            public String toString()
+            {
+                return "numeric_set_or_list";
+            }
+        };
+    }
+
+    /**
+     * @return a function parameter definition that accepts values of type {@link MapType}, independently of the types
+     * of the map keys and values.
+     */
+    public static FunctionParameter anyMap()
+    {
+        return new FunctionParameter()
+        {
+            @Override
+            public void validateType(FunctionName name, AssignmentTestable arg, AbstractType<?> argType)
+            {
+                if (!argType.isUDT() && !(argType instanceof MapType))
+                    throw new InvalidRequestException(format("Function %s requires a map argument, " +
+                                                             "but found argument %s of type %s",
+                                                             name, arg, argType.asCQL3Type()));
+            }
+
+            @Override
+            public String toString()
+            {
+                return "map";
+            }
+        };
+    }
 }
diff --git a/src/java/org/apache/cassandra/cql3/functions/NativeFunctions.java b/src/java/org/apache/cassandra/cql3/functions/NativeFunctions.java
index 6ea5e81ed4..551662e3e5 100644
--- a/src/java/org/apache/cassandra/cql3/functions/NativeFunctions.java
+++ b/src/java/org/apache/cassandra/cql3/functions/NativeFunctions.java
@@ -40,6 +40,7 @@ public class NativeFunctions
             FromJsonFct.addFunctionsTo(this);
             OperationFcts.addFunctionsTo(this);
             AggregateFcts.addFunctionsTo(this);
+            CollectionFcts.addFunctionsTo(this);
             BytesConversionFcts.addFunctionsTo(this);
             MathFcts.addFunctionsTo(this);
         }
diff --git a/src/java/org/apache/cassandra/db/marshal/CollectionType.java b/src/java/org/apache/cassandra/db/marshal/CollectionType.java
index 5e9916e7e4..a8cc9d9231 100644
--- a/src/java/org/apache/cassandra/db/marshal/CollectionType.java
+++ b/src/java/org/apache/cassandra/db/marshal/CollectionType.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Iterator;
+import java.util.function.Consumer;
 
 import org.apache.cassandra.cql3.CQL3Type;
 import org.apache.cassandra.cql3.ColumnSpecification;
@@ -357,4 +358,11 @@ public abstract class CollectionType<T> extends AbstractType<T>
             ByteBufferUtil.skipWithVIntLength(in);
         }
     }
+
+    public int size(ByteBuffer buffer)
+    {
+        return CollectionSerializer.readCollectionSize(buffer.duplicate(), ByteBufferAccessor.instance, ProtocolVersion.V3);
+    }
+
+    public abstract void forEach(ByteBuffer input, ProtocolVersion version, Consumer<ByteBuffer> action);
 }
diff --git a/src/java/org/apache/cassandra/db/marshal/ListType.java b/src/java/org/apache/cassandra/db/marshal/ListType.java
index f795def3a7..73290d5b6c 100644
--- a/src/java/org/apache/cassandra/db/marshal/ListType.java
+++ b/src/java/org/apache/cassandra/db/marshal/ListType.java
@@ -23,6 +23,7 @@ import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Consumer;
 
 import org.apache.cassandra.cql3.Json;
 import org.apache.cassandra.cql3.Lists;
@@ -243,4 +244,10 @@ public class ListType<T> extends CollectionType<List<T>>
     {
         return setOrListToJsonString(buffer, elements, protocolVersion);
     }
+
+    @Override
+    public void forEach(ByteBuffer input, ProtocolVersion version, Consumer<ByteBuffer> action)
+    {
+        serializer.forEach(input, version, action);
+    }
 }
diff --git a/src/java/org/apache/cassandra/db/marshal/MapType.java b/src/java/org/apache/cassandra/db/marshal/MapType.java
index be74ff1626..d9bfc8040c 100644
--- a/src/java/org/apache/cassandra/db/marshal/MapType.java
+++ b/src/java/org/apache/cassandra/db/marshal/MapType.java
@@ -20,6 +20,7 @@ package org.apache.cassandra.db.marshal;
 import java.nio.ByteBuffer;
 import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Consumer;
 
 import org.apache.cassandra.cql3.Json;
 import org.apache.cassandra.cql3.Maps;
@@ -377,4 +378,10 @@ public class MapType<K, V> extends CollectionType<Map<K, V>>
         }
         return sb.append("}").toString();
     }
+
+    @Override
+    public void forEach(ByteBuffer input, ProtocolVersion version, Consumer<ByteBuffer> action)
+    {
+        throw new UnsupportedOperationException();
+    }
 }
diff --git a/src/java/org/apache/cassandra/db/marshal/SetType.java b/src/java/org/apache/cassandra/db/marshal/SetType.java
index 67699ac3da..8e33ca6698 100644
--- a/src/java/org/apache/cassandra/db/marshal/SetType.java
+++ b/src/java/org/apache/cassandra/db/marshal/SetType.java
@@ -20,6 +20,7 @@ package org.apache.cassandra.db.marshal;
 import java.nio.ByteBuffer;
 import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Consumer;
 
 import org.apache.cassandra.cql3.Json;
 import org.apache.cassandra.cql3.Sets;
@@ -226,4 +227,10 @@ public class SetType<T> extends CollectionType<Set<T>>
     {
         return setOrListToJsonString(buffer, elements, protocolVersion);
     }
+
+    @Override
+    public void forEach(ByteBuffer input, ProtocolVersion version, Consumer<ByteBuffer> action)
+    {
+        serializer.forEach(input, version, action);
+    }
 }
diff --git a/src/java/org/apache/cassandra/serializers/CollectionSerializer.java b/src/java/org/apache/cassandra/serializers/CollectionSerializer.java
index 36e346c3e7..841a1b0aac 100644
--- a/src/java/org/apache/cassandra/serializers/CollectionSerializer.java
+++ b/src/java/org/apache/cassandra/serializers/CollectionSerializer.java
@@ -18,9 +18,11 @@
 
 package org.apache.cassandra.serializers;
 
+import java.nio.BufferUnderflowException;
 import java.nio.ByteBuffer;
 import java.util.Collection;
 import java.util.List;
+import java.util.function.Consumer;
 
 import com.google.common.collect.Range;
 
@@ -224,4 +226,25 @@ public abstract class CollectionSerializer<T> extends TypeSerializer<T>
         ByteBufferUtil.copyBytes(input, startPos, output, sizeLen, bodyLen);
         return output;
     }
+
+    public void forEach(ByteBuffer input, ProtocolVersion version, Consumer<ByteBuffer> action)
+    {
+        try
+        {
+            int collectionSize = readCollectionSize(input, ByteBufferAccessor.instance, version);
+            int offset = sizeOfCollectionSize(collectionSize, version);
+
+            for (int i = 0; i < collectionSize; i++)
+            {
+                ByteBuffer value = readValue(input, ByteBufferAccessor.instance, offset, version);
+                offset += sizeOfValue(value, ByteBufferAccessor.instance, version);
+
+                action.accept(value);
+            }
+        }
+        catch (BufferUnderflowException | IndexOutOfBoundsException e)
+        {
+            throw new MarshalException("Not enough bytes to read a set");
+        }
+    }
 }
diff --git a/src/java/org/apache/cassandra/serializers/MapSerializer.java b/src/java/org/apache/cassandra/serializers/MapSerializer.java
index b3e4cba994..0417d403e6 100644
--- a/src/java/org/apache/cassandra/serializers/MapSerializer.java
+++ b/src/java/org/apache/cassandra/serializers/MapSerializer.java
@@ -23,6 +23,7 @@ import java.nio.ByteBuffer;
 import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.function.Consumer;
 
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.db.marshal.ByteBufferAccessor;
@@ -197,4 +198,10 @@ public class MapSerializer<K, V> extends AbstractMapSerializer<Map<K, V>>
     {
         return (Class) Map.class;
     }
+
+    @Override
+    public void forEach(ByteBuffer input, ProtocolVersion version, Consumer<ByteBuffer> action)
+    {
+        throw new UnsupportedOperationException();
+    }
 }
diff --git a/test/unit/org/apache/cassandra/cql3/functions/CollectionFctsTest.java b/test/unit/org/apache/cassandra/cql3/functions/CollectionFctsTest.java
new file mode 100644
index 0000000000..0ad299edee
--- /dev/null
+++ b/test/unit/org/apache/cassandra/cql3/functions/CollectionFctsTest.java
@@ -0,0 +1,595 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.cql3.functions;
+
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.math.RoundingMode;
+
+import org.junit.Test;
+
+import org.apache.cassandra.cql3.CQL3Type;
+import org.apache.cassandra.cql3.CQLTester;
+import org.apache.cassandra.db.marshal.NumberType;
+import org.apache.cassandra.exceptions.InvalidRequestException;
+
+/**
+ * Tests for the functions defined on {@link CollectionFcts}.
+ */
+public class CollectionFctsTest extends CQLTester
+{
+    private static final BigInteger bigint1 = new BigInteger("12345678901234567890");
+    private static final BigInteger bigint2 = new BigInteger("23456789012345678901");
+    private static final BigDecimal bigdecimal1 = new BigDecimal("1234567890.1234567890");
+    private static final BigDecimal bigdecimal2 = new BigDecimal("2345678901.2345678901");
+
+    @Test
+    public void testNotNumericCollection() throws Throwable
+    {
+        createTable("CREATE TABLE %s (k int PRIMARY KEY, v uuid, l list<text>, s set<boolean>, fl frozen<list<text>>, fs frozen<set<boolean>>)");
+
+        // sum
+        assertInvalidThrowMessage("Function system.collection_sum requires a numeric set/list argument, " +
+                                  "but found argument v of type uuid",
+                                  InvalidRequestException.class,
+                                  "SELECT collection_sum(v) FROM %s");
+        assertInvalidThrowMessage("Function system.collection_sum requires a numeric set/list argument, " +
+                                  "but found argument l of type list<text>",
+                                  InvalidRequestException.class,
+                                  "SELECT collection_sum(l) FROM %s");
+        assertInvalidThrowMessage("Function system.collection_sum requires a numeric set/list argument, " +
+                                  "but found argument s of type set<boolean>",
+                                  InvalidRequestException.class,
+                                  "SELECT collection_sum(s) FROM %s");
+        assertInvalidThrowMessage("Function system.collection_sum requires a numeric set/list argument, " +
+                                  "but found argument fl of type frozen<list<text>>",
+                                  InvalidRequestException.class,
+                                  "SELECT collection_sum(fl) FROM %s");
+        assertInvalidThrowMessage("Function system.collection_sum requires a numeric set/list argument, " +
+                                  "but found argument fs of type frozen<set<boolean>>",
+                                  InvalidRequestException.class,
+                                  "SELECT collection_sum(fs) FROM %s");
+
+        // avg
+        assertInvalidThrowMessage("Function system.collection_avg requires a numeric set/list argument, " +
+                                  "but found argument v of type uuid",
+                                  InvalidRequestException.class,
+                                  "SELECT collection_avg(v) FROM %s");
+        assertInvalidThrowMessage("Function system.collection_avg requires a numeric set/list argument, " +
+                                  "but found argument l of type list<text>",
+                                  InvalidRequestException.class,
+                                  "SELECT collection_avg(l) FROM %s");
+        assertInvalidThrowMessage("Function system.collection_avg requires a numeric set/list argument, " +
+                                  "but found argument s of type set<boolean>",
+                                  InvalidRequestException.class,
+                                  "SELECT collection_avg(s) FROM %s");
+        assertInvalidThrowMessage("Function system.collection_avg requires a numeric set/list argument, " +
+                                  "but found argument fl of type frozen<list<text>>",
+                                  InvalidRequestException.class,
+                                  "SELECT collection_avg(fl) FROM %s");
+        assertInvalidThrowMessage("Function system.collection_avg requires a numeric set/list argument, " +
+                                  "but found argument fs of type frozen<set<boolean>>",
+                                  InvalidRequestException.class,
+                                  "SELECT collection_avg(fs) FROM %s");
+    }
+
+    @Test
+    public void testTinyInt() throws Throwable
+    {
+        createTable(CQL3Type.Native.TINYINT);
+
+        // empty collections
+        assertRows(execute("SELECT map_keys(m), map_values(m), map_keys(fm), map_values(fm) FROM %s"),
+                   row(null, null, set(), list()));
+        assertRows(execute("SELECT collection_count(l), collection_count(s), collection_count(m), " +
+                           "collection_count(fl), collection_count(fs), collection_count(fm) FROM %s"),
+                   row(null, null, null, 0, 0, 0));
+        assertRows(execute("SELECT collection_min(l), collection_min(s), collection_min(fl), collection_min(fs) FROM %s"),
+                   row(null, null, null, null));
+        assertRows(execute("SELECT collection_max(l), collection_max(s), collection_max(fl), collection_max(fs) FROM %s"),
+                   row(null, null, null, null));
+        assertRows(execute("SELECT collection_sum(l), collection_sum(s), collection_sum(fl), collection_sum(fs) FROM %s"),
+                   row(null, null, (byte) 0, (byte) 0));
+        assertRows(execute("SELECT collection_avg(l), collection_avg(s), collection_avg(fl), collection_avg(fs) FROM %s"),
+                   row(null, null, (byte) 0, (byte) 0));
+
+        // not empty collections
+        execute("INSERT INTO %s (k, l, fl, s, fs, m, fm)  VALUES (1, ?, ?, ?, ?, ?, ?)",
+                list((byte) 1, (byte) 2), set((byte) 1, (byte) 2),
+                list((byte) 1, (byte) 2), set((byte) 1, (byte) 2),
+                map((byte) 1, (byte) 2, (byte) 3, (byte) 4),
+                map((byte) 1, (byte) 2, (byte) 3, (byte) 4));
+
+        assertRows(execute("SELECT map_keys(m), map_keys(fm) FROM %s"),
+                   row(set((byte) 1, (byte) 3), set((byte) 1, (byte) 3)));
+        assertRows(execute("SELECT map_values(m), map_values(fm) FROM %s"),
+                   row(list((byte) 2, (byte) 4), list((byte) 2, (byte) 4)));
+        assertRows(execute("SELECT collection_count(l), collection_count(s), collection_count(m) FROM %s"),
+                   row(2, 2, 2));
+        assertRows(execute("SELECT collection_count(fl), collection_count(fs), collection_count(fm) FROM %s"),
+                   row(2, 2, 2));
+        assertRows(execute("SELECT collection_min(l), collection_min(s), collection_min(fl), collection_min(fs) FROM %s"),
+                   row((byte) 1, (byte) 1, (byte) 1, (byte) 1));
+        assertRows(execute("SELECT collection_max(l), collection_max(s), collection_max(fl), collection_max(fs) FROM %s"),
+                   row((byte) 2, (byte) 2, (byte) 2, (byte) 2));
+        assertRows(execute("SELECT collection_sum(l), collection_sum(s), collection_sum(fl), collection_sum(fs) FROM %s"),
+                   row((byte) 3, (byte) 3, (byte) 3, (byte) 3));
+        assertRows(execute("SELECT collection_avg(l), collection_avg(s), collection_avg(fl), collection_avg(fs) FROM %s"),
+                   row((byte) 1, (byte) 1, (byte) 1, (byte) 1));
+    }
+
+    @Test
+    public void testSmallInt() throws Throwable
+    {
+        createTable(CQL3Type.Native.SMALLINT);
+
+        // empty collections
+        assertRows(execute("SELECT map_keys(m), map_values(m), map_keys(fm), map_values(fm) FROM %s"),
+                   row(null, null, set(), list()));
+        assertRows(execute("SELECT collection_count(l), collection_count(s), collection_count(m), " +
+                           "collection_count(fl), collection_count(fs), collection_count(fm) FROM %s"),
+                   row(null, null, null, 0, 0, 0));
+        assertRows(execute("SELECT collection_min(l), collection_min(s), collection_min(fl), collection_min(fs) FROM %s"),
+                   row(null, null, null, null));
+        assertRows(execute("SELECT collection_max(l), collection_max(s), collection_max(fl), collection_max(fs) FROM %s"),
+                   row(null, null, null, null));
+        assertRows(execute("SELECT collection_sum(l), collection_sum(s), collection_sum(fl), collection_sum(fs) FROM %s"),
+                   row(null, null, (short) 0, (short) 0));
+        assertRows(execute("SELECT collection_avg(l), collection_avg(s), collection_avg(fl), collection_avg(fs) FROM %s"),
+                   row(null, null, (short) 0, (short) 0));
+
+        // not empty collections
+        execute("INSERT INTO %s (k, l, fl, s, fs, m, fm)  VALUES (1, ?, ?, ?, ?, ?, ?)",
+                list((short) 1, (short) 2), set((short) 1, (short) 2),
+                list((short) 1, (short) 2), set((short) 1, (short) 2),
+                map((short) 1, (short) 2, (short) 3, (short) 4),
+                map((short) 1, (short) 2, (short) 3, (short) 4));
+
+        assertRows(execute("SELECT map_keys(m), map_keys(fm) FROM %s"),
+                   row(set((short) 1, (short) 3), set((short) 1, (short) 3)));
+        assertRows(execute("SELECT map_values(m), map_values(fm) FROM %s"),
+                   row(list((short) 2, (short) 4), list((short) 2, (short) 4)));
+        assertRows(execute("SELECT collection_count(l), collection_count(s), collection_count(m) FROM %s"),
+                   row(2, 2, 2));
+        assertRows(execute("SELECT collection_count(fl), collection_count(fs), collection_count(fm) FROM %s"),
+                   row(2, 2, 2));
+        assertRows(execute("SELECT collection_min(l), collection_min(s), collection_min(fl), collection_min(fs) FROM %s"),
+                   row((short) 1, (short) 1, (short) 1, (short) 1));
+        assertRows(execute("SELECT collection_max(l), collection_max(s), collection_max(fl), collection_max(fs) FROM %s"),
+                   row((short) 2, (short) 2, (short) 2, (short) 2));
+        assertRows(execute("SELECT collection_sum(l), collection_sum(s), collection_sum(fl), collection_sum(fs) FROM %s"),
+                   row((short) 3, (short) 3, (short) 3, (short) 3));
+        assertRows(execute("SELECT collection_avg(l), collection_avg(s), collection_avg(fl), collection_avg(fs) FROM %s"),
+                   row((short) 1, (short) 1, (short) 1, (short) 1));
+    }
+
+    @Test
+    public void testInt() throws Throwable
+    {
+        createTable(CQL3Type.Native.INT);
+
+        // empty collections
+        assertRows(execute("SELECT map_keys(m), map_values(m), map_keys(fm), map_values(fm) FROM %s"),
+                   row(null, null, set(), list()));
+        assertRows(execute("SELECT collection_count(l), collection_count(s), collection_count(m), " +
+                           "collection_count(fl), collection_count(fs), collection_count(fm) FROM %s"),
+                   row(null, null, null, 0, 0, 0));
+        assertRows(execute("SELECT collection_min(l), collection_min(s), collection_min(fl), collection_min(fs) FROM %s"),
+                   row(null, null, null, null));
+        assertRows(execute("SELECT collection_max(l), collection_max(s), collection_max(fl), collection_max(fs) FROM %s"),
+                   row(null, null, null, null));
+        assertRows(execute("SELECT collection_sum(l), collection_sum(s), collection_sum(fl), collection_sum(fs) FROM %s"),
+                   row(null, null, 0, 0));
+        assertRows(execute("SELECT collection_avg(l), collection_avg(s), collection_avg(fl), collection_avg(fs) FROM %s"),
+                   row(null, null, 0, 0));
+
+        // not empty collections
+        execute("INSERT INTO %s (k, l, fl, s, fs, m, fm)  VALUES (1, ?, ?, ?, ?, ?, ?)",
+                list(1, 2), set(1, 2), list(1, 2), set(1, 2), map(1, 2, 3, 4), map(1, 2, 3, 4));
+
+        assertRows(execute("SELECT map_keys(m), map_keys(fm) FROM %s"),
+                   row(set(1, 3), set(1, 3)));
+        assertRows(execute("SELECT map_values(m), map_values(fm) FROM %s"),
+                   row(list(2, 4), list(2, 4)));
+        assertRows(execute("SELECT collection_count(l), collection_count(s), collection_count(m) FROM %s"),
+                   row(2, 2, 2));
+        assertRows(execute("SELECT collection_count(fl), collection_count(fs), collection_count(fm) FROM %s"),
+                   row(2, 2, 2));
+        assertRows(execute("SELECT collection_min(l), collection_min(s), collection_min(fl), collection_min(fs) FROM %s"),
+                   row(1, 1, 1, 1));
+        assertRows(execute("SELECT collection_max(l), collection_max(s), collection_max(fl), collection_max(fs) FROM %s"),
+                   row(2, 2, 2, 2));
+        assertRows(execute("SELECT collection_sum(l), collection_sum(s), collection_sum(fl), collection_sum(fs) FROM %s"),
+                   row(3, 3, 3, 3));
+        assertRows(execute("SELECT collection_avg(l), collection_avg(s), collection_avg(fl), collection_avg(fs) FROM %s"),
+                   row(1, 1, 1, 1));
+    }
+
+    @Test
+    public void testBigInt() throws Throwable
+    {
+        createTable(CQL3Type.Native.BIGINT);
+
+        // empty collections
+        assertRows(execute("SELECT map_keys(m), map_values(m), map_keys(fm), map_values(fm) FROM %s"),
+                   row(null, null, set(), list()));
+        assertRows(execute("SELECT collection_count(l), collection_count(s), collection_count(m), " +
+                           "collection_count(fl), collection_count(fs), collection_count(fm) FROM %s"),
+                   row(null, null, null, 0, 0, 0));
+        assertRows(execute("SELECT collection_min(l), collection_min(s), collection_min(fl), collection_min(fs) FROM %s"),
+                   row(null, null, null, null));
+        assertRows(execute("SELECT collection_max(l), collection_max(s), collection_max(fl), collection_max(fs) FROM %s"),
+                   row(null, null, null, null));
+        assertRows(execute("SELECT collection_sum(l), collection_sum(s), collection_sum(fl), collection_sum(fs) FROM %s"),
+                   row(null, null, 0L, 0L));
+        assertRows(execute("SELECT collection_avg(l), collection_avg(s), collection_avg(fl), collection_avg(fs) FROM %s"),
+                   row(null, null, 0L, 0L));
+
+        // not empty collections
+        execute("INSERT INTO %s (k, l, fl, s, fs, m, fm)  VALUES (1, ?, ?, ?, ?, ?, ?)",
+                list(1L, 2L), set(1L, 2L), list(1L, 2L), set(1L, 2L), map(1L, 2L, 3L, 4L), map(1L, 2L, 3L, 4L));
+
+        assertRows(execute("SELECT map_keys(m), map_keys(fm) FROM %s"),
+                   row(set(1L, 3L), set(1L, 3L)));
+        assertRows(execute("SELECT map_values(m), map_values(fm) FROM %s"),
+                   row(list(2L, 4L), list(2L, 4L)));
+        assertRows(execute("SELECT collection_count(l), collection_count(s), collection_count(m) FROM %s"),
+                   row(2, 2, 2));
+        assertRows(execute("SELECT collection_count(fl), collection_count(fs), collection_count(fm) FROM %s"),
+                   row(2, 2, 2));
+        assertRows(execute("SELECT collection_min(l), collection_min(s), collection_min(fl), collection_min(fs) FROM %s"),
+                   row(1L, 1L, 1L, 1L));
+        assertRows(execute("SELECT collection_max(l), collection_max(s), collection_max(fl), collection_max(fs) FROM %s"),
+                   row(2L, 2L, 2L, 2L));
+        assertRows(execute("SELECT collection_sum(l), collection_sum(s), collection_sum(fl), collection_sum(fs) FROM %s"),
+                   row(3L, 3L, 3L, 3L));
+        assertRows(execute("SELECT collection_avg(l), collection_avg(s), collection_avg(fl), collection_avg(fs) FROM %s"),
+                   row(1L, 1L, 1L, 1L));
+    }
+
+    @Test
+    public void testFloat() throws Throwable
+    {
+        createTable(CQL3Type.Native.FLOAT);
+
+        // empty collections
+        assertRows(execute("SELECT map_keys(m), map_values(m), map_keys(fm), map_values(fm) FROM %s"),
+                   row(null, null, set(), list()));
+        assertRows(execute("SELECT collection_count(l), collection_count(s), collection_count(m), " +
+                           "collection_count(fl), collection_count(fs), collection_count(fm) FROM %s"),
+                   row(null, null, null, 0, 0, 0));
+        assertRows(execute("SELECT collection_min(l), collection_min(s), collection_min(fl), collection_min(fs) FROM %s"),
+                   row(null, null, null, null));
+        assertRows(execute("SELECT collection_max(l), collection_max(s), collection_max(fl), collection_max(fs) FROM %s"),
+                   row(null, null, null, null));
+        assertRows(execute("SELECT collection_sum(l), collection_sum(s), collection_sum(fl), collection_sum(fs) FROM %s"),
+                   row(null, null, 0f, 0f));
+        assertRows(execute("SELECT collection_avg(l), collection_avg(s), collection_avg(fl), collection_avg(fs) FROM %s"),
+                   row(null, null, 0f, 0f));
+
+        // not empty collections
+        execute("INSERT INTO %s (k, l, fl, s, fs, m, fm)  VALUES (1, ?, ?, ?, ?, ?, ?)",
+                list(1.23f, 2.34f), list(1.23f, 2.34f),
+                set(1.23f, 2.34f), set(1.23f, 2.34f),
+                map(1.23f, 2.34f, 3.45f, 4.56f), map(1.23f, 2.34f, 3.45f, 4.56f));
+
+        assertRows(execute("SELECT map_keys(m), map_keys(fm) FROM %s"),
+                   row(set(1.23f, 3.45f), set(1.23f, 3.45f)));
+        assertRows(execute("SELECT map_values(m), map_values(fm) FROM %s"),
+                   row(list(2.34f, 4.56f), list(2.34f, 4.56f)));
+        assertRows(execute("SELECT collection_count(l), collection_count(s), collection_count(m) FROM %s"),
+                   row(2, 2, 2));
+        assertRows(execute("SELECT collection_count(fl), collection_count(fs), collection_count(fm) FROM %s"),
+                   row(2, 2, 2));
+        assertRows(execute("SELECT collection_min(l), collection_min(s), collection_min(fl), collection_min(fs) FROM %s"),
+                   row(1.23f, 1.23f, 1.23f, 1.23f));
+        assertRows(execute("SELECT collection_max(l), collection_max(s), collection_max(fl), collection_max(fs) FROM %s"),
+                   row(2.34f, 2.34f, 2.34f, 2.34f));
+
+        float sum = 1.23f + 2.34f;
+        assertRows(execute("SELECT collection_sum(l), collection_sum(s), collection_sum(fl), collection_sum(fs) FROM %s"),
+                   row(sum, sum, sum, sum));
+
+        float avg = (1.23f + 2.34f) / 2;
+        assertRows(execute("SELECT collection_avg(l), collection_avg(s), collection_avg(fl), collection_avg(fs) FROM %s"),
+                   row(avg, avg, avg, avg));
+    }
+
+    @Test
+    public void testDouble() throws Throwable
+    {
+        createTable(CQL3Type.Native.DOUBLE);
+
+        // empty collections
+        assertRows(execute("SELECT map_keys(m), map_values(m), map_keys(fm), map_values(fm) FROM %s"),
+                   row(null, null, set(), list()));
+        assertRows(execute("SELECT collection_count(l), collection_count(s), collection_count(m), " +
+                           "collection_count(fl), collection_count(fs), collection_count(fm) FROM %s"),
+                   row(null, null, null, 0, 0, 0));
+        assertRows(execute("SELECT collection_min(l), collection_min(s), collection_min(fl), collection_min(fs) FROM %s"),
+                   row(null, null, null, null));
+        assertRows(execute("SELECT collection_max(l), collection_max(s), collection_max(fl), collection_max(fs) FROM %s"),
+                   row(null, null, null, null));
+        assertRows(execute("SELECT collection_sum(l), collection_sum(s), collection_sum(fl), collection_sum(fs) FROM %s"),
+                   row(null, null, 0d, 0d));
+        assertRows(execute("SELECT collection_avg(l), collection_avg(s), collection_avg(fl), collection_avg(fs) FROM %s"),
+                   row(null, null, 0d, 0d));
+
+        // not empty collections
+        execute("INSERT INTO %s (k, l, fl, s, fs, m, fm)  VALUES (1, ?, ?, ?, ?, ?, ?)",
+                list(1.23d, 2.34d), list(1.23d, 2.34d),
+                set(1.23d, 2.34d), set(1.23d, 2.34d),
+                map(1.23d, 2.34d, 3.45d, 4.56d), map(1.23d, 2.34d, 3.45d, 4.56d));
+
+        assertRows(execute("SELECT map_keys(m), map_keys(fm) FROM %s"),
+                   row(set(1.23d, 3.45d), set(1.23d, 3.45d)));
+        assertRows(execute("SELECT map_values(m), map_values(fm) FROM %s"),
+                   row(list(2.34d, 4.56d), list(2.34d, 4.56d)));
+        assertRows(execute("SELECT collection_count(l), collection_count(s), collection_count(m) FROM %s"),
+                   row(2, 2, 2));
+        assertRows(execute("SELECT collection_count(fl), collection_count(fs), collection_count(fm) FROM %s"),
+                   row(2, 2, 2));
+        assertRows(execute("SELECT collection_min(l), collection_min(s), collection_min(fl), collection_min(fs) FROM %s"),
+                   row(1.23d, 1.23d, 1.23d, 1.23d));
+        assertRows(execute("SELECT collection_max(l), collection_max(s), collection_max(fl), collection_max(fs) FROM %s"),
+                   row(2.34d, 2.34d, 2.34d, 2.34d));
+        assertRows(execute("SELECT collection_sum(l), collection_sum(s), collection_sum(fl), collection_sum(fs) FROM %s"),
+                   row(3.57d, 3.57d, 3.57d, 3.57d));
+        assertRows(execute("SELECT collection_avg(l), collection_avg(s), collection_avg(fl), collection_avg(fs) FROM %s"),
+                   row(1.785d, 1.785d, 1.785d, 1.785d));
+    }
+
+    @Test
+    public void testVarInt() throws Throwable
+    {
+        createTable(CQL3Type.Native.VARINT);
+
+        // empty collections
+        assertRows(execute("SELECT map_keys(m), map_values(m), map_keys(fm), map_values(fm) FROM %s"),
+                   row(null, null, set(), list()));
+        assertRows(execute("SELECT collection_count(l), collection_count(s), collection_count(m), " +
+                           "collection_count(fl), collection_count(fs), collection_count(fm) FROM %s"),
+                   row(null, null, null, 0, 0, 0));
+        assertRows(execute("SELECT collection_min(l), collection_min(s), collection_min(fl), collection_min(fs) FROM %s"),
+                   row(null, null, null, null));
+        assertRows(execute("SELECT collection_max(l), collection_max(s), collection_max(fl), collection_max(fs) FROM %s"),
+                   row(null, null, null, null));
+        assertRows(execute("SELECT collection_sum(l), collection_sum(s), collection_sum(fl), collection_sum(fs) FROM %s"),
+                   row(null, null, BigInteger.ZERO, BigInteger.ZERO));
+        assertRows(execute("SELECT collection_avg(l), collection_avg(s), collection_avg(fl), collection_avg(fs) FROM %s"),
+                   row(null, null, BigInteger.ZERO, BigInteger.ZERO));
+
+        // not empty collections
+        execute("INSERT INTO %s (k, l, fl, s, fs, m, fm)  VALUES (1, ?, ?, ?, ?, ?, ?)",
+                list(bigint1, bigint2), list(bigint1, bigint2),
+                set(bigint1, bigint2), set(bigint1, bigint2),
+                map(bigint1, bigint2, bigint2, bigint1), map(bigint1, bigint2, bigint2, bigint1));
+
+        assertRows(execute("SELECT map_keys(m), map_keys(fm) FROM %s"),
+                   row(set(bigint1, bigint2), set(bigint1, bigint2)));
+        assertRows(execute("SELECT map_values(m), map_values(fm) FROM %s"),
+                   row(list(bigint2, bigint1), list(bigint2, bigint1)));
+        assertRows(execute("SELECT collection_count(l), collection_count(s), collection_count(m) FROM %s"),
+                   row(2, 2, 2));
+        assertRows(execute("SELECT collection_count(fl), collection_count(fs), collection_count(fm) FROM %s"),
+                   row(2, 2, 2));
+        assertRows(execute("SELECT collection_min(l), collection_min(s), collection_min(fl), collection_min(fs) FROM %s"),
+                   row(bigint1, bigint1, bigint1, bigint1));
+        assertRows(execute("SELECT collection_max(l), collection_max(s), collection_max(fl), collection_max(fs) FROM %s"),
+                   row(bigint2, bigint2, bigint2, bigint2));
+
+        BigInteger sum = bigint1.add(bigint2);
+        assertRows(execute("SELECT collection_sum(l), collection_sum(s), collection_sum(fl), collection_sum(fs) FROM %s"),
+                   row(sum, sum, sum, sum));
+
+        BigInteger avg = bigint1.add(bigint2).divide(BigInteger.valueOf(2));
+        assertRows(execute("SELECT collection_avg(l), collection_avg(s), collection_avg(fl), collection_avg(fs) FROM %s"),
+                   row(avg, avg, avg, avg));
+    }
+
+    @Test
+    public void testDecimal() throws Throwable
+    {
+        createTable(CQL3Type.Native.DECIMAL);
+
+        // empty collections
+        assertRows(execute("SELECT map_keys(m), map_values(m), map_keys(fm), map_values(fm) FROM %s"),
+                   row(null, null, set(), list()));
+        assertRows(execute("SELECT collection_count(l), collection_count(s), collection_count(m), " +
+                           "collection_count(fl), collection_count(fs), collection_count(fm) FROM %s"),
+                   row(null, null, null, 0, 0, 0));
+        assertRows(execute("SELECT collection_min(l), collection_min(s), collection_min(fl), collection_min(fs) FROM %s"),
+                   row(null, null, null, null));
+        assertRows(execute("SELECT collection_max(l), collection_max(s), collection_max(fl), collection_max(fs) FROM %s"),
+                   row(null, null, null, null));
+        assertRows(execute("SELECT collection_sum(l), collection_sum(s), collection_sum(fl), collection_sum(fs) FROM %s"),
+                   row(null, null, BigDecimal.ZERO, BigDecimal.ZERO));
+        assertRows(execute("SELECT collection_avg(l), collection_avg(s), collection_avg(fl), collection_avg(fs) FROM %s"),
+                   row(null, null, BigDecimal.ZERO, BigDecimal.ZERO));
+
+        // not empty collections
+        execute("INSERT INTO %s (k, l, fl, s, fs, m, fm)  VALUES (1, ?, ?, ?, ?, ?, ?)",
+                list(bigdecimal1, bigdecimal2), list(bigdecimal1, bigdecimal2),
+                set(bigdecimal1, bigdecimal2), set(bigdecimal1, bigdecimal2),
+                map(bigdecimal1, bigdecimal2, bigdecimal2, bigdecimal1),
+                map(bigdecimal1, bigdecimal2, bigdecimal2, bigdecimal1));
+
+        assertRows(execute("SELECT map_keys(m), map_keys(fm) FROM %s"),
+                   row(set(bigdecimal1, bigdecimal2), set(bigdecimal1, bigdecimal2)));
+        assertRows(execute("SELECT map_values(m), map_values(fm) FROM %s"),
+                   row(list(bigdecimal2, bigdecimal1), list(bigdecimal2, bigdecimal1)));
+        assertRows(execute("SELECT collection_count(l), collection_count(s), collection_count(m) FROM %s"),
+                   row(2, 2, 2));
+        assertRows(execute("SELECT collection_count(fl), collection_count(fs), collection_count(fm) FROM %s"),
+                   row(2, 2, 2));
+        assertRows(execute("SELECT collection_min(l), collection_min(s), collection_min(fl), collection_min(fs) FROM %s"),
+                   row(bigdecimal1, bigdecimal1, bigdecimal1, bigdecimal1));
+        assertRows(execute("SELECT collection_max(l), collection_max(s), collection_max(fl), collection_max(fs) FROM %s"),
+                   row(bigdecimal2, bigdecimal2, bigdecimal2, bigdecimal2));
+
+        BigDecimal sum = bigdecimal1.add(bigdecimal2);
+        assertRows(execute("SELECT collection_sum(l), collection_sum(s), collection_sum(fl), collection_sum(fs) FROM %s"),
+                   row(sum, sum, sum, sum));
+
+        BigDecimal avg = bigdecimal1.add(bigdecimal2).divide(BigDecimal.valueOf(2), RoundingMode.HALF_EVEN);
+        assertRows(execute("SELECT collection_avg(l), collection_avg(s), collection_avg(fl), collection_avg(fs) FROM %s"),
+                   row(avg, avg, avg, avg));
+    }
+
+    @Test
+    public void testAscii() throws Throwable
+    {
+        createTable(CQL3Type.Native.ASCII);
+
+        // empty collections
+        assertRows(execute("SELECT map_keys(m), map_values(m), map_keys(fm), map_values(fm) FROM %s"),
+                   row(null, null, set(), list()));
+        assertRows(execute("SELECT collection_count(l), collection_count(s), collection_count(m), " +
+                           "collection_count(fl), collection_count(fs), collection_count(fm) FROM %s"),
+                   row(null, null, null, 0, 0, 0));
+
+        // not empty collections
+        execute("INSERT INTO %s (k, l, fl, s, fs, m, fm)  VALUES (1, ?, ?, ?, ?, ?, ?)",
+                list("abc", "bcd"), set("abc", "bcd"),
+                list("abc", "bcd"), set("abc", "bcd"),
+                map("abc", "bcd", "cde", "def"), map("abc", "bcd", "cde", "def"));
+
+        assertRows(execute("SELECT map_keys(m), map_keys(fm) FROM %s"),
+                   row(set("abc", "cde"), set("abc", "cde")));
+        assertRows(execute("SELECT map_values(m), map_values(fm) FROM %s"),
+                   row(list("bcd", "def"), list("bcd", "def")));
+        assertRows(execute("SELECT collection_count(l), collection_count(s), collection_count(m) FROM %s"),
+                   row(2, 2, 2));
+        assertRows(execute("SELECT collection_count(fl), collection_count(fs), collection_count(fm) FROM %s"),
+                   row(2, 2, 2));
+        assertRows(execute("SELECT collection_min(l), collection_min(s), collection_min(fl), collection_min(fs) FROM %s"),
+                   row("abc", "abc", "abc", "abc"));
+        assertRows(execute("SELECT collection_max(l), collection_max(s), collection_max(fl), collection_max(fs) FROM %s"),
+                   row("bcd", "bcd", "bcd", "bcd"));
+    }
+
+    @Test
+    public void testText() throws Throwable
+    {
+        createTable(CQL3Type.Native.TEXT);
+
+        // empty collections
+        assertRows(execute("SELECT map_keys(m), map_values(m), map_keys(fm), map_values(fm) FROM %s"),
+                   row(null, null, set(), list()));
+        assertRows(execute("SELECT collection_count(l), collection_count(s), collection_count(m), " +
+                           "collection_count(fl), collection_count(fs), collection_count(fm) FROM %s"),
+                   row(null, null, null, 0, 0, 0));
+
+        // not empty collections
+        execute("INSERT INTO %s (k, l, fl, s, fs, m, fm)  VALUES (1, ?, ?, ?, ?, ?, ?)",
+                list("ábc", "bcd"), set("ábc", "bcd"),
+                list("ábc", "bcd"), set("ábc", "bcd"),
+                map("ábc", "bcd", "cdé", "déf"), map("ábc", "bcd", "cdé", "déf"));
+
+        assertRows(execute("SELECT map_keys(m), map_keys(fm) FROM %s"),
+                   row(set("ábc", "cdé"), set("ábc", "cdé")));
+        assertRows(execute("SELECT map_values(m), map_values(fm) FROM %s"),
+                   row(list("déf", "bcd"), list("déf", "bcd")));
+        assertRows(execute("SELECT collection_count(l), collection_count(s), collection_count(m) FROM %s"),
+                   row(2, 2, 2));
+        assertRows(execute("SELECT collection_count(fl), collection_count(fs), collection_count(fm) FROM %s"),
+                   row(2, 2, 2));
+        assertRows(execute("SELECT collection_min(l), collection_min(s), collection_min(fl), collection_min(fs) FROM %s"),
+                   row("bcd", "bcd", "bcd", "bcd"));
+        assertRows(execute("SELECT collection_max(l), collection_max(s), collection_max(fl), collection_max(fs) FROM %s"),
+                   row("ábc", "ábc", "ábc", "ábc"));
+    }
+
+    @Test
+    public void testBoolean() throws Throwable
+    {
+        createTable(CQL3Type.Native.BOOLEAN);
+
+        // empty collections
+        assertRows(execute("SELECT map_keys(m), map_values(m), map_keys(fm), map_values(fm) FROM %s"),
+                   row(null, null, set(), list()));
+        assertRows(execute("SELECT collection_count(l), collection_count(s), collection_count(m), " +
+                           "collection_count(fl), collection_count(fs), collection_count(fm) FROM %s"),
+                   row(null, null, null, 0, 0, 0));
+
+        // not empty collections
+        execute("INSERT INTO %s (k, l, fl, s, fs, m, fm)  VALUES (1, ?, ?, ?, ?, ?, ?)",
+                list(true, false), set(true, false),
+                list(true, false), set(true, false),
+                map(true, false, false, true), map(true, false, false, true));
+
+        assertRows(execute("SELECT map_keys(m), map_keys(fm) FROM %s"),
+                   row(set(true, false), set(true, false)));
+        assertRows(execute("SELECT map_values(m), map_values(fm) FROM %s"),
+                   row(list(true, false), list(true, false)));
+        assertRows(execute("SELECT collection_count(l), collection_count(s), collection_count(m) FROM %s"),
+                   row(2, 2, 2));
+        assertRows(execute("SELECT collection_count(fl), collection_count(fs), collection_count(fm) FROM %s"),
+                   row(2, 2, 2));
+        assertRows(execute("SELECT collection_min(l), collection_min(s), collection_min(fl), collection_min(fs) FROM %s"),
+                   row(false, false, false, false));
+        assertRows(execute("SELECT collection_max(l), collection_max(s), collection_max(fl), collection_max(fs) FROM %s"),
+                   row(true, true, true, true));
+    }
+
+    private void createTable(CQL3Type.Native type) throws Throwable
+    {
+        createTable(String.format("CREATE TABLE %%s (" +
+                                  " k int PRIMARY KEY, " +
+                                  " l list<%s>, " +
+                                  " s set<%<s>, " +
+                                  " m map<%<s, %<s>, " +
+                                  " fl frozen<list<%<s>>, " +
+                                  " fs frozen<set<%<s>>, " +
+                                  " fm frozen<map<%<s, %<s>>)", type));
+
+        // test functions with an empty table
+        assertEmpty(execute("SELECT map_keys(m), map_keys(fm), map_values(m), map_values(fm) FROM %s"));
+        assertEmpty(execute("SELECT collection_count(l), collection_count(s), collection_count(m) FROM %s"));
+        assertEmpty(execute("SELECT collection_count(fl), collection_count(fs), collection_count(fm) FROM %s"));
+        assertEmpty(execute("SELECT collection_min(l), collection_min(s), collection_min(fl), collection_min(fs) FROM %s"));
+        assertEmpty(execute("SELECT collection_max(l), collection_max(s), collection_max(fl), collection_max(fs) FROM %s"));
+
+        String errorMsg = "requires a numeric set/list argument";
+        if (type.getType() instanceof NumberType)
+        {
+            assertEmpty(execute("SELECT collection_sum(l), collection_sum(s), collection_sum(fl), collection_sum(fs) FROM %s"));
+            assertEmpty(execute("SELECT collection_avg(l), collection_avg(s), collection_avg(fl), collection_avg(fs) FROM %s"));
+        }
+        else
+        {
+            assertInvalidThrowMessage(errorMsg, InvalidRequestException.class, "SELECT collection_sum(l) FROM %s");
+            assertInvalidThrowMessage(errorMsg, InvalidRequestException.class, "SELECT collection_avg(l) FROM %s");
+            assertInvalidThrowMessage(errorMsg, InvalidRequestException.class, "SELECT collection_sum(s) FROM %s");
+            assertInvalidThrowMessage(errorMsg, InvalidRequestException.class, "SELECT collection_avg(s) FROM %s");
+            assertInvalidThrowMessage(errorMsg, InvalidRequestException.class, "SELECT collection_sum(fl) FROM %s");
+            assertInvalidThrowMessage(errorMsg, InvalidRequestException.class, "SELECT collection_avg(fl) FROM %s");
+            assertInvalidThrowMessage(errorMsg, InvalidRequestException.class, "SELECT collection_sum(fs) FROM %s");
+            assertInvalidThrowMessage(errorMsg, InvalidRequestException.class, "SELECT collection_avg(fs) FROM %s");
+        }
+        assertInvalidThrowMessage(errorMsg, InvalidRequestException.class, "SELECT collection_sum(m) FROM %s");
+        assertInvalidThrowMessage(errorMsg, InvalidRequestException.class, "SELECT collection_avg(m) FROM %s");
+        assertInvalidThrowMessage(errorMsg, InvalidRequestException.class, "SELECT collection_sum(fm) FROM %s");
+        assertInvalidThrowMessage(errorMsg, InvalidRequestException.class, "SELECT collection_avg(fm) FROM %s");
+
+        // prepare empty collections
+        execute("INSERT INTO %s (k, l, fl, s, fs, m, fm) VALUES (1, ?, ?, ?, ?, ?, ?)",
+                list(), list(), set(), set(), map(), map());
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org