You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ty...@apache.org on 2014/08/08 21:13:13 UTC
[2/2] git commit: Support pure user-defined functions
Support pure user-defined functions
Patch by Robert Stupp; reviewed by Tyler Hobbs for CASSANDRA-7395
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/25411bf1
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/25411bf1
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/25411bf1
Branch: refs/heads/trunk
Commit: 25411bf1d15a35bf17002cf7664173357c6dc6cf
Parents: 2f25e6e
Author: Robert Stupp <sn...@snazy.de>
Authored: Fri Aug 8 14:11:01 2014 -0500
Committer: Tyler Hobbs <ty...@datastax.com>
Committed: Fri Aug 8 14:11:01 2014 -0500
----------------------------------------------------------------------
CHANGES.txt | 1 +
build.xml | 2 +-
pylib/cqlshlib/cql3handling.py | 2 +-
src/java/org/apache/cassandra/auth/Auth.java | 12 +
.../org/apache/cassandra/config/CFMetaData.java | 14 +-
.../org/apache/cassandra/config/KSMetaData.java | 1 +
.../org/apache/cassandra/config/UFMetaData.java | 309 +++++++++++++++++++
.../cassandra/cql3/AssignementTestable.java | 4 +-
src/java/org/apache/cassandra/cql3/Cql.g | 73 ++++-
.../org/apache/cassandra/cql3/TypeCast.java | 2 +-
.../cassandra/cql3/functions/FunctionCall.java | 26 +-
.../cassandra/cql3/functions/Functions.java | 7 +-
.../statements/CreateFunctionStatement.java | 180 +++++++++++
.../cql3/statements/DropFunctionStatement.java | 94 ++++++
.../statements/SchemaAlteringStatement.java | 6 +-
.../cassandra/cql3/statements/Selectable.java | 9 +-
.../cassandra/cql3/statements/Selection.java | 21 +-
.../cql3/udf/UDFFunctionOverloads.java | 87 ++++++
.../apache/cassandra/cql3/udf/UDFRegistry.java | 146 +++++++++
.../apache/cassandra/cql3/udf/UDFunction.java | 178 +++++++++++
.../org/apache/cassandra/db/DefsTables.java | 83 +++++
.../org/apache/cassandra/db/SystemKeyspace.java | 14 +-
.../cassandra/service/CassandraDaemon.java | 4 +
.../cassandra/service/IMigrationListener.java | 4 +
.../cassandra/service/MigrationManager.java | 42 +++
.../org/apache/cassandra/transport/Event.java | 40 +--
.../org/apache/cassandra/transport/Server.java | 12 +
test/unit/org/apache/cassandra/cql3/UFTest.java | 186 +++++++++++
28 files changed, 1517 insertions(+), 42 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/25411bf1/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index b33399b..f6285fe 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
3.0
+ * Support pure user-defined functions (CASSANDRA-7395)
* Permit configurable timestamps with cassandra-stress (CASSANDRA-7416)
* Move sstable RandomAccessReader to nio2, which allows using the
FILE_SHARE_DELETE flag on Windows (CASSANDRA-4050)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/25411bf1/build.xml
----------------------------------------------------------------------
diff --git a/build.xml b/build.xml
index e904ca2..057bf3d 100644
--- a/build.xml
+++ b/build.xml
@@ -1461,7 +1461,7 @@
</java>
</target>
- <target name="javadoc" depends="init" description="Create javadoc">
+ <target name="javadoc" depends="init" description="Create javadoc" unless="no-javadoc">
<create-javadoc destdir="${javadoc.dir}">
<filesets>
<fileset dir="${build.src.java}" defaultexcludes="yes">
http://git-wip-us.apache.org/repos/asf/cassandra/blob/25411bf1/pylib/cqlshlib/cql3handling.py
----------------------------------------------------------------------
diff --git a/pylib/cqlshlib/cql3handling.py b/pylib/cqlshlib/cql3handling.py
index 72461db..d912c67 100644
--- a/pylib/cqlshlib/cql3handling.py
+++ b/pylib/cqlshlib/cql3handling.py
@@ -212,7 +212,7 @@ JUNK ::= /([ \t\r\f\v]+|(--|[/][/])[^\n\r]*([\n\r]|$)|[/][*].*?[*][/])/ ;
<mapLiteral> ::= "{" <term> ":" <term> ( "," <term> ":" <term> )* "}"
;
-<functionName> ::= <identifier>
+<functionName> ::= ( <identifier> ":" ":" )? <identifier>
;
<statementBody> ::= <useStatement>
http://git-wip-us.apache.org/repos/asf/cassandra/blob/25411bf1/src/java/org/apache/cassandra/auth/Auth.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/auth/Auth.java b/src/java/org/apache/cassandra/auth/Auth.java
index 7c532b0..bc23d05 100644
--- a/src/java/org/apache/cassandra/auth/Auth.java
+++ b/src/java/org/apache/cassandra/auth/Auth.java
@@ -291,6 +291,10 @@ public class Auth
{
}
+ public void onDropFunction(String namespace, String functionName)
+ {
+ }
+
public void onCreateKeyspace(String ksName)
{
}
@@ -303,6 +307,10 @@ public class Auth
{
}
+ public void onCreateFunction(String namespace, String functionName)
+ {
+ }
+
public void onUpdateKeyspace(String ksName)
{
}
@@ -314,5 +322,9 @@ public class Auth
public void onUpdateUserType(String ksName, String userType)
{
}
+
+ public void onUpdateFunction(String namespace, String functionName)
+ {
+ }
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/25411bf1/src/java/org/apache/cassandra/config/CFMetaData.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/CFMetaData.java b/src/java/org/apache/cassandra/config/CFMetaData.java
index 5a347f7..37f586d 100644
--- a/src/java/org/apache/cassandra/config/CFMetaData.java
+++ b/src/java/org/apache/cassandra/config/CFMetaData.java
@@ -213,6 +213,19 @@ public final class CFMetaData
+ "PRIMARY KEY (keyspace_name, type_name)"
+ ") WITH COMMENT='Defined user types' AND gc_grace_seconds=604800");
+ public static final CFMetaData SchemaFunctionsCf = compile("CREATE TABLE " + SystemKeyspace.SCHEMA_FUNCTIONS_CF + " ("
+ + "namespace text,"
+ + "name text,"
+ + "signature text,"
+ + "argument_names list<text>,"
+ + "argument_types list<text>,"
+ + "return_type text,"
+ + "deterministic boolean,"
+ + "language text,"
+ + "body text,"
+ + "primary key ((namespace, name), signature)"
+ + ") WITH COMMENT='user defined functions' AND gc_grace_seconds=604800");
+
public static final CFMetaData HintsCf = compile("CREATE TABLE " + SystemKeyspace.HINTS_CF + " ("
+ "target_id uuid,"
+ "hint_id timeuuid,"
@@ -331,7 +344,6 @@ public final class CFMetaData
+ "PRIMARY KEY (id)"
+ ") WITH COMMENT='show all compaction history' AND DEFAULT_TIME_TO_LIVE=604800");
-
public static class SpeculativeRetry
{
public enum RetryType
http://git-wip-us.apache.org/repos/asf/cassandra/blob/25411bf1/src/java/org/apache/cassandra/config/KSMetaData.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/KSMetaData.java b/src/java/org/apache/cassandra/config/KSMetaData.java
index 8c99191..64ac3ff 100644
--- a/src/java/org/apache/cassandra/config/KSMetaData.java
+++ b/src/java/org/apache/cassandra/config/KSMetaData.java
@@ -101,6 +101,7 @@ public final class KSMetaData
CFMetaData.SchemaColumnsCf,
CFMetaData.SchemaTriggersCf,
CFMetaData.SchemaUserTypesCf,
+ CFMetaData.SchemaFunctionsCf,
CFMetaData.CompactionLogCf,
CFMetaData.CompactionHistoryCf,
CFMetaData.PaxosCf,
http://git-wip-us.apache.org/repos/asf/cassandra/blob/25411bf1/src/java/org/apache/cassandra/config/UFMetaData.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/UFMetaData.java b/src/java/org/apache/cassandra/config/UFMetaData.java
new file mode 100644
index 0000000..18484f3
--- /dev/null
+++ b/src/java/org/apache/cassandra/config/UFMetaData.java
@@ -0,0 +1,309 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.config;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.lang3.builder.ToStringBuilder;
+
+import org.antlr.runtime.ANTLRStringStream;
+import org.antlr.runtime.CharStream;
+import org.antlr.runtime.CommonTokenStream;
+import org.antlr.runtime.RecognitionException;
+import org.antlr.runtime.TokenStream;
+import org.apache.cassandra.cql3.AssignementTestable;
+import org.apache.cassandra.cql3.CQL3Type;
+import org.apache.cassandra.cql3.ColumnIdentifier;
+import org.apache.cassandra.cql3.ColumnSpecification;
+import org.apache.cassandra.cql3.CqlLexer;
+import org.apache.cassandra.cql3.CqlParser;
+import org.apache.cassandra.cql3.QueryProcessor;
+import org.apache.cassandra.cql3.UntypedResultSet;
+import org.apache.cassandra.cql3.udf.UDFFunctionOverloads;
+import org.apache.cassandra.cql3.udf.UDFRegistry;
+import org.apache.cassandra.db.CFRowAdder;
+import org.apache.cassandra.db.ColumnFamily;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.Mutation;
+import org.apache.cassandra.db.RangeTombstone;
+import org.apache.cassandra.db.Row;
+import org.apache.cassandra.db.SystemKeyspace;
+import org.apache.cassandra.db.composites.Composite;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.marshal.CompositeType;
+import org.apache.cassandra.db.marshal.UTF8Type;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.exceptions.InvalidRequestException;
+import org.apache.cassandra.exceptions.SyntaxException;
+
+/**
+ * Defined (and loaded) user functions.
+ * <p/>
+ * In practice, because user functions are global, we have only one instance of
+ * this class that retrieve through the Schema class.
+ */
+public final class UFMetaData
+{
+ public final String namespace;
+ public final String functionName;
+ public final String qualifiedName;
+ public final String returnType;
+ public final List<String> argumentNames;
+ public final List<String> argumentTypes;
+ public final String language;
+ public final String body;
+ public final boolean deterministic;
+
+ public final String signature;
+ public final List<CQL3Type> cqlArgumentTypes;
+ public final CQL3Type cqlReturnType;
+
+ static final CompositeType partKey = (CompositeType) CFMetaData.SchemaFunctionsCf.getKeyValidator();
+
+ // TODO tracking "valid" status via an exception field is really bad style - but we need some way to mark a function as "dead"
+ public InvalidRequestException invalid;
+
+ public UFMetaData(String namespace, String functionName, boolean deterministic, List<String> argumentNames,
+ List<String> argumentTypes, String returnType, String language, String body)
+ {
+ this.namespace = namespace != null ? namespace.toLowerCase() : "";
+ this.functionName = functionName.toLowerCase();
+ this.qualifiedName = qualifiedName(namespace, functionName);
+ this.returnType = returnType;
+ this.argumentNames = argumentNames;
+ this.argumentTypes = argumentTypes;
+ this.language = language == null ? "class" : language.toLowerCase();
+ this.body = body;
+ this.deterministic = deterministic;
+
+ this.cqlArgumentTypes = new ArrayList<>(argumentTypes.size());
+ InvalidRequestException inv = null;
+ CQL3Type rt = null;
+ try
+ {
+ rt = parseCQLType(returnType);
+ for (String argumentType : argumentTypes)
+ cqlArgumentTypes.add(parseCQLType(argumentType));
+ }
+ catch (InvalidRequestException e)
+ {
+ inv = e;
+ }
+ this.invalid = inv;
+ this.cqlReturnType = rt;
+
+ StringBuilder signature = new StringBuilder();
+ signature.append(qualifiedName);
+ for (String argumentType : argumentTypes)
+ {
+ signature.append(',');
+ signature.append(argumentType);
+ }
+ this.signature = signature.toString();
+ }
+
+ public boolean compatibleArgs(String ksName, String cfName, List<? extends AssignementTestable> providedArgs)
+ {
+ int cnt = cqlArgumentTypes.size();
+ if (cnt != providedArgs.size())
+ return false;
+ for (int i = 0; i < cnt; i++)
+ {
+ AssignementTestable provided = providedArgs.get(i);
+
+ if (provided == null)
+ continue;
+
+ AbstractType<?> argType = cqlArgumentTypes.get(i).getType();
+
+ ColumnSpecification expected = makeArgSpec(ksName, cfName, argType, i);
+ if (!provided.isAssignableTo(ksName, expected))
+ return false;
+ }
+
+ return true;
+ }
+
+ public ColumnSpecification makeArgSpec(String ksName, String cfName, AbstractType<?> argType, int i)
+ {
+ return new ColumnSpecification(ksName,
+ cfName,
+ new ColumnIdentifier("arg" + i + "(" + qualifiedName + ")", true), argType);
+ }
+
+ private static CQL3Type parseCQLType(String cqlType)
+ throws InvalidRequestException
+ {
+ CharStream stream = new ANTLRStringStream(cqlType);
+ CqlLexer lexer = new CqlLexer(stream);
+
+ TokenStream tokenStream = new CommonTokenStream(lexer);
+ CqlParser parser = new CqlParser(tokenStream);
+ try
+ {
+ CQL3Type.Raw rawType = parser.comparatorType();
+ // TODO CASSANDRA-7563 use appropiate keyspace here ... keyspace must be fully qualified
+ CQL3Type t = rawType.prepare(null);
+ // TODO CASSANDRA-7563 support "complex" types (UDT, tuples, collections), remove catch-NPE below
+ if (!(t instanceof CQL3Type.Native))
+ throw new InvalidRequestException("non-native CQL type '" + cqlType + "' not supported");
+ return t;
+ }
+ catch (NullPointerException | InvalidRequestException | RecognitionException e)
+ {
+ throw new InvalidRequestException("invalid CQL type '" + cqlType + "'");
+ }
+ }
+
+ public static String qualifiedName(String namespace, String functionName)
+ {
+ if (namespace == null)
+ return "::" + functionName;
+ return (namespace + "::" + functionName).toLowerCase();
+ }
+
+ public static Mutation dropFunction(long timestamp, String namespace, String functionName)
+ {
+ UDFFunctionOverloads sigMap = UDFRegistry.getFunctionSigMap(UFMetaData.qualifiedName(namespace, functionName));
+ if (sigMap == null || sigMap.isEmpty())
+ return null;
+
+ Mutation mutation = new Mutation(Keyspace.SYSTEM_KS, partKey.decompose(namespace, functionName));
+ ColumnFamily cf = mutation.addOrGet(SystemKeyspace.SCHEMA_FUNCTIONS_CF);
+
+ int ldt = (int) (System.currentTimeMillis() / 1000);
+ for (UFMetaData f : sigMap.values())
+ udfRemove(timestamp, cf, ldt, f);
+
+ return mutation;
+ }
+
+ private static Composite udfSignatureKey(UFMetaData function)
+ {
+ return CFMetaData.SchemaFunctionsCf.comparator.make(function.signature);
+ }
+
+ private static void udfRemove(long timestamp, ColumnFamily cf, int ldt, UFMetaData f)
+ {
+ Composite prefix = udfSignatureKey(f);
+ cf.addAtom(new RangeTombstone(prefix, prefix.end(), timestamp, ldt));
+ }
+
+ public static Mutation createOrReplaceFunction(long timestamp, UFMetaData f)
+ throws ConfigurationException, SyntaxException
+ {
+ Mutation mutation = new Mutation(Keyspace.SYSTEM_KS, partKey.decompose(f.namespace, f.functionName));
+ ColumnFamily cf = mutation.addOrGet(SystemKeyspace.SCHEMA_FUNCTIONS_CF);
+
+ Composite prefix = udfSignatureKey(f);
+ CFRowAdder adder = new CFRowAdder(cf, prefix, timestamp);
+
+ adder.resetCollection("argument_names");
+ adder.resetCollection("argument_types");
+ adder.add("name", f.functionName);
+ adder.add("return_type", f.returnType);
+ adder.add("language", f.language);
+ adder.add("body", f.body);
+ adder.add("deterministic", f.deterministic);
+
+ for (String argName : f.argumentNames)
+ adder.addListEntry("argument_names", argName);
+ for (String argType : f.argumentTypes)
+ adder.addListEntry("argument_types", argType);
+
+ return mutation;
+ }
+
+ public static UFMetaData fromSchema(UntypedResultSet.Row row)
+ {
+ String namespace = row.getString("namespace");
+ String name = row.getString("name");
+ List<String> argumentNames = row.getList("argument_names", UTF8Type.instance);
+ List<String> argumentTypes = row.getList("argument_types", UTF8Type.instance);
+ String returnType = row.getString("return_type");
+ boolean deterministic = row.getBoolean("deterministic");
+ String language = row.getString("language");
+ String body = row.getString("body");
+
+ return new UFMetaData(namespace, name, deterministic, argumentNames, argumentTypes, returnType, language, body);
+ }
+
+ public static Map<String, UFMetaData> fromSchema(Row row)
+ {
+ UntypedResultSet results = QueryProcessor.resultify("SELECT * FROM system." + SystemKeyspace.SCHEMA_FUNCTIONS_CF, row);
+ Map<String, UFMetaData> udfs = new HashMap<>(results.size());
+ for (UntypedResultSet.Row result : results)
+ {
+ UFMetaData udf = fromSchema(result);
+ udfs.put(udf.signature, udf);
+ }
+ return udfs;
+ }
+
+ public boolean equals(Object o)
+ {
+ if (this == o)
+ return true;
+ if (o == null || getClass() != o.getClass())
+ return false;
+
+ UFMetaData that = (UFMetaData) o;
+ if (!signature.equals(that.signature))
+ return false;
+ if (deterministic != that.deterministic)
+ return false;
+ if (argumentNames != null ? !argumentNames.equals(that.argumentNames) : that.argumentNames != null)
+ return false;
+ if (body != null ? !body.equals(that.body) : that.body != null)
+ return false;
+ if (!namespace.equals(that.namespace))
+ return false;
+ if (!language.equals(that.language))
+ return false;
+ if (returnType != null ? !returnType.equals(that.returnType) : that.returnType != null)
+ return false;
+
+ return true;
+ }
+
+ public int hashCode()
+ {
+ int result = signature.hashCode();
+ result = 31 * result + (returnType != null ? returnType.hashCode() : 0);
+ result = 31 * result + (argumentNames != null ? argumentNames.hashCode() : 0);
+ result = 31 * result + (argumentTypes.hashCode());
+ result = 31 * result + (language.hashCode());
+ result = 31 * result + (body != null ? body.hashCode() : 0);
+ result = 31 * result + (deterministic ? 1 : 0);
+ return result;
+ }
+
+ public String toString()
+ {
+ return new ToStringBuilder(this)
+ .append("signature", signature)
+ .append("returnType", returnType)
+ .append("deterministic", deterministic)
+ .append("language", language)
+ .append("body", body)
+ .toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/25411bf1/src/java/org/apache/cassandra/cql3/AssignementTestable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/AssignementTestable.java b/src/java/org/apache/cassandra/cql3/AssignementTestable.java
index 2253cf7..02b3013 100644
--- a/src/java/org/apache/cassandra/cql3/AssignementTestable.java
+++ b/src/java/org/apache/cassandra/cql3/AssignementTestable.java
@@ -17,12 +17,10 @@
*/
package org.apache.cassandra.cql3;
-import org.apache.cassandra.exceptions.InvalidRequestException;
-
public interface AssignementTestable
{
/**
* @return whether this object can be assigned to the provided receiver
*/
- public boolean isAssignableTo(String keyspace, ColumnSpecification receiver) throws InvalidRequestException;
+ public boolean isAssignableTo(String keyspace, ColumnSpecification receiver);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/25411bf1/src/java/org/apache/cassandra/cql3/Cql.g
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/Cql.g b/src/java/org/apache/cassandra/cql3/Cql.g
index 268bce5..96a668b 100644
--- a/src/java/org/apache/cassandra/cql3/Cql.g
+++ b/src/java/org/apache/cassandra/cql3/Cql.g
@@ -242,6 +242,8 @@ cqlStatement returns [ParsedStatement stmt]
| st25=createTypeStatement { $stmt = st25; }
| st26=alterTypeStatement { $stmt = st26; }
| st27=dropTypeStatement { $stmt = st27; }
+ | st28=createFunctionStatement { $stmt = st28; }
+ | st29=dropFunctionStatement { $stmt = st29; }
;
/*
@@ -298,7 +300,8 @@ unaliasedSelector returns [Selectable s]
: ( c=cident { tmp = c; }
| K_WRITETIME '(' c=cident ')' { tmp = new Selectable.WritetimeOrTTL(c, true); }
| K_TTL '(' c=cident ')' { tmp = new Selectable.WritetimeOrTTL(c, false); }
- | f=functionName args=selectionFunctionArgs { tmp = new Selectable.WithFunction(f, args); }
+ | f=functionName args=selectionFunctionArgs { tmp = new Selectable.WithFunction("", f, args); }
+ | bn=udfName '::' fn=udfName args=selectionFunctionArgs { tmp = new Selectable.WithFunction(bn, fn, args); }
) ( '.' fi=cident { tmp = new Selectable.WithFieldSelection(tmp, fi); } )* { $s = tmp; }
;
@@ -485,6 +488,48 @@ batchStatementObjective returns [ModificationStatement.Parsed statement]
| d=deleteStatement { $statement = d; }
;
+createFunctionStatement returns [CreateFunctionStatement expr]
+ @init {
+ boolean orReplace = false;
+ boolean ifNotExists = false;
+
+ boolean deterministic = true;
+ String language = "CLASS";
+ String bodyOrClassName = null;
+ List<CreateFunctionStatement.Argument> args = new ArrayList<CreateFunctionStatement.Argument>();
+ }
+ : K_CREATE (K_OR K_REPLACE { orReplace = true; })?
+ ((K_NON { deterministic = false; })? K_DETERMINISTIC)?
+ K_FUNCTION
+ (K_IF K_NOT K_EXISTS { ifNotExists = true; })?
+ ( bn=udfName '::' )?
+ fn=udfName
+ '('
+ (
+ k=cident v=comparatorType { args.add(new CreateFunctionStatement.Argument(k, v)); }
+ ( ',' k=cident v=comparatorType { args.add(new CreateFunctionStatement.Argument(k, v)); } )*
+ )?
+ ')'
+ K_RETURNS
+ rt=comparatorType
+ (
+ ( { language="CLASS"; } cls = STRING_LITERAL { bodyOrClassName = $cls.text; } )
+ | ( K_LANGUAGE l = IDENT { language=$l.text; } K_BODY body = ((~K_END_BODY)*) { bodyOrClassName = $body.text; } K_END_BODY )
+ )
+ { $expr = new CreateFunctionStatement(bn, fn, language, bodyOrClassName, deterministic, rt, args, orReplace, ifNotExists); }
+ ;
+
+dropFunctionStatement returns [DropFunctionStatement expr]
+ @init {
+ boolean ifExists = false;
+ }
+ : K_DROP K_FUNCTION
+ (K_IF K_EXISTS { ifExists = true; } )?
+ ( bn=udfName '::' )?
+ fn=udfName
+ { $expr = new DropFunctionStatement(bn, fn, ifExists); }
+ ;
+
/**
* CREATE KEYSPACE [IF NOT EXISTS] <KEYSPACE> WITH attr1 = value1 AND attr2 = value2;
*/
@@ -917,6 +962,11 @@ functionName returns [String s]
| K_TOKEN { $s = "token"; }
;
+udfName returns [String s]
+ : f=IDENT { $s = $f.text; }
+ | u=unreserved_function_keyword { $s = u; }
+ ;
+
functionArgs returns [List<Term.Raw> a]
: '(' ')' { $a = Collections.emptyList(); }
| '(' t1=term { List<Term.Raw> args = new ArrayList<Term.Raw>(); args.add(t1); }
@@ -926,7 +976,8 @@ functionArgs returns [List<Term.Raw> a]
term returns [Term.Raw term]
: v=value { $term = v; }
- | f=functionName args=functionArgs { $term = new FunctionCall.Raw(f, args); }
+ | f=functionName args=functionArgs { $term = new FunctionCall.Raw("", f, args); }
+ | bn=udfName '::' fn=udfName args=functionArgs { $term = new FunctionCall.Raw(bn, fn, args); }
| '(' c=comparatorType ')' t=term { $term = new TypeCast(c, t); }
;
@@ -1180,10 +1231,16 @@ basic_unreserved_keyword returns [String str]
| K_DISTINCT
| K_CONTAINS
| K_STATIC
+ | K_FUNCTION
+ | K_RETURNS
+ | K_LANGUAGE
+ | K_NON
+ | K_DETERMINISTIC
+ | K_BODY
+ | K_END_BODY
) { $str = $k.text; }
;
-
// Case-insensitive keywords
K_SELECT: S E L E C T;
K_FROM: F R O M;
@@ -1287,6 +1344,16 @@ K_TUPLE: T U P L E;
K_TRIGGER: T R I G G E R;
K_STATIC: S T A T I C;
+K_FUNCTION: F U N C T I O N;
+K_RETURNS: R E T U R N S;
+K_LANGUAGE: L A N G U A G E;
+K_NON: N O N;
+K_OR: O R;
+K_REPLACE: R E P L A C E;
+K_DETERMINISTIC: D E T E R M I N I S T I C;
+K_END_BODY: E N D '_' B O D Y;
+K_BODY: B O D Y;
+
// Case-insensitive alpha characters
fragment A: ('a'|'A');
fragment B: ('b'|'B');
http://git-wip-us.apache.org/repos/asf/cassandra/blob/25411bf1/src/java/org/apache/cassandra/cql3/TypeCast.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/TypeCast.java b/src/java/org/apache/cassandra/cql3/TypeCast.java
index e325e4d..3250e3b 100644
--- a/src/java/org/apache/cassandra/cql3/TypeCast.java
+++ b/src/java/org/apache/cassandra/cql3/TypeCast.java
@@ -46,7 +46,7 @@ public class TypeCast implements Term.Raw
return new ColumnSpecification(receiver.ksName, receiver.cfName, new ColumnIdentifier(toString(), true), type.prepare(keyspace).getType());
}
- public boolean isAssignableTo(String keyspace, ColumnSpecification receiver) throws InvalidRequestException
+ public boolean isAssignableTo(String keyspace, ColumnSpecification receiver)
{
try
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/25411bf1/src/java/org/apache/cassandra/cql3/functions/FunctionCall.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/functions/FunctionCall.java b/src/java/org/apache/cassandra/cql3/functions/FunctionCall.java
index a0c7447..fe2c2ee 100644
--- a/src/java/org/apache/cassandra/cql3/functions/FunctionCall.java
+++ b/src/java/org/apache/cassandra/cql3/functions/FunctionCall.java
@@ -22,6 +22,8 @@ import java.util.ArrayList;
import java.util.List;
import org.apache.cassandra.cql3.*;
+import org.apache.cassandra.cql3.udf.UDFunction;
+import org.apache.cassandra.cql3.udf.UDFRegistry;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.db.marshal.CollectionType;
import org.apache.cassandra.db.marshal.ListType;
@@ -93,18 +95,33 @@ public class FunctionCall extends Term.NonTerminal
public static class Raw implements Term.Raw
{
+ private final String namespace;
private final String functionName;
private final List<Term.Raw> terms;
- public Raw(String functionName, List<Term.Raw> terms)
+ public Raw(String namespace, String functionName, List<Term.Raw> terms)
{
+ this.namespace = namespace;
this.functionName = functionName;
this.terms = terms;
}
public Term prepare(String keyspace, ColumnSpecification receiver) throws InvalidRequestException
{
- Function fun = Functions.get(keyspace, functionName, terms, receiver);
+ Function fun = null;
+ if (namespace.isEmpty())
+ fun = Functions.get(keyspace, functionName, terms, receiver);
+
+ if (fun == null)
+ {
+ UDFunction udf = UDFRegistry.resolveFunction(namespace, functionName, receiver.ksName, receiver.cfName, terms);
+ if (udf != null)
+ // got a user defined function to call
+ fun = udf.create(terms);
+ }
+
+ if (fun == null)
+ throw new InvalidRequestException(String.format("Unknown function %s called", namespace.isEmpty() ? functionName : namespace + "::" + functionName));
List<Term> parameters = new ArrayList<Term>(terms.size());
boolean allTerminal = true;
@@ -149,10 +166,13 @@ public class FunctionCall extends Term.NonTerminal
public String toString()
{
StringBuilder sb = new StringBuilder();
+ if (!namespace.isEmpty())
+ sb.append(namespace).append("::");
sb.append(functionName).append("(");
for (int i = 0; i < terms.size(); i++)
{
- if (i > 0) sb.append(", ");
+ if (i > 0)
+ sb.append(", ");
sb.append(terms.get(i));
}
return sb.append(")").toString();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/25411bf1/src/java/org/apache/cassandra/cql3/functions/Functions.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/functions/Functions.java b/src/java/org/apache/cassandra/cql3/functions/Functions.java
index 605e7b3..03dd13d 100644
--- a/src/java/org/apache/cassandra/cql3/functions/Functions.java
+++ b/src/java/org/apache/cassandra/cql3/functions/Functions.java
@@ -62,6 +62,11 @@ public abstract class Functions
declared.put("blobasvarchar", AbstractFunction.factory(BytesConversionFcts.BlobAsVarcharFact));
}
+ public static boolean contains(String functionName)
+ {
+ return declared.containsKey(functionName);
+ }
+
public static AbstractType<?> getReturnType(String functionName, String ksName, String cfName)
{
List<Function.Factory> factories = declared.get(functionName.toLowerCase());
@@ -82,7 +87,7 @@ public abstract class Functions
{
List<Function.Factory> factories = declared.get(name.toLowerCase());
if (factories.isEmpty())
- throw new InvalidRequestException(String.format("Unknown CQL3 function %s called", name));
+ return null;
// Fast path if there is not choice
if (factories.size() == 1)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/25411bf1/src/java/org/apache/cassandra/cql3/statements/CreateFunctionStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/CreateFunctionStatement.java b/src/java/org/apache/cassandra/cql3/statements/CreateFunctionStatement.java
new file mode 100644
index 0000000..094c318
--- /dev/null
+++ b/src/java/org/apache/cassandra/cql3/statements/CreateFunctionStatement.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.cql3.statements;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.cassandra.auth.Permission;
+import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.config.UFMetaData;
+import org.apache.cassandra.cql3.CQL3Type;
+import org.apache.cassandra.cql3.ColumnIdentifier;
+import org.apache.cassandra.cql3.QueryOptions;
+import org.apache.cassandra.cql3.functions.Functions;
+import org.apache.cassandra.cql3.udf.UDFRegistry;
+import org.apache.cassandra.exceptions.InvalidRequestException;
+import org.apache.cassandra.exceptions.RequestValidationException;
+import org.apache.cassandra.exceptions.UnauthorizedException;
+import org.apache.cassandra.service.ClientState;
+import org.apache.cassandra.service.MigrationManager;
+import org.apache.cassandra.service.QueryState;
+import org.apache.cassandra.transport.Event;
+import org.apache.cassandra.transport.messages.ResultMessage;
+
+/**
+ * A <code>CREATE FUNCTION</code> statement parsed from a CQL query.
+ */
+public final class CreateFunctionStatement extends SchemaAlteringStatement
+{
+ final boolean orReplace;
+ final boolean ifNotExists;
+ final String namespace;
+ final String functionName;
+ final String qualifiedName;
+ final String language;
+ final String body;
+ final boolean deterministic;
+ final CQL3Type.Raw returnType;
+ final List<Argument> arguments;
+
+ private UFMetaData ufMeta;
+
+ public CreateFunctionStatement(String namespace, String functionName, String language, String body, boolean deterministic,
+ CQL3Type.Raw returnType, List<Argument> arguments, boolean orReplace, boolean ifNotExists)
+ {
+ super();
+ this.namespace = namespace != null ? namespace : "";
+ this.functionName = functionName;
+ this.qualifiedName = UFMetaData.qualifiedName(namespace, functionName);
+ this.language = language;
+ this.body = body;
+ this.deterministic = deterministic;
+ this.returnType = returnType;
+ this.arguments = arguments;
+ assert functionName != null : "null function name";
+ assert language != null : "null function language";
+ assert body != null : "null function body";
+ assert returnType != null : "null function returnType";
+ assert arguments != null : "null function arguments";
+ this.orReplace = orReplace;
+ this.ifNotExists = ifNotExists;
+ }
+
+ public void checkAccess(ClientState state) throws UnauthorizedException
+ {
+ // TODO CASSANDRA-7557 (function DDL permission)
+
+ state.hasAllKeyspacesAccess(Permission.CREATE);
+ }
+
+ /**
+ * The <code>CqlParser</code> only goes as far as extracting the keyword arguments
+ * from these statements, so this method is responsible for processing and
+ * validating.
+ *
+ * @throws org.apache.cassandra.exceptions.InvalidRequestException if arguments are missing or unacceptable
+ */
+ public void validate(ClientState state) throws RequestValidationException
+ {
+ if (!namespace.isEmpty() && !namespace.matches("\\w+"))
+ throw new InvalidRequestException(String.format("\"%s\" is not a valid function name", qualifiedName));
+ if (!functionName.matches("\\w+"))
+ throw new InvalidRequestException(String.format("\"%s\" is not a valid function name", qualifiedName));
+ if (namespace.length() > Schema.NAME_LENGTH)
+ throw new InvalidRequestException(String.format("UDF namespace names shouldn't be more than %s characters long (got \"%s\")", Schema.NAME_LENGTH, qualifiedName));
+ if (functionName.length() > Schema.NAME_LENGTH)
+ throw new InvalidRequestException(String.format("UDF function names shouldn't be more than %s characters long (got \"%s\")", Schema.NAME_LENGTH, qualifiedName));
+ }
+
+ public Event.SchemaChange changeEvent()
+ {
+ return null;
+ }
+
+ public ResultMessage executeInternal(QueryState state, QueryOptions options)
+ {
+ try
+ {
+ doExecute();
+ return super.executeInternal(state, options);
+ }
+ catch (RequestValidationException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public ResultMessage execute(QueryState state, QueryOptions options) throws RequestValidationException
+ {
+ doExecute();
+ return super.execute(state, options);
+ }
+
+ private void doExecute() throws RequestValidationException
+ {
+ boolean exists = UDFRegistry.hasFunction(qualifiedName);
+ if (exists && ifNotExists)
+ throw new InvalidRequestException(String.format("Function '%s' already exists.", qualifiedName));
+ if (exists && !orReplace)
+ throw new InvalidRequestException(String.format("Function '%s' already exists.", qualifiedName));
+
+ if (namespace.isEmpty() && Functions.contains(functionName))
+ throw new InvalidRequestException(String.format("Function name '%s' is reserved by CQL.", qualifiedName));
+
+ List<Argument> args = arguments;
+ List<String> argumentNames = new ArrayList<>(args.size());
+ List<String> argumentTypes = new ArrayList<>(args.size());
+ for (Argument arg : args)
+ {
+ argumentNames.add(arg.getName().toString());
+ argumentTypes.add(arg.getType().toString());
+ }
+ this.ufMeta = new UFMetaData(namespace, functionName, deterministic, argumentNames, argumentTypes,
+ returnType.toString(), language, body);
+
+ UDFRegistry.tryCreateFunction(ufMeta);
+ }
+
+ public void announceMigration(boolean isLocalOnly) throws RequestValidationException
+ {
+ MigrationManager.announceNewFunction(ufMeta, isLocalOnly);
+ }
+
+ public static final class Argument
+ {
+ final ColumnIdentifier name;
+ final CQL3Type.Raw type;
+
+ public Argument(ColumnIdentifier name, CQL3Type.Raw type)
+ {
+ this.name = name;
+ this.type = type;
+ }
+
+ public ColumnIdentifier getName()
+ {
+ return name;
+ }
+
+ public CQL3Type.Raw getType()
+ {
+ return type;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/25411bf1/src/java/org/apache/cassandra/cql3/statements/DropFunctionStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/DropFunctionStatement.java b/src/java/org/apache/cassandra/cql3/statements/DropFunctionStatement.java
new file mode 100644
index 0000000..7627ab4
--- /dev/null
+++ b/src/java/org/apache/cassandra/cql3/statements/DropFunctionStatement.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.cql3.statements;
+
+import org.apache.cassandra.auth.Permission;
+import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.config.UFMetaData;
+import org.apache.cassandra.exceptions.InvalidRequestException;
+import org.apache.cassandra.exceptions.RequestValidationException;
+import org.apache.cassandra.exceptions.UnauthorizedException;
+import org.apache.cassandra.service.ClientState;
+import org.apache.cassandra.service.MigrationManager;
+import org.apache.cassandra.transport.Event;
+
+/**
+ * A <code>DROP FUNCTION</code> statement parsed from a CQL query.
+ */
+public final class DropFunctionStatement extends SchemaAlteringStatement
+{
+ private final String namespace;
+ private final String functionName;
+ private final String qualifiedName;
+ private final boolean ifExists;
+
+ public DropFunctionStatement(String namespace, String functionName, boolean ifExists)
+ {
+ super();
+ this.namespace = namespace == null ? "" : namespace;
+ this.functionName = functionName;
+ this.qualifiedName = UFMetaData.qualifiedName(namespace, functionName);
+ this.ifExists = ifExists;
+ }
+
+ public void checkAccess(ClientState state) throws UnauthorizedException
+ {
+ // TODO CASSANDRA-7557 (function DDL permission)
+
+ state.hasAllKeyspacesAccess(Permission.DROP);
+ }
+
+ /**
+ * The <code>CqlParser</code> only goes as far as extracting the keyword arguments
+ * from these statements, so this method is responsible for processing and
+ * validating.
+ *
+ * @throws org.apache.cassandra.exceptions.InvalidRequestException if arguments are missing or unacceptable
+ */
+ public void validate(ClientState state) throws RequestValidationException
+ {
+ if (!namespace.isEmpty() && !namespace.matches("\\w+"))
+ throw new InvalidRequestException(String.format("\"%s\" is not a valid function name", qualifiedName));
+ if (!functionName.matches("\\w+"))
+ throw new InvalidRequestException(String.format("\"%s\" is not a valid function name", qualifiedName));
+ if (namespace.length() > Schema.NAME_LENGTH)
+ throw new InvalidRequestException(String.format("UDF namespace names shouldn't be more than %s characters long (got \"%s\")", Schema.NAME_LENGTH, qualifiedName));
+ if (functionName.length() > Schema.NAME_LENGTH)
+ throw new InvalidRequestException(String.format("UDF function names shouldn't be more than %s characters long (got \"%s\")", Schema.NAME_LENGTH, qualifiedName));
+ }
+
+ public Event.SchemaChange changeEvent()
+ {
+ return null;
+ }
+
+ // no execute() - drop propagated via MigrationManager
+
+ public void announceMigration(boolean isLocalOnly) throws RequestValidationException
+ {
+ try
+ {
+ MigrationManager.announceFunctionDrop(namespace, functionName, isLocalOnly);
+ }
+ catch (InvalidRequestException e)
+ {
+ if (!ifExists)
+ throw e;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/25411bf1/src/java/org/apache/cassandra/cql3/statements/SchemaAlteringStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/SchemaAlteringStatement.java b/src/java/org/apache/cassandra/cql3/statements/SchemaAlteringStatement.java
index e70aac9..876568a 100644
--- a/src/java/org/apache/cassandra/cql3/statements/SchemaAlteringStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/SchemaAlteringStatement.java
@@ -70,7 +70,8 @@ public abstract class SchemaAlteringStatement extends CFStatement implements CQL
public ResultMessage execute(QueryState state, QueryOptions options) throws RequestValidationException
{
announceMigration(false);
- return new ResultMessage.SchemaChange(changeEvent());
+ Event.SchemaChange ce = changeEvent();
+ return ce == null ? new ResultMessage.Void() : new ResultMessage.SchemaChange(ce);
}
public ResultMessage executeInternal(QueryState state, QueryOptions options)
@@ -78,7 +79,8 @@ public abstract class SchemaAlteringStatement extends CFStatement implements CQL
try
{
announceMigration(true);
- return new ResultMessage.SchemaChange(changeEvent());
+ Event.SchemaChange ce = changeEvent();
+ return ce == null ? new ResultMessage.Void() : new ResultMessage.SchemaChange(ce);
}
catch (RequestValidationException e)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/25411bf1/src/java/org/apache/cassandra/cql3/statements/Selectable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/Selectable.java b/src/java/org/apache/cassandra/cql3/statements/Selectable.java
index 448301c..ab0a5a3 100644
--- a/src/java/org/apache/cassandra/cql3/statements/Selectable.java
+++ b/src/java/org/apache/cassandra/cql3/statements/Selectable.java
@@ -44,11 +44,13 @@ public interface Selectable
public static class WithFunction implements Selectable
{
+ public final String namespace;
public final String functionName;
public final List<Selectable> args;
- public WithFunction(String functionName, List<Selectable> args)
+ public WithFunction(String namespace, String functionName, List<Selectable> args)
{
+ this.namespace = namespace;
this.functionName = functionName;
this.args = args;
}
@@ -57,10 +59,13 @@ public interface Selectable
public String toString()
{
StringBuilder sb = new StringBuilder();
+ if (!namespace.isEmpty())
+ sb.append(namespace).append("::");
sb.append(functionName).append("(");
for (int i = 0; i < args.size(); i++)
{
- if (i > 0) sb.append(", ");
+ if (i > 0)
+ sb.append(", ");
sb.append(args.get(i));
}
return sb.append(")").toString();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/25411bf1/src/java/org/apache/cassandra/cql3/statements/Selection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/Selection.java b/src/java/org/apache/cassandra/cql3/statements/Selection.java
index 0f0cb62..325ef15 100644
--- a/src/java/org/apache/cassandra/cql3/statements/Selection.java
+++ b/src/java/org/apache/cassandra/cql3/statements/Selection.java
@@ -29,6 +29,8 @@ import org.apache.cassandra.cql3.functions.Function;
import org.apache.cassandra.cql3.functions.Functions;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.cql3.udf.UDFunction;
+import org.apache.cassandra.cql3.udf.UDFRegistry;
import org.apache.cassandra.db.Cell;
import org.apache.cassandra.db.CounterCell;
import org.apache.cassandra.db.ExpiringCell;
@@ -156,13 +158,26 @@ public abstract class Selection
else
{
Selectable.WithFunction withFun = (Selectable.WithFunction)raw.selectable;
- List<Selector> args = new ArrayList<Selector>(withFun.args.size());
+ List<Selector> args = new ArrayList<>(withFun.args.size());
for (Selectable rawArg : withFun.args)
args.add(makeSelector(cfm, new RawSelector(rawArg, null), defs, null));
+ // resolve built-in functions before user defined functions
AbstractType<?> returnType = Functions.getReturnType(withFun.functionName, cfm.ksName, cfm.cfName);
if (returnType == null)
- throw new InvalidRequestException(String.format("Unknown function '%s'", withFun.functionName));
+ {
+ UDFunction userFun = UDFRegistry.resolveFunction(withFun.namespace, withFun.functionName, cfm.ksName, cfm.cfName, args);
+ if (userFun != null)
+ {
+ // got a user defined function to call
+ Function fun = userFun.create(args);
+ ColumnSpecification spec = makeFunctionSpec(cfm, withFun, fun.returnType(), raw.alias);
+ if (metadata != null)
+ metadata.add(spec);
+ return new FunctionSelector(userFun.create(args), args);
+ }
+ throw new InvalidRequestException(String.format("Unknown function '%s'", withFun.namespace.isEmpty() ? withFun.functionName : withFun.namespace + "::" + withFun.functionName));
+ }
ColumnSpecification spec = makeFunctionSpec(cfm, withFun, returnType, raw.alias);
Function fun = Functions.get(cfm.ksName, withFun.functionName, args, spec);
if (metadata != null)
@@ -193,7 +208,7 @@ public abstract class Selection
ColumnIdentifier alias) throws InvalidRequestException
{
if (returnType == null)
- throw new InvalidRequestException(String.format("Unknown function %s called in selection clause", fun.functionName));
+ throw new InvalidRequestException(String.format("Unknown function %s called in selection clause", fun.namespace.isEmpty() ? fun.functionName : fun.namespace +"::"+fun.functionName));
return new ColumnSpecification(cfm.ksName,
cfm.cfName,
http://git-wip-us.apache.org/repos/asf/cassandra/blob/25411bf1/src/java/org/apache/cassandra/cql3/udf/UDFFunctionOverloads.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/udf/UDFFunctionOverloads.java b/src/java/org/apache/cassandra/cql3/udf/UDFFunctionOverloads.java
new file mode 100644
index 0000000..aa6892a
--- /dev/null
+++ b/src/java/org/apache/cassandra/cql3/udf/UDFFunctionOverloads.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.cql3.udf;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.cassandra.config.UFMetaData;
+import org.apache.cassandra.cql3.AssignementTestable;
+import org.apache.cassandra.exceptions.InvalidRequestException;
+
+public final class UDFFunctionOverloads
+{
+ final Map<String, UFMetaData> signatureMap = new ConcurrentHashMap<>();
+ final Map<String, UDFunction> udfInstances = new ConcurrentHashMap<>();
+
+ public void addAndInit(UFMetaData uf, boolean addIfInvalid)
+ {
+ try
+ {
+ UDFunction UDFunction = new UDFunction(uf);
+ udfInstances.put(uf.signature, UDFunction);
+ }
+ catch (InvalidRequestException e)
+ {
+ uf.invalid = e;
+ }
+
+ if (uf.invalid == null || addIfInvalid)
+ signatureMap.put(uf.signature, uf);
+ }
+
+ public void remove(UFMetaData uf)
+ {
+ signatureMap.remove(uf.signature);
+ udfInstances.remove(uf.signature);
+ }
+
+ public Collection<UFMetaData> values()
+ {
+ return signatureMap.values();
+ }
+
+ public boolean isEmpty()
+ {
+ return signatureMap.isEmpty();
+ }
+
+ public UDFunction resolveFunction(String ksName, String cfName, List<? extends AssignementTestable> args)
+ throws InvalidRequestException
+ {
+ for (UFMetaData candidate : signatureMap.values())
+ {
+ // Currently the UDF implementation must use concrete types (like Double, Integer) instead of base types (like Number).
+ // To support handling of base types it is necessary to construct new, temporary instances of UDFFunction with the
+ // signature for the current request in UDFFunction#argsType + UDFFunction#returnType.
+ // Additionally we need the requested return type (AssignementTestable) has a parameter for this method.
+ if (candidate.compatibleArgs(ksName, cfName, args))
+ {
+
+ // TODO CASSANDRA-7557 (specific per-function EXECUTE permission ??)
+
+ if (candidate.invalid != null)
+ throw new InvalidRequestException(candidate.invalid.getMessage());
+ return udfInstances.get(candidate.signature);
+ }
+ }
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/25411bf1/src/java/org/apache/cassandra/cql3/udf/UDFRegistry.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/udf/UDFRegistry.java b/src/java/org/apache/cassandra/cql3/udf/UDFRegistry.java
new file mode 100644
index 0000000..cb3f1a1
--- /dev/null
+++ b/src/java/org/apache/cassandra/cql3/udf/UDFRegistry.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.cql3.udf;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.UFMetaData;
+import org.apache.cassandra.cql3.AssignementTestable;
+import org.apache.cassandra.cql3.QueryProcessor;
+import org.apache.cassandra.cql3.UntypedResultSet;
+import org.apache.cassandra.cql3.functions.Functions;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.SystemKeyspace;
+import org.apache.cassandra.exceptions.InvalidRequestException;
+
+/**
+ * Central registry for user defined functions (CASSANDRA-7395).
+ * <p/>
+ * UDFs are maintained in {@code system.schema_functions} table and distributed to all nodes.
+ * <p/>
+ * UDFs are not maintained in {@link org.apache.cassandra.cql3.functions.Functions} class to have a strict
+ * distinction between 'core CQL' functions provided by Cassandra and functions provided by the user.
+ * 'Core CQL' functions have precedence over UDFs.
+ */
+public class UDFRegistry
+{
+ private static final Logger logger = LoggerFactory.getLogger(UDFRegistry.class);
+
+ static final String SELECT_CQL = "SELECT namespace, name, signature, deterministic, argument_names, argument_types, " +
+ "return_type, language, body FROM " +
+ Keyspace.SYSTEM_KS + '.' + SystemKeyspace.SCHEMA_FUNCTIONS_CF;
+
+ private static final Map<String, UDFFunctionOverloads> functions = new ConcurrentHashMap<>();
+
+ public static void init()
+ {
+ refreshInitial();
+ }
+
+ /**
+ * Initial loading of all existing UDFs.
+ */
+ public static void refreshInitial()
+ {
+ logger.debug("Refreshing UDFs");
+ for (UntypedResultSet.Row row : QueryProcessor.executeOnceInternal(SELECT_CQL))
+ {
+ UFMetaData uf = UFMetaData.fromSchema(row);
+ UDFFunctionOverloads sigMap = functions.get(uf.qualifiedName);
+ if (sigMap == null)
+ functions.put(uf.qualifiedName, sigMap = new UDFFunctionOverloads());
+
+ if (Functions.contains(uf.qualifiedName))
+ logger.warn("The UDF '" + uf.functionName + "' cannot be used because it uses the same name as the CQL " +
+ "function with the same name. You should drop this function but can do a " +
+ "'DESCRIBE FUNCTION "+uf.functionName+";' in cqlsh before to get more information about it.");
+
+ // add the function to the registry even if it is invalid (to be able to drop it)
+ sigMap.addAndInit(uf, true);
+
+ if (uf.invalid != null)
+ logger.error("Loaded invalid UDF : " + uf.invalid.getMessage());
+ }
+ }
+
+ public static boolean hasFunction(String qualifiedName)
+ {
+ UDFFunctionOverloads sigMap = functions.get(qualifiedName.toLowerCase());
+ return sigMap != null && !sigMap.isEmpty();
+ }
+
+ public static UDFunction resolveFunction(String namespace, String functionName, String ksName, String cfName,
+ List<? extends AssignementTestable> args)
+ throws InvalidRequestException
+ {
+ UDFFunctionOverloads sigMap = functions.get(UFMetaData.qualifiedName(namespace, functionName));
+ if (sigMap != null)
+ return sigMap.resolveFunction(ksName, cfName, args);
+ return null;
+ }
+
+ public static void migrateDropFunction(UFMetaData uf)
+ {
+ UDFFunctionOverloads sigMap = functions.get(uf.qualifiedName);
+ if (sigMap == null)
+ return;
+
+ sigMap.remove(uf);
+ }
+
+ public static void migrateUpdateFunction(UFMetaData uf)
+ {
+ migrateAddFunction(uf);
+ }
+
+ public static void migrateAddFunction(UFMetaData uf)
+ {
+ addFunction(uf, true);
+ }
+
+ /**
+ * Used by {@link org.apache.cassandra.cql3.statements.CreateFunctionStatement} to create or replace a new function.
+ */
+ public static void tryCreateFunction(UFMetaData ufMeta) throws InvalidRequestException
+ {
+ addFunction(ufMeta, false);
+
+ if (ufMeta.invalid != null)
+ throw ufMeta.invalid;
+ }
+
+ private static void addFunction(UFMetaData uf, boolean addIfInvalid)
+ {
+ UDFFunctionOverloads sigMap = functions.get(uf.qualifiedName);
+ if (sigMap == null)
+ functions.put(uf.qualifiedName, sigMap = new UDFFunctionOverloads());
+
+ sigMap.addAndInit(uf, addIfInvalid);
+ }
+
+ public static UDFFunctionOverloads getFunctionSigMap(String qualifiedName)
+ {
+ return functions.get(qualifiedName);
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/25411bf1/src/java/org/apache/cassandra/cql3/udf/UDFunction.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/udf/UDFunction.java b/src/java/org/apache/cassandra/cql3/udf/UDFunction.java
new file mode 100644
index 0000000..4866c22
--- /dev/null
+++ b/src/java/org/apache/cassandra/cql3/udf/UDFunction.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.cql3.udf;
+
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.UFMetaData;
+import org.apache.cassandra.cql3.AssignementTestable;
+import org.apache.cassandra.cql3.functions.Function;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.exceptions.InvalidRequestException;
+
+/**
+ * UDFunction contains the <i>invokable</i> instance of a user defined function.
+ * Currently (as of CASSANDRA-7395) only {@code public static} methods in a {@link public} class
+ * can be invoked.
+ * CASSANDRA-7562 will introduce Java source code UDFs and CASSANDRA-7526 will introduce JSR-223 scripting languages.
+ * Invocations of UDFs are routed via this class.
+ */
+public class UDFunction
+{
+ private static final Logger logger = LoggerFactory.getLogger(UDFunction.class);
+
+ public final UFMetaData meta;
+
+ public final Method method;
+
+ UDFunction(UFMetaData meta) throws InvalidRequestException
+ {
+ this.meta = meta;
+
+ Method m;
+ switch (meta.language)
+ {
+ case "class":
+ m = resolveClassMethod();
+ break;
+ default:
+ throw new InvalidRequestException("Invalid UDF language " + meta.language + " for '" + meta.qualifiedName + '\'');
+ }
+ this.method = m;
+ }
+
+ private Method resolveClassMethod() throws InvalidRequestException
+ {
+ Class<?> jReturnType = meta.cqlReturnType.getType().getSerializer().getType();
+ Class<?> paramTypes[] = new Class[meta.cqlArgumentTypes.size()];
+ for (int i = 0; i < paramTypes.length; i++)
+ paramTypes[i] = meta.cqlArgumentTypes.get(i).getType().getSerializer().getType();
+
+ String className;
+ String methodName;
+ int i = meta.body.indexOf('#');
+ if (i != -1)
+ {
+ methodName = meta.body.substring(i + 1);
+ className = meta.body.substring(0, i);
+ }
+ else
+ {
+ methodName = meta.functionName;
+ className = meta.body;
+ }
+ try
+ {
+ Class<?> cls = Class.forName(className, false, Thread.currentThread().getContextClassLoader());
+
+ Method method = cls.getMethod(methodName, paramTypes);
+
+ if (!jReturnType.isAssignableFrom(method.getReturnType()))
+ {
+ throw new InvalidRequestException("Method " + className + '.' + methodName + '(' + Arrays.toString(paramTypes) + ") " +
+ "has incompatible return type " + method.getReturnType() + " (not assignable to " + jReturnType + ')');
+ }
+
+ return method;
+ }
+ catch (ClassNotFoundException e)
+ {
+ throw new InvalidRequestException("Class " + className + " does not exist");
+ }
+ catch (NoSuchMethodException e)
+ {
+ throw new InvalidRequestException("Method " + className + '.' + methodName + '(' + Arrays.toString(paramTypes) + ") does not exist");
+ }
+ }
+
+ public Function create(List<? extends AssignementTestable> providedArgs) throws InvalidRequestException
+ {
+ final int argCount = providedArgs.size();
+ final List<AbstractType<?>> argsType = new ArrayList<>(argCount);
+ final AbstractType<?> returnType = meta.cqlReturnType.getType();
+ for (int i = 0; i < argCount; i++)
+ {
+ AbstractType<?> argType = meta.cqlArgumentTypes.get(i).getType();
+ argsType.add(argType);
+ }
+
+ return new Function()
+ {
+ public String name()
+ {
+ return meta.qualifiedName;
+ }
+
+ public List<AbstractType<?>> argsType()
+ {
+ return argsType;
+ }
+
+ public AbstractType<?> returnType()
+ {
+ return returnType;
+ }
+
+ public ByteBuffer execute(List<ByteBuffer> parameters) throws InvalidRequestException
+ {
+ Object[] parms = new Object[argCount];
+ for (int i = 0; i < parms.length; i++)
+ {
+ ByteBuffer bb = parameters.get(i);
+ if (bb != null)
+ {
+ AbstractType<?> argType = argsType.get(i);
+ parms[i] = argType.compose(bb);
+ }
+ }
+
+ Object result;
+ try
+ {
+ result = method.invoke(null, parms);
+ @SuppressWarnings("unchecked") ByteBuffer r = result != null ? ((AbstractType) returnType).decompose(result) : null;
+ return r;
+ }
+ catch (InvocationTargetException e)
+ {
+ Throwable c = e.getCause();
+ logger.error("Invocation of UDF {} failed", meta.qualifiedName, c);
+ throw new InvalidRequestException("Invocation of UDF " + meta.qualifiedName + " failed: " + c);
+ }
+ catch (IllegalAccessException e)
+ {
+ throw new InvalidRequestException("UDF " + meta.qualifiedName + " invocation failed: " + e);
+ }
+ }
+
+ public boolean isPure()
+ {
+ return meta.deterministic;
+ }
+ };
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/25411bf1/src/java/org/apache/cassandra/db/DefsTables.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DefsTables.java b/src/java/org/apache/cassandra/db/DefsTables.java
index fc43c27..33a112a 100644
--- a/src/java/org/apache/cassandra/db/DefsTables.java
+++ b/src/java/org/apache/cassandra/db/DefsTables.java
@@ -24,6 +24,8 @@ import java.util.*;
import com.google.common.collect.Iterables;
import com.google.common.collect.MapDifference;
import com.google.common.collect.Maps;
+import org.apache.cassandra.config.UFMetaData;
+import org.apache.cassandra.cql3.udf.UDFRegistry;
import org.apache.cassandra.db.commitlog.CommitLog;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -177,6 +179,7 @@ public class DefsTables
Map<DecoratedKey, ColumnFamily> oldKeyspaces = SystemKeyspace.getSchema(SystemKeyspace.SCHEMA_KEYSPACES_CF, keyspaces);
Map<DecoratedKey, ColumnFamily> oldColumnFamilies = SystemKeyspace.getSchema(SystemKeyspace.SCHEMA_COLUMNFAMILIES_CF, keyspaces);
Map<DecoratedKey, ColumnFamily> oldTypes = SystemKeyspace.getSchema(SystemKeyspace.SCHEMA_USER_TYPES_CF, keyspaces);
+ Map<DecoratedKey, ColumnFamily> oldFunctions = SystemKeyspace.getSchema(SystemKeyspace.SCHEMA_FUNCTIONS_CF);
for (Mutation mutation : mutations)
mutation.apply();
@@ -188,10 +191,12 @@ public class DefsTables
Map<DecoratedKey, ColumnFamily> newKeyspaces = SystemKeyspace.getSchema(SystemKeyspace.SCHEMA_KEYSPACES_CF, keyspaces);
Map<DecoratedKey, ColumnFamily> newColumnFamilies = SystemKeyspace.getSchema(SystemKeyspace.SCHEMA_COLUMNFAMILIES_CF, keyspaces);
Map<DecoratedKey, ColumnFamily> newTypes = SystemKeyspace.getSchema(SystemKeyspace.SCHEMA_USER_TYPES_CF, keyspaces);
+ Map<DecoratedKey, ColumnFamily> newFunctions = SystemKeyspace.getSchema(SystemKeyspace.SCHEMA_FUNCTIONS_CF);
Set<String> keyspacesToDrop = mergeKeyspaces(oldKeyspaces, newKeyspaces);
mergeColumnFamilies(oldColumnFamilies, newColumnFamilies);
mergeTypes(oldTypes, newTypes);
+ mergeFunctions(oldFunctions, newFunctions);
// it is safe to drop a keyspace only when all nested ColumnFamilies where deleted
for (String keyspaceToDrop : keyspacesToDrop)
@@ -377,6 +382,54 @@ public class DefsTables
}
}
+ private static void mergeFunctions(Map<DecoratedKey, ColumnFamily> old, Map<DecoratedKey, ColumnFamily> updated)
+ {
+ MapDifference<DecoratedKey, ColumnFamily> diff = Maps.difference(old, updated);
+
+ // New namespace with functions
+ for (Map.Entry<DecoratedKey, ColumnFamily> entry : diff.entriesOnlyOnRight().entrySet())
+ {
+ ColumnFamily cfFunctions = entry.getValue();
+ if (!cfFunctions.hasColumns())
+ continue;
+
+ for (UFMetaData uf : UFMetaData.fromSchema(new Row(entry.getKey(), cfFunctions)).values())
+ addFunction(uf);
+ }
+
+ for (Map.Entry<DecoratedKey, MapDifference.ValueDifference<ColumnFamily>> modifiedEntry : diff.entriesDiffering().entrySet())
+ {
+ DecoratedKey namespace = modifiedEntry.getKey();
+ ColumnFamily prevCFFunctions = modifiedEntry.getValue().leftValue(); // state before external modification
+ ColumnFamily newCFFunctions = modifiedEntry.getValue().rightValue(); // updated state
+
+ if (!prevCFFunctions.hasColumns()) // whole namespace was deleted and now it's re-created
+ {
+ for (UFMetaData uf : UFMetaData.fromSchema(new Row(namespace, newCFFunctions)).values())
+ addFunction(uf);
+ }
+ else if (!newCFFunctions.hasColumns()) // whole namespace is deleted
+ {
+ for (UFMetaData uf : UFMetaData.fromSchema(new Row(namespace, prevCFFunctions)).values())
+ dropFunction(uf);
+ }
+ else // has modifications in the functions, need to perform nested diff to determine what was really changed
+ {
+ MapDifference<String, UFMetaData> functionsDiff = Maps.difference(UFMetaData.fromSchema(new Row(namespace, prevCFFunctions)),
+ UFMetaData.fromSchema(new Row(namespace, newCFFunctions)));
+
+ for (UFMetaData function : functionsDiff.entriesOnlyOnRight().values())
+ addFunction(function);
+
+ for (UFMetaData function : functionsDiff.entriesOnlyOnLeft().values())
+ dropFunction(function);
+
+ for (MapDifference.ValueDifference<UFMetaData> tdiff : functionsDiff.entriesDiffering().values())
+ updateFunction(tdiff.rightValue()); // use the most recent value
+ }
+ }
+ }
+
private static void addKeyspace(KSMetaData ksm)
{
assert Schema.instance.getKSMetaData(ksm.name) == null;
@@ -425,6 +478,16 @@ public class DefsTables
MigrationManager.instance.notifyCreateUserType(ut);
}
+ private static void addFunction(UFMetaData uf)
+ {
+ logger.info("Loading {}", uf);
+
+ UDFRegistry.migrateAddFunction(uf);
+
+ if (!StorageService.instance.isClientMode())
+ MigrationManager.instance.notifyCreateFunction(uf);
+ }
+
private static void updateKeyspace(KSMetaData newState)
{
KSMetaData oldKsm = Schema.instance.getKSMetaData(newState.name);
@@ -467,6 +530,16 @@ public class DefsTables
MigrationManager.instance.notifyUpdateUserType(ut);
}
+ private static void updateFunction(UFMetaData uf)
+ {
+ logger.info("Updating {}", uf);
+
+ UDFRegistry.migrateUpdateFunction(uf);
+
+ if (!StorageService.instance.isClientMode())
+ MigrationManager.instance.notifyUpdateFunction(uf);
+ }
+
private static void dropKeyspace(String ksName)
{
KSMetaData ksm = Schema.instance.getKSMetaData(ksName);
@@ -546,6 +619,16 @@ public class DefsTables
MigrationManager.instance.notifyDropUserType(ut);
}
+ private static void dropFunction(UFMetaData uf)
+ {
+ logger.info("Drop {}", uf);
+
+ UDFRegistry.migrateDropFunction(uf);
+
+ if (!StorageService.instance.isClientMode())
+ MigrationManager.instance.notifyDropFunction(uf);
+ }
+
private static KSMetaData makeNewKeyspaceDefinition(KSMetaData ksm, CFMetaData toExclude)
{
// clone ksm but do not include the new def
http://git-wip-us.apache.org/repos/asf/cassandra/blob/25411bf1/src/java/org/apache/cassandra/db/SystemKeyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SystemKeyspace.java b/src/java/org/apache/cassandra/db/SystemKeyspace.java
index 3c647b6..8b62740 100644
--- a/src/java/org/apache/cassandra/db/SystemKeyspace.java
+++ b/src/java/org/apache/cassandra/db/SystemKeyspace.java
@@ -80,6 +80,7 @@ public class SystemKeyspace
public static final String SCHEMA_COLUMNS_CF = "schema_columns";
public static final String SCHEMA_TRIGGERS_CF = "schema_triggers";
public static final String SCHEMA_USER_TYPES_CF = "schema_usertypes";
+ public static final String SCHEMA_FUNCTIONS_CF = "schema_functions";
public static final String COMPACTION_LOG = "compactions_in_progress";
public static final String PAXOS_CF = "paxos";
public static final String SSTABLE_ACTIVITY_CF = "sstable_activity";
@@ -91,7 +92,8 @@ public class SystemKeyspace
SCHEMA_COLUMNFAMILIES_CF,
SCHEMA_COLUMNS_CF,
SCHEMA_TRIGGERS_CF,
- SCHEMA_USER_TYPES_CF);
+ SCHEMA_USER_TYPES_CF,
+ SCHEMA_FUNCTIONS_CF);
private static volatile Map<UUID, Pair<ReplayPosition, Long>> truncationRecords;
@@ -769,6 +771,16 @@ public class SystemKeyspace
}
}
+ public static Map<DecoratedKey, ColumnFamily> getSchema(String cfName)
+ {
+ Map<DecoratedKey, ColumnFamily> schema = new HashMap<>();
+
+ for (Row schemaEntity : SystemKeyspace.serializedSchema(cfName))
+ schema.put(schemaEntity.key, schemaEntity.cf);
+
+ return schema;
+ }
+
public static Map<DecoratedKey, ColumnFamily> getSchema(String schemaCfName, Set<String> keyspaces)
{
Map<DecoratedKey, ColumnFamily> schema = new HashMap<>();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/25411bf1/src/java/org/apache/cassandra/service/CassandraDaemon.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/CassandraDaemon.java b/src/java/org/apache/cassandra/service/CassandraDaemon.java
index 5c88cb1..71cba23 100644
--- a/src/java/org/apache/cassandra/service/CassandraDaemon.java
+++ b/src/java/org/apache/cassandra/service/CassandraDaemon.java
@@ -33,6 +33,7 @@ import javax.management.StandardMBean;
import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.Uninterruptibles;
+import org.apache.cassandra.cql3.udf.UDFRegistry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -372,6 +373,9 @@ public class CassandraDaemon
if (!FBUtilities.getBroadcastAddress().equals(InetAddress.getLoopbackAddress()))
waitForGossipToSettle();
+ // UDF
+ UDFRegistry.init();
+
// Thift
InetAddress rpcAddr = DatabaseDescriptor.getRpcAddress();
int rpcPort = DatabaseDescriptor.getRpcPort();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/25411bf1/src/java/org/apache/cassandra/service/IMigrationListener.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/IMigrationListener.java b/src/java/org/apache/cassandra/service/IMigrationListener.java
index 4d142bd..b4eb392 100644
--- a/src/java/org/apache/cassandra/service/IMigrationListener.java
+++ b/src/java/org/apache/cassandra/service/IMigrationListener.java
@@ -22,12 +22,16 @@ public interface IMigrationListener
public void onCreateKeyspace(String ksName);
public void onCreateColumnFamily(String ksName, String cfName);
public void onCreateUserType(String ksName, String typeName);
+ public void onCreateFunction(String namespace, String functionName);
public void onUpdateKeyspace(String ksName);
public void onUpdateColumnFamily(String ksName, String cfName);
public void onUpdateUserType(String ksName, String typeName);
+ public void onUpdateFunction(String namespace, String functionName);
public void onDropKeyspace(String ksName);
public void onDropColumnFamily(String ksName, String cfName);
public void onDropUserType(String ksName, String typeName);
+ public void onDropFunction(String namespace, String functionName);
+
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/25411bf1/src/java/org/apache/cassandra/service/MigrationManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/MigrationManager.java b/src/java/org/apache/cassandra/service/MigrationManager.java
index 5dd2534..28e3e39 100644
--- a/src/java/org/apache/cassandra/service/MigrationManager.java
+++ b/src/java/org/apache/cassandra/service/MigrationManager.java
@@ -29,6 +29,9 @@ import java.util.concurrent.*;
import java.lang.management.ManagementFactory;
import java.lang.management.RuntimeMXBean;
+import org.apache.cassandra.config.UFMetaData;
+import org.apache.cassandra.exceptions.InvalidRequestException;
+import org.apache.cassandra.exceptions.SyntaxException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -173,6 +176,24 @@ public class MigrationManager
listener.onCreateUserType(ut.keyspace, ut.getNameAsString());
}
+ public void notifyCreateFunction(UFMetaData uf)
+ {
+ for (IMigrationListener listener : listeners)
+ listener.onCreateFunction(uf.namespace, uf.functionName);
+ }
+
+ public void notifyUpdateFunction(UFMetaData uf)
+ {
+ for (IMigrationListener listener : listeners)
+ listener.onUpdateFunction(uf.namespace, uf.functionName);
+ }
+
+ public void notifyDropFunction(UFMetaData uf)
+ {
+ for (IMigrationListener listener : listeners)
+ listener.onDropFunction(uf.namespace, uf.functionName);
+ }
+
public void notifyUpdateKeyspace(KSMetaData ksm)
{
for (IMigrationListener listener : listeners)
@@ -352,6 +373,27 @@ public class MigrationManager
announce(addSerializedKeyspace(UTMetaData.dropFromSchema(droppedType, FBUtilities.timestampMicros()), droppedType.keyspace), announceLocally);
}
+ public static void announceFunctionDrop(String namespace, String functionName, boolean announceLocally) throws InvalidRequestException
+ {
+ Mutation mutation = UFMetaData.dropFunction(FBUtilities.timestampMicros(), namespace, functionName);
+ if (mutation == null)
+ throw new InvalidRequestException(String.format("Cannot drop non existing function '%s'.", functionName));
+
+ logger.info(String.format("Drop Function '%s::%s'", namespace, functionName));
+ announce(mutation, announceLocally);
+ }
+
+ public static void announceNewFunction(UFMetaData function, boolean announceLocally)
+ throws ConfigurationException, SyntaxException
+ {
+ Mutation mutation = UFMetaData.createOrReplaceFunction(FBUtilities.timestampMicros(), function);
+ if (mutation == null)
+ throw new ConfigurationException(String.format("Function '%s' already exists.", function.qualifiedName));
+
+ logger.info(String.format("Create Function '%s'", function));
+ announce(mutation, announceLocally);
+ }
+
/**
* actively announce a new version to active hosts via rpc
* @param schema The schema mutation to be applied
http://git-wip-us.apache.org/repos/asf/cassandra/blob/25411bf1/src/java/org/apache/cassandra/transport/Event.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/Event.java b/src/java/org/apache/cassandra/transport/Event.java
index b7c5e68..85943cf 100644
--- a/src/java/org/apache/cassandra/transport/Event.java
+++ b/src/java/org/apache/cassandra/transport/Event.java
@@ -208,18 +208,18 @@ public abstract class Event
public final Change change;
public final Target target;
- public final String keyspace;
- public final String tableOrType;
+ public final String keyOrNamespace;
+ public final String tableOrTypeOrFunction;
- public SchemaChange(Change change, Target target, String keyspace, String tableOrType)
+ public SchemaChange(Change change, Target target, String keyOrNamespace, String tableOrTypeOrFunction)
{
super(Type.SCHEMA_CHANGE);
this.change = change;
this.target = target;
- this.keyspace = keyspace;
- this.tableOrType = tableOrType;
+ this.keyOrNamespace = keyOrNamespace;
+ this.tableOrTypeOrFunction = tableOrTypeOrFunction;
if (target != Target.KEYSPACE)
- assert this.tableOrType != null : "Table or type should be set for non-keyspace schema change events";
+ assert this.tableOrTypeOrFunction != null : "Table or type should be set for non-keyspace schema change events";
}
public SchemaChange(Change change, String keyspace)
@@ -252,9 +252,9 @@ public abstract class Event
{
CBUtil.writeEnumValue(change, dest);
CBUtil.writeEnumValue(target, dest);
- CBUtil.writeString(keyspace, dest);
+ CBUtil.writeString(keyOrNamespace, dest);
if (target != Target.KEYSPACE)
- CBUtil.writeString(tableOrType, dest);
+ CBUtil.writeString(tableOrTypeOrFunction, dest);
}
else
{
@@ -263,14 +263,14 @@ public abstract class Event
// For the v1/v2 protocol, we have no way to represent type changes, so we simply say the keyspace
// was updated. See CASSANDRA-7617.
CBUtil.writeEnumValue(Change.UPDATED, dest);
- CBUtil.writeString(keyspace, dest);
+ CBUtil.writeString(keyOrNamespace, dest);
CBUtil.writeString("", dest);
}
else
{
CBUtil.writeEnumValue(change, dest);
- CBUtil.writeString(keyspace, dest);
- CBUtil.writeString(target == Target.KEYSPACE ? "" : tableOrType, dest);
+ CBUtil.writeString(keyOrNamespace, dest);
+ CBUtil.writeString(target == Target.KEYSPACE ? "" : tableOrTypeOrFunction, dest);
}
}
}
@@ -281,10 +281,10 @@ public abstract class Event
{
int size = CBUtil.sizeOfEnumValue(change)
+ CBUtil.sizeOfEnumValue(target)
- + CBUtil.sizeOfString(keyspace);
+ + CBUtil.sizeOfString(keyOrNamespace);
if (target != Target.KEYSPACE)
- size += CBUtil.sizeOfString(tableOrType);
+ size += CBUtil.sizeOfString(tableOrTypeOrFunction);
return size;
}
@@ -293,25 +293,25 @@ public abstract class Event
if (target == Target.TYPE)
{
return CBUtil.sizeOfEnumValue(Change.UPDATED)
- + CBUtil.sizeOfString(keyspace)
+ + CBUtil.sizeOfString(keyOrNamespace)
+ CBUtil.sizeOfString("");
}
return CBUtil.sizeOfEnumValue(change)
- + CBUtil.sizeOfString(keyspace)
- + CBUtil.sizeOfString(target == Target.KEYSPACE ? "" : tableOrType);
+ + CBUtil.sizeOfString(keyOrNamespace)
+ + CBUtil.sizeOfString(target == Target.KEYSPACE ? "" : tableOrTypeOrFunction);
}
}
@Override
public String toString()
{
- return change + " " + target + " " + keyspace + (tableOrType == null ? "" : "." + tableOrType);
+ return change + " " + target + " " + keyOrNamespace + (tableOrTypeOrFunction == null ? "" : "." + tableOrTypeOrFunction);
}
@Override
public int hashCode()
{
- return Objects.hashCode(change, target, keyspace, tableOrType);
+ return Objects.hashCode(change, target, keyOrNamespace, tableOrTypeOrFunction);
}
@Override
@@ -323,8 +323,8 @@ public abstract class Event
SchemaChange scc = (SchemaChange)other;
return Objects.equal(change, scc.change)
&& Objects.equal(target, scc.target)
- && Objects.equal(keyspace, scc.keyspace)
- && Objects.equal(tableOrType, scc.tableOrType);
+ && Objects.equal(keyOrNamespace, scc.keyOrNamespace)
+ && Objects.equal(tableOrTypeOrFunction, scc.tableOrTypeOrFunction);
}
}
}