You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ty...@apache.org on 2014/12/30 19:26:18 UTC

cassandra git commit: Schema change events/results for UDFs and aggregates

Repository: cassandra
Updated Branches:
  refs/heads/trunk cfee3da90 -> dcc3bb054


Schema change events/results for UDFs and aggregates

Patch by Robert Stupp; reviewed by Tyler Hobbs for CASSANDRA-7708


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/dcc3bb05
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/dcc3bb05
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/dcc3bb05

Branch: refs/heads/trunk
Commit: dcc3bb054167eb5f408cea79935855780fd56285
Parents: cfee3da
Author: Robert Stupp <sn...@snazy.de>
Authored: Tue Dec 30 12:25:17 2014 -0600
Committer: Tyler Hobbs <ty...@datastax.com>
Committed: Tue Dec 30 12:25:17 2014 -0600

----------------------------------------------------------------------
 CHANGES.txt                                     |  7 +-
 doc/native_protocol_v4.spec                     | 33 ++++---
 src/java/org/apache/cassandra/auth/Auth.java    | 58 +-----------
 .../apache/cassandra/cql3/QueryProcessor.java   | 24 ++---
 .../cassandra/cql3/functions/Functions.java     | 20 +---
 .../cassandra/cql3/functions/UDHelper.java      | 23 +----
 .../statements/CreateAggregateStatement.java    | 14 ++-
 .../statements/CreateFunctionStatement.java     | 13 ++-
 .../cql3/statements/DropAggregateStatement.java |  8 +-
 .../cql3/statements/DropFunctionStatement.java  |  8 +-
 .../cassandra/db/marshal/AbstractType.java      |  9 ++
 .../cassandra/schema/LegacySchemaTables.java    | 43 +++++++--
 .../cassandra/service/IMigrationListener.java   | 40 --------
 .../cassandra/service/MigrationListener.java    | 85 +++++++++++++++++
 .../cassandra/service/MigrationManager.java     | 49 +++++-----
 .../org/apache/cassandra/transport/Event.java   | 96 +++++++++++++++++---
 .../org/apache/cassandra/transport/Server.java  | 28 ++++--
 .../apache/cassandra/cql3/AggregationTest.java  | 62 ++++++++++++-
 .../org/apache/cassandra/cql3/CQLTester.java    | 43 +++++++--
 test/unit/org/apache/cassandra/cql3/UFTest.java | 46 +++++++++-
 .../cassandra/transport/SerDeserTest.java       | 14 +++
 21 files changed, 483 insertions(+), 240 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/dcc3bb05/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 1468693..ac63fb3 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -10,7 +10,8 @@
  * Fix aggregate fn results on empty selection, result column name,
    and cqlsh parsing (CASSANDRA-8229)
  * Mark sstables as repaired after full repair (CASSANDRA-7586)
- * Extend Descriptor to include a format value and refactor reader/writer apis (CASSANDRA-7443)
+ * Extend Descriptor to include a format value and refactor reader/writer
+   APIs (CASSANDRA-7443)
  * Integrate JMH for microbenchmarks (CASSANDRA-8151)
  * Keep sstable levels when bootstrapping (CASSANDRA-7460)
  * Add Sigar library and perform basic OS settings check on startup (CASSANDRA-7838)
@@ -22,8 +23,8 @@
  * Improve compaction logging (CASSANDRA-7818)
  * Remove YamlFileNetworkTopologySnitch (CASSANDRA-7917)
  * Do anticompaction in groups (CASSANDRA-6851)
- * Support pure user-defined functions (CASSANDRA-7395, 7526, 7562, 7740, 7781, 7929,
-   7924, 7812, 8063, 7813)
+ * Support user-defined functions (CASSANDRA-7395, 7526, 7562, 7740, 7781, 7929,
+   7924, 7812, 8063, 7813, 7708)
  * Permit configurable timestamps with cassandra-stress (CASSANDRA-7416)
  * Move sstable RandomAccessReader to nio2, which allows using the
    FILE_SHARE_DELETE flag on Windows (CASSANDRA-4050)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dcc3bb05/doc/native_protocol_v4.spec
----------------------------------------------------------------------
diff --git a/doc/native_protocol_v4.spec b/doc/native_protocol_v4.spec
index 02aac3b..3764e91 100644
--- a/doc/native_protocol_v4.spec
+++ b/doc/native_protocol_v4.spec
@@ -669,18 +669,25 @@ Table of Contents
       the rest of the message will be <change_type><target><options> where:
         - <change_type> is a [string] representing the type of changed involved.
           It will be one of "CREATED", "UPDATED" or "DROPPED".
-        - <target> is a [string] that can be one of "KEYSPACE", "TABLE" or "TYPE"
-          and describes what has been modified ("TYPE" stands for modifications
-          related to user types).
-        - <options> depends on the preceding <target>. If <target> is
-          "KEYSPACE", then <options> will be a single [string] representing the
-          keyspace changed. Otherwise, if <target> is "TABLE" or "TYPE", then
-          <options> will be 2 [string]: the first one will be the keyspace
-          containing the affected object, and the second one will be the name
-          of said affected object (so either the table name or the user type
-          name).
-
-  All EVENT message have a streamId of -1 (Section 2.3).
+        - <target> is a [string] that can be one of "KEYSPACE", "TABLE", "TYPE",
+          "FUNCTION" or "AGGREGATE" and describes what has been modified
+          ("TYPE" stands for modifications related to user types, "FUNCTION"
+          for modifications related to user defined functions, "AGGREGATE"
+          for modifications related to user defined aggregates).
+        - <options> depends on the preceding <target>:
+          - If <target> is "KEYSPACE", then <options> will be a single [string]
+            representing the keyspace changed.
+          - If <target> is "TABLE" or "TYPE", then
+            <options> will be 2 [string]: the first one will be the keyspace
+            containing the affected object, and the second one will be the name
+            of said affected object (either the table, user type, function, or
+            aggregate name).
+          - If <target> is "FUNCTION" or "AGGREGATE", multiple arguments follow:
+            - [string] keyspace containing the user defined function / aggregate
+            - [string] the function/aggregate name
+            - [string list] one string for each argument type (as CQL type)
+
+  All EVENT messages have a streamId of -1 (Section 2.3).
 
   Please note that "NEW_NODE" and "UP" events are sent based on internal Gossip
   communication and as such may be sent a short delay before the binary
@@ -896,4 +903,6 @@ Table of Contents
 
 10. Changes from v3
 
+  * The format of "SCHEMA_CHANGE" events (Section 4.2.6) (and implicitly "Schema_change" results (Section 4.2.5.5))
+    has been modified, and now includes changes related to user defined functions and user defined aggregates.
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dcc3bb05/src/java/org/apache/cassandra/auth/Auth.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/auth/Auth.java b/src/java/org/apache/cassandra/auth/Auth.java
index cdcfa0e..0c3b0fe 100644
--- a/src/java/org/apache/cassandra/auth/Auth.java
+++ b/src/java/org/apache/cassandra/auth/Auth.java
@@ -185,7 +185,7 @@ public class Auth implements AuthMBean
         DatabaseDescriptor.getAuthorizer().setup();
 
         // register a custom MigrationListener for permissions cleanup after dropped keyspaces/cfs.
-        MigrationManager.instance.register(new MigrationListener());
+        MigrationManager.instance.register(new AuthMigrationListener());
 
         // the delay is here to give the node some time to see its peers - to reduce
         // "Skipped default superuser setup: some nodes were not ready" log spam.
@@ -318,9 +318,9 @@ public class Auth implements AuthMBean
     }
 
     /**
-     * IMigrationListener implementation that cleans up permissions on dropped resources.
+     * MigrationListener implementation that cleans up permissions on dropped resources.
      */
-    public static class MigrationListener implements IMigrationListener
+    public static class AuthMigrationListener extends MigrationListener
     {
         public void onDropKeyspace(String ksName)
         {
@@ -331,57 +331,5 @@ public class Auth implements AuthMBean
         {
             DatabaseDescriptor.getAuthorizer().revokeAll(DataResource.columnFamily(ksName, cfName));
         }
-
-        public void onDropUserType(String ksName, String userType)
-        {
-        }
-
-        public void onDropFunction(String ksName, String functionName)
-        {
-        }
-
-        public void onDropAggregate(String ksName, String aggregateName)
-        {
-        }
-
-        public void onCreateKeyspace(String ksName)
-        {
-        }
-
-        public void onCreateColumnFamily(String ksName, String cfName)
-        {
-        }
-
-        public void onCreateUserType(String ksName, String userType)
-        {
-        }
-
-        public void onCreateFunction(String ksName, String functionName)
-        {
-        }
-
-        public void onCreateAggregate(String ksName, String aggregateName)
-        {
-        }
-
-        public void onUpdateKeyspace(String ksName)
-        {
-        }
-
-        public void onUpdateColumnFamily(String ksName, String cfName)
-        {
-        }
-
-        public void onUpdateUserType(String ksName, String userType)
-        {
-        }
-
-        public void onUpdateFunction(String ksName, String functionName)
-        {
-        }
-
-        public void onUpdateAggregate(String ksName, String aggregateName)
-        {
-        }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dcc3bb05/src/java/org/apache/cassandra/cql3/QueryProcessor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/QueryProcessor.java b/src/java/org/apache/cassandra/cql3/QueryProcessor.java
index ae09972..8531d32 100644
--- a/src/java/org/apache/cassandra/cql3/QueryProcessor.java
+++ b/src/java/org/apache/cassandra/cql3/QueryProcessor.java
@@ -31,6 +31,7 @@ import com.googlecode.concurrentlinkedhashmap.EntryWeigher;
 import com.googlecode.concurrentlinkedhashmap.EvictionListener;
 
 import org.antlr.runtime.*;
+import org.apache.cassandra.service.MigrationListener;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -50,7 +51,6 @@ import org.apache.cassandra.exceptions.RequestValidationException;
 import org.apache.cassandra.exceptions.SyntaxException;
 import org.apache.cassandra.metrics.CQLMetrics;
 import org.apache.cassandra.service.ClientState;
-import org.apache.cassandra.service.IMigrationListener;
 import org.apache.cassandra.service.MigrationManager;
 import org.apache.cassandra.service.QueryState;
 import org.apache.cassandra.service.pager.QueryPager;
@@ -560,7 +560,7 @@ public class QueryProcessor implements QueryHandler
         return meter.measureDeep(key);
     }
 
-    private static class MigrationSubscriber implements IMigrationListener
+    private static class MigrationSubscriber extends MigrationListener
     {
         private void removeInvalidPreparedStatements(String ksName, String cfName)
         {
@@ -602,10 +602,7 @@ public class QueryProcessor implements QueryHandler
             return ksName.equals(statementKsName) && (cfName == null || cfName.equals(statementCfName));
         }
 
-        public void onCreateKeyspace(String ksName) { }
-        public void onCreateColumnFamily(String ksName, String cfName) { }
-        public void onCreateUserType(String ksName, String typeName) { }
-        public void onCreateFunction(String ksName, String functionName) {
+        public void onCreateFunction(String ksName, String functionName, List<AbstractType<?>> argTypes) {
             if (Functions.getOverloadCount(new FunctionName(ksName, functionName)) > 1)
             {
                 // in case there are other overloads, we have to remove all overloads since argument type
@@ -614,7 +611,7 @@ public class QueryProcessor implements QueryHandler
                 removeInvalidPreparedStatementsForFunction(thriftPreparedStatements.values().iterator(), ksName, functionName);
             }
         }
-        public void onCreateAggregate(String ksName, String aggregateName) {
+        public void onCreateAggregate(String ksName, String aggregateName, List<AbstractType<?>> argTypes) {
             if (Functions.getOverloadCount(new FunctionName(ksName, aggregateName)) > 1)
             {
                 // in case there are other overloads, we have to remove all overloads since argument type
@@ -624,12 +621,6 @@ public class QueryProcessor implements QueryHandler
             }
         }
 
-        public void onUpdateKeyspace(String ksName) { }
-        public void onUpdateColumnFamily(String ksName, String cfName) { }
-        public void onUpdateUserType(String ksName, String typeName) { }
-        public void onUpdateFunction(String ksName, String functionName) { }
-        public void onUpdateAggregate(String ksName, String aggregateName) { }
-
         public void onDropKeyspace(String ksName)
         {
             removeInvalidPreparedStatements(ksName, null);
@@ -640,18 +631,17 @@ public class QueryProcessor implements QueryHandler
             removeInvalidPreparedStatements(ksName, cfName);
         }
 
-        public void onDropUserType(String ksName, String typeName) { }
-        public void onDropFunction(String ksName, String functionName) {
+        public void onDropFunction(String ksName, String functionName, List<AbstractType<?>> argTypes) {
             removeInvalidPreparedStatementsForFunction(preparedStatements.values().iterator(), ksName, functionName);
             removeInvalidPreparedStatementsForFunction(thriftPreparedStatements.values().iterator(), ksName, functionName);
         }
-        public void onDropAggregate(String ksName, String aggregateName)
+        public void onDropAggregate(String ksName, String aggregateName, List<AbstractType<?>> argTypes)
         {
             removeInvalidPreparedStatementsForFunction(preparedStatements.values().iterator(), ksName, aggregateName);
             removeInvalidPreparedStatementsForFunction(thriftPreparedStatements.values().iterator(), ksName, aggregateName);
         }
 
-        private void removeInvalidPreparedStatementsForFunction(Iterator<ParsedStatement.Prepared> iterator,
+        private static void removeInvalidPreparedStatementsForFunction(Iterator<ParsedStatement.Prepared> iterator,
                                                                 String ksName, String functionName)
         {
             while (iterator.hasNext())

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dcc3bb05/src/java/org/apache/cassandra/cql3/functions/Functions.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/functions/Functions.java b/src/java/org/apache/cassandra/cql3/functions/Functions.java
index b55ebc5..09e360b 100644
--- a/src/java/org/apache/cassandra/cql3/functions/Functions.java
+++ b/src/java/org/apache/cassandra/cql3/functions/Functions.java
@@ -27,7 +27,7 @@ import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.cql3.*;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.exceptions.InvalidRequestException;
-import org.apache.cassandra.service.IMigrationListener;
+import org.apache.cassandra.service.MigrationListener;
 import org.apache.cassandra.service.MigrationManager;
 
 public abstract class Functions
@@ -302,28 +302,12 @@ public abstract class Functions
         return true;
     }
 
-    private static class FunctionsMigrationListener implements IMigrationListener
+    private static class FunctionsMigrationListener extends MigrationListener
     {
-        public void onCreateKeyspace(String ksName) { }
-        public void onCreateColumnFamily(String ksName, String cfName) { }
-        public void onCreateUserType(String ksName, String typeName) { }
-        public void onCreateFunction(String ksName, String functionName) { }
-        public void onCreateAggregate(String ksName, String aggregateName) { }
-
-        public void onUpdateKeyspace(String ksName) { }
-        public void onUpdateColumnFamily(String ksName, String cfName) { }
         public void onUpdateUserType(String ksName, String typeName) {
             for (Function function : all())
                 if (function instanceof UDFunction)
                     ((UDFunction)function).userTypeUpdated(ksName, typeName);
         }
-        public void onUpdateFunction(String ksName, String functionName) { }
-        public void onUpdateAggregate(String ksName, String aggregateName) { }
-
-        public void onDropKeyspace(String ksName) { }
-        public void onDropColumnFamily(String ksName, String cfName) { }
-        public void onDropUserType(String ksName, String typeName) { }
-        public void onDropFunction(String ksName, String functionName) { }
-        public void onDropAggregate(String ksName, String aggregateName) { }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dcc3bb05/src/java/org/apache/cassandra/cql3/functions/UDHelper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/functions/UDHelper.java b/src/java/org/apache/cassandra/cql3/functions/UDHelper.java
index 0738cbe..f4b3809 100644
--- a/src/java/org/apache/cassandra/cql3/functions/UDHelper.java
+++ b/src/java/org/apache/cassandra/cql3/functions/UDHelper.java
@@ -21,8 +21,6 @@ import java.lang.invoke.MethodHandle;
 import java.lang.invoke.MethodHandles;
 import java.lang.reflect.Method;
 import java.nio.ByteBuffer;
-import java.nio.charset.StandardCharsets;
-import java.security.MessageDigest;
 import java.util.*;
 
 import org.slf4j.Logger;
@@ -30,9 +28,7 @@ import org.slf4j.LoggerFactory;
 
 import com.datastax.driver.core.DataType;
 import org.apache.cassandra.cql3.*;
-import org.apache.cassandra.db.marshal.AbstractType;
-import org.apache.cassandra.db.marshal.UTF8Type;
-import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.db.marshal.*;
 
 /**
  * Helper class for User Defined Functions + Aggregates.
@@ -66,7 +62,7 @@ public final class UDHelper
      */
     public static Class<?>[] javaTypes(DataType[] dataTypes)
     {
-        Class<?> paramTypes[] = new Class[dataTypes.length];
+        Class<?>[] paramTypes = new Class[dataTypes.length];
         for (int i = 0; i < paramTypes.length; i++)
             paramTypes[i] = dataTypes[i].asJavaClass();
         return paramTypes;
@@ -107,19 +103,4 @@ public final class UDHelper
             throw new RuntimeException("cannot parse driver type " + cqlType.getType().toString(), e);
         }
     }
-
-    // We allow method overloads, so a function is not uniquely identified by its name only, but
-    // also by its argument types. To distinguish overloads of given function name in the schema
-    // we use a "signature" which is just a SHA-1 of it's argument types (we could replace that by
-    // using a "signature" UDT that would be comprised of the function name and argument types,
-    // which we could then use as clustering column. But as we haven't yet used UDT in system tables,
-    // We'll leave that decision to #6717).
-    public static ByteBuffer calculateSignature(AbstractFunction fun)
-    {
-        MessageDigest digest = FBUtilities.newMessageDigest("SHA-1");
-        digest.update(UTF8Type.instance.decompose(fun.name().name));
-        for (AbstractType<?> type : fun.argTypes())
-            digest.update(UTF8Type.instance.decompose(type.asCQL3Type().toString()));
-        return ByteBuffer.wrap(digest.digest());
-    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dcc3bb05/src/java/org/apache/cassandra/cql3/statements/CreateAggregateStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/CreateAggregateStatement.java b/src/java/org/apache/cassandra/cql3/statements/CreateAggregateStatement.java
index 9816e58..e135ffe 100644
--- a/src/java/org/apache/cassandra/cql3/statements/CreateAggregateStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/CreateAggregateStatement.java
@@ -47,13 +47,16 @@ public final class CreateAggregateStatement extends SchemaAlteringStatement
     private final boolean orReplace;
     private final boolean ifNotExists;
     private FunctionName functionName;
-    private String stateFunc;
-    private String finalFunc;
+    private final String stateFunc;
+    private final String finalFunc;
     private final CQL3Type.Raw stateTypeRaw;
 
     private final List<CQL3Type.Raw> argRawTypes;
     private final Term.Raw ival;
 
+    private UDAggregate udAggregate;
+    private boolean replaced;
+
     public CreateAggregateStatement(FunctionName functionName,
                                     List<CQL3Type.Raw> argRawTypes,
                                     String stateFunc,
@@ -102,7 +105,9 @@ public final class CreateAggregateStatement extends SchemaAlteringStatement
 
     public Event.SchemaChange changeEvent()
     {
-        return null;
+        return new Event.SchemaChange(replaced ? Event.SchemaChange.Change.UPDATED : Event.SchemaChange.Change.CREATED,
+                                      Event.SchemaChange.Target.AGGREGATE,
+                                      udAggregate.name().keyspace, udAggregate.name().name, AbstractType.asCQLTypeStringList(udAggregate.argTypes()));
     }
 
     public boolean announceMigration(boolean isLocalOnly) throws RequestValidationException
@@ -164,10 +169,11 @@ public final class CreateAggregateStatement extends SchemaAlteringStatement
             initcond = ival.prepare(functionName.keyspace, receiver).bindAndGet(QueryOptions.DEFAULT);
         }
 
-        UDAggregate udAggregate = new UDAggregate(functionName, argTypes, returnType,
+        udAggregate = new UDAggregate(functionName, argTypes, returnType,
                                                   fState,
                                                   fFinal,
                                                   initcond);
+        replaced = old != null;
 
         MigrationManager.announceNewAggregate(udAggregate, isLocalOnly);
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dcc3bb05/src/java/org/apache/cassandra/cql3/statements/CreateFunctionStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/CreateFunctionStatement.java b/src/java/org/apache/cassandra/cql3/statements/CreateFunctionStatement.java
index dbdecf9..c49f80c 100644
--- a/src/java/org/apache/cassandra/cql3/statements/CreateFunctionStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/CreateFunctionStatement.java
@@ -51,6 +51,9 @@ public final class CreateFunctionStatement extends SchemaAlteringStatement
     private final List<CQL3Type.Raw> argRawTypes;
     private final CQL3Type.Raw rawReturnType;
 
+    private UDFunction udFunction;
+    private boolean replaced;
+
     public CreateFunctionStatement(FunctionName functionName,
                                    String language,
                                    String body,
@@ -101,7 +104,9 @@ public final class CreateFunctionStatement extends SchemaAlteringStatement
 
     public Event.SchemaChange changeEvent()
     {
-        return null;
+        return new Event.SchemaChange(replaced ? Event.SchemaChange.Change.UPDATED : Event.SchemaChange.Change.CREATED,
+                                      Event.SchemaChange.Target.FUNCTION,
+                                      udFunction.name().keyspace, udFunction.name().name, AbstractType.asCQLTypeStringList(udFunction.argTypes()));
     }
 
     public boolean announceMigration(boolean isLocalOnly) throws RequestValidationException
@@ -131,7 +136,11 @@ public final class CreateFunctionStatement extends SchemaAlteringStatement
                                                                 functionName, returnType.asCQL3Type(), old.returnType().asCQL3Type()));
         }
 
-        MigrationManager.announceNewFunction(UDFunction.create(functionName, argNames, argTypes, returnType, language, body, deterministic), isLocalOnly);
+        this.udFunction = UDFunction.create(functionName, argNames, argTypes, returnType, language, body, deterministic);
+        this.replaced = old != null;
+
+        MigrationManager.announceNewFunction(udFunction, isLocalOnly);
+
         return true;
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dcc3bb05/src/java/org/apache/cassandra/cql3/statements/DropAggregateStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/DropAggregateStatement.java b/src/java/org/apache/cassandra/cql3/statements/DropAggregateStatement.java
index 118f89d..97ec196 100644
--- a/src/java/org/apache/cassandra/cql3/statements/DropAggregateStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/DropAggregateStatement.java
@@ -42,6 +42,8 @@ public final class DropAggregateStatement extends SchemaAlteringStatement
     private final List<CQL3Type.Raw> argRawTypes;
     private final boolean argsPresent;
 
+    private Function old;
+
     public DropAggregateStatement(FunctionName functionName,
                                   List<CQL3Type.Raw> argRawTypes,
                                   boolean argsPresent,
@@ -77,7 +79,8 @@ public final class DropAggregateStatement extends SchemaAlteringStatement
 
     public Event.SchemaChange changeEvent()
     {
-        return null;
+        return new Event.SchemaChange(Event.SchemaChange.Change.DROPPED, Event.SchemaChange.Target.AGGREGATE,
+                                      old.name().keyspace, old.name().name, AbstractType.asCQLTypeStringList(old.argTypes()));
     }
 
     public boolean announceMigration(boolean isLocalOnly) throws RequestValidationException
@@ -130,7 +133,10 @@ public final class DropAggregateStatement extends SchemaAlteringStatement
             throw new InvalidRequestException(String.format("Cannot drop aggregate '%s' because it is a " +
                                                             "native (built-in) function", functionName));
 
+        this.old = old;
+
         MigrationManager.announceAggregateDrop((UDAggregate)old, isLocalOnly);
+
         return true;
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dcc3bb05/src/java/org/apache/cassandra/cql3/statements/DropFunctionStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/DropFunctionStatement.java b/src/java/org/apache/cassandra/cql3/statements/DropFunctionStatement.java
index 394aca0..083db45 100644
--- a/src/java/org/apache/cassandra/cql3/statements/DropFunctionStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/DropFunctionStatement.java
@@ -42,6 +42,8 @@ public final class DropFunctionStatement extends SchemaAlteringStatement
     private final List<CQL3Type.Raw> argRawTypes;
     private final boolean argsPresent;
 
+    private Function old;
+
     public DropFunctionStatement(FunctionName functionName,
                                  List<CQL3Type.Raw> argRawTypes,
                                  boolean argsPresent,
@@ -81,7 +83,8 @@ public final class DropFunctionStatement extends SchemaAlteringStatement
     @Override
     public Event.SchemaChange changeEvent()
     {
-        return null;
+        return new Event.SchemaChange(Event.SchemaChange.Change.DROPPED, Event.SchemaChange.Target.FUNCTION,
+                                      old.name().keyspace, old.name().name, AbstractType.asCQLTypeStringList(old.argTypes()));
     }
 
     @Override
@@ -135,7 +138,10 @@ public final class DropFunctionStatement extends SchemaAlteringStatement
         if (!references.isEmpty())
             throw new InvalidRequestException(String.format("Function '%s' still referenced by %s", functionName, references));
 
+        this.old = old;
+
         MigrationManager.announceFunctionDrop((UDFunction) old, isLocalOnly);
+
         return true;
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dcc3bb05/src/java/org/apache/cassandra/db/marshal/AbstractType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/marshal/AbstractType.java b/src/java/org/apache/cassandra/db/marshal/AbstractType.java
index 85b6dc7..d3711df 100644
--- a/src/java/org/apache/cassandra/db/marshal/AbstractType.java
+++ b/src/java/org/apache/cassandra/db/marshal/AbstractType.java
@@ -18,6 +18,7 @@
 package org.apache.cassandra.db.marshal;
 
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
@@ -63,6 +64,14 @@ public abstract class AbstractType<T> implements Comparator<ByteBuffer>
         };
     }
 
+    public static List<String> asCQLTypeStringList(List<AbstractType<?>> abstractTypes)
+    {
+        List<String> r = new ArrayList<>(abstractTypes.size());
+        for (AbstractType<?> abstractType : abstractTypes)
+            r.add(abstractType.asCQL3Type().toString());
+        return r;
+    }
+
     public T compose(ByteBuffer bytes)
     {
         return getSerializer().deserialize(bytes);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dcc3bb05/src/java/org/apache/cassandra/schema/LegacySchemaTables.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/LegacySchemaTables.java b/src/java/org/apache/cassandra/schema/LegacySchemaTables.java
index 047698c..4d06863 100644
--- a/src/java/org/apache/cassandra/schema/LegacySchemaTables.java
+++ b/src/java/org/apache/cassandra/schema/LegacySchemaTables.java
@@ -157,7 +157,7 @@ public class LegacySchemaTables
                 "CREATE TABLE %s ("
                 + "keyspace_name text,"
                 + "function_name text,"
-                + "signature blob,"
+                + "signature frozen<list<text>>,"
                 + "argument_names list<text>,"
                 + "argument_types list<text>,"
                 + "body text,"
@@ -172,7 +172,7 @@ public class LegacySchemaTables
                 "CREATE TABLE %s ("
                 + "keyspace_name text,"
                 + "aggregate_name text,"
-                + "signature blob,"
+                + "signature frozen<list<text>>,"
                 + "argument_types list<text>,"
                 + "final_func text,"
                 + "initcond blob,"
@@ -1293,7 +1293,7 @@ public class LegacySchemaTables
     private static void addFunctionToSchemaMutation(UDFunction function, long timestamp, Mutation mutation)
     {
         ColumnFamily cells = mutation.addOrGet(Functions);
-        Composite prefix = Functions.comparator.make(function.name().name, UDHelper.calculateSignature(function));
+        Composite prefix = Functions.comparator.make(function.name().name, functionSignatureWithTypes(function));
         CFRowAdder adder = new CFRowAdder(cells, prefix, timestamp);
 
         adder.resetCollection("argument_names");
@@ -1319,7 +1319,7 @@ public class LegacySchemaTables
         ColumnFamily cells = mutation.addOrGet(Functions);
         int ldt = (int) (System.currentTimeMillis() / 1000);
 
-        Composite prefix = Functions.comparator.make(function.name().name, UDHelper.calculateSignature(function));
+        Composite prefix = Functions.comparator.make(function.name().name, functionSignatureWithTypes(function));
         cells.addAtom(new RangeTombstone(prefix, prefix.end(), timestamp, ldt));
 
         return mutation;
@@ -1332,7 +1332,7 @@ public class LegacySchemaTables
         for (UntypedResultSet.Row row : QueryProcessor.resultify(query, partition))
         {
             UDFunction function = createFunctionFromFunctionRow(row);
-            functions.put(UDHelper.calculateSignature(function), function);
+            functions.put(functionSignatureWithNameAndTypes(function), function);
         }
         return functions;
     }
@@ -1385,7 +1385,7 @@ public class LegacySchemaTables
     private static void addAggregateToSchemaMutation(UDAggregate aggregate, long timestamp, Mutation mutation)
     {
         ColumnFamily cells = mutation.addOrGet(Aggregates);
-        Composite prefix = Aggregates.comparator.make(aggregate.name().name, UDHelper.calculateSignature(aggregate));
+        Composite prefix = Aggregates.comparator.make(aggregate.name().name, functionSignatureWithTypes(aggregate));
         CFRowAdder adder = new CFRowAdder(cells, prefix, timestamp);
 
         adder.resetCollection("argument_types");
@@ -1409,7 +1409,7 @@ public class LegacySchemaTables
         for (UntypedResultSet.Row row : QueryProcessor.resultify(query, partition))
         {
             UDAggregate aggregate = createAggregateFromAggregateRow(row);
-            aggregates.put(UDHelper.calculateSignature(aggregate), aggregate);
+            aggregates.put(functionSignatureWithNameAndTypes(aggregate), aggregate);
         }
         return aggregates;
     }
@@ -1459,7 +1459,7 @@ public class LegacySchemaTables
         ColumnFamily cells = mutation.addOrGet(Aggregates);
         int ldt = (int) (System.currentTimeMillis() / 1000);
 
-        Composite prefix = Aggregates.comparator.make(aggregate.name().name, UDHelper.calculateSignature(aggregate));
+        Composite prefix = Aggregates.comparator.make(aggregate.name().name, functionSignatureWithTypes(aggregate));
         cells.addAtom(new RangeTombstone(prefix, prefix.end(), timestamp, ldt));
 
         return mutation;
@@ -1477,4 +1477,31 @@ public class LegacySchemaTables
             throw new RuntimeException(e);
         }
     }
+
+    // We allow method overloads, so a function is not uniquely identified by its name only, but
+    // also by its argument types. To distinguish overloads of given function name in the schema
+    // we use a "signature" which is just a list of it's CQL argument types (we could replace that by
+    // using a "signature" UDT that would be comprised of the function name and argument types,
+    // which we could then use as clustering column. But as we haven't yet used UDT in system tables,
+    // We'll leave that decision to #6717).
+    public static ByteBuffer functionSignatureWithTypes(AbstractFunction fun)
+    {
+        ListType<String> list = ListType.getInstance(UTF8Type.instance, false);
+        List<String> strList = new ArrayList<>(fun.argTypes().size());
+        for (AbstractType<?> argType : fun.argTypes())
+            strList.add(argType.asCQL3Type().toString());
+        return list.decompose(strList);
+    }
+
+    public static ByteBuffer functionSignatureWithNameAndTypes(AbstractFunction fun)
+    {
+        ListType<String> list = ListType.getInstance(UTF8Type.instance, false);
+        List<String> strList = new ArrayList<>(fun.argTypes().size() + 2);
+        strList.add(fun.name().keyspace);
+        strList.add(fun.name().name);
+        for (AbstractType<?> argType : fun.argTypes())
+            strList.add(argType.asCQL3Type().toString());
+        return list.decompose(strList);
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dcc3bb05/src/java/org/apache/cassandra/service/IMigrationListener.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/IMigrationListener.java b/src/java/org/apache/cassandra/service/IMigrationListener.java
deleted file mode 100644
index faaffb9..0000000
--- a/src/java/org/apache/cassandra/service/IMigrationListener.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.service;
-
-public interface IMigrationListener
-{
-    public void onCreateKeyspace(String ksName);
-    public void onCreateColumnFamily(String ksName, String cfName);
-    public void onCreateUserType(String ksName, String typeName);
-    public void onCreateFunction(String ksName, String functionName);
-    public void onCreateAggregate(String ksName, String aggregateName);
-
-    public void onUpdateKeyspace(String ksName);
-    public void onUpdateColumnFamily(String ksName, String cfName);
-    public void onUpdateUserType(String ksName, String typeName);
-    public void onUpdateFunction(String ksName, String functionName);
-    public void onUpdateAggregate(String ksName, String aggregateName);
-
-    public void onDropKeyspace(String ksName);
-    public void onDropColumnFamily(String ksName, String cfName);
-    public void onDropUserType(String ksName, String typeName);
-    public void onDropFunction(String ksName, String functionName);
-    public void onDropAggregate(String ksName, String aggregateName);
-
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dcc3bb05/src/java/org/apache/cassandra/service/MigrationListener.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/MigrationListener.java b/src/java/org/apache/cassandra/service/MigrationListener.java
new file mode 100644
index 0000000..2b728d9
--- /dev/null
+++ b/src/java/org/apache/cassandra/service/MigrationListener.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.service;
+
+import java.util.List;
+
+import org.apache.cassandra.db.marshal.AbstractType;
+
+public abstract class MigrationListener
+{
+    public void onCreateKeyspace(String ksName)
+    {
+    }
+
+    public void onCreateColumnFamily(String ksName, String cfName)
+    {
+    }
+
+    public void onCreateUserType(String ksName, String typeName)
+    {
+    }
+
+    public void onCreateFunction(String ksName, String functionName, List<AbstractType<?>> argTypes)
+    {
+    }
+
+    public void onCreateAggregate(String ksName, String aggregateName, List<AbstractType<?>> argTypes)
+    {
+    }
+
+    public void onUpdateKeyspace(String ksName)
+    {
+    }
+
+    public void onUpdateColumnFamily(String ksName, String cfName)
+    {
+    }
+
+    public void onUpdateUserType(String ksName, String typeName)
+    {
+    }
+
+    public void onUpdateFunction(String ksName, String functionName, List<AbstractType<?>> argTypes)
+    {
+    }
+
+    public void onUpdateAggregate(String ksName, String aggregateName, List<AbstractType<?>> argTypes)
+    {
+    }
+
+    public void onDropKeyspace(String ksName)
+    {
+    }
+
+    public void onDropColumnFamily(String ksName, String cfName)
+    {
+    }
+
+    public void onDropUserType(String ksName, String typeName)
+    {
+    }
+
+    public void onDropFunction(String ksName, String functionName, List<AbstractType<?>> argTypes)
+    {
+    }
+
+    public void onDropAggregate(String ksName, String aggregateName, List<AbstractType<?>> argTypes)
+    {
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dcc3bb05/src/java/org/apache/cassandra/service/MigrationManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/MigrationManager.java b/src/java/org/apache/cassandra/service/MigrationManager.java
index fe32559..ef1adc6 100644
--- a/src/java/org/apache/cassandra/service/MigrationManager.java
+++ b/src/java/org/apache/cassandra/service/MigrationManager.java
@@ -63,16 +63,16 @@ public class MigrationManager
 
     public static final int MIGRATION_DELAY_IN_MS = 60000;
 
-    private final List<IMigrationListener> listeners = new CopyOnWriteArrayList<>();
+    private final List<MigrationListener> listeners = new CopyOnWriteArrayList<>();
     
     private MigrationManager() {}
 
-    public void register(IMigrationListener listener)
+    public void register(MigrationListener listener)
     {
         listeners.add(listener);
     }
 
-    public void unregister(IMigrationListener listener)
+    public void unregister(MigrationListener listener)
     {
         listeners.remove(listener);
     }
@@ -160,92 +160,93 @@ public class MigrationManager
 
     public void notifyCreateKeyspace(KSMetaData ksm)
     {
-        for (IMigrationListener listener : listeners)
+        for (MigrationListener listener : listeners)
             listener.onCreateKeyspace(ksm.name);
     }
 
     public void notifyCreateColumnFamily(CFMetaData cfm)
     {
-        for (IMigrationListener listener : listeners)
+        for (MigrationListener listener : listeners)
             listener.onCreateColumnFamily(cfm.ksName, cfm.cfName);
     }
 
     public void notifyCreateUserType(UserType ut)
     {
-        for (IMigrationListener listener : listeners)
+        for (MigrationListener listener : listeners)
             listener.onCreateUserType(ut.keyspace, ut.getNameAsString());
     }
 
     public void notifyCreateFunction(UDFunction udf)
     {
-        for (IMigrationListener listener : listeners)
-            listener.onCreateFunction(udf.name().keyspace, udf.name().name);
+        for (MigrationListener listener : listeners)
+            listener.onCreateFunction(udf.name().keyspace, udf.name().name, udf.argTypes());
     }
 
+
     public void notifyCreateAggregate(UDAggregate udf)
     {
-        for (IMigrationListener listener : listeners)
-            listener.onCreateAggregate(udf.name().keyspace, udf.name().name);
+        for (MigrationListener listener : listeners)
+            listener.onCreateAggregate(udf.name().keyspace, udf.name().name, udf.argTypes());
     }
 
     public void notifyUpdateKeyspace(KSMetaData ksm)
     {
-        for (IMigrationListener listener : listeners)
+        for (MigrationListener listener : listeners)
             listener.onUpdateKeyspace(ksm.name);
     }
 
     public void notifyUpdateColumnFamily(CFMetaData cfm)
     {
-        for (IMigrationListener listener : listeners)
+        for (MigrationListener listener : listeners)
             listener.onUpdateColumnFamily(cfm.ksName, cfm.cfName);
     }
 
     public void notifyUpdateUserType(UserType ut)
     {
-        for (IMigrationListener listener : listeners)
+        for (MigrationListener listener : listeners)
             listener.onUpdateUserType(ut.keyspace, ut.getNameAsString());
     }
 
     public void notifyUpdateFunction(UDFunction udf)
     {
-        for (IMigrationListener listener : listeners)
-            listener.onUpdateFunction(udf.name().keyspace, udf.name().name);
+        for (MigrationListener listener : listeners)
+            listener.onUpdateFunction(udf.name().keyspace, udf.name().name, udf.argTypes());
     }
 
     public void notifyUpdateAggregate(UDAggregate udf)
     {
-        for (IMigrationListener listener : listeners)
-            listener.onUpdateAggregate(udf.name().keyspace, udf.name().name);
+        for (MigrationListener listener : listeners)
+            listener.onUpdateAggregate(udf.name().keyspace, udf.name().name, udf.argTypes());
     }
 
     public void notifyDropKeyspace(KSMetaData ksm)
     {
-        for (IMigrationListener listener : listeners)
+        for (MigrationListener listener : listeners)
             listener.onDropKeyspace(ksm.name);
     }
 
     public void notifyDropColumnFamily(CFMetaData cfm)
     {
-        for (IMigrationListener listener : listeners)
+        for (MigrationListener listener : listeners)
             listener.onDropColumnFamily(cfm.ksName, cfm.cfName);
     }
 
     public void notifyDropUserType(UserType ut)
     {
-        for (IMigrationListener listener : listeners)
+        for (MigrationListener listener : listeners)
             listener.onDropUserType(ut.keyspace, ut.getNameAsString());
     }
 
     public void notifyDropFunction(UDFunction udf)
     {
-        for (IMigrationListener listener : listeners)
-            listener.onDropFunction(udf.name().keyspace, udf.name().name);
+        for (MigrationListener listener : listeners)
+            listener.onDropFunction(udf.name().keyspace, udf.name().name, udf.argTypes());
     }
 
     public void notifyDropAggregate(UDAggregate udf)
     {
-        for (IMigrationListener listener : listeners)
-            listener.onDropAggregate(udf.name().keyspace, udf.name().name);
+        for (MigrationListener listener : listeners)
+            listener.onDropAggregate(udf.name().keyspace, udf.name().name, udf.argTypes());
     }
 
     public static void announceNewKeyspace(KSMetaData ksm) throws ConfigurationException

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dcc3bb05/src/java/org/apache/cassandra/transport/Event.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/Event.java b/src/java/org/apache/cassandra/transport/Event.java
index 9962599..5e9c6b7 100644
--- a/src/java/org/apache/cassandra/transport/Event.java
+++ b/src/java/org/apache/cassandra/transport/Event.java
@@ -19,6 +19,8 @@ package org.apache.cassandra.transport;
 
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
+import java.util.Iterator;
+import java.util.List;
 
 import com.google.common.base.Objects;
 import io.netty.buffer.ByteBuf;
@@ -204,22 +206,29 @@ public abstract class Event
     public static class SchemaChange extends Event
     {
         public enum Change { CREATED, UPDATED, DROPPED }
-        public enum Target { KEYSPACE, TABLE, TYPE }
+        public enum Target { KEYSPACE, TABLE, TYPE, FUNCTION, AGGREGATE }
 
         public final Change change;
         public final Target target;
         public final String keyspace;
-        public final String tableOrTypeOrFunction;
+        public final String name;
+        public final List<String> argTypes;
 
-        public SchemaChange(Change change, Target target, String keyspace, String tableOrTypeOrFunction)
+        public SchemaChange(Change change, Target target, String keyspace, String name, List<String> argTypes)
         {
             super(Type.SCHEMA_CHANGE);
             this.change = change;
             this.target = target;
             this.keyspace = keyspace;
-            this.tableOrTypeOrFunction = tableOrTypeOrFunction;
+            this.name = name;
             if (target != Target.KEYSPACE)
-                assert this.tableOrTypeOrFunction != null : "Table or type should be set for non-keyspace schema change events";
+                assert this.name != null : "Table, type, function or aggregate name should be set for non-keyspace schema change events";
+            this.argTypes = argTypes;
+        }
+
+        public SchemaChange(Change change, Target target, String keyspace, String name)
+        {
+            this(change, target, keyspace, name, null);
         }
 
         public SchemaChange(Change change, String keyspace)
@@ -236,7 +245,11 @@ public abstract class Event
                 Target target = CBUtil.readEnumValue(Target.class, cb);
                 String keyspace = CBUtil.readString(cb);
                 String tableOrType = target == Target.KEYSPACE ? null : CBUtil.readString(cb);
-                return new SchemaChange(change, target, keyspace, tableOrType);
+                List<String> argTypes = null;
+                if (target == Target.FUNCTION || target == Target.AGGREGATE)
+                    argTypes = CBUtil.readStringList(cb);
+
+                return new SchemaChange(change, target, keyspace, tableOrType, argTypes);
             }
             else
             {
@@ -248,13 +261,36 @@ public abstract class Event
 
         public void serializeEvent(ByteBuf dest, int version)
         {
+            if (target == Target.FUNCTION || target == Target.AGGREGATE)
+            {
+                if (version >= 4)
+                {
+                    // available since protocol version 4
+                    CBUtil.writeEnumValue(change, dest);
+                    CBUtil.writeEnumValue(target, dest);
+                    CBUtil.writeString(keyspace, dest);
+                    CBUtil.writeString(name, dest);
+                    CBUtil.writeStringList(argTypes, dest);
+                }
+                else
+                {
+                    // not available in protocol versions < 4 - just say the keyspace was updated.
+                    CBUtil.writeEnumValue(Change.UPDATED, dest);
+                    if (version >= 3)
+                        CBUtil.writeEnumValue(Target.KEYSPACE, dest);
+                    CBUtil.writeString(keyspace, dest);
+                    CBUtil.writeString("", dest);
+                }
+                return;
+            }
+
             if (version >= 3)
             {
                 CBUtil.writeEnumValue(change, dest);
                 CBUtil.writeEnumValue(target, dest);
                 CBUtil.writeString(keyspace, dest);
                 if (target != Target.KEYSPACE)
-                    CBUtil.writeString(tableOrTypeOrFunction, dest);
+                    CBUtil.writeString(name, dest);
             }
             else
             {
@@ -270,13 +306,30 @@ public abstract class Event
                 {
                     CBUtil.writeEnumValue(change, dest);
                     CBUtil.writeString(keyspace, dest);
-                    CBUtil.writeString(target == Target.KEYSPACE ? "" : tableOrTypeOrFunction, dest);
+                    CBUtil.writeString(target == Target.KEYSPACE ? "" : name, dest);
                 }
             }
         }
 
         public int eventSerializedSize(int version)
         {
+            if (target == Target.FUNCTION || target == Target.AGGREGATE)
+            {
+                if (version >= 4)
+                    return CBUtil.sizeOfEnumValue(change)
+                               + CBUtil.sizeOfEnumValue(target)
+                               + CBUtil.sizeOfString(keyspace)
+                               + CBUtil.sizeOfString(name)
+                               + CBUtil.sizeOfStringList(argTypes);
+                if (version >= 3)
+                    return CBUtil.sizeOfEnumValue(Change.UPDATED)
+                           + CBUtil.sizeOfEnumValue(Target.KEYSPACE)
+                           + CBUtil.sizeOfString(keyspace);
+                return CBUtil.sizeOfEnumValue(Change.UPDATED)
+                       + CBUtil.sizeOfString(keyspace)
+                       + CBUtil.sizeOfString("");
+            }
+
             if (version >= 3)
             {
                 int size = CBUtil.sizeOfEnumValue(change)
@@ -284,7 +337,7 @@ public abstract class Event
                          + CBUtil.sizeOfString(keyspace);
 
                 if (target != Target.KEYSPACE)
-                    size += CBUtil.sizeOfString(tableOrTypeOrFunction);
+                    size += CBUtil.sizeOfString(name);
 
                 return size;
             }
@@ -298,20 +351,36 @@ public abstract class Event
                 }
                 return CBUtil.sizeOfEnumValue(change)
                      + CBUtil.sizeOfString(keyspace)
-                     + CBUtil.sizeOfString(target == Target.KEYSPACE ? "" : tableOrTypeOrFunction);
+                     + CBUtil.sizeOfString(target == Target.KEYSPACE ? "" : name);
             }
         }
 
         @Override
         public String toString()
         {
-            return change + " " + target + " " + keyspace + (tableOrTypeOrFunction == null ? "" : "." + tableOrTypeOrFunction);
+            StringBuilder sb = new StringBuilder().append(change)
+                                                  .append(' ').append(target)
+                                                  .append(' ').append(keyspace);
+            if (name != null)
+                sb.append('.').append(name);
+            if (argTypes != null)
+            {
+                sb.append(" (");
+                for (Iterator<String> iter = argTypes.iterator(); iter.hasNext(); )
+                {
+                    sb.append(iter.next());
+                    if (iter.hasNext())
+                        sb.append(',');
+                }
+                sb.append(')');
+            }
+            return sb.toString();
         }
 
         @Override
         public int hashCode()
         {
-            return Objects.hashCode(change, target, keyspace, tableOrTypeOrFunction);
+            return Objects.hashCode(change, target, keyspace, name, argTypes);
         }
 
         @Override
@@ -324,7 +393,8 @@ public abstract class Event
             return Objects.equal(change, scc.change)
                 && Objects.equal(target, scc.target)
                 && Objects.equal(keyspace, scc.keyspace)
-                && Objects.equal(tableOrTypeOrFunction, scc.tableOrTypeOrFunction);
+                && Objects.equal(name, scc.name)
+                && Objects.equal(argTypes, scc.argTypes);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dcc3bb05/src/java/org/apache/cassandra/transport/Server.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/Server.java b/src/java/org/apache/cassandra/transport/Server.java
index 5202a94..147d729 100644
--- a/src/java/org/apache/cassandra/transport/Server.java
+++ b/src/java/org/apache/cassandra/transport/Server.java
@@ -22,6 +22,7 @@ import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.UnknownHostException;
 import java.util.EnumMap;
+import java.util.List;
 import java.util.concurrent.Callable;
 import java.util.concurrent.atomic.AtomicBoolean;
 import javax.net.ssl.SSLContext;
@@ -44,6 +45,7 @@ import org.apache.cassandra.auth.IAuthenticator;
 import org.apache.cassandra.auth.ISaslAwareAuthenticator;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.config.EncryptionOptions;
+import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.metrics.ClientMetrics;
 import org.apache.cassandra.security.SSLFactory;
 import org.apache.cassandra.service.*;
@@ -330,7 +332,7 @@ public class Server implements CassandraDaemon.Server
         }
     }
 
-    private static class EventNotifier implements IEndpointLifecycleSubscriber, IMigrationListener
+    private static class EventNotifier extends MigrationListener implements IEndpointLifecycleSubscriber
     {
         private final Server server;
         private static final InetAddress bindAll;
@@ -410,12 +412,16 @@ public class Server implements CassandraDaemon.Server
             server.connectionTracker.send(new Event.SchemaChange(Event.SchemaChange.Change.CREATED, Event.SchemaChange.Target.TYPE, ksName, typeName));
         }
 
-        public void onCreateFunction(String ksName, String functionName)
+        public void onCreateFunction(String ksName, String functionName, List<AbstractType<?>> argTypes)
         {
+            server.connectionTracker.send(new Event.SchemaChange(Event.SchemaChange.Change.CREATED, Event.SchemaChange.Target.FUNCTION,
+                                                                 ksName, functionName, AbstractType.asCQLTypeStringList(argTypes)));
         }
 
-        public void onCreateAggregate(String ksName, String aggregateName)
+        public void onCreateAggregate(String ksName, String aggregateName, List<AbstractType<?>> argTypes)
         {
+            server.connectionTracker.send(new Event.SchemaChange(Event.SchemaChange.Change.CREATED, Event.SchemaChange.Target.AGGREGATE,
+                                                                 ksName, aggregateName, AbstractType.asCQLTypeStringList(argTypes)));
         }
 
         public void onUpdateKeyspace(String ksName)
@@ -433,12 +439,16 @@ public class Server implements CassandraDaemon.Server
             server.connectionTracker.send(new Event.SchemaChange(Event.SchemaChange.Change.UPDATED, Event.SchemaChange.Target.TYPE, ksName, typeName));
         }
 
-        public void onUpdateFunction(String ksName, String functionName)
+        public void onUpdateFunction(String ksName, String functionName, List<AbstractType<?>> argTypes)
         {
+            server.connectionTracker.send(new Event.SchemaChange(Event.SchemaChange.Change.UPDATED, Event.SchemaChange.Target.FUNCTION,
+                                                                 ksName, functionName, AbstractType.asCQLTypeStringList(argTypes)));
         }
 
-        public void onUpdateAggregate(String ksName, String aggregateName)
+        public void onUpdateAggregate(String ksName, String aggregateName, List<AbstractType<?>> argTypes)
         {
+            server.connectionTracker.send(new Event.SchemaChange(Event.SchemaChange.Change.UPDATED, Event.SchemaChange.Target.AGGREGATE,
+                                                                 ksName, aggregateName, AbstractType.asCQLTypeStringList(argTypes)));
         }
 
         public void onDropKeyspace(String ksName)
@@ -456,12 +466,16 @@ public class Server implements CassandraDaemon.Server
             server.connectionTracker.send(new Event.SchemaChange(Event.SchemaChange.Change.DROPPED, Event.SchemaChange.Target.TYPE, ksName, typeName));
         }
 
-        public void onDropFunction(String ksName, String functionName)
+        public void onDropFunction(String ksName, String functionName, List<AbstractType<?>> argTypes)
         {
+            server.connectionTracker.send(new Event.SchemaChange(Event.SchemaChange.Change.DROPPED, Event.SchemaChange.Target.FUNCTION,
+                                                                 ksName, functionName, AbstractType.asCQLTypeStringList(argTypes)));
         }
 
-        public void onDropAggregate(String ksName, String aggregateName)
+        public void onDropAggregate(String ksName, String aggregateName, List<AbstractType<?>> argTypes)
         {
+            server.connectionTracker.send(new Event.SchemaChange(Event.SchemaChange.Change.DROPPED, Event.SchemaChange.Target.AGGREGATE,
+                                                                 ksName, aggregateName, AbstractType.asCQLTypeStringList(argTypes)));
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dcc3bb05/test/unit/org/apache/cassandra/cql3/AggregationTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/AggregationTest.java b/test/unit/org/apache/cassandra/cql3/AggregationTest.java
index 940e87f..1ddd1f1 100644
--- a/test/unit/org/apache/cassandra/cql3/AggregationTest.java
+++ b/test/unit/org/apache/cassandra/cql3/AggregationTest.java
@@ -28,6 +28,7 @@ import org.junit.Assert;
 import org.junit.Test;
 
 import org.apache.cassandra.service.ClientState;
+import org.apache.cassandra.transport.Event;
 import org.apache.cassandra.transport.messages.ResultMessage;
 
 public class AggregationTest extends CQLTester
@@ -41,7 +42,7 @@ public class AggregationTest extends CQLTester
         assertColumnNames(execute("SELECT COUNT(*) FROM %s"), "count");
         assertRows(execute("SELECT COUNT(*) FROM %s"), row(0L));
         assertColumnNames(execute("SELECT max(b), min(b), sum(b), avg(b) , max(c), sum(c), avg(c), sum(d), avg(d) FROM %s"),
-                          "system.max(b)", "system.min(b)", "system.sum(b)", "system.avg(b)" , "system.max(c)", "system.sum(c)", "system.avg(c)", "system.sum(d)", "system.avg(d)");
+                          "system.max(b)", "system.min(b)", "system.sum(b)", "system.avg(b)", "system.max(c)", "system.sum(c)", "system.avg(c)", "system.sum(d)", "system.avg(d)");
         assertRows(execute("SELECT max(b), min(b), sum(b), avg(b) , max(c), sum(c), avg(c), sum(d), avg(d) FROM %s"),
                    row(null, null, 0, 0, null, 0.0, 0.0, new BigDecimal("0"), new BigDecimal("0")));
 
@@ -133,7 +134,7 @@ public class AggregationTest extends CQLTester
     }
 
     @Test
-    public void testDropStatements() throws Throwable
+    public void testSchemaChange() throws Throwable
     {
         String f = createFunction(KEYSPACE,
                                   "double, double",
@@ -141,13 +142,66 @@ public class AggregationTest extends CQLTester
                                   "RETURNS double " +
                                   "LANGUAGE javascript " +
                                   "AS '\"string\";';");
+
         createFunctionOverload(f,
+                               "double, double",
+                               "CREATE OR REPLACE FUNCTION %s(state int, val int) " +
+                               "RETURNS int " +
+                               "LANGUAGE javascript " +
+                               "AS '\"string\";';");
+
+        String a = createAggregate(KEYSPACE,
+                                   "double",
+                                   "CREATE OR REPLACE AGGREGATE %s(double) " +
+                                   "SFUNC " + shortFunctionName(f) + " " +
+                                   "STYPE double");
+
+        assertLastSchemaChange(Event.SchemaChange.Change.CREATED, Event.SchemaChange.Target.AGGREGATE,
+                               KEYSPACE, parseFunctionName(a).name,
+                               "double");
+
+        schemaChange("CREATE OR REPLACE AGGREGATE " + a + "(double) " +
+                     "SFUNC " + shortFunctionName(f) + " " +
+                     "STYPE double");
+
+        assertLastSchemaChange(Event.SchemaChange.Change.UPDATED, Event.SchemaChange.Target.AGGREGATE,
+                               KEYSPACE, parseFunctionName(a).name,
+                               "double");
+
+        createAggregateOverload(a,
+                                "int",
+                                "CREATE OR REPLACE AGGREGATE %s(int) " +
+                                "SFUNC " + shortFunctionName(f) + " " +
+                                "STYPE int");
+
+        assertLastSchemaChange(Event.SchemaChange.Change.CREATED, Event.SchemaChange.Target.AGGREGATE,
+                               KEYSPACE, parseFunctionName(a).name,
+                               "int");
+
+        schemaChange("DROP AGGREGATE " + a + "(double)");
+
+        assertLastSchemaChange(Event.SchemaChange.Change.DROPPED, Event.SchemaChange.Target.AGGREGATE,
+                               KEYSPACE, parseFunctionName(a).name,
+                               "double");
+    }
+
+    @Test
+    public void testDropStatements() throws Throwable
+    {
+        String f = createFunction(KEYSPACE,
                                   "double, double",
-                                  "CREATE OR REPLACE FUNCTION %s(state int, val int) " +
-                                  "RETURNS int " +
+                                  "CREATE OR REPLACE FUNCTION %s(state double, val double) " +
+                                  "RETURNS double " +
                                   "LANGUAGE javascript " +
                                   "AS '\"string\";';");
 
+        createFunctionOverload(f,
+                               "double, double",
+                               "CREATE OR REPLACE FUNCTION %s(state int, val int) " +
+                               "RETURNS int " +
+                               "LANGUAGE javascript " +
+                               "AS '\"string\";';");
+
         // DROP AGGREGATE must not succeed against a scalar
         assertInvalid("DROP AGGREGATE " + f);
         assertInvalid("DROP AGGREGATE " + f + "(double, double)");

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dcc3bb05/test/unit/org/apache/cassandra/cql3/CQLTester.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/CQLTester.java b/test/unit/org/apache/cassandra/cql3/CQLTester.java
index 36fe957..5611ac6 100644
--- a/test/unit/org/apache/cassandra/cql3/CQLTester.java
+++ b/test/unit/org/apache/cassandra/cql3/CQLTester.java
@@ -42,21 +42,25 @@ import org.slf4j.LoggerFactory;
 
 import com.datastax.driver.core.*;
 import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Row;
 import org.apache.cassandra.SchemaLoader;
 import org.apache.cassandra.concurrent.ScheduledExecutors;
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.cql3.functions.FunctionName;
-import org.apache.cassandra.db.Directories;
-import org.apache.cassandra.db.Keyspace;
-import org.apache.cassandra.db.SystemKeyspace;
+import org.apache.cassandra.cql3.statements.ParsedStatement;
+import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.marshal.*;
 import org.apache.cassandra.db.marshal.TupleType;
 import org.apache.cassandra.exceptions.*;
 import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.serializers.TypeSerializer;
+import org.apache.cassandra.service.ClientState;
+import org.apache.cassandra.service.QueryState;
 import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.transport.Event;
 import org.apache.cassandra.transport.Server;
+import org.apache.cassandra.transport.messages.ResultMessage;
 
 /**
  * Base class for CQL tests.
@@ -94,6 +98,8 @@ public abstract class CQLTester
         }
     }
 
+    public static ResultMessage lastSchemaChangeResult;
+
     private List<String> tables = new ArrayList<>();
     private List<String> types = new ArrayList<>();
     private List<String> functions = new ArrayList<>();
@@ -327,7 +333,7 @@ public abstract class CQLTester
         String fullQuery = String.format(query, functionName);
         functions.add(functionName + '(' + argTypes + ')');
         logger.info(fullQuery);
-        execute(fullQuery);
+        schemaChange(fullQuery);
     }
 
     protected String createAggregate(String keyspace, String argTypes, String query) throws Throwable
@@ -342,7 +348,7 @@ public abstract class CQLTester
         String fullQuery = String.format(query, aggregateName);
         aggregates.add(aggregateName + '(' + argTypes + ')');
         logger.info(fullQuery);
-        execute(fullQuery);
+        schemaChange(fullQuery);
     }
 
     protected void createTable(String query)
@@ -426,12 +432,33 @@ public abstract class CQLTester
         schemaChange(fullQuery);
     }
 
-    private static void schemaChange(String query)
+    protected void assertLastSchemaChange(Event.SchemaChange.Change change, Event.SchemaChange.Target target,
+                                          String keyspace, String name,
+                                          String... argTypes)
+    {
+        Assert.assertTrue(lastSchemaChangeResult instanceof ResultMessage.SchemaChange);
+        ResultMessage.SchemaChange schemaChange = (ResultMessage.SchemaChange) lastSchemaChangeResult;
+        Assert.assertSame(change, schemaChange.change.change);
+        Assert.assertSame(target, schemaChange.change.target);
+        Assert.assertEquals(keyspace, schemaChange.change.keyspace);
+        Assert.assertEquals(name, schemaChange.change.name);
+        Assert.assertEquals(argTypes != null ? Arrays.asList(argTypes) : null, schemaChange.change.argTypes);
+    }
+
+    protected static void schemaChange(String query)
     {
         try
         {
-            // executeOnceInternal don't work for schema changes
-            QueryProcessor.executeOnceInternal(query);
+            ClientState state = ClientState.forInternalCalls();
+            state.setKeyspace(SystemKeyspace.NAME);
+            QueryState queryState = new QueryState(state);
+
+            ParsedStatement.Prepared prepared = QueryProcessor.parseStatement(query, queryState);
+            prepared.statement.validate(state);
+
+            QueryOptions options = QueryOptions.forInternalCalls(Collections.<ByteBuffer>emptyList());
+
+            lastSchemaChangeResult = prepared.statement.executeInternal(queryState, options);
         }
         catch (Exception e)
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dcc3bb05/test/unit/org/apache/cassandra/cql3/UFTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/UFTest.java b/test/unit/org/apache/cassandra/cql3/UFTest.java
index fa28126..ea1b2da 100644
--- a/test/unit/org/apache/cassandra/cql3/UFTest.java
+++ b/test/unit/org/apache/cassandra/cql3/UFTest.java
@@ -29,6 +29,7 @@ import org.apache.cassandra.cql3.functions.FunctionName;
 import org.apache.cassandra.cql3.functions.Functions;
 import org.apache.cassandra.exceptions.InvalidRequestException;
 import org.apache.cassandra.service.ClientState;
+import org.apache.cassandra.transport.Event;
 import org.apache.cassandra.transport.Server;
 import org.apache.cassandra.transport.messages.ResultMessage;
 
@@ -36,6 +37,47 @@ public class UFTest extends CQLTester
 {
 
     @Test
+    public void testSchemaChange() throws Throwable
+    {
+        String f = createFunction(KEYSPACE,
+                                  "double, double",
+                                  "CREATE OR REPLACE FUNCTION %s(state double, val double) " +
+                                  "RETURNS double " +
+                                  "LANGUAGE javascript " +
+                                  "AS '\"string\";';");
+
+        assertLastSchemaChange(Event.SchemaChange.Change.CREATED, Event.SchemaChange.Target.FUNCTION,
+                               KEYSPACE, parseFunctionName(f).name,
+                               "double", "double");
+
+        createFunctionOverload(f,
+                               "double, double",
+                               "CREATE OR REPLACE FUNCTION %s(state int, val int) " +
+                               "RETURNS int " +
+                               "LANGUAGE javascript " +
+                               "AS '\"string\";';");
+
+        assertLastSchemaChange(Event.SchemaChange.Change.CREATED, Event.SchemaChange.Target.FUNCTION,
+                               KEYSPACE, parseFunctionName(f).name,
+                               "int", "int");
+
+        schemaChange("CREATE OR REPLACE FUNCTION " + f + "(state int, val int) " +
+                     "RETURNS int " +
+                     "LANGUAGE javascript " +
+                     "AS '\"string\";';");
+
+        assertLastSchemaChange(Event.SchemaChange.Change.UPDATED, Event.SchemaChange.Target.FUNCTION,
+                               KEYSPACE, parseFunctionName(f).name,
+                               "int", "int");
+
+        schemaChange("DROP FUNCTION " + f + "(double, double)");
+
+        assertLastSchemaChange(Event.SchemaChange.Change.DROPPED, Event.SchemaChange.Target.FUNCTION,
+                               KEYSPACE, parseFunctionName(f).name,
+                               "double", "double");
+    }
+
+    @Test
     public void testFunctionDropOnKeyspaceDrop() throws Throwable
     {
         String fSin = createFunction(KEYSPACE_PER_TEST, "double",
@@ -245,7 +287,7 @@ public class UFTest extends CQLTester
         // single-int-overload must still work
         assertRows(execute("SELECT v FROM %s WHERE k = " + fOverload + "((int)?)", 3), row(1));
         // overloaded has just one overload now - so the following DROP FUNCTION is not ambigious (CASSANDRA-7812)
-        execute("DROP FUNCTION " + fOverload + "");
+        execute("DROP FUNCTION " + fOverload);
     }
 
     @Test
@@ -360,7 +402,7 @@ public class UFTest extends CQLTester
         createTable("CREATE TABLE %s (key int primary key, val bigint)");
 
         String fName = createFunction(KEYSPACE, "double",
-                                      "CREATE OR REPLACE FUNCTION " + KEYSPACE + ".jft(val double)" +
+                                      "CREATE OR REPLACE FUNCTION %s(val double)" +
                                       "RETURNS double LANGUAGE JAVA " +
                                       "AS 'return val;';");
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dcc3bb05/test/unit/org/apache/cassandra/transport/SerDeserTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/transport/SerDeserTest.java b/test/unit/org/apache/cassandra/transport/SerDeserTest.java
index 649f7a2..39bd58b 100644
--- a/test/unit/org/apache/cassandra/transport/SerDeserTest.java
+++ b/test/unit/org/apache/cassandra/transport/SerDeserTest.java
@@ -94,6 +94,7 @@ public class SerDeserTest
     {
         eventSerDeserTest(2);
         eventSerDeserTest(3);
+        eventSerDeserTest(4);
     }
 
     public void eventSerDeserTest(int version) throws Exception
@@ -122,6 +123,19 @@ public class SerDeserTest
             events.add(new SchemaChange(SchemaChange.Change.DROPPED, SchemaChange.Target.TYPE, "ks", "type"));
         }
 
+        if (version >= 4)
+        {
+            List<String> moreTypes = Arrays.asList("text", "bigint");
+
+            events.add(new SchemaChange(SchemaChange.Change.CREATED, SchemaChange.Target.FUNCTION, "ks", "func", Collections.<String>emptyList()));
+            events.add(new SchemaChange(SchemaChange.Change.UPDATED, SchemaChange.Target.FUNCTION, "ks", "func", moreTypes));
+            events.add(new SchemaChange(SchemaChange.Change.DROPPED, SchemaChange.Target.FUNCTION, "ks", "func", moreTypes));
+
+            events.add(new SchemaChange(SchemaChange.Change.CREATED, SchemaChange.Target.AGGREGATE, "ks", "aggr", Collections.<String>emptyList()));
+            events.add(new SchemaChange(SchemaChange.Change.UPDATED, SchemaChange.Target.AGGREGATE, "ks", "aggr", moreTypes));
+            events.add(new SchemaChange(SchemaChange.Change.DROPPED, SchemaChange.Target.AGGREGATE, "ks", "aggr", moreTypes));
+        }
+
         for (Event ev : events)
         {
             ByteBuf buf = Unpooled.buffer(ev.serializedSize(version));