You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ty...@apache.org on 2014/12/11 18:47:14 UTC

[1/2] cassandra git commit: Support for user-defined aggregate functions

Repository: cassandra
Updated Branches:
  refs/heads/trunk 857de5540 -> e2f35c767


http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2f35c76/src/java/org/apache/cassandra/cql3/statements/DropAggregateStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/DropAggregateStatement.java b/src/java/org/apache/cassandra/cql3/statements/DropAggregateStatement.java
new file mode 100644
index 0000000..118f89d
--- /dev/null
+++ b/src/java/org/apache/cassandra/cql3/statements/DropAggregateStatement.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.cql3.statements;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.cassandra.auth.Permission;
+import org.apache.cassandra.cql3.CQL3Type;
+import org.apache.cassandra.cql3.functions.*;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.exceptions.InvalidRequestException;
+import org.apache.cassandra.exceptions.RequestValidationException;
+import org.apache.cassandra.exceptions.UnauthorizedException;
+import org.apache.cassandra.service.ClientState;
+import org.apache.cassandra.service.MigrationManager;
+import org.apache.cassandra.thrift.ThriftValidation;
+import org.apache.cassandra.transport.Event;
+
+/**
+ * A <code>DROP AGGREGATE</code> statement parsed from a CQL query.
+ */
+public final class DropAggregateStatement extends SchemaAlteringStatement
+{
+    private FunctionName functionName;
+    private final boolean ifExists;
+    private final List<CQL3Type.Raw> argRawTypes;
+    private final boolean argsPresent;
+
+    public DropAggregateStatement(FunctionName functionName,
+                                  List<CQL3Type.Raw> argRawTypes,
+                                  boolean argsPresent,
+                                  boolean ifExists)
+    {
+        this.functionName = functionName;
+        this.argRawTypes = argRawTypes;
+        this.argsPresent = argsPresent;
+        this.ifExists = ifExists;
+    }
+
+    public void prepareKeyspace(ClientState state) throws InvalidRequestException
+    {
+        if (!functionName.hasKeyspace() && state.getRawKeyspace() != null)
+            functionName = new FunctionName(state.getKeyspace(), functionName.name);
+
+        if (!functionName.hasKeyspace())
+            throw new InvalidRequestException("Functions must be fully qualified with a keyspace name if a keyspace is not set for the session");
+
+        ThriftValidation.validateKeyspaceNotSystem(functionName.keyspace);
+    }
+
+    public void checkAccess(ClientState state) throws UnauthorizedException, InvalidRequestException
+    {
+        // TODO CASSANDRA-7557 (function DDL permission)
+
+        state.hasKeyspaceAccess(functionName.keyspace, Permission.DROP);
+    }
+
+    public void validate(ClientState state) throws RequestValidationException
+    {
+    }
+
+    public Event.SchemaChange changeEvent()
+    {
+        return null;
+    }
+
+    public boolean announceMigration(boolean isLocalOnly) throws RequestValidationException
+    {
+        List<Function> olds = Functions.find(functionName);
+
+        if (!argsPresent && olds != null && olds.size() > 1)
+            throw new InvalidRequestException(String.format("'DROP AGGREGATE %s' matches multiple function definitions; " +
+                                                            "specify the argument types by issuing a statement like " +
+                                                            "'DROP AGGREGATE %s (type, type, ...)'. Hint: use cqlsh " +
+                                                            "'DESCRIBE AGGREGATE %s' command to find all overloads",
+                                                            functionName, functionName, functionName));
+
+        List<AbstractType<?>> argTypes = new ArrayList<>(argRawTypes.size());
+        for (CQL3Type.Raw rawType : argRawTypes)
+            argTypes.add(rawType.prepare(functionName.keyspace).getType());
+
+        Function old;
+        if (argsPresent)
+        {
+            old = Functions.find(functionName, argTypes);
+            if (old == null || !(old instanceof AggregateFunction))
+            {
+                if (ifExists)
+                    return false;
+                // just build a nicer error message
+                StringBuilder sb = new StringBuilder();
+                for (CQL3Type.Raw rawType : argRawTypes)
+                {
+                    if (sb.length() > 0)
+                        sb.append(", ");
+                    sb.append(rawType);
+                }
+                throw new InvalidRequestException(String.format("Cannot drop non existing aggregate '%s(%s)'",
+                                                                functionName, sb));
+            }
+        }
+        else
+        {
+            if (olds == null || olds.isEmpty() || !(olds.get(0) instanceof AggregateFunction))
+            {
+                if (ifExists)
+                    return false;
+                throw new InvalidRequestException(String.format("Cannot drop non existing aggregate '%s'", functionName));
+            }
+            old = olds.get(0);
+        }
+
+        if (old.isNative())
+            throw new InvalidRequestException(String.format("Cannot drop aggregate '%s' because it is a " +
+                                                            "native (built-in) function", functionName));
+
+        MigrationManager.announceAggregateDrop((UDAggregate)old, isLocalOnly);
+        return true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2f35c76/src/java/org/apache/cassandra/cql3/statements/DropFunctionStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/DropFunctionStatement.java b/src/java/org/apache/cassandra/cql3/statements/DropFunctionStatement.java
index 0ba3721..394aca0 100644
--- a/src/java/org/apache/cassandra/cql3/statements/DropFunctionStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/DropFunctionStatement.java
@@ -60,7 +60,7 @@ public final class DropFunctionStatement extends SchemaAlteringStatement
             functionName = new FunctionName(state.getKeyspace(), functionName.name);
 
         if (!functionName.hasKeyspace())
-            throw new InvalidRequestException("You need to be logged in a keyspace or use a fully qualified function name");
+            throw new InvalidRequestException("Functions must be fully qualified with a keyspace name if a keyspace is not set for the session");
 
         ThriftValidation.validateKeyspaceNotSystem(functionName.keyspace);
     }
@@ -73,11 +73,6 @@ public final class DropFunctionStatement extends SchemaAlteringStatement
         state.hasKeyspaceAccess(functionName.keyspace, Permission.DROP);
     }
 
-    /**
-     * The <code>CqlParser</code> only goes as far as extracting the keyword arguments
-     * from these statements, so this method is responsible for processing and
-     * validating.
-     */
     @Override
     public void validate(ClientState state)
     {
@@ -109,7 +104,7 @@ public final class DropFunctionStatement extends SchemaAlteringStatement
         if (argsPresent)
         {
             old = Functions.find(functionName, argTypes);
-            if (old == null)
+            if (old == null || !(old instanceof ScalarFunction))
             {
                 if (ifExists)
                     return false;
@@ -127,7 +122,7 @@ public final class DropFunctionStatement extends SchemaAlteringStatement
         }
         else
         {
-            if (olds == null || olds.isEmpty())
+            if (olds == null || olds.isEmpty() || !(olds.get(0) instanceof ScalarFunction))
             {
                 if (ifExists)
                     return false;
@@ -136,7 +131,11 @@ public final class DropFunctionStatement extends SchemaAlteringStatement
             old = olds.get(0);
         }
 
-        MigrationManager.announceFunctionDrop((UDFunction)old, isLocalOnly);
+        List<Function> references = Functions.getReferencesTo(old);
+        if (!references.isEmpty())
+            throw new InvalidRequestException(String.format("Function '%s' still referenced by %s", functionName, references));
+
+        MigrationManager.announceFunctionDrop((UDFunction) old, isLocalOnly);
         return true;
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2f35c76/src/java/org/apache/cassandra/db/DefsTables.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DefsTables.java b/src/java/org/apache/cassandra/db/DefsTables.java
index bcb0893..82a5dd1 100644
--- a/src/java/org/apache/cassandra/db/DefsTables.java
+++ b/src/java/org/apache/cassandra/db/DefsTables.java
@@ -34,6 +34,7 @@ import org.apache.cassandra.config.KSMetaData;
 import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.config.UTMetaData;
 import org.apache.cassandra.cql3.functions.Functions;
+import org.apache.cassandra.cql3.functions.UDAggregate;
 import org.apache.cassandra.cql3.functions.UDFunction;
 import org.apache.cassandra.db.commitlog.CommitLog;
 import org.apache.cassandra.db.compaction.CompactionManager;
@@ -118,7 +119,8 @@ public class DefsTables
         Map<DecoratedKey, ColumnFamily> oldKeyspaces = SystemKeyspace.getSchema(SystemKeyspace.SCHEMA_KEYSPACES_TABLE, keyspaces);
         Map<DecoratedKey, ColumnFamily> oldColumnFamilies = SystemKeyspace.getSchema(SystemKeyspace.SCHEMA_COLUMNFAMILIES_TABLE, keyspaces);
         Map<DecoratedKey, ColumnFamily> oldTypes = SystemKeyspace.getSchema(SystemKeyspace.SCHEMA_USER_TYPES_TABLE, keyspaces);
-        Map<DecoratedKey, ColumnFamily> oldFunctions = SystemKeyspace.getSchema(SystemKeyspace.SCHEMA_FUNCTIONS_TABLE);
+        Map<DecoratedKey, ColumnFamily> oldFunctions = SystemKeyspace.getSchema(SystemKeyspace.SCHEMA_FUNCTIONS_TABLE, keyspaces);
+        Map<DecoratedKey, ColumnFamily> oldAggregates = SystemKeyspace.getSchema(SystemKeyspace.SCHEMA_AGGREGATES_TABLE, keyspaces);
 
         for (Mutation mutation : mutations)
             mutation.apply();
@@ -130,12 +132,14 @@ public class DefsTables
         Map<DecoratedKey, ColumnFamily> newKeyspaces = SystemKeyspace.getSchema(SystemKeyspace.SCHEMA_KEYSPACES_TABLE, keyspaces);
         Map<DecoratedKey, ColumnFamily> newColumnFamilies = SystemKeyspace.getSchema(SystemKeyspace.SCHEMA_COLUMNFAMILIES_TABLE, keyspaces);
         Map<DecoratedKey, ColumnFamily> newTypes = SystemKeyspace.getSchema(SystemKeyspace.SCHEMA_USER_TYPES_TABLE, keyspaces);
-        Map<DecoratedKey, ColumnFamily> newFunctions = SystemKeyspace.getSchema(SystemKeyspace.SCHEMA_FUNCTIONS_TABLE);
+        Map<DecoratedKey, ColumnFamily> newFunctions = SystemKeyspace.getSchema(SystemKeyspace.SCHEMA_FUNCTIONS_TABLE, keyspaces);
+        Map<DecoratedKey, ColumnFamily> newAggregates = SystemKeyspace.getSchema(SystemKeyspace.SCHEMA_AGGREGATES_TABLE, keyspaces);
 
         Set<String> keyspacesToDrop = mergeKeyspaces(oldKeyspaces, newKeyspaces);
         mergeColumnFamilies(oldColumnFamilies, newColumnFamilies);
         mergeTypes(oldTypes, newTypes);
         mergeFunctions(oldFunctions, newFunctions);
+        mergeAggregates(oldAggregates, newAggregates);
 
         // it is safe to drop a keyspace only when all nested ColumnFamilies where deleted
         for (String keyspaceToDrop : keyspacesToDrop)
@@ -348,6 +352,59 @@ public class DefsTables
             dropFunction(udf);
     }
 
+    // see the comments for mergeKeyspaces()
+    private static void mergeAggregates(Map<DecoratedKey, ColumnFamily> before, Map<DecoratedKey, ColumnFamily> after)
+    {
+        List<UDAggregate> created = new ArrayList<>();
+        List<UDAggregate> altered = new ArrayList<>();
+        List<UDAggregate> dropped = new ArrayList<>();
+
+        MapDifference<DecoratedKey, ColumnFamily> diff = Maps.difference(before, after);
+
+        // New keyspace with functions
+        for (Map.Entry<DecoratedKey, ColumnFamily> entry : diff.entriesOnlyOnRight().entrySet())
+            if (entry.getValue().hasColumns())
+                created.addAll(UDAggregate.fromSchema(new Row(entry.getKey(), entry.getValue())).values());
+
+        for (Map.Entry<DecoratedKey, MapDifference.ValueDifference<ColumnFamily>> entry : diff.entriesDiffering().entrySet())
+        {
+            ColumnFamily pre = entry.getValue().leftValue();
+            ColumnFamily post = entry.getValue().rightValue();
+
+            if (pre.hasColumns() && post.hasColumns())
+            {
+                MapDifference<Composite, UDAggregate> delta =
+                        Maps.difference(UDAggregate.fromSchema(new Row(entry.getKey(), pre)),
+                                        UDAggregate.fromSchema(new Row(entry.getKey(), post)));
+
+                dropped.addAll(delta.entriesOnlyOnLeft().values());
+                created.addAll(delta.entriesOnlyOnRight().values());
+                Iterables.addAll(altered, Iterables.transform(delta.entriesDiffering().values(), new Function<MapDifference.ValueDifference<UDAggregate>, UDAggregate>()
+                {
+                    public UDAggregate apply(MapDifference.ValueDifference<UDAggregate> pair)
+                    {
+                        return pair.rightValue();
+                    }
+                }));
+            }
+            else if (pre.hasColumns())
+            {
+                dropped.addAll(UDAggregate.fromSchema(new Row(entry.getKey(), pre)).values());
+            }
+            else if (post.hasColumns())
+            {
+                created.addAll(UDAggregate.fromSchema(new Row(entry.getKey(), post)).values());
+            }
+        }
+
+        for (UDAggregate udf : created)
+            addAggregate(udf);
+        for (UDAggregate udf : altered)
+            updateAggregate(udf);
+        for (UDAggregate udf : dropped)
+            dropAggregate(udf);
+    }
+
     private static void addKeyspace(KSMetaData ksm)
     {
         assert Schema.instance.getKSMetaData(ksm.name) == null;
@@ -397,6 +454,15 @@ public class DefsTables
         MigrationManager.instance.notifyCreateFunction(udf);
     }
 
+    private static void addAggregate(UDAggregate udf)
+    {
+        logger.info("Loading {}", udf);
+
+        Functions.addFunction(udf);
+
+        MigrationManager.instance.notifyCreateAggregate(udf);
+    }
+
     private static void updateKeyspace(String ksName)
     {
         KSMetaData oldKsm = Schema.instance.getKSMetaData(ksName);
@@ -441,6 +507,15 @@ public class DefsTables
         MigrationManager.instance.notifyUpdateFunction(udf);
     }
 
+    private static void updateAggregate(UDAggregate udf)
+    {
+        logger.info("Updating {}", udf);
+
+        Functions.replaceFunction(udf);
+
+        MigrationManager.instance.notifyUpdateAggregate(udf);
+    }
+
     private static void dropKeyspace(String ksName)
     {
         KSMetaData ksm = Schema.instance.getKSMetaData(ksName);
@@ -520,6 +595,16 @@ public class DefsTables
         MigrationManager.instance.notifyDropFunction(udf);
     }
 
+    private static void dropAggregate(UDAggregate udf)
+    {
+        logger.info("Drop {}", udf);
+
+        // TODO: this is kind of broken as this remove all overloads of the function name
+        Functions.removeFunction(udf.name(), udf.argTypes());
+
+        MigrationManager.instance.notifyDropAggregate(udf);
+    }
+
     private static KSMetaData makeNewKeyspaceDefinition(KSMetaData ksm, CFMetaData toExclude)
     {
         // clone ksm but do not include the new def

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2f35c76/src/java/org/apache/cassandra/db/SystemKeyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SystemKeyspace.java b/src/java/org/apache/cassandra/db/SystemKeyspace.java
index ddf6fa0..3e8b0a2 100644
--- a/src/java/org/apache/cassandra/db/SystemKeyspace.java
+++ b/src/java/org/apache/cassandra/db/SystemKeyspace.java
@@ -76,6 +76,7 @@ public final class SystemKeyspace
     public static final String SCHEMA_TRIGGERS_TABLE = "schema_triggers";
     public static final String SCHEMA_USER_TYPES_TABLE = "schema_usertypes";
     public static final String SCHEMA_FUNCTIONS_TABLE = "schema_functions";
+    public static final String SCHEMA_AGGREGATES_TABLE = "schema_aggregates";
 
     public static final String BUILT_INDEXES_TABLE = "IndexInfo";
     public static final String HINTS_TABLE = "hints";
@@ -95,7 +96,8 @@ public final class SystemKeyspace
                       SCHEMA_COLUMNS_TABLE,
                       SCHEMA_TRIGGERS_TABLE,
                       SCHEMA_USER_TYPES_TABLE,
-                      SCHEMA_FUNCTIONS_TABLE);
+                      SCHEMA_FUNCTIONS_TABLE,
+                      SCHEMA_AGGREGATES_TABLE);
 
     private static int WEEK = (int) TimeUnit.DAYS.toSeconds(7);
 
@@ -177,7 +179,6 @@ public final class SystemKeyspace
                 + "PRIMARY KEY ((keyspace_name), type_name))")
                 .gcGraceSeconds(WEEK);
 
-
     public static final CFMetaData SchemaFunctionsTable =
         compile(SCHEMA_FUNCTIONS_TABLE, "user defined function definitions",
                 "CREATE TABLE %s ("
@@ -193,6 +194,21 @@ public final class SystemKeyspace
                 + "PRIMARY KEY ((keyspace_name), function_name, signature))")
                 .gcGraceSeconds(WEEK);
 
+    public static final CFMetaData SchemaAggregatesTable =
+        compile(SCHEMA_AGGREGATES_TABLE, "user defined aggregate definitions",
+                "CREATE TABLE %s ("
+                + "keyspace_name text,"
+                + "aggregate_name text,"
+                + "signature blob,"
+                + "argument_types list<text>,"
+                + "return_type text,"
+                + "state_func text,"
+                + "state_type text,"
+                + "final_func text,"
+                + "initcond blob,"
+                + "PRIMARY KEY ((keyspace_name), aggregate_name, signature))")
+                .gcGraceSeconds(WEEK);
+
     public static final CFMetaData BuiltIndexesTable =
         compile(BUILT_INDEXES_TABLE, "built column indexes",
                 "CREATE TABLE \"%s\" ("
@@ -331,6 +347,7 @@ public final class SystemKeyspace
                           SchemaTriggersTable,
                           SchemaUserTypesTable,
                           SchemaFunctionsTable,
+                          SchemaAggregatesTable,
                           BuiltIndexesTable,
                           HintsTable,
                           BatchlogTable,

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2f35c76/src/java/org/apache/cassandra/service/IMigrationListener.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/IMigrationListener.java b/src/java/org/apache/cassandra/service/IMigrationListener.java
index bc67e8a..faaffb9 100644
--- a/src/java/org/apache/cassandra/service/IMigrationListener.java
+++ b/src/java/org/apache/cassandra/service/IMigrationListener.java
@@ -23,15 +23,18 @@ public interface IMigrationListener
     public void onCreateColumnFamily(String ksName, String cfName);
     public void onCreateUserType(String ksName, String typeName);
     public void onCreateFunction(String ksName, String functionName);
+    public void onCreateAggregate(String ksName, String aggregateName);
 
     public void onUpdateKeyspace(String ksName);
     public void onUpdateColumnFamily(String ksName, String cfName);
     public void onUpdateUserType(String ksName, String typeName);
     public void onUpdateFunction(String ksName, String functionName);
+    public void onUpdateAggregate(String ksName, String aggregateName);
 
     public void onDropKeyspace(String ksName);
     public void onDropColumnFamily(String ksName, String cfName);
     public void onDropUserType(String ksName, String typeName);
     public void onDropFunction(String ksName, String functionName);
+    public void onDropAggregate(String ksName, String aggregateName);
 
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2f35c76/src/java/org/apache/cassandra/service/MigrationManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/MigrationManager.java b/src/java/org/apache/cassandra/service/MigrationManager.java
index 73bab66..c3fe1fa 100644
--- a/src/java/org/apache/cassandra/service/MigrationManager.java
+++ b/src/java/org/apache/cassandra/service/MigrationManager.java
@@ -39,11 +39,9 @@ import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.KSMetaData;
 import org.apache.cassandra.config.UTMetaData;
 import org.apache.cassandra.config.Schema;
-import org.apache.cassandra.cql3.functions.AggregateFunction;
-import org.apache.cassandra.cql3.functions.ScalarFunction;
+import org.apache.cassandra.cql3.functions.UDAggregate;
 import org.apache.cassandra.cql3.functions.UDFunction;
 import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.db.marshal.UserType;
 import org.apache.cassandra.exceptions.AlreadyExistsException;
 import org.apache.cassandra.exceptions.ConfigurationException;
@@ -196,21 +194,22 @@ public class MigrationManager
             listener.onDropFunction(udf.name().keyspace, udf.name().name);
     }
 
-    private List<String> asString(List<AbstractType<?>> abstractTypes)
+    public void notifyCreateAggregate(UDAggregate udf)
     {
-        List<String> r = new ArrayList<>(abstractTypes.size());
-        for (AbstractType<?> abstractType : abstractTypes)
-            r.add(abstractType.asCQL3Type().toString());
-        return r;
+        for (IMigrationListener listener : listeners)
+            listener.onCreateAggregate(udf.name().keyspace, udf.name().name);
+    }
+
+    public void notifyUpdateAggregate(UDAggregate udf)
+    {
+        for (IMigrationListener listener : listeners)
+            listener.onUpdateAggregate(udf.name().keyspace, udf.name().name);
     }
 
-    private String udType(UDFunction udf)
+    public void notifyDropAggregate(UDAggregate udf)
     {
-        if (udf instanceof ScalarFunction)
-            return "scalar";
-        if (udf instanceof AggregateFunction)
-            return "aggregate";
-        return "";
+        for (IMigrationListener listener : listeners)
+            listener.onDropAggregate(udf.name().keyspace, udf.name().name);
     }
 
     public void notifyUpdateKeyspace(KSMetaData ksm)
@@ -395,14 +394,28 @@ public class MigrationManager
     public static void announceFunctionDrop(UDFunction udf, boolean announceLocally)
     {
         Mutation mutation = udf.toSchemaDrop(FBUtilities.timestampMicros());
-        logger.info(String.format("Drop Function overload '%s' args '%s'", udf.name(), udf.argTypes()));
+        logger.info(String.format("Drop scalar function overload '%s' args '%s'", udf.name(), udf.argTypes()));
+        announce(mutation, announceLocally);
+    }
+
+    public static void announceAggregateDrop(UDAggregate udf, boolean announceLocally)
+    {
+        Mutation mutation = udf.toSchemaDrop(FBUtilities.timestampMicros());
+        logger.info(String.format("Drop aggregate function overload '%s' args '%s'", udf.name(), udf.argTypes()));
         announce(mutation, announceLocally);
     }
 
     public static void announceNewFunction(UDFunction udf, boolean announceLocally)
     {
         Mutation mutation = udf.toSchemaUpdate(FBUtilities.timestampMicros());
-        logger.info(String.format("Create Function '%s'", udf.name()));
+        logger.info(String.format("Create scalar function '%s'", udf.name()));
+        announce(mutation, announceLocally);
+    }
+
+    public static void announceNewAggregate(UDAggregate udf, boolean announceLocally)
+    {
+        Mutation mutation = udf.toSchemaUpdate(FBUtilities.timestampMicros());
+        logger.info(String.format("Create aggregate function '%s'", udf.name()));
         announce(mutation, announceLocally);
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2f35c76/src/java/org/apache/cassandra/transport/Server.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/Server.java b/src/java/org/apache/cassandra/transport/Server.java
index cc071b1..5202a94 100644
--- a/src/java/org/apache/cassandra/transport/Server.java
+++ b/src/java/org/apache/cassandra/transport/Server.java
@@ -414,6 +414,10 @@ public class Server implements CassandraDaemon.Server
         {
         }
 
+        public void onCreateAggregate(String ksName, String aggregateName)
+        {
+        }
+
         public void onUpdateKeyspace(String ksName)
         {
             server.connectionTracker.send(new Event.SchemaChange(Event.SchemaChange.Change.UPDATED, ksName));
@@ -433,6 +437,10 @@ public class Server implements CassandraDaemon.Server
         {
         }
 
+        public void onUpdateAggregate(String ksName, String aggregateName)
+        {
+        }
+
         public void onDropKeyspace(String ksName)
         {
             server.connectionTracker.send(new Event.SchemaChange(Event.SchemaChange.Change.DROPPED, ksName));
@@ -451,5 +459,9 @@ public class Server implements CassandraDaemon.Server
         public void onDropFunction(String ksName, String functionName)
         {
         }
+
+        public void onDropAggregate(String ksName, String aggregateName)
+        {
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2f35c76/test/unit/org/apache/cassandra/cql3/AggregationTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/AggregationTest.java b/test/unit/org/apache/cassandra/cql3/AggregationTest.java
index 859fe65..940e87f 100644
--- a/test/unit/org/apache/cassandra/cql3/AggregationTest.java
+++ b/test/unit/org/apache/cassandra/cql3/AggregationTest.java
@@ -24,8 +24,12 @@ import java.util.Date;
 import java.util.TimeZone;
 
 import org.apache.commons.lang3.time.DateUtils;
+import org.junit.Assert;
 import org.junit.Test;
 
+import org.apache.cassandra.service.ClientState;
+import org.apache.cassandra.transport.messages.ResultMessage;
+
 public class AggregationTest extends CQLTester
 {
     @Test
@@ -94,16 +98,20 @@ public class AggregationTest extends CQLTester
     {
         createTable("CREATE TABLE %s (a int primary key, b timeuuid, c double, d double)");
 
-        execute("CREATE OR REPLACE FUNCTION "+KEYSPACE+".copySign(magnitude double, sign double) RETURNS double LANGUAGE JAVA\n" +
-                "AS 'return Double.valueOf(Math.copySign(magnitude.doubleValue(), sign.doubleValue()));';");
+        String copySign = createFunction(KEYSPACE,
+                                         "double, double",
+                                         "CREATE OR REPLACE FUNCTION %s(magnitude double, sign double) " +
+                                         "RETURNS double " +
+                                         "LANGUAGE JAVA " +
+                                         "AS 'return Double.valueOf(Math.copySign(magnitude.doubleValue(), sign.doubleValue()));';");
 
         assertColumnNames(execute("SELECT max(a), max(unixTimestampOf(b)) FROM %s"), "system.max(a)", "system.max(system.unixtimestampof(b))");
         assertRows(execute("SELECT max(a), max(unixTimestampOf(b)) FROM %s"), row(null, null));
         assertColumnNames(execute("SELECT max(a), unixTimestampOf(max(b)) FROM %s"), "system.max(a)", "system.unixtimestampof(system.max(b))");
         assertRows(execute("SELECT max(a), unixTimestampOf(max(b)) FROM %s"), row(null, null));
 
-        assertColumnNames(execute("SELECT max(copySign(c, d)) FROM %s"), "system.max("+KEYSPACE+".copysign(c, d))");
-        assertRows(execute("SELECT max(copySign(c, d)) FROM %s"), row((Object) null));
+        assertColumnNames(execute("SELECT max(" + copySign + "(c, d)) FROM %s"), "system.max(" + copySign + "(c, d))");
+        assertRows(execute("SELECT max(" + copySign + "(c, d)) FROM %s"), row((Object) null));
 
         execute("INSERT INTO %s (a, b, c, d) VALUES (1, maxTimeuuid('2011-02-03 04:05:00+0000'), -1.2, 2.1)");
         execute("INSERT INTO %s (a, b, c, d) VALUES (2, maxTimeuuid('2011-02-03 04:06:00+0000'), 1.3, -3.4)");
@@ -117,10 +125,624 @@ public class AggregationTest extends CQLTester
         assertRows(execute("SELECT max(a), max(unixTimestampOf(b)) FROM %s"), row(3, date.getTime()));
         assertRows(execute("SELECT max(a), unixTimestampOf(max(b)) FROM %s"), row(3, date.getTime()));
 
-        assertRows(execute("SELECT copySign(max(c), min(c)) FROM %s"), row(-1.4));
-        assertRows(execute("SELECT copySign(c, d) FROM %s"), row(1.2), row(-1.3), row(1.4));
-        assertRows(execute("SELECT max(copySign(c, d)) FROM %s"), row(1.4));
-        assertInvalid("SELECT copySign(c, max(c)) FROM %s");
-        assertInvalid("SELECT copySign(max(c), c) FROM %s");
+        assertRows(execute("SELECT " + copySign + "(max(c), min(c)) FROM %s"), row(-1.4));
+        assertRows(execute("SELECT " + copySign + "(c, d) FROM %s"), row(1.2), row(-1.3), row(1.4));
+        assertRows(execute("SELECT max(" + copySign + "(c, d)) FROM %s"), row(1.4));
+        assertInvalid("SELECT " + copySign + "(c, max(c)) FROM %s");
+        assertInvalid("SELECT " + copySign + "(max(c), c) FROM %s");
+    }
+
+    @Test
+    public void testDropStatements() throws Throwable
+    {
+        String f = createFunction(KEYSPACE,
+                                  "double, double",
+                                  "CREATE OR REPLACE FUNCTION %s(state double, val double) " +
+                                  "RETURNS double " +
+                                  "LANGUAGE javascript " +
+                                  "AS '\"string\";';");
+        createFunctionOverload(f,
+                                  "double, double",
+                                  "CREATE OR REPLACE FUNCTION %s(state int, val int) " +
+                                  "RETURNS int " +
+                                  "LANGUAGE javascript " +
+                                  "AS '\"string\";';");
+
+        // DROP AGGREGATE must not succeed against a scalar
+        assertInvalid("DROP AGGREGATE " + f);
+        assertInvalid("DROP AGGREGATE " + f + "(double, double)");
+
+        String a = createAggregate(KEYSPACE,
+                                   "double",
+                                   "CREATE OR REPLACE AGGREGATE %s(double) " +
+                                   "SFUNC " + shortFunctionName(f) + " " +
+                                   "STYPE double");
+        createAggregateOverload(a,
+                                "int",
+                                "CREATE OR REPLACE AGGREGATE %s(int) " +
+                                "SFUNC " + shortFunctionName(f) + " " +
+                                "STYPE int");
+
+        // DROP FUNCTION must not succeed against an aggregate
+        assertInvalid("DROP FUNCTION " + a);
+        assertInvalid("DROP FUNCTION " + a + "(double)");
+
+        // ambigious
+        assertInvalid("DROP AGGREGATE " + a);
+        assertInvalid("DROP AGGREGATE IF EXISTS " + a);
+
+        execute("DROP AGGREGATE IF EXISTS " + KEYSPACE + ".non_existing");
+        execute("DROP AGGREGATE IF EXISTS " + a + "(int, text)");
+
+        execute("DROP AGGREGATE " + a + "(double)");
+
+        execute("DROP AGGREGATE IF EXISTS " + a + "(double)");
+    }
+
+    @Test
+    public void testDropReferenced() throws Throwable
+    {
+        String f = createFunction(KEYSPACE,
+                                  "double, double",
+                                  "CREATE OR REPLACE FUNCTION %s(state double, val double) " +
+                                  "RETURNS double " +
+                                  "LANGUAGE javascript " +
+                                  "AS '\"string\";';");
+
+        String a = createAggregate(KEYSPACE,
+                                   "double",
+                                   "CREATE OR REPLACE AGGREGATE %s(double) " +
+                                   "SFUNC " + shortFunctionName(f) + " " +
+                                   "STYPE double");
+
+        // DROP FUNCTION must not succeed because the function is still referenced by the aggregate
+        assertInvalid("DROP FUNCTION " + f);
+
+        execute("DROP AGGREGATE " + a + "(double)");
+    }
+
+    @Test
+    public void testJavaAggregateNoInit() throws Throwable
+    {
+        createTable("CREATE TABLE %s (a int primary key, b int)");
+        execute("INSERT INTO %s (a, b) VALUES (1, 1)");
+        execute("INSERT INTO %s (a, b) VALUES (2, 2)");
+        execute("INSERT INTO %s (a, b) VALUES (3, 3)");
+
+        String fState = createFunction(KEYSPACE,
+                                       "int, int",
+                                       "CREATE FUNCTION %s(a int, b int) " +
+                                       "RETURNS int " +
+                                       "LANGUAGE java " +
+                                       "AS 'return Integer.valueOf((a!=null?a.intValue():0) + b.intValue());'");
+
+        String fFinal = createFunction(KEYSPACE,
+                                       "int",
+                                       "CREATE FUNCTION %s(a int) " +
+                                       "RETURNS text " +
+                                       "LANGUAGE java " +
+                                       "AS 'return a.toString();'");
+
+        String a = createAggregate(KEYSPACE,
+                                   "int",
+                                   "CREATE AGGREGATE %s(int) " +
+                                   "SFUNC " + shortFunctionName(fState) + " " +
+                                   "STYPE int " +
+                                   "FINALFUNC " + shortFunctionName(fFinal));
+
+        // 1 + 2 + 3 = 6
+        assertRows(execute("SELECT " + a + "(b) FROM %s"), row("6"));
+
+        execute("DROP AGGREGATE " + a + "(int)");
+
+        assertInvalid("SELECT " + a + "(b) FROM %s");
+    }
+
+    @Test
+    public void testJavaAggregateNullInitcond() throws Throwable
+    {
+        createTable("CREATE TABLE %s (a int primary key, b int)");
+        execute("INSERT INTO %s (a, b) VALUES (1, 1)");
+        execute("INSERT INTO %s (a, b) VALUES (2, 2)");
+        execute("INSERT INTO %s (a, b) VALUES (3, 3)");
+
+        String fState = createFunction(KEYSPACE,
+                                       "int, int",
+                                       "CREATE FUNCTION %s(a int, b int) " +
+                                       "RETURNS int " +
+                                       "LANGUAGE java " +
+                                       "AS 'return Integer.valueOf((a!=null?a.intValue():0) + b.intValue());'");
+
+        String fFinal = createFunction(KEYSPACE,
+                                       "int",
+                                       "CREATE FUNCTION %s(a int) " +
+                                       "RETURNS text " +
+                                       "LANGUAGE java " +
+                                       "AS 'return a.toString();'");
+
+        String a = createAggregate(KEYSPACE,
+                                   "int",
+                                   "CREATE AGGREGATE %s(int) " +
+                                   "SFUNC " + shortFunctionName(fState) + " " +
+                                   "STYPE int " +
+                                   "FINALFUNC " + shortFunctionName(fFinal) + " " +
+                                   "INITCOND null");
+
+        // 1 + 2 + 3 = 6
+        assertRows(execute("SELECT " + a + "(b) FROM %s"), row("6"));
+
+        execute("DROP AGGREGATE " + a + "(int)");
+
+        assertInvalid("SELECT " + a + "(b) FROM %s");
+    }
+
+    @Test
+    public void testJavaAggregateInvalidInitcond() throws Throwable
+    {
+        String fState = createFunction(KEYSPACE,
+                                       "int, int",
+                                       "CREATE FUNCTION %s(a int, b int) " +
+                                       "RETURNS int " +
+                                       "LANGUAGE java " +
+                                       "AS 'return Integer.valueOf((a!=null?a.intValue():0) + b.intValue());'");
+
+        String fFinal = createFunction(KEYSPACE,
+                                       "int",
+                                       "CREATE FUNCTION %s(a int) " +
+                                       "RETURNS text " +
+                                       "LANGUAGE java " +
+                                       "AS 'return a.toString();'");
+
+        assertInvalid("CREATE AGGREGATE " + KEYSPACE + ".aggrInvalid(int)" +
+                      "SFUNC " + shortFunctionName(fState) + " " +
+                      "STYPE int " +
+                      "FINALFUNC " + shortFunctionName(fFinal) + " " +
+                      "INITCOND 'foobar'");
+    }
+
+    @Test
+    public void testJavaAggregateIncompatibleTypes() throws Throwable
+    {
+        String fState = createFunction(KEYSPACE,
+                                       "int, int",
+                                       "CREATE FUNCTION %s(a int, b int) " +
+                                       "RETURNS int " +
+                                       "LANGUAGE java " +
+                                       "AS 'return Integer.valueOf((a!=null?a.intValue():0) + b.intValue());'");
+
+        String fFinal = createFunction(KEYSPACE,
+                                       "int",
+                                       "CREATE FUNCTION %s(a int) " +
+                                       "RETURNS text " +
+                                       "LANGUAGE java " +
+                                       "AS 'return a.toString();'");
+
+        String fState2 = createFunction(KEYSPACE,
+                                        "int, int",
+                                        "CREATE FUNCTION %s(a double, b double) " +
+                                        "RETURNS double " +
+                                        "LANGUAGE java " +
+                                        "AS 'return Double.valueOf((a!=null?a.doubleValue():0d) + b.doubleValue());'");
+
+        String fFinal2 = createFunction(KEYSPACE,
+                                        "int",
+                                        "CREATE FUNCTION %s(a double) " +
+                                        "RETURNS text " +
+                                        "LANGUAGE java " +
+                                        "AS 'return a.toString();'");
+
+        assertInvalid("CREATE AGGREGATE " + KEYSPACE + ".aggrInvalid(double)" +
+                      "SFUNC " + shortFunctionName(fState) + " " +
+                      "STYPE double " +
+                      "FINALFUNC " + shortFunctionName(fFinal));
+        assertInvalid("CREATE AGGREGATE " + KEYSPACE + ".aggrInvalid(int)" +
+                      "SFUNC " + shortFunctionName(fState) + " " +
+                      "STYPE double " +
+                      "FINALFUNC " + shortFunctionName(fFinal));
+        assertInvalid("CREATE AGGREGATE " + KEYSPACE + ".aggrInvalid(double)" +
+                      "SFUNC " + shortFunctionName(fState) + " " +
+                      "STYPE int " +
+                      "FINALFUNC " + shortFunctionName(fFinal));
+        assertInvalid("CREATE AGGREGATE " + KEYSPACE + ".aggrInvalid(double)" +
+                      "SFUNC " + shortFunctionName(fState) + " " +
+                      "STYPE int");
+        assertInvalid("CREATE AGGREGATE " + KEYSPACE + ".aggrInvalid(int)" +
+                      "SFUNC " + shortFunctionName(fState) + " " +
+                      "STYPE double");
+
+        assertInvalid("CREATE AGGREGATE " + KEYSPACE + ".aggrInvalid(double)" +
+                      "SFUNC " + shortFunctionName(fState2) + " " +
+                      "STYPE double " +
+                      "FINALFUNC " + shortFunctionName(fFinal));
+
+        assertInvalid("CREATE AGGREGATE " + KEYSPACE + ".aggrInvalid(double)" +
+                      "SFUNC " + shortFunctionName(fState) + " " +
+                      "STYPE double " +
+                      "FINALFUNC " + shortFunctionName(fFinal2));
+    }
+
+    @Test
+    public void testJavaAggregateNonExistingFuncs() throws Throwable
+    {
+        String fState = createFunction(KEYSPACE,
+                                       "int, int",
+                                       "CREATE FUNCTION %s(a int, b int) " +
+                                       "RETURNS int " +
+                                       "LANGUAGE java " +
+                                       "AS 'return Integer.valueOf((a!=null?a.intValue():0) + b.intValue());'");
+
+        String fFinal = createFunction(KEYSPACE,
+                                       "int",
+                                       "CREATE FUNCTION %s(a int) " +
+                                       "RETURNS text " +
+                                       "LANGUAGE java " +
+                                       "AS 'return a.toString();'");
+
+        assertInvalid("CREATE AGGREGATE " + KEYSPACE + ".aggrInvalid(int)" +
+                      "SFUNC " + shortFunctionName(fState) + "_not_there " +
+                      "STYPE int " +
+                      "FINALFUNC " + shortFunctionName(fFinal));
+
+        assertInvalid("CREATE AGGREGATE " + KEYSPACE + ".aggrInvalid(int)" +
+                      "SFUNC " + shortFunctionName(fState) + " " +
+                      "STYPE int " +
+                      "FINALFUNC " + shortFunctionName(fFinal) + "_not_there");
+
+        execute("CREATE AGGREGATE " + KEYSPACE + ".aggrInvalid(int)" +
+                "SFUNC " + shortFunctionName(fState) + " " +
+                "STYPE int " +
+                "FINALFUNC " + shortFunctionName(fFinal));
+        execute("DROP AGGREGATE " + KEYSPACE + ".aggrInvalid(int)");
+    }
+
+    @Test
+    public void testJavaAggregateFailingFuncs() throws Throwable
+    {
+        createTable("CREATE TABLE %s (a int primary key, b int)");
+        execute("INSERT INTO %s (a, b) VALUES (1, 1)");
+        execute("INSERT INTO %s (a, b) VALUES (2, 2)");
+        execute("INSERT INTO %s (a, b) VALUES (3, 3)");
+
+        String fState = createFunction(KEYSPACE,
+                                       "int, int",
+                                       "CREATE FUNCTION %s(a int, b int) " +
+                                       "RETURNS int " +
+                                       "LANGUAGE java " +
+                                       "AS 'throw new RuntimeException();'");
+
+        String fStateOK = createFunction(KEYSPACE,
+                                       "int, int",
+                                       "CREATE FUNCTION %s(a int, b int) " +
+                                       "RETURNS int " +
+                                       "LANGUAGE java " +
+                                       "AS 'return Integer.valueOf(42);'");
+
+        String fFinal = createFunction(KEYSPACE,
+                                       "int",
+                                       "CREATE FUNCTION %s(a int) " +
+                                       "RETURNS text " +
+                                       "LANGUAGE java " +
+                                       "AS 'throw new RuntimeException();'");
+
+        String fFinalOK = createFunction(KEYSPACE,
+                                       "int",
+                                       "CREATE FUNCTION %s(a int) " +
+                                       "RETURNS text " +
+                                       "LANGUAGE java " +
+                                       "AS 'return \"foobar\";'");
+
+        String a0 = createAggregate(KEYSPACE,
+                                    "int",
+                                    "CREATE AGGREGATE %s(int) " +
+                                    "SFUNC " + shortFunctionName(fState) + " " +
+                                    "STYPE int " +
+                                    "FINALFUNC " + shortFunctionName(fFinal) + " " +
+                                    "INITCOND null");
+        String a1 = createAggregate(KEYSPACE,
+                                    "int",
+                                    "CREATE AGGREGATE %s(int) " +
+                                    "SFUNC " + shortFunctionName(fStateOK) + " " +
+                                    "STYPE int " +
+                                    "FINALFUNC " + shortFunctionName(fFinal) + " " +
+                                    "INITCOND null");
+        String a2 = createAggregate(KEYSPACE,
+                                    "int",
+                                    "CREATE AGGREGATE %s(int) " +
+                                    "SFUNC " + shortFunctionName(fStateOK) + " " +
+                                    "STYPE int " +
+                                    "FINALFUNC " + shortFunctionName(fFinalOK) + " " +
+                                    "INITCOND null");
+
+        assertInvalid("SELECT " + a0 + "(b) FROM %s");
+        assertInvalid("SELECT " + a1 + "(b) FROM %s");
+        assertRows(execute("SELECT " + a2 + "(b) FROM %s"), row("foobar"));
+    }
+
+    @Test
+    public void testJavaAggregateWithoutStateOrFinal() throws Throwable
+    {
+        assertInvalid("CREATE AGGREGATE " + KEYSPACE + ".jSumFooNE1(int) " +
+                      "SFUNC jSumFooNEstate " +
+                      "STYPE int");
+
+        String f = createFunction(KEYSPACE,
+                                  "int, int",
+                                  "CREATE FUNCTION %s(a int, b int) " +
+                                  "RETURNS int " +
+                                  "LANGUAGE java " +
+                                  "AS 'return Integer.valueOf((a!=null?a.intValue():0) + b.intValue());'");
+
+        assertInvalid("CREATE AGGREGATE " + KEYSPACE + ".jSumFooNE2(int) " +
+                      "SFUNC " + shortFunctionName(f) + " " +
+                      "STYPE int " +
+                      "FINALFUNC jSumFooNEfinal");
+
+        execute("DROP FUNCTION " + f + "(int, int)");
+    }
+
+    @Test
+    public void testJavaAggregate() throws Throwable
+    {
+        createTable("CREATE TABLE %s (a int primary key, b int)");
+        execute("INSERT INTO %s (a, b) VALUES (1, 1)");
+        execute("INSERT INTO %s (a, b) VALUES (2, 2)");
+        execute("INSERT INTO %s (a, b) VALUES (3, 3)");
+
+        String fState = createFunction(KEYSPACE,
+                                       "int, int",
+                                       "CREATE FUNCTION %s(a int, b int) " +
+                                       "RETURNS int " +
+                                       "LANGUAGE java " +
+                                       "AS 'return Integer.valueOf((a!=null?a.intValue():0) + b.intValue());'");
+
+        String fFinal = createFunction(KEYSPACE,
+                                       "int",
+                                       "CREATE FUNCTION %s(a int) " +
+                                       "RETURNS text " +
+                                       "LANGUAGE java " +
+                                       "AS 'return a.toString();'");
+
+        String a = createAggregate(KEYSPACE,
+                                   "int",
+                                   "CREATE AGGREGATE %s(int) " +
+                                   "SFUNC " + shortFunctionName(fState) + " " +
+                                   "STYPE int " +
+                                   "FINALFUNC " + shortFunctionName(fFinal) + " " +
+                                   "INITCOND 42");
+
+        // 42 + 1 + 2 + 3 = 48
+        assertRows(execute("SELECT " + a + "(b) FROM %s"), row("48"));
+
+        execute("DROP AGGREGATE " + a + "(int)");
+
+        execute("DROP FUNCTION " + fFinal + "(int)");
+        execute("DROP FUNCTION " + fState + "(int, int)");
+
+        assertInvalid("SELECT " + a + "(b) FROM %s");
+    }
+
+    @Test
+    public void testJavaAggregateSimple() throws Throwable
+    {
+        createTable("CREATE TABLE %s (a int primary key, b int)");
+        execute("INSERT INTO %s (a, b) VALUES (1, 1)");
+        execute("INSERT INTO %s (a, b) VALUES (2, 2)");
+        execute("INSERT INTO %s (a, b) VALUES (3, 3)");
+
+        String fState = createFunction(KEYSPACE,
+                                       "int, int",
+                                       "CREATE FUNCTION %s(a int, b int) " +
+                                       "RETURNS int " +
+                                       "LANGUAGE java " +
+                                       "AS 'return Integer.valueOf((a!=null?a.intValue():0) + b.intValue());'");
+
+        String a = createAggregate(KEYSPACE,
+                                   "int, int",
+                                   "CREATE AGGREGATE %s(int) " +
+                                   "SFUNC " + shortFunctionName(fState) + " " +
+                                   "STYPE int");
+
+        // 1 + 2 + 3 = 6
+        assertRows(execute("SELECT " + a + "(b) FROM %s"), row(6));
+
+        execute("DROP AGGREGATE " + a + "(int)");
+
+        execute("DROP FUNCTION " + fState + "(int, int)");
+
+        assertInvalid("SELECT " + a + "(b) FROM %s");
+    }
+
+    @Test
+    public void testJavaAggregateComplex() throws Throwable
+    {
+        createTable("CREATE TABLE %s (a int primary key, b int)");
+        execute("INSERT INTO %s (a, b) VALUES (1, 1)");
+        execute("INSERT INTO %s (a, b) VALUES (2, 2)");
+        execute("INSERT INTO %s (a, b) VALUES (3, 3)");
+
+        // build an average aggregation function using
+        // tuple<bigint,int> as state
+        // double as finaltype
+
+        String fState = createFunction(KEYSPACE,
+                                       "frozen<tuple<bigint, int>>, int",
+                                       "CREATE FUNCTION %s(a frozen<tuple<bigint, int>>, b int) " +
+                                       "RETURNS frozen<tuple<bigint, int>> " +
+                                       "LANGUAGE java " +
+                                       "AS '" +
+                                       "a.setLong(0, a.getLong(0) + b.intValue());" +
+                                       "a.setInt(1, a.getInt(1) + 1);" +
+                                       "return a;" +
+                                       "'");
+
+        String fFinal = createFunction(KEYSPACE,
+                                       "frozen<tuple<bigint, int>>",
+                                       "CREATE FUNCTION %s(a frozen<tuple<bigint, int>>) " +
+                                       "RETURNS double " +
+                                       "LANGUAGE java " +
+                                       "AS '" +
+                                       "double r = a.getLong(0);" +
+                                       "r /= a.getInt(1);" +
+                                       "return Double.valueOf(r);" +
+                                       "'");
+
+        String a = createAggregate(KEYSPACE,
+                                   "int",
+                                   "CREATE AGGREGATE %s(int) " +
+                                   "SFUNC " + shortFunctionName(fState) + " " +
+                                   "STYPE frozen<tuple<bigint, int>> "+
+                                   "FINALFUNC " + shortFunctionName(fFinal) + " " +
+                                   "INITCOND (0, 0)");
+
+        // 1 + 2 + 3 = 6 / 3 = 2
+        assertRows(execute("SELECT " + a + "(b) FROM %s"), row(2d));
+
+    }
+
+    @Test
+    public void testJavascriptAggregate() throws Throwable
+    {
+        createTable("CREATE TABLE %s (a int primary key, b int)");
+        execute("INSERT INTO %s (a, b) VALUES (1, 1)");
+        execute("INSERT INTO %s (a, b) VALUES (2, 2)");
+        execute("INSERT INTO %s (a, b) VALUES (3, 3)");
+
+        String fState = createFunction(KEYSPACE,
+                                       "int, int",
+                                       "CREATE FUNCTION %s(a int, b int) " +
+                                       "RETURNS int " +
+                                       "LANGUAGE javascript " +
+                                       "AS 'a + b;'");
+
+        String fFinal = createFunction(KEYSPACE,
+                                       "int",
+                                       "CREATE FUNCTION %s(a int) " +
+                                       "RETURNS text " +
+                                       "LANGUAGE javascript " +
+                                       "AS '\"\"+a'");
+
+        String a = createFunction(KEYSPACE,
+                                  "int",
+                                  "CREATE AGGREGATE %s(int) " +
+                                  "SFUNC " + shortFunctionName(fState) + " " +
+                                  "STYPE int " +
+                                  "FINALFUNC " + shortFunctionName(fFinal) + " " +
+                                  "INITCOND 42");
+
+        // 42 + 1 + 2 + 3 = 48
+        assertRows(execute("SELECT " + a + "(b) FROM %s"), row("48"));
+
+        execute("DROP AGGREGATE " + a + "(int)");
+
+        execute("DROP FUNCTION " + fFinal + "(int)");
+        execute("DROP FUNCTION " + fState + "(int, int)");
+
+        assertInvalid("SELECT " + a + "(b) FROM %s");
+    }
+
+    @Test
+    public void testJavascriptAggregateSimple() throws Throwable
+    {
+        createTable("CREATE TABLE %s (a int primary key, b int)");
+        execute("INSERT INTO %s (a, b) VALUES (1, 1)");
+        execute("INSERT INTO %s (a, b) VALUES (2, 2)");
+        execute("INSERT INTO %s (a, b) VALUES (3, 3)");
+
+        String fState = createFunction(KEYSPACE,
+                                       "int, int",
+                                       "CREATE FUNCTION %s(a int, b int) " +
+                                       "RETURNS int " +
+                                       "LANGUAGE javascript " +
+                                       "AS 'a + b;'");
+
+        String a = createAggregate(KEYSPACE,
+                                   "int, int",
+                                   "CREATE AGGREGATE %s(int) " +
+                                   "SFUNC " + shortFunctionName(fState) + " " +
+                                   "STYPE int ");
+
+        // 1 + 2 + 3 = 6
+        assertRows(execute("SELECT " + a + "(b) FROM %s"), row(6));
+
+        execute("DROP AGGREGATE " + a + "(int)");
+
+        execute("DROP FUNCTION " + fState + "(int, int)");
+
+        assertInvalid("SELECT " + a + "(b) FROM %s");
+    }
+
+    @Test
+    public void testFunctionDropPreparedStatement() throws Throwable
+    {
+        String otherKS = "cqltest_foo";
+
+        execute("CREATE KEYSPACE IF NOT EXISTS " + otherKS + " WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 3};");
+        try
+        {
+            execute("CREATE TABLE " + otherKS + ".jsdp (a int primary key, b int)");
+
+            String fState = createFunction(otherKS,
+                                           "int, int",
+                                           "CREATE FUNCTION %s(a int, b int) " +
+                                           "RETURNS int " +
+                                           "LANGUAGE javascript " +
+                                           "AS 'a + b;'");
+
+            String a = createAggregate(otherKS,
+                                       "int",
+                                       "CREATE AGGREGATE %s(int) " +
+                                       "SFUNC " + shortFunctionName(fState) + " " +
+                                       "STYPE int");
+
+            ResultMessage.Prepared prepared = QueryProcessor.prepare("SELECT " + a + "(b) FROM " + otherKS + ".jsdp", ClientState.forInternalCalls(), false);
+            Assert.assertNotNull(QueryProcessor.instance.getPrepared(prepared.statementId));
+
+            execute("DROP AGGREGATE " + a + "(int)");
+            Assert.assertNull(QueryProcessor.instance.getPrepared(prepared.statementId));
+
+            //
+
+            execute("CREATE AGGREGATE " + a + "(int) " +
+                    "SFUNC " + shortFunctionName(fState) + " " +
+                    "STYPE int");
+
+            prepared = QueryProcessor.prepare("SELECT " + a + "(b) FROM " + otherKS + ".jsdp", ClientState.forInternalCalls(), false);
+            Assert.assertNotNull(QueryProcessor.instance.getPrepared(prepared.statementId));
+
+            execute("DROP KEYSPACE " + otherKS + ";");
+
+            Assert.assertNull(QueryProcessor.instance.getPrepared(prepared.statementId));
+        }
+        finally
+        {
+            execute("DROP KEYSPACE IF EXISTS " + otherKS + ";");
+        }
+    }
+
+    @Test
+    public void testAggregatesReferencedInAggregates() throws Throwable
+    {
+
+        String fState = createFunction(KEYSPACE,
+                                       "int, int",
+                                       "CREATE FUNCTION %s(a int, b int) " +
+                                       "RETURNS int " +
+                                       "LANGUAGE javascript " +
+                                       "AS 'a + b;'");
+
+        String a = createAggregate(KEYSPACE,
+                                   "int, int",
+                                   "CREATE AGGREGATE %s(int) " +
+                                   "SFUNC " + shortFunctionName(fState) + " " +
+                                   "STYPE int ");
+
+        assertInvalid("CREATE AGGREGATE " + KEYSPACE + ".aggInv(int) " +
+                      "SFUNC " + shortFunctionName(a) + " " +
+                      "STYPE int ");
+
+        assertInvalid("CREATE AGGREGATE " + KEYSPACE + ".aggInv(int) " +
+                      "SFUNC " + shortFunctionName(fState) + " " +
+                      "STYPE int " +
+                      "FINALFUNC " + shortFunctionName(a));
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2f35c76/test/unit/org/apache/cassandra/cql3/CQLTester.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/CQLTester.java b/test/unit/org/apache/cassandra/cql3/CQLTester.java
index 2404237..883da3a 100644
--- a/test/unit/org/apache/cassandra/cql3/CQLTester.java
+++ b/test/unit/org/apache/cassandra/cql3/CQLTester.java
@@ -46,6 +46,7 @@ import org.apache.cassandra.SchemaLoader;
 import org.apache.cassandra.concurrent.ScheduledExecutors;
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.cql3.functions.FunctionName;
 import org.apache.cassandra.db.Directories;
 import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.db.SystemKeyspace;
@@ -243,6 +244,19 @@ public abstract class CQLTester
         return USE_PREPARED_VALUES;
     }
 
+    public static FunctionName parseFunctionName(String qualifiedName)
+    {
+        int i = qualifiedName.indexOf('.');
+        return i == -1
+               ? FunctionName.nativeFunction(qualifiedName)
+               : new FunctionName(qualifiedName.substring(0, i).trim(), qualifiedName.substring(i+1).trim());
+    }
+
+    public static String shortFunctionName(String f)
+    {
+        return parseFunctionName(f).name;
+    }
+
     private static void removeAllSSTables(String ks, String table)
     {
         // clean up data directory which are stored as data directory/keyspace/data files

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2f35c76/test/unit/org/apache/cassandra/cql3/UFTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/UFTest.java b/test/unit/org/apache/cassandra/cql3/UFTest.java
index 824719b..fa28126 100644
--- a/test/unit/org/apache/cassandra/cql3/UFTest.java
+++ b/test/unit/org/apache/cassandra/cql3/UFTest.java
@@ -35,14 +35,6 @@ import org.apache.cassandra.transport.messages.ResultMessage;
 public class UFTest extends CQLTester
 {
 
-    public static FunctionName parseFunctionName(String qualifiedName)
-    {
-        int i = qualifiedName.indexOf('.');
-        return i == -1
-               ? FunctionName.nativeFunction(qualifiedName)
-               : new FunctionName(qualifiedName.substring(0, i).trim(), qualifiedName.substring(i+1).trim());
-    }
-
     @Test
     public void testFunctionDropOnKeyspaceDrop() throws Throwable
     {


[2/2] cassandra git commit: Support for user-defined aggregate functions

Posted by ty...@apache.org.
Support for user-defined aggregate functions

Patch by Robert Stupp; reviewed by Tyler Hobbs for CASSANDRA-8053


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/e2f35c76
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/e2f35c76
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/e2f35c76

Branch: refs/heads/trunk
Commit: e2f35c767e479da9761628578299b54872d7eea9
Parents: 857de55
Author: Robert Stupp <sn...@snazy.de>
Authored: Thu Dec 11 11:46:28 2014 -0600
Committer: Tyler Hobbs <ty...@datastax.com>
Committed: Thu Dec 11 11:46:28 2014 -0600

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 pylib/cqlshlib/cql3handling.py                  |  28 +-
 src/java/org/apache/cassandra/auth/Auth.java    |  12 +
 .../org/apache/cassandra/config/KSMetaData.java |   1 +
 src/java/org/apache/cassandra/cql3/Cql.g        |  61 ++
 .../apache/cassandra/cql3/QueryProcessor.java   |  15 +
 .../cql3/functions/AbstractFunction.java        |  10 +
 .../cassandra/cql3/functions/AggregateFcts.java |  64 +-
 .../cql3/functions/AggregateFunction.java       |  10 +-
 .../cassandra/cql3/functions/Function.java      |   4 +
 .../cassandra/cql3/functions/FunctionCall.java  |   2 +-
 .../cassandra/cql3/functions/Functions.java     |  24 +-
 .../cql3/functions/JavaSourceUDFFactory.java    |   6 +-
 .../cassandra/cql3/functions/UDAggregate.java   | 280 ++++++++
 .../cassandra/cql3/functions/UDFunction.java    | 193 ++----
 .../cassandra/cql3/functions/UDHelper.java      | 123 ++++
 .../selection/AbstractFunctionSelector.java     |   4 +-
 .../selection/AggregateFunctionSelector.java    |   6 +-
 .../cassandra/cql3/selection/FieldSelector.java |   2 +-
 .../cassandra/cql3/selection/Selection.java     |   8 +-
 .../cassandra/cql3/selection/Selector.java      |   2 +-
 .../cql3/selection/SelectorFactories.java       |   2 +-
 .../statements/CreateAggregateStatement.java    | 194 ++++++
 .../statements/CreateFunctionStatement.java     |  11 +-
 .../cql3/statements/DropAggregateStatement.java | 136 ++++
 .../cql3/statements/DropFunctionStatement.java  |  17 +-
 .../org/apache/cassandra/db/DefsTables.java     |  89 ++-
 .../org/apache/cassandra/db/SystemKeyspace.java |  21 +-
 .../cassandra/service/IMigrationListener.java   |   3 +
 .../cassandra/service/MigrationManager.java     |  45 +-
 .../org/apache/cassandra/transport/Server.java  |  12 +
 .../apache/cassandra/cql3/AggregationTest.java  | 640 ++++++++++++++++++-
 .../org/apache/cassandra/cql3/CQLTester.java    |  14 +
 test/unit/org/apache/cassandra/cql3/UFTest.java |   8 -
 34 files changed, 1795 insertions(+), 253 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2f35c76/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 34e740e..6ff61e7 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.0
+ * Support for user-defined aggregation functions (CASSANDRA-8053)
  * Fix NPE in SelectStatement with empty IN values (CASSANDRA-8419)
  * Refactor SelectStatement, return IN results in natural order instead
    of IN value list order (CASSANDRA-7981)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2f35c76/pylib/cqlshlib/cql3handling.py
----------------------------------------------------------------------
diff --git a/pylib/cqlshlib/cql3handling.py b/pylib/cqlshlib/cql3handling.py
index f8a3069..84af796 100644
--- a/pylib/cqlshlib/cql3handling.py
+++ b/pylib/cqlshlib/cql3handling.py
@@ -41,7 +41,7 @@ class Cql3ParsingRuleSet(CqlParsingRuleSet):
         'select', 'from', 'where', 'and', 'key', 'insert', 'update', 'with',
         'limit', 'using', 'use', 'set',
         'begin', 'apply', 'batch', 'truncate', 'delete', 'in', 'create',
-        'function', 'keyspace', 'schema', 'columnfamily', 'table', 'index', 'on', 'drop',
+        'function', 'aggregate', 'keyspace', 'schema', 'columnfamily', 'table', 'index', 'on', 'drop',
         'primary', 'into', 'values', 'timestamp', 'ttl', 'alter', 'add', 'type',
         'compact', 'storage', 'order', 'by', 'asc', 'desc', 'clustering',
         'token', 'writetime', 'map', 'list', 'to', 'custom', 'if', 'not'
@@ -209,7 +209,10 @@ JUNK ::= /([ \t\r\f\v]+|(--|[/][/])[^\n\r]*([\n\r]|$)|[/][*].*?[*][/])/ ;
 <mapLiteral> ::= "{" <term> ":" <term> ( "," <term> ":" <term> )* "}"
                ;
 
-<functionName> ::= <identifier> ( "." <identifier> )?
+<userFunctionName> ::= <identifier> ( "." <identifier> )?
+               ;
+
+<functionName> ::= <userFunctionName>
                  | "TOKEN"
                  ;
 
@@ -233,12 +236,14 @@ JUNK ::= /([ \t\r\f\v]+|(--|[/][/])[^\n\r]*([\n\r]|$)|[/][*].*?[*][/])/ ;
                           | <createIndexStatement>
                           | <createUserTypeStatement>
                           | <createFunctionStatement>
+                          | <createAggregateStatement>
                           | <createTriggerStatement>
                           | <dropKeyspaceStatement>
                           | <dropColumnFamilyStatement>
                           | <dropIndexStatement>
                           | <dropUserTypeStatement>
                           | <dropFunctionStatement>
+                          | <dropAggregateStatement>
                           | <dropTriggerStatement>
                           | <alterTableStatement>
                           | <alterKeyspaceStatement>
@@ -1010,7 +1015,7 @@ syntax_rules += r'''
 <createFunctionStatement> ::= "CREATE" ("OR" "REPLACE")? "FUNCTION"
                             ("IF" "NOT" "EXISTS")?
                             ("NON"? "DETERMINISTIC")?
-                            <functionName>
+                            <userFunctionName>
                             ( "(" ( newcol=<cident> <storageType>
                               ( "," [newcolname]=<cident> <storageType> )* )?
                             ")" )?
@@ -1018,6 +1023,18 @@ syntax_rules += r'''
                             "LANGUAGE" <cident> "AS" <stringLiteral>
                          ;
 
+<createAggregateStatement> ::= "CREATE" ("OR" "REPLACE")? "AGGREGATE"
+                            ("IF" "NOT" "EXISTS")?
+                            <userFunctionName>
+                            ( "("
+                                 ( <storageType> ( "," <storageType> )* )?
+                              ")" )?
+                            "SFUNC" <identifier>
+                            "STYPE" <storageType>
+                            ( "FINALFUNC" <identifier> )?
+                            ( "INITCOND" <term> )?
+                         ;
+
 '''
 
 explain_completion('createIndexStatement', 'indexname', '<new_index_name>')
@@ -1049,7 +1066,10 @@ syntax_rules += r'''
 <dropUserTypeStatement> ::= "DROP" "TYPE" ut=<userTypeName>
                           ;
 
-<dropFunctionStatement> ::= "DROP" "FUNCTION" ( "IF" "EXISTS" )? <functionName>
+<dropFunctionStatement> ::= "DROP" "FUNCTION" ( "IF" "EXISTS" )? <userFunctionName>
+                          ;
+
+<dropAggregateStatement> ::= "DROP" "AGGREGATE" ( "IF" "EXISTS" )? <userFunctionName>
                           ;
 
 '''

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2f35c76/src/java/org/apache/cassandra/auth/Auth.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/auth/Auth.java b/src/java/org/apache/cassandra/auth/Auth.java
index 041ce2b..cdcfa0e 100644
--- a/src/java/org/apache/cassandra/auth/Auth.java
+++ b/src/java/org/apache/cassandra/auth/Auth.java
@@ -340,6 +340,10 @@ public class Auth implements AuthMBean
         {
         }
 
+        public void onDropAggregate(String ksName, String aggregateName)
+        {
+        }
+
         public void onCreateKeyspace(String ksName)
         {
         }
@@ -356,6 +360,10 @@ public class Auth implements AuthMBean
         {
         }
 
+        public void onCreateAggregate(String ksName, String aggregateName)
+        {
+        }
+
         public void onUpdateKeyspace(String ksName)
         {
         }
@@ -371,5 +379,9 @@ public class Auth implements AuthMBean
         public void onUpdateFunction(String ksName, String functionName)
         {
         }
+
+        public void onUpdateAggregate(String ksName, String aggregateName)
+        {
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2f35c76/src/java/org/apache/cassandra/config/KSMetaData.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/KSMetaData.java b/src/java/org/apache/cassandra/config/KSMetaData.java
index 494f98b..e5576ad 100644
--- a/src/java/org/apache/cassandra/config/KSMetaData.java
+++ b/src/java/org/apache/cassandra/config/KSMetaData.java
@@ -186,6 +186,7 @@ public final class KSMetaData
         mutation.delete(SystemKeyspace.SCHEMA_TRIGGERS_TABLE, timestamp);
         mutation.delete(SystemKeyspace.SCHEMA_USER_TYPES_TABLE, timestamp);
         mutation.delete(SystemKeyspace.SCHEMA_FUNCTIONS_TABLE, timestamp);
+        mutation.delete(SystemKeyspace.SCHEMA_AGGREGATES_TABLE, timestamp);
         mutation.delete(SystemKeyspace.BUILT_INDEXES_TABLE, timestamp);
 
         return mutation;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2f35c76/src/java/org/apache/cassandra/cql3/Cql.g
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/Cql.g b/src/java/org/apache/cassandra/cql3/Cql.g
index 1997544..ed133e7 100644
--- a/src/java/org/apache/cassandra/cql3/Cql.g
+++ b/src/java/org/apache/cassandra/cql3/Cql.g
@@ -245,6 +245,8 @@ cqlStatement returns [ParsedStatement stmt]
     | st27=dropTypeStatement           { $stmt = st27; }
     | st28=createFunctionStatement     { $stmt = st28; }
     | st29=dropFunctionStatement       { $stmt = st29; }
+    | st30=createAggregateStatement    { $stmt = st30; }
+    | st31=dropAggregateStatement      { $stmt = st31; }
     ;
 
 /*
@@ -488,6 +490,55 @@ batchStatementObjective returns [ModificationStatement.Parsed statement]
     | d=deleteStatement  { $statement = d; }
     ;
 
+createAggregateStatement returns [CreateAggregateStatement expr]
+    @init {
+        boolean orReplace = false;
+        boolean ifNotExists = false;
+
+        List<CQL3Type.Raw> argsTypes = new ArrayList<>();
+    }
+    : K_CREATE (K_OR K_REPLACE { orReplace = true; })?
+      K_AGGREGATE
+      (K_IF K_NOT K_EXISTS { ifNotExists = true; })?
+      fn=functionName
+      '('
+        (
+          v=comparatorType { argsTypes.add(v); }
+          ( ',' v=comparatorType { argsTypes.add(v); } )*
+        )?
+      ')'
+      K_SFUNC sfunc = allowedFunctionName
+      K_STYPE stype = comparatorType
+      (
+        K_FINALFUNC ffunc = allowedFunctionName
+      )?
+      (
+        K_INITCOND ival = term
+      )?
+      { $expr = new CreateAggregateStatement(fn, argsTypes, sfunc, stype, ffunc, ival, orReplace, ifNotExists); }
+    ;
+
+dropAggregateStatement returns [DropAggregateStatement expr]
+    @init {
+        boolean ifExists = false;
+        List<CQL3Type.Raw> argsTypes = new ArrayList<>();
+        boolean argsPresent = false;
+    }
+    : K_DROP K_AGGREGATE
+      (K_IF K_EXISTS { ifExists = true; } )?
+      fn=functionName
+      (
+        '('
+          (
+            v=comparatorType { argsTypes.add(v); }
+            ( ',' v=comparatorType { argsTypes.add(v); } )*
+          )?
+        ')'
+        { argsPresent = true; }
+      )?
+      { $expr = new DropAggregateStatement(fn, argsTypes, argsPresent, ifExists); }
+    ;
+
 createFunctionStatement returns [CreateFunctionStatement expr]
     @init {
         boolean orReplace = false;
@@ -1271,6 +1322,11 @@ basic_unreserved_keyword returns [String str]
         | K_CONTAINS
         | K_STATIC
         | K_FUNCTION
+        | K_AGGREGATE
+        | K_SFUNC
+        | K_STYPE
+        | K_FINALFUNC
+        | K_INITCOND
         | K_RETURNS
         | K_LANGUAGE
         | K_NON
@@ -1384,6 +1440,11 @@ K_STATIC:      S T A T I C;
 K_FROZEN:      F R O Z E N;
 
 K_FUNCTION:    F U N C T I O N;
+K_AGGREGATE:   A G G R E G A T E;
+K_SFUNC:       S F U N C;
+K_STYPE:       S T Y P E;
+K_FINALFUNC:   F I N A L F U N C;
+K_INITCOND:    I N I T C O N D;
 K_RETURNS:     R E T U R N S;
 K_LANGUAGE:    L A N G U A G E;
 K_NON:         N O N;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2f35c76/src/java/org/apache/cassandra/cql3/QueryProcessor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/QueryProcessor.java b/src/java/org/apache/cassandra/cql3/QueryProcessor.java
index 82b354e..8bd5daa 100644
--- a/src/java/org/apache/cassandra/cql3/QueryProcessor.java
+++ b/src/java/org/apache/cassandra/cql3/QueryProcessor.java
@@ -613,11 +613,21 @@ public class QueryProcessor implements QueryHandler
                 removeInvalidPreparedStatementsForFunction(thriftPreparedStatements.values().iterator(), ksName, functionName);
             }
         }
+        public void onCreateAggregate(String ksName, String aggregateName) {
+            if (Functions.getOverloadCount(new FunctionName(ksName, aggregateName)) > 1)
+            {
+                // in case there are other overloads, we have to remove all overloads since argument type
+                // matching may change (due to type casting)
+                removeInvalidPreparedStatementsForFunction(preparedStatements.values().iterator(), ksName, aggregateName);
+                removeInvalidPreparedStatementsForFunction(thriftPreparedStatements.values().iterator(), ksName, aggregateName);
+            }
+        }
 
         public void onUpdateKeyspace(String ksName) { }
         public void onUpdateColumnFamily(String ksName, String cfName) { }
         public void onUpdateUserType(String ksName, String typeName) { }
         public void onUpdateFunction(String ksName, String functionName) { }
+        public void onUpdateAggregate(String ksName, String aggregateName) { }
 
         public void onDropKeyspace(String ksName)
         {
@@ -634,6 +644,11 @@ public class QueryProcessor implements QueryHandler
             removeInvalidPreparedStatementsForFunction(preparedStatements.values().iterator(), ksName, functionName);
             removeInvalidPreparedStatementsForFunction(thriftPreparedStatements.values().iterator(), ksName, functionName);
         }
+        public void onDropAggregate(String ksName, String aggregateName)
+        {
+            removeInvalidPreparedStatementsForFunction(preparedStatements.values().iterator(), ksName, aggregateName);
+            removeInvalidPreparedStatementsForFunction(thriftPreparedStatements.values().iterator(), ksName, aggregateName);
+        }
 
         private void removeInvalidPreparedStatementsForFunction(Iterator<ParsedStatement.Prepared> iterator,
                                                                 String ksName, String functionName)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2f35c76/src/java/org/apache/cassandra/cql3/functions/AbstractFunction.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/functions/AbstractFunction.java b/src/java/org/apache/cassandra/cql3/functions/AbstractFunction.java
index d5a40a0..e2d69b8 100644
--- a/src/java/org/apache/cassandra/cql3/functions/AbstractFunction.java
+++ b/src/java/org/apache/cassandra/cql3/functions/AbstractFunction.java
@@ -66,6 +66,16 @@ public abstract class AbstractFunction implements Function
             && Objects.equal(this.returnType, that.returnType);
     }
 
+    public boolean usesFunction(String ksName, String functionName)
+    {
+        return name.keyspace.equals(ksName) && name.name.equals(functionName);
+    }
+
+    public boolean hasReferenceTo(Function function)
+    {
+        return false;
+    }
+
     @Override
     public int hashCode()
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2f35c76/src/java/org/apache/cassandra/cql3/functions/AggregateFcts.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/functions/AggregateFcts.java b/src/java/org/apache/cassandra/cql3/functions/AggregateFcts.java
index f72ed44..865dfbf 100644
--- a/src/java/org/apache/cassandra/cql3/functions/AggregateFcts.java
+++ b/src/java/org/apache/cassandra/cql3/functions/AggregateFcts.java
@@ -53,12 +53,12 @@ public abstract class AggregateFcts
                             count = 0;
                         }
 
-                        public ByteBuffer compute()
+                        public ByteBuffer compute(int protocolVersion)
                         {
                             return ((LongType) returnType()).decompose(Long.valueOf(count));
                         }
 
-                        public void addInput(List<ByteBuffer> values)
+                        public void addInput(int protocolVersion, List<ByteBuffer> values)
                         {
                             count++;
                         }
@@ -84,12 +84,12 @@ public abstract class AggregateFcts
                             sum = BigDecimal.ZERO;
                         }
 
-                        public ByteBuffer compute()
+                        public ByteBuffer compute(int protocolVersion)
                         {
                             return ((DecimalType) returnType()).decompose(sum);
                         }
 
-                        public void addInput(List<ByteBuffer> values)
+                        public void addInput(int protocolVersion, List<ByteBuffer> values)
                         {
                             ByteBuffer value = values.get(0);
 
@@ -123,7 +123,7 @@ public abstract class AggregateFcts
                             sum = BigDecimal.ZERO;
                         }
 
-                        public ByteBuffer compute()
+                        public ByteBuffer compute(int protocolVersion)
                         {
                             if (count == 0)
                                 return ((DecimalType) returnType()).decompose(BigDecimal.ZERO);
@@ -131,7 +131,7 @@ public abstract class AggregateFcts
                             return ((DecimalType) returnType()).decompose(sum.divide(BigDecimal.valueOf(count)));
                         }
 
-                        public void addInput(List<ByteBuffer> values)
+                        public void addInput(int protocolVersion, List<ByteBuffer> values)
                         {
                             ByteBuffer value = values.get(0);
 
@@ -163,12 +163,12 @@ public abstract class AggregateFcts
                             sum = BigInteger.ZERO;
                         }
 
-                        public ByteBuffer compute()
+                        public ByteBuffer compute(int protocolVersion)
                         {
                             return ((IntegerType) returnType()).decompose(sum);
                         }
 
-                        public void addInput(List<ByteBuffer> values)
+                        public void addInput(int protocolVersion, List<ByteBuffer> values)
                         {
                             ByteBuffer value = values.get(0);
 
@@ -202,7 +202,7 @@ public abstract class AggregateFcts
                             sum = BigInteger.ZERO;
                         }
 
-                        public ByteBuffer compute()
+                        public ByteBuffer compute(int protocolVersion)
                         {
                             if (count == 0)
                                 return ((IntegerType) returnType()).decompose(BigInteger.ZERO);
@@ -210,7 +210,7 @@ public abstract class AggregateFcts
                             return ((IntegerType) returnType()).decompose(sum.divide(BigInteger.valueOf(count)));
                         }
 
-                        public void addInput(List<ByteBuffer> values)
+                        public void addInput(int protocolVersion, List<ByteBuffer> values)
                         {
                             ByteBuffer value = values.get(0);
 
@@ -242,12 +242,12 @@ public abstract class AggregateFcts
                             sum = 0;
                         }
 
-                        public ByteBuffer compute()
+                        public ByteBuffer compute(int protocolVersion)
                         {
                             return ((Int32Type) returnType()).decompose(sum);
                         }
 
-                        public void addInput(List<ByteBuffer> values)
+                        public void addInput(int protocolVersion, List<ByteBuffer> values)
                         {
                             ByteBuffer value = values.get(0);
 
@@ -281,14 +281,14 @@ public abstract class AggregateFcts
                             sum = 0;
                         }
 
-                        public ByteBuffer compute()
+                        public ByteBuffer compute(int protocolVersion)
                         {
                             int avg = count == 0 ? 0 : sum / count;
 
                             return ((Int32Type) returnType()).decompose(avg);
                         }
 
-                        public void addInput(List<ByteBuffer> values)
+                        public void addInput(int protocolVersion, List<ByteBuffer> values)
                         {
                             ByteBuffer value = values.get(0);
 
@@ -320,12 +320,12 @@ public abstract class AggregateFcts
                             sum = 0;
                         }
 
-                        public ByteBuffer compute()
+                        public ByteBuffer compute(int protocolVersion)
                         {
                             return ((LongType) returnType()).decompose(sum);
                         }
 
-                        public void addInput(List<ByteBuffer> values)
+                        public void addInput(int protocolVersion, List<ByteBuffer> values)
                         {
                             ByteBuffer value = values.get(0);
 
@@ -359,14 +359,14 @@ public abstract class AggregateFcts
                             sum = 0;
                         }
 
-                        public ByteBuffer compute()
+                        public ByteBuffer compute(int protocolVersion)
                         {
                             long avg = count == 0 ? 0 : sum / count;
 
                             return ((LongType) returnType()).decompose(avg);
                         }
 
-                        public void addInput(List<ByteBuffer> values)
+                        public void addInput(int protocolVersion, List<ByteBuffer> values)
                         {
                             ByteBuffer value = values.get(0);
 
@@ -398,12 +398,12 @@ public abstract class AggregateFcts
                             sum = 0;
                         }
 
-                        public ByteBuffer compute()
+                        public ByteBuffer compute(int protocolVersion)
                         {
                             return ((FloatType) returnType()).decompose(sum);
                         }
 
-                        public void addInput(List<ByteBuffer> values)
+                        public void addInput(int protocolVersion, List<ByteBuffer> values)
                         {
                             ByteBuffer value = values.get(0);
 
@@ -437,14 +437,14 @@ public abstract class AggregateFcts
                             sum = 0;
                         }
 
-                        public ByteBuffer compute()
+                        public ByteBuffer compute(int protocolVersion)
                         {
                             float avg = count == 0 ? 0 : sum / count;
 
                             return ((FloatType) returnType()).decompose(avg);
                         }
 
-                        public void addInput(List<ByteBuffer> values)
+                        public void addInput(int protocolVersion, List<ByteBuffer> values)
                         {
                             ByteBuffer value = values.get(0);
 
@@ -476,12 +476,12 @@ public abstract class AggregateFcts
                             sum = 0;
                         }
 
-                        public ByteBuffer compute()
+                        public ByteBuffer compute(int protocolVersion)
                         {
                             return ((DoubleType) returnType()).decompose(sum);
                         }
 
-                        public void addInput(List<ByteBuffer> values)
+                        public void addInput(int protocolVersion, List<ByteBuffer> values)
                         {
                             ByteBuffer value = values.get(0);
 
@@ -515,14 +515,14 @@ public abstract class AggregateFcts
                             sum = 0;
                         }
 
-                        public ByteBuffer compute()
+                        public ByteBuffer compute(int protocolVersion)
                         {
                             double avg = count == 0 ? 0 : sum / count;
 
                             return ((DoubleType) returnType()).decompose(avg);
                         }
 
-                        public void addInput(List<ByteBuffer> values)
+                        public void addInput(int protocolVersion, List<ByteBuffer> values)
                         {
                             ByteBuffer value = values.get(0);
 
@@ -558,12 +558,12 @@ public abstract class AggregateFcts
                         max = null;
                     }
 
-                    public ByteBuffer compute()
+                    public ByteBuffer compute(int protocolVersion)
                     {
                         return max;
                     }
 
-                    public void addInput(List<ByteBuffer> values)
+                    public void addInput(int protocolVersion, List<ByteBuffer> values)
                     {
                         ByteBuffer value = values.get(0);
 
@@ -599,12 +599,12 @@ public abstract class AggregateFcts
                         min = null;
                     }
 
-                    public ByteBuffer compute()
+                    public ByteBuffer compute(int protocolVersion)
                     {
                         return min;
                     }
 
-                    public void addInput(List<ByteBuffer> values)
+                    public void addInput(int protocolVersion, List<ByteBuffer> values)
                     {
                         ByteBuffer value = values.get(0);
 
@@ -640,12 +640,12 @@ public abstract class AggregateFcts
                         count = 0;
                     }
 
-                    public ByteBuffer compute()
+                    public ByteBuffer compute(int protocolVersion)
                     {
                         return ((LongType) returnType()).decompose(count);
                     }
 
-                    public void addInput(List<ByteBuffer> values)
+                    public void addInput(int protocolVersion, List<ByteBuffer> values)
                     {
                         ByteBuffer value = values.get(0);
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2f35c76/src/java/org/apache/cassandra/cql3/functions/AggregateFunction.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/functions/AggregateFunction.java b/src/java/org/apache/cassandra/cql3/functions/AggregateFunction.java
index 47eee4b..ddbc9d1 100644
--- a/src/java/org/apache/cassandra/cql3/functions/AggregateFunction.java
+++ b/src/java/org/apache/cassandra/cql3/functions/AggregateFunction.java
@@ -20,6 +20,8 @@ package org.apache.cassandra.cql3.functions;
 import java.nio.ByteBuffer;
 import java.util.List;
 
+import org.apache.cassandra.exceptions.InvalidRequestException;
+
 /**
  * Performs a calculation on a set of values and return a single value.
  */
@@ -30,7 +32,7 @@ public interface AggregateFunction extends Function
      *
      * @return a new <code>Aggregate</code> instance.
      */
-    public Aggregate newAggregate();
+    public Aggregate newAggregate() throws InvalidRequestException;
 
     /**
      * An aggregation operation.
@@ -40,16 +42,18 @@ public interface AggregateFunction extends Function
         /**
          * Adds the specified input to this aggregate.
          *
+         * @param protocolVersion native protocol version
          * @param values the values to add to the aggregate.
          */
-        public void addInput(List<ByteBuffer> values);
+        public void addInput(int protocolVersion, List<ByteBuffer> values) throws InvalidRequestException;
 
         /**
          * Computes and returns the aggregate current value.
          *
+         * @param protocolVersion native protocol version
          * @return the aggregate current value.
          */
-        public ByteBuffer compute();
+        public ByteBuffer compute(int protocolVersion) throws InvalidRequestException;
 
         /**
          * Reset this aggregate.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2f35c76/src/java/org/apache/cassandra/cql3/functions/Function.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/functions/Function.java b/src/java/org/apache/cassandra/cql3/functions/Function.java
index 9e41fe4..4d2b993 100644
--- a/src/java/org/apache/cassandra/cql3/functions/Function.java
+++ b/src/java/org/apache/cassandra/cql3/functions/Function.java
@@ -51,4 +51,8 @@ public interface Function
      * @return <code>true</code> if the function is an aggregate function, <code>false</code> otherwise.
      */
     public boolean isAggregate();
+
+    boolean usesFunction(String ksName, String functionName);
+
+    boolean hasReferenceTo(Function function);
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2f35c76/src/java/org/apache/cassandra/cql3/functions/FunctionCall.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/functions/FunctionCall.java b/src/java/org/apache/cassandra/cql3/functions/FunctionCall.java
index 01443d2..72ac63e 100644
--- a/src/java/org/apache/cassandra/cql3/functions/FunctionCall.java
+++ b/src/java/org/apache/cassandra/cql3/functions/FunctionCall.java
@@ -44,7 +44,7 @@ public class FunctionCall extends Term.NonTerminal
 
     public boolean usesFunction(String ksName, String functionName)
     {
-        return fun.name().keyspace.equals(ksName) && fun.name().name.equals(functionName);
+        return fun.usesFunction(ksName, functionName);
     }
 
     public void collectMarkerSpecification(VariableSpecifications boundNames)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2f35c76/src/java/org/apache/cassandra/cql3/functions/Functions.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/functions/Functions.java b/src/java/org/apache/cassandra/cql3/functions/Functions.java
index a8fdf0f..7d94e47 100644
--- a/src/java/org/apache/cassandra/cql3/functions/Functions.java
+++ b/src/java/org/apache/cassandra/cql3/functions/Functions.java
@@ -42,7 +42,8 @@ public abstract class Functions
     // to handle it as a special case.
     private static final FunctionName TOKEN_FUNCTION_NAME = FunctionName.nativeFunction("token");
 
-    private static final String SELECT_UDFS = "SELECT * FROM " + SystemKeyspace.NAME + '.' + SystemKeyspace.SCHEMA_FUNCTIONS_TABLE;
+    private static final String SELECT_UD_FUNCTION = "SELECT * FROM " + SystemKeyspace.NAME + '.' + SystemKeyspace.SCHEMA_FUNCTIONS_TABLE;
+    private static final String SELECT_UD_AGGREGATE = "SELECT * FROM " + SystemKeyspace.NAME + '.' + SystemKeyspace.SCHEMA_AGGREGATES_TABLE;
 
     private Functions() {}
 
@@ -101,8 +102,10 @@ public abstract class Functions
     public static void loadUDFFromSchema()
     {
         logger.debug("Loading UDFs");
-        for (UntypedResultSet.Row row : QueryProcessor.executeOnceInternal(SELECT_UDFS))
+        for (UntypedResultSet.Row row : QueryProcessor.executeOnceInternal(SELECT_UD_FUNCTION))
             addFunction(UDFunction.fromSchema(row));
+        for (UntypedResultSet.Row row : QueryProcessor.executeOnceInternal(SELECT_UD_AGGREGATE))
+            addFunction(UDAggregate.fromSchema(row));
     }
 
     public static ColumnSpecification makeArgSpec(String receiverKs, String receiverCf, Function fun, int i)
@@ -268,7 +271,7 @@ public abstract class Functions
     }
 
     // This is *not* thread safe but is only called in DefsTables that is synchronized.
-    public static void addFunction(UDFunction fun)
+    public static void addFunction(AbstractFunction fun)
     {
         // We shouldn't get there unless that function don't exist
         assert find(fun.name(), fun.argTypes()) == null;
@@ -284,12 +287,21 @@ public abstract class Functions
     }
 
     // Same remarks than for addFunction
-    public static void replaceFunction(UDFunction fun)
+    public static void replaceFunction(AbstractFunction fun)
     {
         removeFunction(fun.name(), fun.argTypes());
         addFunction(fun);
     }
 
+    public static List<Function> getReferencesTo(Function old)
+    {
+        List<Function> references = new ArrayList<>();
+        for (Function function : declared.values())
+            if (function.hasReferenceTo(old))
+                references.add(function);
+        return references;
+    }
+
     public static Collection<Function> all()
     {
         return declared.values();
@@ -316,6 +328,7 @@ public abstract class Functions
         public void onCreateColumnFamily(String ksName, String cfName) { }
         public void onCreateUserType(String ksName, String typeName) { }
         public void onCreateFunction(String ksName, String functionName) { }
+        public void onCreateAggregate(String ksName, String aggregateName) { }
 
         public void onUpdateKeyspace(String ksName) { }
         public void onUpdateColumnFamily(String ksName, String cfName) { }
@@ -325,11 +338,12 @@ public abstract class Functions
                     ((UDFunction)function).userTypeUpdated(ksName, typeName);
         }
         public void onUpdateFunction(String ksName, String functionName) { }
+        public void onUpdateAggregate(String ksName, String aggregateName) { }
 
         public void onDropKeyspace(String ksName) { }
         public void onDropColumnFamily(String ksName, String cfName) { }
         public void onDropUserType(String ksName, String typeName) { }
         public void onDropFunction(String ksName, String functionName) { }
-
+        public void onDropAggregate(String ksName, String aggregateName) { }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2f35c76/src/java/org/apache/cassandra/cql3/functions/JavaSourceUDFFactory.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/functions/JavaSourceUDFFactory.java b/src/java/org/apache/cassandra/cql3/functions/JavaSourceUDFFactory.java
index 560f077..5b1f5bd 100644
--- a/src/java/org/apache/cassandra/cql3/functions/JavaSourceUDFFactory.java
+++ b/src/java/org/apache/cassandra/cql3/functions/JavaSourceUDFFactory.java
@@ -57,11 +57,11 @@ public final class JavaSourceUDFFactory
     throws InvalidRequestException
     {
         // argDataTypes is just the C* internal argTypes converted to the Java Driver DataType
-        DataType[] argDataTypes = UDFunction.driverTypes(argTypes);
+        DataType[] argDataTypes = UDHelper.driverTypes(argTypes);
         // returnDataType is just the C* internal returnType converted to the Java Driver DataType
-        DataType returnDataType = UDFunction.driverType(returnType);
+        DataType returnDataType = UDHelper.driverType(returnType);
         // javaParamTypes is just the Java representation for argTypes resp. argDataTypes
-        Class<?>[] javaParamTypes = UDFunction.javaTypes(argDataTypes);
+        Class<?>[] javaParamTypes = UDHelper.javaTypes(argDataTypes);
         // javaReturnType is just the Java representation for returnType resp. returnDataType
         Class<?> javaReturnType = returnDataType.asJavaClass();
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2f35c76/src/java/org/apache/cassandra/cql3/functions/UDAggregate.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/functions/UDAggregate.java b/src/java/org/apache/cassandra/cql3/functions/UDAggregate.java
new file mode 100644
index 0000000..f259265
--- /dev/null
+++ b/src/java/org/apache/cassandra/cql3/functions/UDAggregate.java
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.cql3.functions;
+
+import java.nio.ByteBuffer;
+import java.util.*;
+
+import com.google.common.base.Objects;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.cql3.*;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.composites.Composite;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.marshal.TypeParser;
+import org.apache.cassandra.db.marshal.UTF8Type;
+import org.apache.cassandra.exceptions.*;
+
+/**
+ * Base class for user-defined-aggregates.
+ */
+public class UDAggregate extends AbstractFunction implements AggregateFunction
+{
+    protected static final Logger logger = LoggerFactory.getLogger(UDAggregate.class);
+
+    protected final AbstractType<?> stateType;
+    protected final ByteBuffer initcond;
+    private final ScalarFunction stateFunction;
+    private final ScalarFunction finalFunction;
+
+    public UDAggregate(FunctionName name,
+                       List<AbstractType<?>> argTypes,
+                       AbstractType<?> returnType,
+                       ScalarFunction stateFunc,
+                       ScalarFunction finalFunc,
+                       ByteBuffer initcond)
+    {
+        super(name, argTypes, returnType);
+        this.stateFunction = stateFunc;
+        this.finalFunction = finalFunc;
+        this.stateType = stateFunc != null ? stateFunc.returnType() : null;
+        this.initcond = initcond;
+    }
+
+    public boolean hasReferenceTo(Function function)
+    {
+        return stateFunction == function || finalFunction == function;
+    }
+
+    public boolean usesFunction(String ksName, String functionName)
+    {
+        return super.usesFunction(ksName, functionName)
+            || stateFunction != null && stateFunction.name().keyspace.equals(ksName) && stateFunction.name().name.equals(functionName)
+            || finalFunction != null && finalFunction.name().keyspace.equals(ksName) && finalFunction.name().name.equals(functionName);
+    }
+
+    public boolean isAggregate()
+    {
+        return true;
+    }
+
+    public boolean isPure()
+    {
+        return false;
+    }
+
+    public boolean isNative()
+    {
+        return false;
+    }
+
+    public Aggregate newAggregate() throws InvalidRequestException
+    {
+        return new Aggregate()
+        {
+            private ByteBuffer state;
+            {
+                reset();
+            }
+
+            public void addInput(int protocolVersion, List<ByteBuffer> values) throws InvalidRequestException
+            {
+                List<ByteBuffer> copy = new ArrayList<>(values.size() + 1);
+                copy.add(state);
+                copy.addAll(values);
+                state = stateFunction.execute(protocolVersion, copy);
+            }
+
+            public ByteBuffer compute(int protocolVersion) throws InvalidRequestException
+            {
+                if (finalFunction == null)
+                    return state;
+                return finalFunction.execute(protocolVersion, Collections.singletonList(state));
+            }
+
+            public void reset()
+            {
+                state = initcond != null ? initcond.duplicate() : null;
+            }
+        };
+    }
+
+    private static ScalarFunction resolveScalar(FunctionName aName, FunctionName fName, List<AbstractType<?>> argTypes) throws InvalidRequestException
+    {
+        Function func = Functions.find(fName, argTypes);
+        if (func == null)
+            throw new InvalidRequestException(String.format("Referenced state function '%s %s' for aggregate '%s' does not exist",
+                                                            fName, Arrays.toString(UDHelper.driverTypes(argTypes)), aName));
+        if (!(func instanceof ScalarFunction))
+            throw new InvalidRequestException(String.format("Referenced state function '%s %s' for aggregate '%s' is not a scalar function",
+                                                            fName, Arrays.toString(UDHelper.driverTypes(argTypes)), aName));
+        return (ScalarFunction) func;
+    }
+
+    private static Mutation makeSchemaMutation(FunctionName name)
+    {
+        UTF8Type kv = (UTF8Type)SystemKeyspace.SchemaAggregatesTable.getKeyValidator();
+        return new Mutation(SystemKeyspace.NAME, kv.decompose(name.keyspace));
+    }
+
+    public Mutation toSchemaDrop(long timestamp)
+    {
+        Mutation mutation = makeSchemaMutation(name);
+        ColumnFamily cf = mutation.addOrGet(SystemKeyspace.SCHEMA_AGGREGATES_TABLE);
+
+        Composite prefix = SystemKeyspace.SchemaAggregatesTable.comparator.make(name.name, UDHelper.computeSignature(argTypes));
+        int ldt = (int) (System.currentTimeMillis() / 1000);
+        cf.addAtom(new RangeTombstone(prefix, prefix.end(), timestamp, ldt));
+
+        return mutation;
+    }
+
+    public static Map<Composite, UDAggregate> fromSchema(Row row)
+    {
+        UntypedResultSet results = QueryProcessor.resultify("SELECT * FROM system." + SystemKeyspace.SCHEMA_AGGREGATES_TABLE, row);
+        Map<Composite, UDAggregate> udfs = new HashMap<>(results.size());
+        for (UntypedResultSet.Row result : results)
+            udfs.put(SystemKeyspace.SchemaAggregatesTable.comparator.make(result.getString("aggregate_name"), result.getBlob("signature")),
+                     fromSchema(result));
+        return udfs;
+    }
+
+    public Mutation toSchemaUpdate(long timestamp)
+    {
+        Mutation mutation = makeSchemaMutation(name);
+        ColumnFamily cf = mutation.addOrGet(SystemKeyspace.SCHEMA_AGGREGATES_TABLE);
+
+        Composite prefix = SystemKeyspace.SchemaAggregatesTable.comparator.make(name.name, UDHelper.computeSignature(argTypes));
+        CFRowAdder adder = new CFRowAdder(cf, prefix, timestamp);
+
+        adder.resetCollection("argument_types");
+        adder.add("return_type", returnType.toString());
+        adder.add("state_func", stateFunction.name().name);
+        if (stateType != null)
+            adder.add("state_type", stateType.toString());
+        if (finalFunction != null)
+            adder.add("final_func", finalFunction.name().name);
+        if (initcond != null)
+            adder.add("initcond", initcond);
+
+        for (AbstractType<?> argType : argTypes)
+            adder.addListEntry("argument_types", argType.toString());
+
+        return mutation;
+    }
+
+    public static UDAggregate fromSchema(UntypedResultSet.Row row)
+    {
+        String ksName = row.getString("keyspace_name");
+        String functionName = row.getString("aggregate_name");
+        FunctionName name = new FunctionName(ksName, functionName);
+
+        List<String> types = row.getList("argument_types", UTF8Type.instance);
+
+        List<AbstractType<?>> argTypes;
+        if (types == null)
+        {
+            argTypes = Collections.emptyList();
+        }
+        else
+        {
+            argTypes = new ArrayList<>(types.size());
+            for (String type : types)
+                argTypes.add(parseType(type));
+        }
+
+        AbstractType<?> returnType = parseType(row.getString("return_type"));
+
+        FunctionName stateFunc = new FunctionName(ksName, row.getString("state_func"));
+        FunctionName finalFunc = row.has("final_func") ? new FunctionName(ksName, row.getString("final_func")) : null;
+        AbstractType<?> stateType = row.has("state_type") ? parseType(row.getString("state_type")) : null;
+        ByteBuffer initcond = row.has("initcond") ? row.getBytes("initcond") : null;
+
+        try
+        {
+            return create(name, argTypes, returnType, stateFunc, finalFunc, stateType, initcond);
+        }
+        catch (InvalidRequestException reason)
+        {
+            return createBroken(name, argTypes, returnType, initcond, reason);
+        }
+    }
+
+    private static UDAggregate createBroken(FunctionName name, List<AbstractType<?>> argTypes, AbstractType<?> returnType,
+                                            ByteBuffer initcond, final InvalidRequestException reason)
+    {
+        return new UDAggregate(name, argTypes, returnType, null, null, initcond) {
+            public Aggregate newAggregate() throws InvalidRequestException
+            {
+                throw new InvalidRequestException(String.format("Aggregate '%s' exists but hasn't been loaded successfully for the following reason: %s. "
+                                                                + "Please see the server log for more details", this, reason.getMessage()));
+            }
+        };
+    }
+
+    private static UDAggregate create(FunctionName name, List<AbstractType<?>> argTypes, AbstractType<?> returnType,
+                                      FunctionName stateFunc, FunctionName finalFunc, AbstractType<?> stateType, ByteBuffer initcond)
+    throws InvalidRequestException
+    {
+        List<AbstractType<?>> stateTypes = new ArrayList<>(argTypes.size() + 1);
+        stateTypes.add(stateType);
+        stateTypes.addAll(argTypes);
+        List<AbstractType<?>> finalTypes = Collections.<AbstractType<?>>singletonList(stateType);
+        return new UDAggregate(name, argTypes, returnType,
+                               resolveScalar(name, stateFunc, stateTypes),
+                               finalFunc != null ? resolveScalar(name, finalFunc, finalTypes) : null,
+                               initcond);
+    }
+
+    private static AbstractType<?> parseType(String str)
+    {
+        // We only use this when reading the schema where we shouldn't get an error
+        try
+        {
+            return TypeParser.parse(str);
+        }
+        catch (SyntaxException | ConfigurationException e)
+        {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public boolean equals(Object o)
+    {
+        if (!(o instanceof UDAggregate))
+            return false;
+
+        UDAggregate that = (UDAggregate) o;
+        return Objects.equal(this.name, that.name)
+               && Functions.typeEquals(this.argTypes, that.argTypes)
+               && Functions.typeEquals(this.returnType, that.returnType)
+               && Objects.equal(this.stateFunction, that.stateFunction)
+               && Objects.equal(this.finalFunction, that.finalFunction)
+               && Objects.equal(this.stateType, that.stateType)
+               && Objects.equal(this.initcond, that.initcond);
+    }
+
+    @Override
+    public int hashCode()
+    {
+        return Objects.hashCode(name, argTypes, returnType, stateFunction, finalFunction, stateType, initcond);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2f35c76/src/java/org/apache/cassandra/cql3/functions/UDFunction.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/functions/UDFunction.java b/src/java/org/apache/cassandra/cql3/functions/UDFunction.java
index 973c70a..8b42e51 100644
--- a/src/java/org/apache/cassandra/cql3/functions/UDFunction.java
+++ b/src/java/org/apache/cassandra/cql3/functions/UDFunction.java
@@ -17,12 +17,7 @@
  */
 package org.apache.cassandra.cql3.functions;
 
-import java.lang.invoke.MethodHandle;
-import java.lang.invoke.MethodHandles;
-import java.lang.reflect.Method;
 import java.nio.ByteBuffer;
-import java.nio.charset.StandardCharsets;
-import java.security.MessageDigest;
 import java.util.*;
 
 import com.google.common.base.Objects;
@@ -43,7 +38,6 @@ import org.apache.cassandra.db.marshal.TypeParser;
 import org.apache.cassandra.exceptions.*;
 import org.apache.cassandra.service.MigrationManager;
 import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.cassandra.utils.FBUtilities;
 
 /**
  * Base class for User Defined Functions.
@@ -52,80 +46,10 @@ public abstract class UDFunction extends AbstractFunction implements ScalarFunct
 {
     protected static final Logger logger = LoggerFactory.getLogger(UDFunction.class);
 
-    // TODO make these c'tors and methods public in Java-Driver - see https://datastax-oss.atlassian.net/browse/JAVA-502
-    static final MethodHandle methodParseOne;
-    static
-    {
-        try
-        {
-            Class<?> cls = Class.forName("com.datastax.driver.core.CassandraTypeParser");
-            Method m = cls.getDeclaredMethod("parseOne", String.class);
-            m.setAccessible(true);
-            methodParseOne = MethodHandles.lookup().unreflect(m);
-        }
-        catch (Exception e)
-        {
-            throw new RuntimeException(e);
-        }
-    }
-
-    /**
-     * Construct an array containing the Java classes for the given Java Driver {@link com.datastax.driver.core.DataType}s.
-     *
-     * @param dataTypes array with UDF argument types
-     * @return array of same size with UDF arguments
-     */
-    public static Class<?>[] javaTypes(DataType[] dataTypes)
-    {
-        Class<?> paramTypes[] = new Class[dataTypes.length];
-        for (int i = 0; i < paramTypes.length; i++)
-            paramTypes[i] = dataTypes[i].asJavaClass();
-        return paramTypes;
-    }
-
-    /**
-     * Construct an array containing the Java Driver {@link com.datastax.driver.core.DataType}s for the
-     * C* internal types.
-     *
-     * @param abstractTypes list with UDF argument types
-     * @return array with argument types as {@link com.datastax.driver.core.DataType}
-     */
-    public static DataType[] driverTypes(List<AbstractType<?>> abstractTypes)
-    {
-        DataType[] argDataTypes = new DataType[abstractTypes.size()];
-        for (int i = 0; i < argDataTypes.length; i++)
-            argDataTypes[i] = driverType(abstractTypes.get(i));
-        return argDataTypes;
-    }
-
-    /**
-     * Returns the Java Driver {@link com.datastax.driver.core.DataType} for the C* internal type.
-     */
-    public static DataType driverType(AbstractType abstractType)
-    {
-        CQL3Type cqlType = abstractType.asCQL3Type();
-        try
-        {
-            return (DataType) methodParseOne.invoke(cqlType.getType().toString());
-        }
-        catch (RuntimeException | Error e)
-        {
-            // immediately rethrow these...
-            throw e;
-        }
-        catch (Throwable e)
-        {
-            throw new RuntimeException("cannot parse driver type " + cqlType.getType().toString(), e);
-        }
-    }
-
-    // instance vars
-
     protected final List<ColumnIdentifier> argNames;
-
     protected final String language;
     protected final String body;
-    protected final boolean deterministic;
+    private final boolean deterministic;
 
     protected final DataType[] argDataTypes;
     protected final DataType returnDataType;
@@ -138,8 +62,8 @@ public abstract class UDFunction extends AbstractFunction implements ScalarFunct
                          String body,
                          boolean deterministic)
     {
-        this(name, argNames, argTypes, driverTypes(argTypes), returnType,
-             driverType(returnType), language, body, deterministic);
+        this(name, argNames, argTypes, UDHelper.driverTypes(argTypes), returnType,
+             UDHelper.driverType(returnType), language, body, deterministic);
     }
 
     protected UDFunction(FunctionName name,
@@ -151,7 +75,7 @@ public abstract class UDFunction extends AbstractFunction implements ScalarFunct
                          String language,
                          String body,
                          boolean deterministic)
-        {
+    {
         super(name, argTypes, returnType);
         assert new HashSet<>(argNames).size() == argNames.size() : "duplicate argument names";
         this.argNames = argNames;
@@ -162,36 +86,6 @@ public abstract class UDFunction extends AbstractFunction implements ScalarFunct
         this.returnDataType = returnDataType;
     }
 
-    /**
-     * Used by UDF implementations (both Java code generated by {@link org.apache.cassandra.cql3.functions.JavaSourceUDFFactory}
-     * and script executor {@link org.apache.cassandra.cql3.functions.ScriptBasedUDF}) to convert the C*
-     * serialized representation to the Java object representation.
-     *
-     * @param protocolVersion the native protocol version used for serialization
-     * @param argIndex        index of the UDF input argument
-     */
-    protected Object compose(int protocolVersion, int argIndex, ByteBuffer value)
-    {
-        return value == null ? null : argDataTypes[argIndex].deserialize(value, ProtocolVersion.fromInt(protocolVersion));
-    }
-
-    /**
-     * Used by UDF implementations (both Java code generated by {@link org.apache.cassandra.cql3.functions.JavaSourceUDFFactory}
-     * and script executor {@link org.apache.cassandra.cql3.functions.ScriptBasedUDF}) to convert the Java
-     * object representation for the return value to the C* serialized representation.
-     *
-     * @param protocolVersion the native protocol version used for serialization
-     */
-    protected ByteBuffer decompose(int protocolVersion, Object value)
-    {
-        return value == null ? null : returnDataType.serialize(value, ProtocolVersion.fromInt(protocolVersion));
-    }
-
-    public boolean isAggregate()
-    {
-        return false;
-    }
-
     public static UDFunction create(FunctionName name,
                                     List<ColumnIdentifier> argNames,
                                     List<AbstractType<?>> argTypes,
@@ -218,12 +112,12 @@ public abstract class UDFunction extends AbstractFunction implements ScalarFunct
      *     than saying that the function doesn't exist)
      */
     private static UDFunction createBrokenFunction(FunctionName name,
-                                                  List<ColumnIdentifier> argNames,
-                                                  List<AbstractType<?>> argTypes,
-                                                  AbstractType<?> returnType,
-                                                  String language,
-                                                  String body,
-                                                  final InvalidRequestException reason)
+                                                   List<ColumnIdentifier> argNames,
+                                                   List<AbstractType<?>> argTypes,
+                                                   AbstractType<?> returnType,
+                                                   String language,
+                                                   String body,
+                                                   final InvalidRequestException reason)
     {
         return new UDFunction(name, argNames, argTypes, returnType, language, body, true)
         {
@@ -235,18 +129,9 @@ public abstract class UDFunction extends AbstractFunction implements ScalarFunct
         };
     }
 
-    // We allow method overloads, so a function is not uniquely identified by its name only, but
-    // also by its argument types. To distinguish overloads of given function name in the schema 
-    // we use a "signature" which is just a SHA-1 of it's argument types (we could replace that by
-    // using a "signature" UDT that would be comprised of the function name and argument types,
-    // which we could then use as clustering column. But as we haven't yet used UDT in system tables,
-    // We'll left that decision to #6717).
-    private static ByteBuffer computeSignature(List<AbstractType<?>> argTypes)
+    public boolean isAggregate()
     {
-        MessageDigest digest = FBUtilities.newMessageDigest("SHA-1");
-        for (AbstractType<?> type : argTypes)
-            digest.update(type.asCQL3Type().toString().getBytes(StandardCharsets.UTF_8));
-        return ByteBuffer.wrap(digest.digest());
+        return false;
     }
 
     public boolean isPure()
@@ -259,6 +144,31 @@ public abstract class UDFunction extends AbstractFunction implements ScalarFunct
         return false;
     }
 
+    /**
+     * Used by UDF implementations (both Java code generated by {@link org.apache.cassandra.cql3.functions.JavaSourceUDFFactory}
+     * and script executor {@link org.apache.cassandra.cql3.functions.ScriptBasedUDF}) to convert the C*
+     * serialized representation to the Java object representation.
+     *
+     * @param protocolVersion the native protocol version used for serialization
+     * @param argIndex        index of the UDF input argument
+     */
+    protected Object compose(int protocolVersion, int argIndex, ByteBuffer value)
+    {
+        return value == null ? null : argDataTypes[argIndex].deserialize(value, ProtocolVersion.fromInt(protocolVersion));
+    }
+
+    /**
+     * Used by UDF implementations (both Java code generated by {@link org.apache.cassandra.cql3.functions.JavaSourceUDFFactory}
+     * and script executor {@link org.apache.cassandra.cql3.functions.ScriptBasedUDF}) to convert the Java
+     * object representation for the return value to the C* serialized representation.
+     *
+     * @param protocolVersion the native protocol version used for serialization
+     */
+    protected ByteBuffer decompose(int protocolVersion, Object value)
+    {
+        return value == null ? null : returnDataType.serialize(value, ProtocolVersion.fromInt(protocolVersion));
+    }
+
     private static Mutation makeSchemaMutation(FunctionName name)
     {
         UTF8Type kv = (UTF8Type)SystemKeyspace.SchemaFunctionsTable.getKeyValidator();
@@ -270,19 +180,29 @@ public abstract class UDFunction extends AbstractFunction implements ScalarFunct
         Mutation mutation = makeSchemaMutation(name);
         ColumnFamily cf = mutation.addOrGet(SystemKeyspace.SCHEMA_FUNCTIONS_TABLE);
 
-        Composite prefix = SystemKeyspace.SchemaFunctionsTable.comparator.make(name.name, computeSignature(argTypes));
+        Composite prefix = SystemKeyspace.SchemaFunctionsTable.comparator.make(name.name, UDHelper.computeSignature(argTypes));
         int ldt = (int) (System.currentTimeMillis() / 1000);
         cf.addAtom(new RangeTombstone(prefix, prefix.end(), timestamp, ldt));
 
         return mutation;
     }
 
+    public static Map<Composite, UDFunction> fromSchema(Row row)
+    {
+        UntypedResultSet results = QueryProcessor.resultify("SELECT * FROM system." + SystemKeyspace.SCHEMA_FUNCTIONS_TABLE, row);
+        Map<Composite, UDFunction> udfs = new HashMap<>(results.size());
+        for (UntypedResultSet.Row result : results)
+            udfs.put(SystemKeyspace.SchemaFunctionsTable.comparator.make(result.getString("function_name"), result.getBlob("signature")),
+                     fromSchema(result));
+        return udfs;
+    }
+
     public Mutation toSchemaUpdate(long timestamp)
     {
         Mutation mutation = makeSchemaMutation(name);
         ColumnFamily cf = mutation.addOrGet(SystemKeyspace.SCHEMA_FUNCTIONS_TABLE);
 
-        Composite prefix = SystemKeyspace.SchemaFunctionsTable.comparator.make(name.name, computeSignature(argTypes));
+        Composite prefix = SystemKeyspace.SchemaFunctionsTable.comparator.make(name.name, UDHelper.computeSignature(argTypes));
         CFRowAdder adder = new CFRowAdder(cf, prefix, timestamp);
 
         adder.resetCollection("argument_names");
@@ -360,15 +280,6 @@ public abstract class UDFunction extends AbstractFunction implements ScalarFunct
         }
     }
 
-    public static Map<Composite, UDFunction> fromSchema(Row row)
-    {
-        UntypedResultSet results = QueryProcessor.resultify("SELECT * FROM system." + SystemKeyspace.SCHEMA_FUNCTIONS_TABLE, row);
-        Map<Composite, UDFunction> udfs = new HashMap<>(results.size());
-        for (UntypedResultSet.Row result : results)
-            udfs.put(SystemKeyspace.SchemaFunctionsTable.comparator.make(result.getString("function_name"), result.getBlob("signature")), fromSchema(result));
-        return udfs;
-    }
-
     @Override
     public boolean equals(Object o)
     {
@@ -377,9 +288,9 @@ public abstract class UDFunction extends AbstractFunction implements ScalarFunct
 
         UDFunction that = (UDFunction)o;
         return Objects.equal(this.name, that.name)
-            && Objects.equal(this.argNames, that.argNames)
             && Functions.typeEquals(this.argTypes, that.argTypes)
             && Functions.typeEquals(this.returnType, that.returnType)
+            && Objects.equal(this.argNames, that.argNames)
             && Objects.equal(this.language, that.language)
             && Objects.equal(this.body, that.body)
             && Objects.equal(this.deterministic, that.deterministic);
@@ -388,7 +299,7 @@ public abstract class UDFunction extends AbstractFunction implements ScalarFunct
     @Override
     public int hashCode()
     {
-        return Objects.hashCode(name, argNames, argTypes, returnType, language, body, deterministic);
+        return Objects.hashCode(name, argTypes, returnType, argNames, language, body, deterministic);
     }
 
     public void userTypeUpdated(String ksName, String typeName)
@@ -408,7 +319,7 @@ public abstract class UDFunction extends AbstractFunction implements ScalarFunct
 
                     org.apache.cassandra.db.marshal.UserType ut = ksm.userTypes.getType(ByteBufferUtil.bytes(typeName));
 
-                    DataType newUserType = driverType(ut);
+                    DataType newUserType = UDHelper.driverType(ut);
                     argDataTypes[i] = newUserType;
 
                     argTypes.set(i, ut);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2f35c76/src/java/org/apache/cassandra/cql3/functions/UDHelper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/functions/UDHelper.java b/src/java/org/apache/cassandra/cql3/functions/UDHelper.java
new file mode 100644
index 0000000..2a17c75
--- /dev/null
+++ b/src/java/org/apache/cassandra/cql3/functions/UDHelper.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.cql3.functions;
+
+import java.lang.invoke.MethodHandle;
+import java.lang.invoke.MethodHandles;
+import java.lang.reflect.Method;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.security.MessageDigest;
+import java.util.*;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datastax.driver.core.DataType;
+import org.apache.cassandra.cql3.*;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.utils.FBUtilities;
+
+/**
+ * Helper class for User Defined Functions + Aggregates.
+ */
+final class UDHelper
+{
+    protected static final Logger logger = LoggerFactory.getLogger(UDHelper.class);
+
+    // TODO make these c'tors and methods public in Java-Driver - see https://datastax-oss.atlassian.net/browse/JAVA-502
+    static final MethodHandle methodParseOne;
+    static
+    {
+        try
+        {
+            Class<?> cls = Class.forName("com.datastax.driver.core.CassandraTypeParser");
+            Method m = cls.getDeclaredMethod("parseOne", String.class);
+            m.setAccessible(true);
+            methodParseOne = MethodHandles.lookup().unreflect(m);
+        }
+        catch (Exception e)
+        {
+            throw new RuntimeException(e);
+        }
+    }
+
+    /**
+     * Construct an array containing the Java classes for the given Java Driver {@link com.datastax.driver.core.DataType}s.
+     *
+     * @param dataTypes array with UDF argument types
+     * @return array of same size with UDF arguments
+     */
+    public static Class<?>[] javaTypes(DataType[] dataTypes)
+    {
+        Class<?> paramTypes[] = new Class[dataTypes.length];
+        for (int i = 0; i < paramTypes.length; i++)
+            paramTypes[i] = dataTypes[i].asJavaClass();
+        return paramTypes;
+    }
+
+    /**
+     * Construct an array containing the Java Driver {@link com.datastax.driver.core.DataType}s for the
+     * C* internal types.
+     *
+     * @param abstractTypes list with UDF argument types
+     * @return array with argument types as {@link com.datastax.driver.core.DataType}
+     */
+    public static DataType[] driverTypes(List<AbstractType<?>> abstractTypes)
+    {
+        DataType[] argDataTypes = new DataType[abstractTypes.size()];
+        for (int i = 0; i < argDataTypes.length; i++)
+            argDataTypes[i] = driverType(abstractTypes.get(i));
+        return argDataTypes;
+    }
+
+    /**
+     * Returns the Java Driver {@link com.datastax.driver.core.DataType} for the C* internal type.
+     */
+    public static DataType driverType(AbstractType abstractType)
+    {
+        CQL3Type cqlType = abstractType.asCQL3Type();
+        try
+        {
+            return (DataType) methodParseOne.invoke(cqlType.getType().toString());
+        }
+        catch (RuntimeException | Error e)
+        {
+            // immediately rethrow these...
+            throw e;
+        }
+        catch (Throwable e)
+        {
+            throw new RuntimeException("cannot parse driver type " + cqlType.getType().toString(), e);
+        }
+    }
+
+    // We allow method overloads, so a function is not uniquely identified by its name only, but
+    // also by its argument types. To distinguish overloads of given function name in the schema
+    // we use a "signature" which is just a SHA-1 of it's argument types (we could replace that by
+    // using a "signature" UDT that would be comprised of the function name and argument types,
+    // which we could then use as clustering column. But as we haven't yet used UDT in system tables,
+    // We'll left that decision to #6717).
+    protected static ByteBuffer computeSignature(List<AbstractType<?>> argTypes)
+    {
+        MessageDigest digest = FBUtilities.newMessageDigest("SHA-1");
+        for (AbstractType<?> type : argTypes)
+            digest.update(type.asCQL3Type().toString().getBytes(StandardCharsets.UTF_8));
+        return ByteBuffer.wrap(digest.digest());
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2f35c76/src/java/org/apache/cassandra/cql3/selection/AbstractFunctionSelector.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/selection/AbstractFunctionSelector.java b/src/java/org/apache/cassandra/cql3/selection/AbstractFunctionSelector.java
index 3778d41..2bf169d 100644
--- a/src/java/org/apache/cassandra/cql3/selection/AbstractFunctionSelector.java
+++ b/src/java/org/apache/cassandra/cql3/selection/AbstractFunctionSelector.java
@@ -69,10 +69,10 @@ abstract class AbstractFunctionSelector<T extends Function> extends Selector
 
             public boolean usesFunction(String ksName, String functionName)
             {
-                return fun.name().keyspace.equals(ksName) && fun.name().name.equals(functionName);
+                return fun.usesFunction(ksName, functionName);
             }
 
-            public Selector newInstance()
+            public Selector newInstance() throws InvalidRequestException
             {
                 return fun.isAggregate() ? new AggregateFunctionSelector(fun, factories.newInstances())
                                          : new ScalarFunctionSelector(fun, factories.newInstances());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2f35c76/src/java/org/apache/cassandra/cql3/selection/AggregateFunctionSelector.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/selection/AggregateFunctionSelector.java b/src/java/org/apache/cassandra/cql3/selection/AggregateFunctionSelector.java
index 7702796..27a8294 100644
--- a/src/java/org/apache/cassandra/cql3/selection/AggregateFunctionSelector.java
+++ b/src/java/org/apache/cassandra/cql3/selection/AggregateFunctionSelector.java
@@ -44,12 +44,12 @@ final class AggregateFunctionSelector extends AbstractFunctionSelector<Aggregate
             args.set(i, s.getOutput(protocolVersion));
             s.reset();
         }
-        this.aggregate.addInput(args);
+        this.aggregate.addInput(protocolVersion, args);
     }
 
     public ByteBuffer getOutput(int protocolVersion) throws InvalidRequestException
     {
-        return aggregate.compute();
+        return aggregate.compute(protocolVersion);
     }
 
     public void reset()
@@ -57,7 +57,7 @@ final class AggregateFunctionSelector extends AbstractFunctionSelector<Aggregate
         aggregate.reset();
     }
 
-    AggregateFunctionSelector(Function fun, List<Selector> argSelectors)
+    AggregateFunctionSelector(Function fun, List<Selector> argSelectors) throws InvalidRequestException
     {
         super((AggregateFunction) fun, argSelectors);
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2f35c76/src/java/org/apache/cassandra/cql3/selection/FieldSelector.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/selection/FieldSelector.java b/src/java/org/apache/cassandra/cql3/selection/FieldSelector.java
index d695598..76dbb22 100644
--- a/src/java/org/apache/cassandra/cql3/selection/FieldSelector.java
+++ b/src/java/org/apache/cassandra/cql3/selection/FieldSelector.java
@@ -47,7 +47,7 @@ final class FieldSelector extends Selector
                 return type.fieldType(field);
             }
 
-            public Selector newInstance()
+            public Selector newInstance() throws InvalidRequestException
             {
                 return new FieldSelector(type, field, factory.newInstance());
             }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2f35c76/src/java/org/apache/cassandra/cql3/selection/Selection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/selection/Selection.java b/src/java/org/apache/cassandra/cql3/selection/Selection.java
index e44a39f..58e994a 100644
--- a/src/java/org/apache/cassandra/cql3/selection/Selection.java
+++ b/src/java/org/apache/cassandra/cql3/selection/Selection.java
@@ -213,7 +213,7 @@ public abstract class Selection
         return metadata;
     }
 
-    protected abstract Selectors newSelectors();
+    protected abstract Selectors newSelectors() throws InvalidRequestException;
 
     /**
      * @return the list of CQL3 columns value this SelectionClause needs.
@@ -223,7 +223,7 @@ public abstract class Selection
         return columns;
     }
 
-    public ResultSetBuilder resultSetBuilder(long now)
+    public ResultSetBuilder resultSetBuilder(long now) throws InvalidRequestException
     {
         return new ResultSetBuilder(now);
     }
@@ -273,7 +273,7 @@ public abstract class Selection
         final int[] ttls;
         final long now;
 
-        private ResultSetBuilder(long now)
+        private ResultSetBuilder(long now) throws InvalidRequestException
         {
             this.resultSet = new ResultSet(getResultMetadata().copy(), new ArrayList<List<ByteBuffer>>());
             this.selectors = newSelectors();
@@ -468,7 +468,7 @@ public abstract class Selection
             return factories.containsOnlyAggregateFunctions();
         }
 
-        protected Selectors newSelectors()
+        protected Selectors newSelectors() throws InvalidRequestException
         {
             return new Selectors()
             {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2f35c76/src/java/org/apache/cassandra/cql3/selection/Selector.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/selection/Selector.java b/src/java/org/apache/cassandra/cql3/selection/Selector.java
index 0c1933f..3ed773b 100644
--- a/src/java/org/apache/cassandra/cql3/selection/Selector.java
+++ b/src/java/org/apache/cassandra/cql3/selection/Selector.java
@@ -65,7 +65,7 @@ public abstract class Selector implements AssignmentTestable
          *
          * @return a new <code>Selector</code> instance
          */
-        public abstract Selector newInstance();
+        public abstract Selector newInstance() throws InvalidRequestException;
 
         /**
          * Checks if this factory creates selectors instances that creates aggregates.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2f35c76/src/java/org/apache/cassandra/cql3/selection/SelectorFactories.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/selection/SelectorFactories.java b/src/java/org/apache/cassandra/cql3/selection/SelectorFactories.java
index 9f6025c..3afd1ec 100644
--- a/src/java/org/apache/cassandra/cql3/selection/SelectorFactories.java
+++ b/src/java/org/apache/cassandra/cql3/selection/SelectorFactories.java
@@ -155,7 +155,7 @@ final class SelectorFactories implements Iterable<Selector.Factory>
      * Creates a list of new <code>Selector</code> instances.
      * @return a list of new <code>Selector</code> instances.
      */
-    public List<Selector> newInstances()
+    public List<Selector> newInstances() throws InvalidRequestException
     {
         List<Selector> selectors = new ArrayList<>(factories.size());
         for (Selector.Factory factory : factories)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2f35c76/src/java/org/apache/cassandra/cql3/statements/CreateAggregateStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/CreateAggregateStatement.java b/src/java/org/apache/cassandra/cql3/statements/CreateAggregateStatement.java
new file mode 100644
index 0000000..9816e58
--- /dev/null
+++ b/src/java/org/apache/cassandra/cql3/statements/CreateAggregateStatement.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.cql3.statements;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.cassandra.auth.Permission;
+import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.cql3.CQL3Type;
+import org.apache.cassandra.cql3.ColumnIdentifier;
+import org.apache.cassandra.cql3.ColumnSpecification;
+import org.apache.cassandra.cql3.QueryOptions;
+import org.apache.cassandra.cql3.Term;
+import org.apache.cassandra.cql3.functions.*;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.exceptions.InvalidRequestException;
+import org.apache.cassandra.exceptions.RequestValidationException;
+import org.apache.cassandra.exceptions.UnauthorizedException;
+import org.apache.cassandra.service.ClientState;
+import org.apache.cassandra.service.MigrationManager;
+import org.apache.cassandra.thrift.ThriftValidation;
+import org.apache.cassandra.transport.Event;
+
+/**
+ * A <code>CREATE AGGREGATE</code> statement parsed from a CQL query.
+ */
+public final class CreateAggregateStatement extends SchemaAlteringStatement
+{
+    private final boolean orReplace;
+    private final boolean ifNotExists;
+    private FunctionName functionName;
+    private String stateFunc;
+    private String finalFunc;
+    private final CQL3Type.Raw stateTypeRaw;
+
+    private final List<CQL3Type.Raw> argRawTypes;
+    private final Term.Raw ival;
+
+    public CreateAggregateStatement(FunctionName functionName,
+                                    List<CQL3Type.Raw> argRawTypes,
+                                    String stateFunc,
+                                    CQL3Type.Raw stateType,
+                                    String finalFunc,
+                                    Term.Raw ival,
+                                    boolean orReplace,
+                                    boolean ifNotExists)
+    {
+        this.functionName = functionName;
+        this.argRawTypes = argRawTypes;
+        this.stateFunc = stateFunc;
+        this.finalFunc = finalFunc;
+        this.stateTypeRaw = stateType;
+        this.ival = ival;
+        this.orReplace = orReplace;
+        this.ifNotExists = ifNotExists;
+    }
+
+    public void prepareKeyspace(ClientState state) throws InvalidRequestException
+    {
+        if (!functionName.hasKeyspace() && state.getRawKeyspace() != null)
+            functionName = new FunctionName(state.getKeyspace(), functionName.name);
+
+        if (!functionName.hasKeyspace())
+            throw new InvalidRequestException("Functions must be fully qualified with a keyspace name if a keyspace is not set for the session");
+
+        ThriftValidation.validateKeyspaceNotSystem(functionName.keyspace);
+    }
+
+    public void checkAccess(ClientState state) throws UnauthorizedException, InvalidRequestException
+    {
+        // TODO CASSANDRA-7557 (function DDL permission)
+
+        state.hasKeyspaceAccess(functionName.keyspace, Permission.CREATE);
+    }
+
+    public void validate(ClientState state) throws InvalidRequestException
+    {
+        if (ifNotExists && orReplace)
+            throw new InvalidRequestException("Cannot use both 'OR REPLACE' and 'IF NOT EXISTS' directives");
+
+        if (Schema.instance.getKSMetaData(functionName.keyspace) == null)
+            throw new InvalidRequestException(String.format("Cannot add aggregate '%s' to non existing keyspace '%s'.", functionName.name, functionName.keyspace));
+    }
+
+    public Event.SchemaChange changeEvent()
+    {
+        return null;
+    }
+
+    public boolean announceMigration(boolean isLocalOnly) throws RequestValidationException
+    {
+        List<AbstractType<?>> argTypes = new ArrayList<>(argRawTypes.size());
+        for (CQL3Type.Raw rawType : argRawTypes)
+            argTypes.add(rawType.prepare(functionName.keyspace).getType());
+
+        FunctionName stateFuncName = new FunctionName(functionName.keyspace, stateFunc);
+        FunctionName finalFuncName;
+
+        ScalarFunction fFinal = null;
+        AbstractType<?> stateType = stateTypeRaw.prepare(functionName.keyspace).getType();
+        Function f = Functions.find(stateFuncName, stateArguments(stateType, argTypes));
+        if (!(f instanceof ScalarFunction))
+            throw new InvalidRequestException("State function " + stateFuncSig(stateFuncName, stateTypeRaw, argRawTypes) + " does not exist or is not a scalar function");
+        ScalarFunction fState = (ScalarFunction)f;
+
+        AbstractType<?> returnType;
+        if (finalFunc != null)
+        {
+            finalFuncName = new FunctionName(functionName.keyspace, finalFunc);
+            f = Functions.find(finalFuncName, Collections.<AbstractType<?>>singletonList(stateType));
+            if (!(f instanceof ScalarFunction))
+                throw new InvalidRequestException("Final function " + finalFuncName + "(" + stateTypeRaw + ") does not exist");
+            fFinal = (ScalarFunction) f;
+            returnType = fFinal.returnType();
+        }
+        else
+        {
+            returnType = fState.returnType();
+            if (!returnType.equals(stateType))
+                throw new InvalidRequestException("State function " + stateFuncSig(stateFuncName, stateTypeRaw, argRawTypes) + " return type must be the same as the first argument type (if no final function is used)");
+        }
+
+        Function old = Functions.find(functionName, argTypes);
+        if (old != null)
+        {
+            if (ifNotExists)
+                return false;
+            if (!orReplace)
+                throw new InvalidRequestException(String.format("Function %s already exists", old));
+            if (!(old instanceof AggregateFunction))
+                throw new InvalidRequestException(String.format("Aggregate %s can only replace an aggregate", old));
+
+            // Means we're replacing the function. We still need to validate that 1) it's not a native function and 2) that the return type
+            // matches (or that could break existing code badly)
+            if (old.isNative())
+                throw new InvalidRequestException(String.format("Cannot replace native aggregate %s", old));
+            if (!old.returnType().isValueCompatibleWith(returnType))
+                throw new InvalidRequestException(String.format("Cannot replace aggregate %s, the new return type %s is not compatible with the return type %s of existing function",
+                                                                functionName, returnType.asCQL3Type(), old.returnType().asCQL3Type()));
+        }
+
+        ByteBuffer initcond = null;
+        if (ival != null)
+        {
+            ColumnSpecification receiver = new ColumnSpecification(functionName.keyspace, "--dummy--", new ColumnIdentifier("(aggregate_initcond)", true), stateType);
+            initcond = ival.prepare(functionName.keyspace, receiver).bindAndGet(QueryOptions.DEFAULT);
+        }
+
+        UDAggregate udAggregate = new UDAggregate(functionName, argTypes, returnType,
+                                                  fState,
+                                                  fFinal,
+                                                  initcond);
+
+        MigrationManager.announceNewAggregate(udAggregate, isLocalOnly);
+
+        return true;
+    }
+
+    private String stateFuncSig(FunctionName stateFuncName, CQL3Type.Raw stateTypeRaw, List<CQL3Type.Raw> argRawTypes)
+    {
+        StringBuilder sb = new StringBuilder();
+        sb.append(stateFuncName.toString()).append('(').append(stateTypeRaw);
+        for (CQL3Type.Raw argRawType : argRawTypes)
+            sb.append(", ").append(argRawType);
+        sb.append(')');
+        return sb.toString();
+    }
+
+    private List<AbstractType<?>> stateArguments(AbstractType<?> stateType, List<AbstractType<?>> argTypes)
+    {
+        List<AbstractType<?>> r = new ArrayList<>(argTypes.size() + 1);
+        r.add(stateType);
+        r.addAll(argTypes);
+        return r;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2f35c76/src/java/org/apache/cassandra/cql3/statements/CreateFunctionStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/CreateFunctionStatement.java b/src/java/org/apache/cassandra/cql3/statements/CreateFunctionStatement.java
index 8d8c27a..dbdecf9 100644
--- a/src/java/org/apache/cassandra/cql3/statements/CreateFunctionStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/CreateFunctionStatement.java
@@ -50,7 +50,6 @@ public final class CreateFunctionStatement extends SchemaAlteringStatement
     private final List<ColumnIdentifier> argNames;
     private final List<CQL3Type.Raw> argRawTypes;
     private final CQL3Type.Raw rawReturnType;
-    private String currentKeyspace;
 
     public CreateFunctionStatement(FunctionName functionName,
                                    String language,
@@ -75,13 +74,11 @@ public final class CreateFunctionStatement extends SchemaAlteringStatement
 
     public void prepareKeyspace(ClientState state) throws InvalidRequestException
     {
-        currentKeyspace = state.getRawKeyspace();
-
-        if (!functionName.hasKeyspace() && currentKeyspace != null)
-            functionName = new FunctionName(currentKeyspace, functionName.name);
+        if (!functionName.hasKeyspace() && state.getRawKeyspace() != null)
+            functionName = new FunctionName(state.getRawKeyspace(), functionName.name);
 
         if (!functionName.hasKeyspace())
-            throw new InvalidRequestException("You need to be logged in a keyspace or use a fully qualified function name");
+            throw new InvalidRequestException("Functions must be fully qualified with a keyspace name if a keyspace is not set for the session");
 
         ThriftValidation.validateKeyspaceNotSystem(functionName.keyspace);
     }
@@ -126,6 +123,8 @@ public final class CreateFunctionStatement extends SchemaAlteringStatement
                 return false;
             if (!orReplace)
                 throw new InvalidRequestException(String.format("Function %s already exists", old));
+            if (!(old instanceof ScalarFunction))
+                throw new InvalidRequestException(String.format("Function %s can only replace a function", old));
 
             if (!Functions.typeEquals(old.returnType(), returnType))
                 throw new InvalidRequestException(String.format("Cannot replace function %s, the new return type %s is not compatible with the return type %s of existing function",