You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ty...@apache.org on 2014/12/30 19:26:18 UTC
cassandra git commit: Schema change events/results for UDFs and
aggregates
Repository: cassandra
Updated Branches:
refs/heads/trunk cfee3da90 -> dcc3bb054
Schema change events/results for UDFs and aggregates
Patch by Robert Stupp; reviewed by Tyler Hobbs for CASSANDRA-7708
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/dcc3bb05
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/dcc3bb05
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/dcc3bb05
Branch: refs/heads/trunk
Commit: dcc3bb054167eb5f408cea79935855780fd56285
Parents: cfee3da
Author: Robert Stupp <sn...@snazy.de>
Authored: Tue Dec 30 12:25:17 2014 -0600
Committer: Tyler Hobbs <ty...@datastax.com>
Committed: Tue Dec 30 12:25:17 2014 -0600
----------------------------------------------------------------------
CHANGES.txt | 7 +-
doc/native_protocol_v4.spec | 33 ++++---
src/java/org/apache/cassandra/auth/Auth.java | 58 +-----------
.../apache/cassandra/cql3/QueryProcessor.java | 24 ++---
.../cassandra/cql3/functions/Functions.java | 20 +---
.../cassandra/cql3/functions/UDHelper.java | 23 +----
.../statements/CreateAggregateStatement.java | 14 ++-
.../statements/CreateFunctionStatement.java | 13 ++-
.../cql3/statements/DropAggregateStatement.java | 8 +-
.../cql3/statements/DropFunctionStatement.java | 8 +-
.../cassandra/db/marshal/AbstractType.java | 9 ++
.../cassandra/schema/LegacySchemaTables.java | 43 +++++++--
.../cassandra/service/IMigrationListener.java | 40 --------
.../cassandra/service/MigrationListener.java | 85 +++++++++++++++++
.../cassandra/service/MigrationManager.java | 49 +++++-----
.../org/apache/cassandra/transport/Event.java | 96 +++++++++++++++++---
.../org/apache/cassandra/transport/Server.java | 28 ++++--
.../apache/cassandra/cql3/AggregationTest.java | 62 ++++++++++++-
.../org/apache/cassandra/cql3/CQLTester.java | 43 +++++++--
test/unit/org/apache/cassandra/cql3/UFTest.java | 46 +++++++++-
.../cassandra/transport/SerDeserTest.java | 14 +++
21 files changed, 483 insertions(+), 240 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/dcc3bb05/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 1468693..ac63fb3 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -10,7 +10,8 @@
* Fix aggregate fn results on empty selection, result column name,
and cqlsh parsing (CASSANDRA-8229)
* Mark sstables as repaired after full repair (CASSANDRA-7586)
- * Extend Descriptor to include a format value and refactor reader/writer apis (CASSANDRA-7443)
+ * Extend Descriptor to include a format value and refactor reader/writer
+ APIs (CASSANDRA-7443)
* Integrate JMH for microbenchmarks (CASSANDRA-8151)
* Keep sstable levels when bootstrapping (CASSANDRA-7460)
* Add Sigar library and perform basic OS settings check on startup (CASSANDRA-7838)
@@ -22,8 +23,8 @@
* Improve compaction logging (CASSANDRA-7818)
* Remove YamlFileNetworkTopologySnitch (CASSANDRA-7917)
* Do anticompaction in groups (CASSANDRA-6851)
- * Support pure user-defined functions (CASSANDRA-7395, 7526, 7562, 7740, 7781, 7929,
- 7924, 7812, 8063, 7813)
+ * Support user-defined functions (CASSANDRA-7395, 7526, 7562, 7740, 7781, 7929,
+ 7924, 7812, 8063, 7813, 7708)
* Permit configurable timestamps with cassandra-stress (CASSANDRA-7416)
* Move sstable RandomAccessReader to nio2, which allows using the
FILE_SHARE_DELETE flag on Windows (CASSANDRA-4050)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/dcc3bb05/doc/native_protocol_v4.spec
----------------------------------------------------------------------
diff --git a/doc/native_protocol_v4.spec b/doc/native_protocol_v4.spec
index 02aac3b..3764e91 100644
--- a/doc/native_protocol_v4.spec
+++ b/doc/native_protocol_v4.spec
@@ -669,18 +669,25 @@ Table of Contents
the rest of the message will be <change_type><target><options> where:
- <change_type> is a [string] representing the type of changed involved.
It will be one of "CREATED", "UPDATED" or "DROPPED".
- - <target> is a [string] that can be one of "KEYSPACE", "TABLE" or "TYPE"
- and describes what has been modified ("TYPE" stands for modifications
- related to user types).
- - <options> depends on the preceding <target>. If <target> is
- "KEYSPACE", then <options> will be a single [string] representing the
- keyspace changed. Otherwise, if <target> is "TABLE" or "TYPE", then
- <options> will be 2 [string]: the first one will be the keyspace
- containing the affected object, and the second one will be the name
- of said affected object (so either the table name or the user type
- name).
-
- All EVENT message have a streamId of -1 (Section 2.3).
+ - <target> is a [string] that can be one of "KEYSPACE", "TABLE", "TYPE",
+ "FUNCTION" or "AGGREGATE" and describes what has been modified
+ ("TYPE" stands for modifications related to user types, "FUNCTION"
+ for modifications related to user defined functions, "AGGREGATE"
+ for modifications related to user defined aggregates).
+ - <options> depends on the preceding <target>:
+ - If <target> is "KEYSPACE", then <options> will be a single [string]
+ representing the keyspace changed.
+ - If <target> is "TABLE" or "TYPE", then
+ <options> will be 2 [string]: the first one will be the keyspace
+ containing the affected object, and the second one will be the name
+ of said affected object (either the table, user type, function, or
+ aggregate name).
+ - If <target> is "FUNCTION" or "AGGREGATE", multiple arguments follow:
+ - [string] keyspace containing the user defined function / aggregate
+ - [string] the function/aggregate name
+ - [string list] one string for each argument type (as CQL type)
+
+ All EVENT messages have a streamId of -1 (Section 2.3).
Please note that "NEW_NODE" and "UP" events are sent based on internal Gossip
communication and as such may be sent a short delay before the binary
@@ -896,4 +903,6 @@ Table of Contents
10. Changes from v3
+ * The format of "SCHEMA_CHANGE" events (Section 4.2.6) (and implicitly "Schema_change" results (Section 4.2.5.5))
+ has been modified, and now includes changes related to user defined functions and user defined aggregates.
http://git-wip-us.apache.org/repos/asf/cassandra/blob/dcc3bb05/src/java/org/apache/cassandra/auth/Auth.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/auth/Auth.java b/src/java/org/apache/cassandra/auth/Auth.java
index cdcfa0e..0c3b0fe 100644
--- a/src/java/org/apache/cassandra/auth/Auth.java
+++ b/src/java/org/apache/cassandra/auth/Auth.java
@@ -185,7 +185,7 @@ public class Auth implements AuthMBean
DatabaseDescriptor.getAuthorizer().setup();
// register a custom MigrationListener for permissions cleanup after dropped keyspaces/cfs.
- MigrationManager.instance.register(new MigrationListener());
+ MigrationManager.instance.register(new AuthMigrationListener());
// the delay is here to give the node some time to see its peers - to reduce
// "Skipped default superuser setup: some nodes were not ready" log spam.
@@ -318,9 +318,9 @@ public class Auth implements AuthMBean
}
/**
- * IMigrationListener implementation that cleans up permissions on dropped resources.
+ * MigrationListener implementation that cleans up permissions on dropped resources.
*/
- public static class MigrationListener implements IMigrationListener
+ public static class AuthMigrationListener extends MigrationListener
{
public void onDropKeyspace(String ksName)
{
@@ -331,57 +331,5 @@ public class Auth implements AuthMBean
{
DatabaseDescriptor.getAuthorizer().revokeAll(DataResource.columnFamily(ksName, cfName));
}
-
- public void onDropUserType(String ksName, String userType)
- {
- }
-
- public void onDropFunction(String ksName, String functionName)
- {
- }
-
- public void onDropAggregate(String ksName, String aggregateName)
- {
- }
-
- public void onCreateKeyspace(String ksName)
- {
- }
-
- public void onCreateColumnFamily(String ksName, String cfName)
- {
- }
-
- public void onCreateUserType(String ksName, String userType)
- {
- }
-
- public void onCreateFunction(String ksName, String functionName)
- {
- }
-
- public void onCreateAggregate(String ksName, String aggregateName)
- {
- }
-
- public void onUpdateKeyspace(String ksName)
- {
- }
-
- public void onUpdateColumnFamily(String ksName, String cfName)
- {
- }
-
- public void onUpdateUserType(String ksName, String userType)
- {
- }
-
- public void onUpdateFunction(String ksName, String functionName)
- {
- }
-
- public void onUpdateAggregate(String ksName, String aggregateName)
- {
- }
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/dcc3bb05/src/java/org/apache/cassandra/cql3/QueryProcessor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/QueryProcessor.java b/src/java/org/apache/cassandra/cql3/QueryProcessor.java
index ae09972..8531d32 100644
--- a/src/java/org/apache/cassandra/cql3/QueryProcessor.java
+++ b/src/java/org/apache/cassandra/cql3/QueryProcessor.java
@@ -31,6 +31,7 @@ import com.googlecode.concurrentlinkedhashmap.EntryWeigher;
import com.googlecode.concurrentlinkedhashmap.EvictionListener;
import org.antlr.runtime.*;
+import org.apache.cassandra.service.MigrationListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -50,7 +51,6 @@ import org.apache.cassandra.exceptions.RequestValidationException;
import org.apache.cassandra.exceptions.SyntaxException;
import org.apache.cassandra.metrics.CQLMetrics;
import org.apache.cassandra.service.ClientState;
-import org.apache.cassandra.service.IMigrationListener;
import org.apache.cassandra.service.MigrationManager;
import org.apache.cassandra.service.QueryState;
import org.apache.cassandra.service.pager.QueryPager;
@@ -560,7 +560,7 @@ public class QueryProcessor implements QueryHandler
return meter.measureDeep(key);
}
- private static class MigrationSubscriber implements IMigrationListener
+ private static class MigrationSubscriber extends MigrationListener
{
private void removeInvalidPreparedStatements(String ksName, String cfName)
{
@@ -602,10 +602,7 @@ public class QueryProcessor implements QueryHandler
return ksName.equals(statementKsName) && (cfName == null || cfName.equals(statementCfName));
}
- public void onCreateKeyspace(String ksName) { }
- public void onCreateColumnFamily(String ksName, String cfName) { }
- public void onCreateUserType(String ksName, String typeName) { }
- public void onCreateFunction(String ksName, String functionName) {
+ public void onCreateFunction(String ksName, String functionName, List<AbstractType<?>> argTypes) {
if (Functions.getOverloadCount(new FunctionName(ksName, functionName)) > 1)
{
// in case there are other overloads, we have to remove all overloads since argument type
@@ -614,7 +611,7 @@ public class QueryProcessor implements QueryHandler
removeInvalidPreparedStatementsForFunction(thriftPreparedStatements.values().iterator(), ksName, functionName);
}
}
- public void onCreateAggregate(String ksName, String aggregateName) {
+ public void onCreateAggregate(String ksName, String aggregateName, List<AbstractType<?>> argTypes) {
if (Functions.getOverloadCount(new FunctionName(ksName, aggregateName)) > 1)
{
// in case there are other overloads, we have to remove all overloads since argument type
@@ -624,12 +621,6 @@ public class QueryProcessor implements QueryHandler
}
}
- public void onUpdateKeyspace(String ksName) { }
- public void onUpdateColumnFamily(String ksName, String cfName) { }
- public void onUpdateUserType(String ksName, String typeName) { }
- public void onUpdateFunction(String ksName, String functionName) { }
- public void onUpdateAggregate(String ksName, String aggregateName) { }
-
public void onDropKeyspace(String ksName)
{
removeInvalidPreparedStatements(ksName, null);
@@ -640,18 +631,17 @@ public class QueryProcessor implements QueryHandler
removeInvalidPreparedStatements(ksName, cfName);
}
- public void onDropUserType(String ksName, String typeName) { }
- public void onDropFunction(String ksName, String functionName) {
+ public void onDropFunction(String ksName, String functionName, List<AbstractType<?>> argTypes) {
removeInvalidPreparedStatementsForFunction(preparedStatements.values().iterator(), ksName, functionName);
removeInvalidPreparedStatementsForFunction(thriftPreparedStatements.values().iterator(), ksName, functionName);
}
- public void onDropAggregate(String ksName, String aggregateName)
+ public void onDropAggregate(String ksName, String aggregateName, List<AbstractType<?>> argTypes)
{
removeInvalidPreparedStatementsForFunction(preparedStatements.values().iterator(), ksName, aggregateName);
removeInvalidPreparedStatementsForFunction(thriftPreparedStatements.values().iterator(), ksName, aggregateName);
}
- private void removeInvalidPreparedStatementsForFunction(Iterator<ParsedStatement.Prepared> iterator,
+ private static void removeInvalidPreparedStatementsForFunction(Iterator<ParsedStatement.Prepared> iterator,
String ksName, String functionName)
{
while (iterator.hasNext())
http://git-wip-us.apache.org/repos/asf/cassandra/blob/dcc3bb05/src/java/org/apache/cassandra/cql3/functions/Functions.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/functions/Functions.java b/src/java/org/apache/cassandra/cql3/functions/Functions.java
index b55ebc5..09e360b 100644
--- a/src/java/org/apache/cassandra/cql3/functions/Functions.java
+++ b/src/java/org/apache/cassandra/cql3/functions/Functions.java
@@ -27,7 +27,7 @@ import org.apache.cassandra.config.Schema;
import org.apache.cassandra.cql3.*;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.exceptions.InvalidRequestException;
-import org.apache.cassandra.service.IMigrationListener;
+import org.apache.cassandra.service.MigrationListener;
import org.apache.cassandra.service.MigrationManager;
public abstract class Functions
@@ -302,28 +302,12 @@ public abstract class Functions
return true;
}
- private static class FunctionsMigrationListener implements IMigrationListener
+ private static class FunctionsMigrationListener extends MigrationListener
{
- public void onCreateKeyspace(String ksName) { }
- public void onCreateColumnFamily(String ksName, String cfName) { }
- public void onCreateUserType(String ksName, String typeName) { }
- public void onCreateFunction(String ksName, String functionName) { }
- public void onCreateAggregate(String ksName, String aggregateName) { }
-
- public void onUpdateKeyspace(String ksName) { }
- public void onUpdateColumnFamily(String ksName, String cfName) { }
public void onUpdateUserType(String ksName, String typeName) {
for (Function function : all())
if (function instanceof UDFunction)
((UDFunction)function).userTypeUpdated(ksName, typeName);
}
- public void onUpdateFunction(String ksName, String functionName) { }
- public void onUpdateAggregate(String ksName, String aggregateName) { }
-
- public void onDropKeyspace(String ksName) { }
- public void onDropColumnFamily(String ksName, String cfName) { }
- public void onDropUserType(String ksName, String typeName) { }
- public void onDropFunction(String ksName, String functionName) { }
- public void onDropAggregate(String ksName, String aggregateName) { }
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/dcc3bb05/src/java/org/apache/cassandra/cql3/functions/UDHelper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/functions/UDHelper.java b/src/java/org/apache/cassandra/cql3/functions/UDHelper.java
index 0738cbe..f4b3809 100644
--- a/src/java/org/apache/cassandra/cql3/functions/UDHelper.java
+++ b/src/java/org/apache/cassandra/cql3/functions/UDHelper.java
@@ -21,8 +21,6 @@ import java.lang.invoke.MethodHandle;
import java.lang.invoke.MethodHandles;
import java.lang.reflect.Method;
import java.nio.ByteBuffer;
-import java.nio.charset.StandardCharsets;
-import java.security.MessageDigest;
import java.util.*;
import org.slf4j.Logger;
@@ -30,9 +28,7 @@ import org.slf4j.LoggerFactory;
import com.datastax.driver.core.DataType;
import org.apache.cassandra.cql3.*;
-import org.apache.cassandra.db.marshal.AbstractType;
-import org.apache.cassandra.db.marshal.UTF8Type;
-import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.db.marshal.*;
/**
* Helper class for User Defined Functions + Aggregates.
@@ -66,7 +62,7 @@ public final class UDHelper
*/
public static Class<?>[] javaTypes(DataType[] dataTypes)
{
- Class<?> paramTypes[] = new Class[dataTypes.length];
+ Class<?>[] paramTypes = new Class[dataTypes.length];
for (int i = 0; i < paramTypes.length; i++)
paramTypes[i] = dataTypes[i].asJavaClass();
return paramTypes;
@@ -107,19 +103,4 @@ public final class UDHelper
throw new RuntimeException("cannot parse driver type " + cqlType.getType().toString(), e);
}
}
-
- // We allow method overloads, so a function is not uniquely identified by its name only, but
- // also by its argument types. To distinguish overloads of given function name in the schema
- // we use a "signature" which is just a SHA-1 of it's argument types (we could replace that by
- // using a "signature" UDT that would be comprised of the function name and argument types,
- // which we could then use as clustering column. But as we haven't yet used UDT in system tables,
- // We'll leave that decision to #6717).
- public static ByteBuffer calculateSignature(AbstractFunction fun)
- {
- MessageDigest digest = FBUtilities.newMessageDigest("SHA-1");
- digest.update(UTF8Type.instance.decompose(fun.name().name));
- for (AbstractType<?> type : fun.argTypes())
- digest.update(UTF8Type.instance.decompose(type.asCQL3Type().toString()));
- return ByteBuffer.wrap(digest.digest());
- }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/dcc3bb05/src/java/org/apache/cassandra/cql3/statements/CreateAggregateStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/CreateAggregateStatement.java b/src/java/org/apache/cassandra/cql3/statements/CreateAggregateStatement.java
index 9816e58..e135ffe 100644
--- a/src/java/org/apache/cassandra/cql3/statements/CreateAggregateStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/CreateAggregateStatement.java
@@ -47,13 +47,16 @@ public final class CreateAggregateStatement extends SchemaAlteringStatement
private final boolean orReplace;
private final boolean ifNotExists;
private FunctionName functionName;
- private String stateFunc;
- private String finalFunc;
+ private final String stateFunc;
+ private final String finalFunc;
private final CQL3Type.Raw stateTypeRaw;
private final List<CQL3Type.Raw> argRawTypes;
private final Term.Raw ival;
+ private UDAggregate udAggregate;
+ private boolean replaced;
+
public CreateAggregateStatement(FunctionName functionName,
List<CQL3Type.Raw> argRawTypes,
String stateFunc,
@@ -102,7 +105,9 @@ public final class CreateAggregateStatement extends SchemaAlteringStatement
public Event.SchemaChange changeEvent()
{
- return null;
+ return new Event.SchemaChange(replaced ? Event.SchemaChange.Change.UPDATED : Event.SchemaChange.Change.CREATED,
+ Event.SchemaChange.Target.AGGREGATE,
+ udAggregate.name().keyspace, udAggregate.name().name, AbstractType.asCQLTypeStringList(udAggregate.argTypes()));
}
public boolean announceMigration(boolean isLocalOnly) throws RequestValidationException
@@ -164,10 +169,11 @@ public final class CreateAggregateStatement extends SchemaAlteringStatement
initcond = ival.prepare(functionName.keyspace, receiver).bindAndGet(QueryOptions.DEFAULT);
}
- UDAggregate udAggregate = new UDAggregate(functionName, argTypes, returnType,
+ udAggregate = new UDAggregate(functionName, argTypes, returnType,
fState,
fFinal,
initcond);
+ replaced = old != null;
MigrationManager.announceNewAggregate(udAggregate, isLocalOnly);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/dcc3bb05/src/java/org/apache/cassandra/cql3/statements/CreateFunctionStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/CreateFunctionStatement.java b/src/java/org/apache/cassandra/cql3/statements/CreateFunctionStatement.java
index dbdecf9..c49f80c 100644
--- a/src/java/org/apache/cassandra/cql3/statements/CreateFunctionStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/CreateFunctionStatement.java
@@ -51,6 +51,9 @@ public final class CreateFunctionStatement extends SchemaAlteringStatement
private final List<CQL3Type.Raw> argRawTypes;
private final CQL3Type.Raw rawReturnType;
+ private UDFunction udFunction;
+ private boolean replaced;
+
public CreateFunctionStatement(FunctionName functionName,
String language,
String body,
@@ -101,7 +104,9 @@ public final class CreateFunctionStatement extends SchemaAlteringStatement
public Event.SchemaChange changeEvent()
{
- return null;
+ return new Event.SchemaChange(replaced ? Event.SchemaChange.Change.UPDATED : Event.SchemaChange.Change.CREATED,
+ Event.SchemaChange.Target.FUNCTION,
+ udFunction.name().keyspace, udFunction.name().name, AbstractType.asCQLTypeStringList(udFunction.argTypes()));
}
public boolean announceMigration(boolean isLocalOnly) throws RequestValidationException
@@ -131,7 +136,11 @@ public final class CreateFunctionStatement extends SchemaAlteringStatement
functionName, returnType.asCQL3Type(), old.returnType().asCQL3Type()));
}
- MigrationManager.announceNewFunction(UDFunction.create(functionName, argNames, argTypes, returnType, language, body, deterministic), isLocalOnly);
+ this.udFunction = UDFunction.create(functionName, argNames, argTypes, returnType, language, body, deterministic);
+ this.replaced = old != null;
+
+ MigrationManager.announceNewFunction(udFunction, isLocalOnly);
+
return true;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/dcc3bb05/src/java/org/apache/cassandra/cql3/statements/DropAggregateStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/DropAggregateStatement.java b/src/java/org/apache/cassandra/cql3/statements/DropAggregateStatement.java
index 118f89d..97ec196 100644
--- a/src/java/org/apache/cassandra/cql3/statements/DropAggregateStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/DropAggregateStatement.java
@@ -42,6 +42,8 @@ public final class DropAggregateStatement extends SchemaAlteringStatement
private final List<CQL3Type.Raw> argRawTypes;
private final boolean argsPresent;
+ private Function old;
+
public DropAggregateStatement(FunctionName functionName,
List<CQL3Type.Raw> argRawTypes,
boolean argsPresent,
@@ -77,7 +79,8 @@ public final class DropAggregateStatement extends SchemaAlteringStatement
public Event.SchemaChange changeEvent()
{
- return null;
+ return new Event.SchemaChange(Event.SchemaChange.Change.DROPPED, Event.SchemaChange.Target.AGGREGATE,
+ old.name().keyspace, old.name().name, AbstractType.asCQLTypeStringList(old.argTypes()));
}
public boolean announceMigration(boolean isLocalOnly) throws RequestValidationException
@@ -130,7 +133,10 @@ public final class DropAggregateStatement extends SchemaAlteringStatement
throw new InvalidRequestException(String.format("Cannot drop aggregate '%s' because it is a " +
"native (built-in) function", functionName));
+ this.old = old;
+
MigrationManager.announceAggregateDrop((UDAggregate)old, isLocalOnly);
+
return true;
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/dcc3bb05/src/java/org/apache/cassandra/cql3/statements/DropFunctionStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/DropFunctionStatement.java b/src/java/org/apache/cassandra/cql3/statements/DropFunctionStatement.java
index 394aca0..083db45 100644
--- a/src/java/org/apache/cassandra/cql3/statements/DropFunctionStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/DropFunctionStatement.java
@@ -42,6 +42,8 @@ public final class DropFunctionStatement extends SchemaAlteringStatement
private final List<CQL3Type.Raw> argRawTypes;
private final boolean argsPresent;
+ private Function old;
+
public DropFunctionStatement(FunctionName functionName,
List<CQL3Type.Raw> argRawTypes,
boolean argsPresent,
@@ -81,7 +83,8 @@ public final class DropFunctionStatement extends SchemaAlteringStatement
@Override
public Event.SchemaChange changeEvent()
{
- return null;
+ return new Event.SchemaChange(Event.SchemaChange.Change.DROPPED, Event.SchemaChange.Target.FUNCTION,
+ old.name().keyspace, old.name().name, AbstractType.asCQLTypeStringList(old.argTypes()));
}
@Override
@@ -135,7 +138,10 @@ public final class DropFunctionStatement extends SchemaAlteringStatement
if (!references.isEmpty())
throw new InvalidRequestException(String.format("Function '%s' still referenced by %s", functionName, references));
+ this.old = old;
+
MigrationManager.announceFunctionDrop((UDFunction) old, isLocalOnly);
+
return true;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/dcc3bb05/src/java/org/apache/cassandra/db/marshal/AbstractType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/marshal/AbstractType.java b/src/java/org/apache/cassandra/db/marshal/AbstractType.java
index 85b6dc7..d3711df 100644
--- a/src/java/org/apache/cassandra/db/marshal/AbstractType.java
+++ b/src/java/org/apache/cassandra/db/marshal/AbstractType.java
@@ -18,6 +18,7 @@
package org.apache.cassandra.db.marshal;
import java.nio.ByteBuffer;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
@@ -63,6 +64,14 @@ public abstract class AbstractType<T> implements Comparator<ByteBuffer>
};
}
+ public static List<String> asCQLTypeStringList(List<AbstractType<?>> abstractTypes)
+ {
+ List<String> r = new ArrayList<>(abstractTypes.size());
+ for (AbstractType<?> abstractType : abstractTypes)
+ r.add(abstractType.asCQL3Type().toString());
+ return r;
+ }
+
public T compose(ByteBuffer bytes)
{
return getSerializer().deserialize(bytes);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/dcc3bb05/src/java/org/apache/cassandra/schema/LegacySchemaTables.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/LegacySchemaTables.java b/src/java/org/apache/cassandra/schema/LegacySchemaTables.java
index 047698c..4d06863 100644
--- a/src/java/org/apache/cassandra/schema/LegacySchemaTables.java
+++ b/src/java/org/apache/cassandra/schema/LegacySchemaTables.java
@@ -157,7 +157,7 @@ public class LegacySchemaTables
"CREATE TABLE %s ("
+ "keyspace_name text,"
+ "function_name text,"
- + "signature blob,"
+ + "signature frozen<list<text>>,"
+ "argument_names list<text>,"
+ "argument_types list<text>,"
+ "body text,"
@@ -172,7 +172,7 @@ public class LegacySchemaTables
"CREATE TABLE %s ("
+ "keyspace_name text,"
+ "aggregate_name text,"
- + "signature blob,"
+ + "signature frozen<list<text>>,"
+ "argument_types list<text>,"
+ "final_func text,"
+ "initcond blob,"
@@ -1293,7 +1293,7 @@ public class LegacySchemaTables
private static void addFunctionToSchemaMutation(UDFunction function, long timestamp, Mutation mutation)
{
ColumnFamily cells = mutation.addOrGet(Functions);
- Composite prefix = Functions.comparator.make(function.name().name, UDHelper.calculateSignature(function));
+ Composite prefix = Functions.comparator.make(function.name().name, functionSignatureWithTypes(function));
CFRowAdder adder = new CFRowAdder(cells, prefix, timestamp);
adder.resetCollection("argument_names");
@@ -1319,7 +1319,7 @@ public class LegacySchemaTables
ColumnFamily cells = mutation.addOrGet(Functions);
int ldt = (int) (System.currentTimeMillis() / 1000);
- Composite prefix = Functions.comparator.make(function.name().name, UDHelper.calculateSignature(function));
+ Composite prefix = Functions.comparator.make(function.name().name, functionSignatureWithTypes(function));
cells.addAtom(new RangeTombstone(prefix, prefix.end(), timestamp, ldt));
return mutation;
@@ -1332,7 +1332,7 @@ public class LegacySchemaTables
for (UntypedResultSet.Row row : QueryProcessor.resultify(query, partition))
{
UDFunction function = createFunctionFromFunctionRow(row);
- functions.put(UDHelper.calculateSignature(function), function);
+ functions.put(functionSignatureWithNameAndTypes(function), function);
}
return functions;
}
@@ -1385,7 +1385,7 @@ public class LegacySchemaTables
private static void addAggregateToSchemaMutation(UDAggregate aggregate, long timestamp, Mutation mutation)
{
ColumnFamily cells = mutation.addOrGet(Aggregates);
- Composite prefix = Aggregates.comparator.make(aggregate.name().name, UDHelper.calculateSignature(aggregate));
+ Composite prefix = Aggregates.comparator.make(aggregate.name().name, functionSignatureWithTypes(aggregate));
CFRowAdder adder = new CFRowAdder(cells, prefix, timestamp);
adder.resetCollection("argument_types");
@@ -1409,7 +1409,7 @@ public class LegacySchemaTables
for (UntypedResultSet.Row row : QueryProcessor.resultify(query, partition))
{
UDAggregate aggregate = createAggregateFromAggregateRow(row);
- aggregates.put(UDHelper.calculateSignature(aggregate), aggregate);
+ aggregates.put(functionSignatureWithNameAndTypes(aggregate), aggregate);
}
return aggregates;
}
@@ -1459,7 +1459,7 @@ public class LegacySchemaTables
ColumnFamily cells = mutation.addOrGet(Aggregates);
int ldt = (int) (System.currentTimeMillis() / 1000);
- Composite prefix = Aggregates.comparator.make(aggregate.name().name, UDHelper.calculateSignature(aggregate));
+ Composite prefix = Aggregates.comparator.make(aggregate.name().name, functionSignatureWithTypes(aggregate));
cells.addAtom(new RangeTombstone(prefix, prefix.end(), timestamp, ldt));
return mutation;
@@ -1477,4 +1477,31 @@ public class LegacySchemaTables
throw new RuntimeException(e);
}
}
+
+ // We allow method overloads, so a function is not uniquely identified by its name only, but
+ // also by its argument types. To distinguish overloads of given function name in the schema
+ // we use a "signature" which is just a list of it's CQL argument types (we could replace that by
+ // using a "signature" UDT that would be comprised of the function name and argument types,
+ // which we could then use as clustering column. But as we haven't yet used UDT in system tables,
+ // We'll leave that decision to #6717).
+ public static ByteBuffer functionSignatureWithTypes(AbstractFunction fun)
+ {
+ ListType<String> list = ListType.getInstance(UTF8Type.instance, false);
+ List<String> strList = new ArrayList<>(fun.argTypes().size());
+ for (AbstractType<?> argType : fun.argTypes())
+ strList.add(argType.asCQL3Type().toString());
+ return list.decompose(strList);
+ }
+
+ public static ByteBuffer functionSignatureWithNameAndTypes(AbstractFunction fun)
+ {
+ ListType<String> list = ListType.getInstance(UTF8Type.instance, false);
+ List<String> strList = new ArrayList<>(fun.argTypes().size() + 2);
+ strList.add(fun.name().keyspace);
+ strList.add(fun.name().name);
+ for (AbstractType<?> argType : fun.argTypes())
+ strList.add(argType.asCQL3Type().toString());
+ return list.decompose(strList);
+ }
+
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/dcc3bb05/src/java/org/apache/cassandra/service/IMigrationListener.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/IMigrationListener.java b/src/java/org/apache/cassandra/service/IMigrationListener.java
deleted file mode 100644
index faaffb9..0000000
--- a/src/java/org/apache/cassandra/service/IMigrationListener.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.service;
-
-public interface IMigrationListener
-{
- public void onCreateKeyspace(String ksName);
- public void onCreateColumnFamily(String ksName, String cfName);
- public void onCreateUserType(String ksName, String typeName);
- public void onCreateFunction(String ksName, String functionName);
- public void onCreateAggregate(String ksName, String aggregateName);
-
- public void onUpdateKeyspace(String ksName);
- public void onUpdateColumnFamily(String ksName, String cfName);
- public void onUpdateUserType(String ksName, String typeName);
- public void onUpdateFunction(String ksName, String functionName);
- public void onUpdateAggregate(String ksName, String aggregateName);
-
- public void onDropKeyspace(String ksName);
- public void onDropColumnFamily(String ksName, String cfName);
- public void onDropUserType(String ksName, String typeName);
- public void onDropFunction(String ksName, String functionName);
- public void onDropAggregate(String ksName, String aggregateName);
-
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/dcc3bb05/src/java/org/apache/cassandra/service/MigrationListener.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/MigrationListener.java b/src/java/org/apache/cassandra/service/MigrationListener.java
new file mode 100644
index 0000000..2b728d9
--- /dev/null
+++ b/src/java/org/apache/cassandra/service/MigrationListener.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.service;
+
+import java.util.List;
+
+import org.apache.cassandra.db.marshal.AbstractType;
+
+public abstract class MigrationListener
+{
+ public void onCreateKeyspace(String ksName)
+ {
+ }
+
+ public void onCreateColumnFamily(String ksName, String cfName)
+ {
+ }
+
+ public void onCreateUserType(String ksName, String typeName)
+ {
+ }
+
+ public void onCreateFunction(String ksName, String functionName, List<AbstractType<?>> argTypes)
+ {
+ }
+
+ public void onCreateAggregate(String ksName, String aggregateName, List<AbstractType<?>> argTypes)
+ {
+ }
+
+ public void onUpdateKeyspace(String ksName)
+ {
+ }
+
+ public void onUpdateColumnFamily(String ksName, String cfName)
+ {
+ }
+
+ public void onUpdateUserType(String ksName, String typeName)
+ {
+ }
+
+ public void onUpdateFunction(String ksName, String functionName, List<AbstractType<?>> argTypes)
+ {
+ }
+
+ public void onUpdateAggregate(String ksName, String aggregateName, List<AbstractType<?>> argTypes)
+ {
+ }
+
+ public void onDropKeyspace(String ksName)
+ {
+ }
+
+ public void onDropColumnFamily(String ksName, String cfName)
+ {
+ }
+
+ public void onDropUserType(String ksName, String typeName)
+ {
+ }
+
+ public void onDropFunction(String ksName, String functionName, List<AbstractType<?>> argTypes)
+ {
+ }
+
+ public void onDropAggregate(String ksName, String aggregateName, List<AbstractType<?>> argTypes)
+ {
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/dcc3bb05/src/java/org/apache/cassandra/service/MigrationManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/MigrationManager.java b/src/java/org/apache/cassandra/service/MigrationManager.java
index fe32559..ef1adc6 100644
--- a/src/java/org/apache/cassandra/service/MigrationManager.java
+++ b/src/java/org/apache/cassandra/service/MigrationManager.java
@@ -63,16 +63,16 @@ public class MigrationManager
public static final int MIGRATION_DELAY_IN_MS = 60000;
- private final List<IMigrationListener> listeners = new CopyOnWriteArrayList<>();
+ private final List<MigrationListener> listeners = new CopyOnWriteArrayList<>();
private MigrationManager() {}
- public void register(IMigrationListener listener)
+ public void register(MigrationListener listener)
{
listeners.add(listener);
}
- public void unregister(IMigrationListener listener)
+ public void unregister(MigrationListener listener)
{
listeners.remove(listener);
}
@@ -160,92 +160,93 @@ public class MigrationManager
public void notifyCreateKeyspace(KSMetaData ksm)
{
- for (IMigrationListener listener : listeners)
+ for (MigrationListener listener : listeners)
listener.onCreateKeyspace(ksm.name);
}
public void notifyCreateColumnFamily(CFMetaData cfm)
{
- for (IMigrationListener listener : listeners)
+ for (MigrationListener listener : listeners)
listener.onCreateColumnFamily(cfm.ksName, cfm.cfName);
}
public void notifyCreateUserType(UserType ut)
{
- for (IMigrationListener listener : listeners)
+ for (MigrationListener listener : listeners)
listener.onCreateUserType(ut.keyspace, ut.getNameAsString());
}
public void notifyCreateFunction(UDFunction udf)
{
- for (IMigrationListener listener : listeners)
- listener.onCreateFunction(udf.name().keyspace, udf.name().name);
+ for (MigrationListener listener : listeners)
+ listener.onCreateFunction(udf.name().keyspace, udf.name().name, udf.argTypes());
}
+
public void notifyCreateAggregate(UDAggregate udf)
{
- for (IMigrationListener listener : listeners)
- listener.onCreateAggregate(udf.name().keyspace, udf.name().name);
+ for (MigrationListener listener : listeners)
+ listener.onCreateAggregate(udf.name().keyspace, udf.name().name, udf.argTypes());
}
public void notifyUpdateKeyspace(KSMetaData ksm)
{
- for (IMigrationListener listener : listeners)
+ for (MigrationListener listener : listeners)
listener.onUpdateKeyspace(ksm.name);
}
public void notifyUpdateColumnFamily(CFMetaData cfm)
{
- for (IMigrationListener listener : listeners)
+ for (MigrationListener listener : listeners)
listener.onUpdateColumnFamily(cfm.ksName, cfm.cfName);
}
public void notifyUpdateUserType(UserType ut)
{
- for (IMigrationListener listener : listeners)
+ for (MigrationListener listener : listeners)
listener.onUpdateUserType(ut.keyspace, ut.getNameAsString());
}
public void notifyUpdateFunction(UDFunction udf)
{
- for (IMigrationListener listener : listeners)
- listener.onUpdateFunction(udf.name().keyspace, udf.name().name);
+ for (MigrationListener listener : listeners)
+ listener.onUpdateFunction(udf.name().keyspace, udf.name().name, udf.argTypes());
}
public void notifyUpdateAggregate(UDAggregate udf)
{
- for (IMigrationListener listener : listeners)
- listener.onUpdateAggregate(udf.name().keyspace, udf.name().name);
+ for (MigrationListener listener : listeners)
+ listener.onUpdateAggregate(udf.name().keyspace, udf.name().name, udf.argTypes());
}
public void notifyDropKeyspace(KSMetaData ksm)
{
- for (IMigrationListener listener : listeners)
+ for (MigrationListener listener : listeners)
listener.onDropKeyspace(ksm.name);
}
public void notifyDropColumnFamily(CFMetaData cfm)
{
- for (IMigrationListener listener : listeners)
+ for (MigrationListener listener : listeners)
listener.onDropColumnFamily(cfm.ksName, cfm.cfName);
}
public void notifyDropUserType(UserType ut)
{
- for (IMigrationListener listener : listeners)
+ for (MigrationListener listener : listeners)
listener.onDropUserType(ut.keyspace, ut.getNameAsString());
}
public void notifyDropFunction(UDFunction udf)
{
- for (IMigrationListener listener : listeners)
- listener.onDropFunction(udf.name().keyspace, udf.name().name);
+ for (MigrationListener listener : listeners)
+ listener.onDropFunction(udf.name().keyspace, udf.name().name, udf.argTypes());
}
public void notifyDropAggregate(UDAggregate udf)
{
- for (IMigrationListener listener : listeners)
- listener.onDropAggregate(udf.name().keyspace, udf.name().name);
+ for (MigrationListener listener : listeners)
+ listener.onDropAggregate(udf.name().keyspace, udf.name().name, udf.argTypes());
}
public static void announceNewKeyspace(KSMetaData ksm) throws ConfigurationException
http://git-wip-us.apache.org/repos/asf/cassandra/blob/dcc3bb05/src/java/org/apache/cassandra/transport/Event.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/Event.java b/src/java/org/apache/cassandra/transport/Event.java
index 9962599..5e9c6b7 100644
--- a/src/java/org/apache/cassandra/transport/Event.java
+++ b/src/java/org/apache/cassandra/transport/Event.java
@@ -19,6 +19,8 @@ package org.apache.cassandra.transport;
import java.net.InetAddress;
import java.net.InetSocketAddress;
+import java.util.Iterator;
+import java.util.List;
import com.google.common.base.Objects;
import io.netty.buffer.ByteBuf;
@@ -204,22 +206,29 @@ public abstract class Event
public static class SchemaChange extends Event
{
public enum Change { CREATED, UPDATED, DROPPED }
- public enum Target { KEYSPACE, TABLE, TYPE }
+ public enum Target { KEYSPACE, TABLE, TYPE, FUNCTION, AGGREGATE }
public final Change change;
public final Target target;
public final String keyspace;
- public final String tableOrTypeOrFunction;
+ public final String name;
+ public final List<String> argTypes;
- public SchemaChange(Change change, Target target, String keyspace, String tableOrTypeOrFunction)
+ public SchemaChange(Change change, Target target, String keyspace, String name, List<String> argTypes)
{
super(Type.SCHEMA_CHANGE);
this.change = change;
this.target = target;
this.keyspace = keyspace;
- this.tableOrTypeOrFunction = tableOrTypeOrFunction;
+ this.name = name;
if (target != Target.KEYSPACE)
- assert this.tableOrTypeOrFunction != null : "Table or type should be set for non-keyspace schema change events";
+ assert this.name != null : "Table, type, function or aggregate name should be set for non-keyspace schema change events";
+ this.argTypes = argTypes;
+ }
+
+ public SchemaChange(Change change, Target target, String keyspace, String name)
+ {
+ this(change, target, keyspace, name, null);
}
public SchemaChange(Change change, String keyspace)
@@ -236,7 +245,11 @@ public abstract class Event
Target target = CBUtil.readEnumValue(Target.class, cb);
String keyspace = CBUtil.readString(cb);
String tableOrType = target == Target.KEYSPACE ? null : CBUtil.readString(cb);
- return new SchemaChange(change, target, keyspace, tableOrType);
+ List<String> argTypes = null;
+ if (target == Target.FUNCTION || target == Target.AGGREGATE)
+ argTypes = CBUtil.readStringList(cb);
+
+ return new SchemaChange(change, target, keyspace, tableOrType, argTypes);
}
else
{
@@ -248,13 +261,36 @@ public abstract class Event
public void serializeEvent(ByteBuf dest, int version)
{
+ if (target == Target.FUNCTION || target == Target.AGGREGATE)
+ {
+ if (version >= 4)
+ {
+ // available since protocol version 4
+ CBUtil.writeEnumValue(change, dest);
+ CBUtil.writeEnumValue(target, dest);
+ CBUtil.writeString(keyspace, dest);
+ CBUtil.writeString(name, dest);
+ CBUtil.writeStringList(argTypes, dest);
+ }
+ else
+ {
+ // not available in protocol versions < 4 - just say the keyspace was updated.
+ CBUtil.writeEnumValue(Change.UPDATED, dest);
+ if (version >= 3)
+ CBUtil.writeEnumValue(Target.KEYSPACE, dest);
+ CBUtil.writeString(keyspace, dest);
+ CBUtil.writeString("", dest);
+ }
+ return;
+ }
+
if (version >= 3)
{
CBUtil.writeEnumValue(change, dest);
CBUtil.writeEnumValue(target, dest);
CBUtil.writeString(keyspace, dest);
if (target != Target.KEYSPACE)
- CBUtil.writeString(tableOrTypeOrFunction, dest);
+ CBUtil.writeString(name, dest);
}
else
{
@@ -270,13 +306,30 @@ public abstract class Event
{
CBUtil.writeEnumValue(change, dest);
CBUtil.writeString(keyspace, dest);
- CBUtil.writeString(target == Target.KEYSPACE ? "" : tableOrTypeOrFunction, dest);
+ CBUtil.writeString(target == Target.KEYSPACE ? "" : name, dest);
}
}
}
public int eventSerializedSize(int version)
{
+ if (target == Target.FUNCTION || target == Target.AGGREGATE)
+ {
+ if (version >= 4)
+ return CBUtil.sizeOfEnumValue(change)
+ + CBUtil.sizeOfEnumValue(target)
+ + CBUtil.sizeOfString(keyspace)
+ + CBUtil.sizeOfString(name)
+ + CBUtil.sizeOfStringList(argTypes);
+ if (version >= 3)
+ return CBUtil.sizeOfEnumValue(Change.UPDATED)
+ + CBUtil.sizeOfEnumValue(Target.KEYSPACE)
+ + CBUtil.sizeOfString(keyspace);
+ return CBUtil.sizeOfEnumValue(Change.UPDATED)
+ + CBUtil.sizeOfString(keyspace)
+ + CBUtil.sizeOfString("");
+ }
+
if (version >= 3)
{
int size = CBUtil.sizeOfEnumValue(change)
@@ -284,7 +337,7 @@ public abstract class Event
+ CBUtil.sizeOfString(keyspace);
if (target != Target.KEYSPACE)
- size += CBUtil.sizeOfString(tableOrTypeOrFunction);
+ size += CBUtil.sizeOfString(name);
return size;
}
@@ -298,20 +351,36 @@ public abstract class Event
}
return CBUtil.sizeOfEnumValue(change)
+ CBUtil.sizeOfString(keyspace)
- + CBUtil.sizeOfString(target == Target.KEYSPACE ? "" : tableOrTypeOrFunction);
+ + CBUtil.sizeOfString(target == Target.KEYSPACE ? "" : name);
}
}
@Override
public String toString()
{
- return change + " " + target + " " + keyspace + (tableOrTypeOrFunction == null ? "" : "." + tableOrTypeOrFunction);
+ StringBuilder sb = new StringBuilder().append(change)
+ .append(' ').append(target)
+ .append(' ').append(keyspace);
+ if (name != null)
+ sb.append('.').append(name);
+ if (argTypes != null)
+ {
+ sb.append(" (");
+ for (Iterator<String> iter = argTypes.iterator(); iter.hasNext(); )
+ {
+ sb.append(iter.next());
+ if (iter.hasNext())
+ sb.append(',');
+ }
+ sb.append(')');
+ }
+ return sb.toString();
}
@Override
public int hashCode()
{
- return Objects.hashCode(change, target, keyspace, tableOrTypeOrFunction);
+ return Objects.hashCode(change, target, keyspace, name, argTypes);
}
@Override
@@ -324,7 +393,8 @@ public abstract class Event
return Objects.equal(change, scc.change)
&& Objects.equal(target, scc.target)
&& Objects.equal(keyspace, scc.keyspace)
- && Objects.equal(tableOrTypeOrFunction, scc.tableOrTypeOrFunction);
+ && Objects.equal(name, scc.name)
+ && Objects.equal(argTypes, scc.argTypes);
}
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/dcc3bb05/src/java/org/apache/cassandra/transport/Server.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/Server.java b/src/java/org/apache/cassandra/transport/Server.java
index 5202a94..147d729 100644
--- a/src/java/org/apache/cassandra/transport/Server.java
+++ b/src/java/org/apache/cassandra/transport/Server.java
@@ -22,6 +22,7 @@ import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.util.EnumMap;
+import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.net.ssl.SSLContext;
@@ -44,6 +45,7 @@ import org.apache.cassandra.auth.IAuthenticator;
import org.apache.cassandra.auth.ISaslAwareAuthenticator;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.EncryptionOptions;
+import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.metrics.ClientMetrics;
import org.apache.cassandra.security.SSLFactory;
import org.apache.cassandra.service.*;
@@ -330,7 +332,7 @@ public class Server implements CassandraDaemon.Server
}
}
- private static class EventNotifier implements IEndpointLifecycleSubscriber, IMigrationListener
+ private static class EventNotifier extends MigrationListener implements IEndpointLifecycleSubscriber
{
private final Server server;
private static final InetAddress bindAll;
@@ -410,12 +412,16 @@ public class Server implements CassandraDaemon.Server
server.connectionTracker.send(new Event.SchemaChange(Event.SchemaChange.Change.CREATED, Event.SchemaChange.Target.TYPE, ksName, typeName));
}
- public void onCreateFunction(String ksName, String functionName)
+ public void onCreateFunction(String ksName, String functionName, List<AbstractType<?>> argTypes)
{
+ server.connectionTracker.send(new Event.SchemaChange(Event.SchemaChange.Change.CREATED, Event.SchemaChange.Target.FUNCTION,
+ ksName, functionName, AbstractType.asCQLTypeStringList(argTypes)));
}
- public void onCreateAggregate(String ksName, String aggregateName)
+ public void onCreateAggregate(String ksName, String aggregateName, List<AbstractType<?>> argTypes)
{
+ server.connectionTracker.send(new Event.SchemaChange(Event.SchemaChange.Change.CREATED, Event.SchemaChange.Target.AGGREGATE,
+ ksName, aggregateName, AbstractType.asCQLTypeStringList(argTypes)));
}
public void onUpdateKeyspace(String ksName)
@@ -433,12 +439,16 @@ public class Server implements CassandraDaemon.Server
server.connectionTracker.send(new Event.SchemaChange(Event.SchemaChange.Change.UPDATED, Event.SchemaChange.Target.TYPE, ksName, typeName));
}
- public void onUpdateFunction(String ksName, String functionName)
+ public void onUpdateFunction(String ksName, String functionName, List<AbstractType<?>> argTypes)
{
+ server.connectionTracker.send(new Event.SchemaChange(Event.SchemaChange.Change.UPDATED, Event.SchemaChange.Target.FUNCTION,
+ ksName, functionName, AbstractType.asCQLTypeStringList(argTypes)));
}
- public void onUpdateAggregate(String ksName, String aggregateName)
+ public void onUpdateAggregate(String ksName, String aggregateName, List<AbstractType<?>> argTypes)
{
+ server.connectionTracker.send(new Event.SchemaChange(Event.SchemaChange.Change.UPDATED, Event.SchemaChange.Target.AGGREGATE,
+ ksName, aggregateName, AbstractType.asCQLTypeStringList(argTypes)));
}
public void onDropKeyspace(String ksName)
@@ -456,12 +466,16 @@ public class Server implements CassandraDaemon.Server
server.connectionTracker.send(new Event.SchemaChange(Event.SchemaChange.Change.DROPPED, Event.SchemaChange.Target.TYPE, ksName, typeName));
}
- public void onDropFunction(String ksName, String functionName)
+ public void onDropFunction(String ksName, String functionName, List<AbstractType<?>> argTypes)
{
+ server.connectionTracker.send(new Event.SchemaChange(Event.SchemaChange.Change.DROPPED, Event.SchemaChange.Target.FUNCTION,
+ ksName, functionName, AbstractType.asCQLTypeStringList(argTypes)));
}
- public void onDropAggregate(String ksName, String aggregateName)
+ public void onDropAggregate(String ksName, String aggregateName, List<AbstractType<?>> argTypes)
{
+ server.connectionTracker.send(new Event.SchemaChange(Event.SchemaChange.Change.DROPPED, Event.SchemaChange.Target.AGGREGATE,
+ ksName, aggregateName, AbstractType.asCQLTypeStringList(argTypes)));
}
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/dcc3bb05/test/unit/org/apache/cassandra/cql3/AggregationTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/AggregationTest.java b/test/unit/org/apache/cassandra/cql3/AggregationTest.java
index 940e87f..1ddd1f1 100644
--- a/test/unit/org/apache/cassandra/cql3/AggregationTest.java
+++ b/test/unit/org/apache/cassandra/cql3/AggregationTest.java
@@ -28,6 +28,7 @@ import org.junit.Assert;
import org.junit.Test;
import org.apache.cassandra.service.ClientState;
+import org.apache.cassandra.transport.Event;
import org.apache.cassandra.transport.messages.ResultMessage;
public class AggregationTest extends CQLTester
@@ -41,7 +42,7 @@ public class AggregationTest extends CQLTester
assertColumnNames(execute("SELECT COUNT(*) FROM %s"), "count");
assertRows(execute("SELECT COUNT(*) FROM %s"), row(0L));
assertColumnNames(execute("SELECT max(b), min(b), sum(b), avg(b) , max(c), sum(c), avg(c), sum(d), avg(d) FROM %s"),
- "system.max(b)", "system.min(b)", "system.sum(b)", "system.avg(b)" , "system.max(c)", "system.sum(c)", "system.avg(c)", "system.sum(d)", "system.avg(d)");
+ "system.max(b)", "system.min(b)", "system.sum(b)", "system.avg(b)", "system.max(c)", "system.sum(c)", "system.avg(c)", "system.sum(d)", "system.avg(d)");
assertRows(execute("SELECT max(b), min(b), sum(b), avg(b) , max(c), sum(c), avg(c), sum(d), avg(d) FROM %s"),
row(null, null, 0, 0, null, 0.0, 0.0, new BigDecimal("0"), new BigDecimal("0")));
@@ -133,7 +134,7 @@ public class AggregationTest extends CQLTester
}
@Test
- public void testDropStatements() throws Throwable
+ public void testSchemaChange() throws Throwable
{
String f = createFunction(KEYSPACE,
"double, double",
@@ -141,13 +142,66 @@ public class AggregationTest extends CQLTester
"RETURNS double " +
"LANGUAGE javascript " +
"AS '\"string\";';");
+
createFunctionOverload(f,
+ "double, double",
+ "CREATE OR REPLACE FUNCTION %s(state int, val int) " +
+ "RETURNS int " +
+ "LANGUAGE javascript " +
+ "AS '\"string\";';");
+
+ String a = createAggregate(KEYSPACE,
+ "double",
+ "CREATE OR REPLACE AGGREGATE %s(double) " +
+ "SFUNC " + shortFunctionName(f) + " " +
+ "STYPE double");
+
+ assertLastSchemaChange(Event.SchemaChange.Change.CREATED, Event.SchemaChange.Target.AGGREGATE,
+ KEYSPACE, parseFunctionName(a).name,
+ "double");
+
+ schemaChange("CREATE OR REPLACE AGGREGATE " + a + "(double) " +
+ "SFUNC " + shortFunctionName(f) + " " +
+ "STYPE double");
+
+ assertLastSchemaChange(Event.SchemaChange.Change.UPDATED, Event.SchemaChange.Target.AGGREGATE,
+ KEYSPACE, parseFunctionName(a).name,
+ "double");
+
+ createAggregateOverload(a,
+ "int",
+ "CREATE OR REPLACE AGGREGATE %s(int) " +
+ "SFUNC " + shortFunctionName(f) + " " +
+ "STYPE int");
+
+ assertLastSchemaChange(Event.SchemaChange.Change.CREATED, Event.SchemaChange.Target.AGGREGATE,
+ KEYSPACE, parseFunctionName(a).name,
+ "int");
+
+ schemaChange("DROP AGGREGATE " + a + "(double)");
+
+ assertLastSchemaChange(Event.SchemaChange.Change.DROPPED, Event.SchemaChange.Target.AGGREGATE,
+ KEYSPACE, parseFunctionName(a).name,
+ "double");
+ }
+
+ @Test
+ public void testDropStatements() throws Throwable
+ {
+ String f = createFunction(KEYSPACE,
"double, double",
- "CREATE OR REPLACE FUNCTION %s(state int, val int) " +
- "RETURNS int " +
+ "CREATE OR REPLACE FUNCTION %s(state double, val double) " +
+ "RETURNS double " +
"LANGUAGE javascript " +
"AS '\"string\";';");
+ createFunctionOverload(f,
+ "double, double",
+ "CREATE OR REPLACE FUNCTION %s(state int, val int) " +
+ "RETURNS int " +
+ "LANGUAGE javascript " +
+ "AS '\"string\";';");
+
// DROP AGGREGATE must not succeed against a scalar
assertInvalid("DROP AGGREGATE " + f);
assertInvalid("DROP AGGREGATE " + f + "(double, double)");
http://git-wip-us.apache.org/repos/asf/cassandra/blob/dcc3bb05/test/unit/org/apache/cassandra/cql3/CQLTester.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/CQLTester.java b/test/unit/org/apache/cassandra/cql3/CQLTester.java
index 36fe957..5611ac6 100644
--- a/test/unit/org/apache/cassandra/cql3/CQLTester.java
+++ b/test/unit/org/apache/cassandra/cql3/CQLTester.java
@@ -42,21 +42,25 @@ import org.slf4j.LoggerFactory;
import com.datastax.driver.core.*;
import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Row;
import org.apache.cassandra.SchemaLoader;
import org.apache.cassandra.concurrent.ScheduledExecutors;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.Schema;
import org.apache.cassandra.cql3.functions.FunctionName;
-import org.apache.cassandra.db.Directories;
-import org.apache.cassandra.db.Keyspace;
-import org.apache.cassandra.db.SystemKeyspace;
+import org.apache.cassandra.cql3.statements.ParsedStatement;
+import org.apache.cassandra.db.*;
import org.apache.cassandra.db.marshal.*;
import org.apache.cassandra.db.marshal.TupleType;
import org.apache.cassandra.exceptions.*;
import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.serializers.TypeSerializer;
+import org.apache.cassandra.service.ClientState;
+import org.apache.cassandra.service.QueryState;
import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.transport.Event;
import org.apache.cassandra.transport.Server;
+import org.apache.cassandra.transport.messages.ResultMessage;
/**
* Base class for CQL tests.
@@ -94,6 +98,8 @@ public abstract class CQLTester
}
}
+ public static ResultMessage lastSchemaChangeResult;
+
private List<String> tables = new ArrayList<>();
private List<String> types = new ArrayList<>();
private List<String> functions = new ArrayList<>();
@@ -327,7 +333,7 @@ public abstract class CQLTester
String fullQuery = String.format(query, functionName);
functions.add(functionName + '(' + argTypes + ')');
logger.info(fullQuery);
- execute(fullQuery);
+ schemaChange(fullQuery);
}
protected String createAggregate(String keyspace, String argTypes, String query) throws Throwable
@@ -342,7 +348,7 @@ public abstract class CQLTester
String fullQuery = String.format(query, aggregateName);
aggregates.add(aggregateName + '(' + argTypes + ')');
logger.info(fullQuery);
- execute(fullQuery);
+ schemaChange(fullQuery);
}
protected void createTable(String query)
@@ -426,12 +432,33 @@ public abstract class CQLTester
schemaChange(fullQuery);
}
- private static void schemaChange(String query)
+ protected void assertLastSchemaChange(Event.SchemaChange.Change change, Event.SchemaChange.Target target,
+ String keyspace, String name,
+ String... argTypes)
+ {
+ Assert.assertTrue(lastSchemaChangeResult instanceof ResultMessage.SchemaChange);
+ ResultMessage.SchemaChange schemaChange = (ResultMessage.SchemaChange) lastSchemaChangeResult;
+ Assert.assertSame(change, schemaChange.change.change);
+ Assert.assertSame(target, schemaChange.change.target);
+ Assert.assertEquals(keyspace, schemaChange.change.keyspace);
+ Assert.assertEquals(name, schemaChange.change.name);
+ Assert.assertEquals(argTypes != null ? Arrays.asList(argTypes) : null, schemaChange.change.argTypes);
+ }
+
+ protected static void schemaChange(String query)
{
try
{
- // executeOnceInternal don't work for schema changes
- QueryProcessor.executeOnceInternal(query);
+ ClientState state = ClientState.forInternalCalls();
+ state.setKeyspace(SystemKeyspace.NAME);
+ QueryState queryState = new QueryState(state);
+
+ ParsedStatement.Prepared prepared = QueryProcessor.parseStatement(query, queryState);
+ prepared.statement.validate(state);
+
+ QueryOptions options = QueryOptions.forInternalCalls(Collections.<ByteBuffer>emptyList());
+
+ lastSchemaChangeResult = prepared.statement.executeInternal(queryState, options);
}
catch (Exception e)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/dcc3bb05/test/unit/org/apache/cassandra/cql3/UFTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/UFTest.java b/test/unit/org/apache/cassandra/cql3/UFTest.java
index fa28126..ea1b2da 100644
--- a/test/unit/org/apache/cassandra/cql3/UFTest.java
+++ b/test/unit/org/apache/cassandra/cql3/UFTest.java
@@ -29,6 +29,7 @@ import org.apache.cassandra.cql3.functions.FunctionName;
import org.apache.cassandra.cql3.functions.Functions;
import org.apache.cassandra.exceptions.InvalidRequestException;
import org.apache.cassandra.service.ClientState;
+import org.apache.cassandra.transport.Event;
import org.apache.cassandra.transport.Server;
import org.apache.cassandra.transport.messages.ResultMessage;
@@ -36,6 +37,47 @@ public class UFTest extends CQLTester
{
@Test
+ public void testSchemaChange() throws Throwable
+ {
+ String f = createFunction(KEYSPACE,
+ "double, double",
+ "CREATE OR REPLACE FUNCTION %s(state double, val double) " +
+ "RETURNS double " +
+ "LANGUAGE javascript " +
+ "AS '\"string\";';");
+
+ assertLastSchemaChange(Event.SchemaChange.Change.CREATED, Event.SchemaChange.Target.FUNCTION,
+ KEYSPACE, parseFunctionName(f).name,
+ "double", "double");
+
+ createFunctionOverload(f,
+ "double, double",
+ "CREATE OR REPLACE FUNCTION %s(state int, val int) " +
+ "RETURNS int " +
+ "LANGUAGE javascript " +
+ "AS '\"string\";';");
+
+ assertLastSchemaChange(Event.SchemaChange.Change.CREATED, Event.SchemaChange.Target.FUNCTION,
+ KEYSPACE, parseFunctionName(f).name,
+ "int", "int");
+
+ schemaChange("CREATE OR REPLACE FUNCTION " + f + "(state int, val int) " +
+ "RETURNS int " +
+ "LANGUAGE javascript " +
+ "AS '\"string\";';");
+
+ assertLastSchemaChange(Event.SchemaChange.Change.UPDATED, Event.SchemaChange.Target.FUNCTION,
+ KEYSPACE, parseFunctionName(f).name,
+ "int", "int");
+
+ schemaChange("DROP FUNCTION " + f + "(double, double)");
+
+ assertLastSchemaChange(Event.SchemaChange.Change.DROPPED, Event.SchemaChange.Target.FUNCTION,
+ KEYSPACE, parseFunctionName(f).name,
+ "double", "double");
+ }
+
+ @Test
public void testFunctionDropOnKeyspaceDrop() throws Throwable
{
String fSin = createFunction(KEYSPACE_PER_TEST, "double",
@@ -245,7 +287,7 @@ public class UFTest extends CQLTester
// single-int-overload must still work
assertRows(execute("SELECT v FROM %s WHERE k = " + fOverload + "((int)?)", 3), row(1));
// overloaded has just one overload now - so the following DROP FUNCTION is not ambigious (CASSANDRA-7812)
- execute("DROP FUNCTION " + fOverload + "");
+ execute("DROP FUNCTION " + fOverload);
}
@Test
@@ -360,7 +402,7 @@ public class UFTest extends CQLTester
createTable("CREATE TABLE %s (key int primary key, val bigint)");
String fName = createFunction(KEYSPACE, "double",
- "CREATE OR REPLACE FUNCTION " + KEYSPACE + ".jft(val double)" +
+ "CREATE OR REPLACE FUNCTION %s(val double)" +
"RETURNS double LANGUAGE JAVA " +
"AS 'return val;';");
http://git-wip-us.apache.org/repos/asf/cassandra/blob/dcc3bb05/test/unit/org/apache/cassandra/transport/SerDeserTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/transport/SerDeserTest.java b/test/unit/org/apache/cassandra/transport/SerDeserTest.java
index 649f7a2..39bd58b 100644
--- a/test/unit/org/apache/cassandra/transport/SerDeserTest.java
+++ b/test/unit/org/apache/cassandra/transport/SerDeserTest.java
@@ -94,6 +94,7 @@ public class SerDeserTest
{
eventSerDeserTest(2);
eventSerDeserTest(3);
+ eventSerDeserTest(4);
}
public void eventSerDeserTest(int version) throws Exception
@@ -122,6 +123,19 @@ public class SerDeserTest
events.add(new SchemaChange(SchemaChange.Change.DROPPED, SchemaChange.Target.TYPE, "ks", "type"));
}
+ if (version >= 4)
+ {
+ List<String> moreTypes = Arrays.asList("text", "bigint");
+
+ events.add(new SchemaChange(SchemaChange.Change.CREATED, SchemaChange.Target.FUNCTION, "ks", "func", Collections.<String>emptyList()));
+ events.add(new SchemaChange(SchemaChange.Change.UPDATED, SchemaChange.Target.FUNCTION, "ks", "func", moreTypes));
+ events.add(new SchemaChange(SchemaChange.Change.DROPPED, SchemaChange.Target.FUNCTION, "ks", "func", moreTypes));
+
+ events.add(new SchemaChange(SchemaChange.Change.CREATED, SchemaChange.Target.AGGREGATE, "ks", "aggr", Collections.<String>emptyList()));
+ events.add(new SchemaChange(SchemaChange.Change.UPDATED, SchemaChange.Target.AGGREGATE, "ks", "aggr", moreTypes));
+ events.add(new SchemaChange(SchemaChange.Change.DROPPED, SchemaChange.Target.AGGREGATE, "ks", "aggr", moreTypes));
+ }
+
for (Event ev : events)
{
ByteBuf buf = Unpooled.buffer(ev.serializedSize(version));