You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ty...@apache.org on 2014/09/03 21:38:17 UTC

git commit: Accept Java source code for user-defined functions

Repository: cassandra
Updated Branches:
  refs/heads/trunk 83bf0d5d5 -> 511129ddb


Accept Java source code for user-defined functions

Patch by Robert Stupp; reviewed by Tyler Hobbs for CASSANDRA-7562


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/511129dd
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/511129dd
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/511129dd

Branch: refs/heads/trunk
Commit: 511129ddbda694bebcd7502d1760f3587b8adf8b
Parents: 83bf0d5
Author: Robert Stupp <sn...@snazy.de>
Authored: Wed Sep 3 14:37:15 2014 -0500
Committer: Tyler Hobbs <ty...@datastax.com>
Committed: Wed Sep 3 14:37:15 2014 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 NOTICE.txt                                      |   3 +
 build.xml                                       |   5 +-
 lib/javassist-3.18.2-GA.jar                     | Bin 0 -> 714524 bytes
 .../cql3/functions/AbstractJavaUDF.java         |  91 +++++++++
 .../cql3/functions/JavaSourceBasedUDF.java      | 112 ++++++++++
 .../cql3/functions/ReflectionBasedUDF.java      |  46 +----
 .../cassandra/cql3/functions/UDFunction.java    |  25 ++-
 test/unit/org/apache/cassandra/cql3/UFTest.java | 204 +++++++++++++++++++
 9 files changed, 443 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/511129dd/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 3c87abe..6f87cf9 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.0
+ * Support Java source code for user-defined functions (CASSANDRA-7562)
  * Require arg types to disambiguate UDF drops (CASSANDRA-7812)
  * Do anticompaction in groups (CASSANDRA-6851)
  * Verify that UDF class methods are static (CASSANDRA-7781)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/511129dd/NOTICE.txt
----------------------------------------------------------------------
diff --git a/NOTICE.txt b/NOTICE.txt
index cf7b8dc..bbe21d7 100644
--- a/NOTICE.txt
+++ b/NOTICE.txt
@@ -60,3 +60,6 @@ Copyright 2010, Cedric Beust cedric@beust.com
 
 HLL++ support provided by stream-lib
 (https://github.com/addthis/stream-lib)
+
+Javassist
+(http://www.csg.ci.i.u-tokyo.ac.jp/~chiba/javassist/)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/511129dd/build.xml
----------------------------------------------------------------------
diff --git a/build.xml b/build.xml
index 7f030b0..00d1c86 100644
--- a/build.xml
+++ b/build.xml
@@ -391,6 +391,7 @@
           <dependency groupId="com.google.code.findbugs" artifactId="jsr305" version="2.0.2" />
           <dependency groupId="com.clearspring.analytics" artifactId="stream" version="2.5.2" />
           <dependency groupId="com.datastax.cassandra" artifactId="cassandra-driver-core" version="2.0.5" />
+          <dependency groupId="org.javassist" artifactId="javassist" version="3.18.2-GA" />
           <dependency groupId="net.sf.supercsv" artifactId="super-csv" version="2.1.0" />
 	  <dependency groupId="net.ju-n.compile-command-annotations" artifactId="compile-command-annotations" version="1.2.0" />
         </dependencyManagement>
@@ -436,7 +437,8 @@
       	<dependency groupId="com.google.code.findbugs" artifactId="jsr305"/>
         <dependency groupId="org.antlr" artifactId="antlr"/>
         <dependency groupId="com.datastax.cassandra" artifactId="cassandra-driver-core"/>
-	<dependency groupId="net.ju-n.compile-command-annotations" artifactId="compile-command-annotations"/>
+        <dependency groupId="net.ju-n.compile-command-annotations" artifactId="compile-command-annotations"/>
+        <dependency groupId="org.javassist" artifactId="javassist"/>
       </artifact:pom>
 
       <artifact:pom id="coverage-deps-pom"
@@ -494,6 +496,7 @@
         <dependency groupId="com.thinkaurelius.thrift" artifactId="thrift-server" version="0.3.6"/>
         <dependency groupId="com.clearspring.analytics" artifactId="stream" version="2.5.2" />
         <dependency groupId="net.sf.supercsv" artifactId="super-csv" version="2.1.0" />
+        <dependency groupId="org.javassist" artifactId="javassist"/>
 
         <dependency groupId="ch.qos.logback" artifactId="logback-core"/>
         <dependency groupId="ch.qos.logback" artifactId="logback-classic"/>

http://git-wip-us.apache.org/repos/asf/cassandra/blob/511129dd/lib/javassist-3.18.2-GA.jar
----------------------------------------------------------------------
diff --git a/lib/javassist-3.18.2-GA.jar b/lib/javassist-3.18.2-GA.jar
new file mode 100644
index 0000000..c8761c8
Binary files /dev/null and b/lib/javassist-3.18.2-GA.jar differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/511129dd/src/java/org/apache/cassandra/cql3/functions/AbstractJavaUDF.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/functions/AbstractJavaUDF.java b/src/java/org/apache/cassandra/cql3/functions/AbstractJavaUDF.java
new file mode 100644
index 0000000..f147f00
--- /dev/null
+++ b/src/java/org/apache/cassandra/cql3/functions/AbstractJavaUDF.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.cql3.functions;
+
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.nio.ByteBuffer;
+import java.util.List;
+
+import org.apache.cassandra.cql3.ColumnIdentifier;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.exceptions.InvalidRequestException;
+
+/**
+ * UDF implementation base class for reflection and Java source UDFs.
+ */
+abstract class AbstractJavaUDF extends UDFunction
+{
+    private final Method method;
+
+    AbstractJavaUDF(FunctionName name,
+                    List<ColumnIdentifier> argNames,
+                    List<AbstractType<?>> argTypes,
+                    AbstractType<?> returnType,
+                    String language,
+                    String body,
+                    boolean deterministic)
+    throws InvalidRequestException
+    {
+        super(name, argNames, argTypes, returnType, language, body, deterministic);
+        assert language.equals(requiredLanguage());
+        this.method = resolveMethod();
+    }
+
+    abstract String requiredLanguage();
+
+    abstract Method resolveMethod() throws InvalidRequestException;
+
+    protected Class<?>[] javaParamTypes()
+    {
+        Class<?> paramTypes[] = new Class[argTypes.size()];
+        for (int i = 0; i < paramTypes.length; i++)
+            paramTypes[i] = argTypes.get(i).getSerializer().getType();
+        return paramTypes;
+    }
+
+    protected Class<?> javaReturnType()
+    {
+        return returnType.getSerializer().getType();
+    }
+
+    public ByteBuffer execute(List<ByteBuffer> parameters) throws InvalidRequestException
+    {
+        Object[] parms = new Object[argTypes.size()];
+        for (int i = 0; i < parms.length; i++)
+        {
+            ByteBuffer bb = parameters.get(i);
+            if (bb != null)
+                parms[i] = argTypes.get(i).compose(bb);
+        }
+
+        Object result;
+        try
+        {
+            result = method.invoke(null, parms);
+            @SuppressWarnings("unchecked") ByteBuffer r = result != null ? ((AbstractType) returnType).decompose(result) : null;
+            return r;
+        }
+        catch (InvocationTargetException | IllegalAccessException e)
+        {
+            Throwable c = e.getCause();
+            logger.error("Invocation of function '{}' failed", this, c);
+            throw new InvalidRequestException("Invocation of function '" + this + "' failed: " + c);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/511129dd/src/java/org/apache/cassandra/cql3/functions/JavaSourceBasedUDF.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/functions/JavaSourceBasedUDF.java b/src/java/org/apache/cassandra/cql3/functions/JavaSourceBasedUDF.java
new file mode 100644
index 0000000..7e483a0
--- /dev/null
+++ b/src/java/org/apache/cassandra/cql3/functions/JavaSourceBasedUDF.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.cql3.functions;
+
+import java.lang.reflect.Method;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javassist.CannotCompileException;
+import javassist.ClassPool;
+import javassist.CtClass;
+import javassist.CtNewMethod;
+import org.apache.cassandra.cql3.ColumnIdentifier;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.exceptions.InvalidRequestException;
+
+/**
+ * User-defined function using Java source code in UDF body.
+ * <p/>
+ * This is used when the LANGUAGE of the UDF definition is "java".
+ */
+final class JavaSourceBasedUDF extends AbstractJavaUDF
+{
+    static final AtomicInteger clsIdgen = new AtomicInteger();
+
+    JavaSourceBasedUDF(FunctionName name,
+                       List<ColumnIdentifier> argNames,
+                       List<AbstractType<?>> argTypes,
+                       AbstractType<?> returnType,
+                       String language,
+                       String body,
+                       boolean deterministic)
+    throws InvalidRequestException
+    {
+        super(name, argNames, argTypes, returnType, language, body, deterministic);
+    }
+
+    String requiredLanguage()
+    {
+        return "java";
+    }
+
+    Method resolveMethod() throws InvalidRequestException
+    {
+        Class<?> jReturnType = javaReturnType();
+        Class<?>[] paramTypes = javaParamTypes();
+
+        StringBuilder code = new StringBuilder();
+        code.append("public static ").
+             append(jReturnType.getName()).append(' ').
+             append(name.name).append('(');
+        for (int i = 0; i < paramTypes.length; i++)
+        {
+            if (i > 0)
+                code.append(", ");
+            code.append(paramTypes[i].getName()).
+                 append(' ').
+                 append(argNames.get(i));
+        }
+        code.append(") { ");
+        code.append(body);
+        code.append('}');
+
+        ClassPool classPool = ClassPool.getDefault();
+        CtClass cc = classPool.makeClass("org.apache.cassandra.cql3.udf.gen.C" + javaIdentifierPart(name.toString()) + '_' + clsIdgen.incrementAndGet());
+        try
+        {
+            cc.addMethod(CtNewMethod.make(code.toString(), cc));
+            Class<?> clazz = cc.toClass();
+            return clazz.getMethod(name.name, paramTypes);
+        }
+        catch (LinkageError e)
+        {
+            throw new InvalidRequestException("Could not compile function '" + name + "' from Java source: " + e.getMessage());
+        }
+        catch (CannotCompileException e)
+        {
+            throw new InvalidRequestException("Could not compile function '" + name + "' from Java source: " + e.getReason());
+        }
+        catch (NoSuchMethodException e)
+        {
+            throw new InvalidRequestException("Could not build function '" + name + "' from Java source");
+        }
+    }
+
+    private static String javaIdentifierPart(String qualifiedName)
+    {
+        StringBuilder sb = new StringBuilder(qualifiedName.length());
+        for (int i = 0; i < qualifiedName.length(); i++)
+        {
+            char c = qualifiedName.charAt(i);
+            if (Character.isJavaIdentifierPart(c))
+                sb.append(c);
+        }
+        return sb.toString();
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/511129dd/src/java/org/apache/cassandra/cql3/functions/ReflectionBasedUDF.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/functions/ReflectionBasedUDF.java b/src/java/org/apache/cassandra/cql3/functions/ReflectionBasedUDF.java
index 68e388d..e02147a 100644
--- a/src/java/org/apache/cassandra/cql3/functions/ReflectionBasedUDF.java
+++ b/src/java/org/apache/cassandra/cql3/functions/ReflectionBasedUDF.java
@@ -17,10 +17,8 @@
  */
 package org.apache.cassandra.cql3.functions;
 
-import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.lang.reflect.Modifier;
-import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.List;
 
@@ -34,10 +32,8 @@ import org.apache.cassandra.exceptions.InvalidRequestException;
  *
  * This is used when the LANGUAGE of the UDF definition is "class".
  */
-class ReflectionBasedUDF extends UDFunction
+final class ReflectionBasedUDF extends AbstractJavaUDF
 {
-    public final Method method;
-
     ReflectionBasedUDF(FunctionName name,
                        List<ColumnIdentifier> argNames,
                        List<AbstractType<?>> argTypes,
@@ -48,16 +44,17 @@ class ReflectionBasedUDF extends UDFunction
     throws InvalidRequestException
     {
         super(name, argNames, argTypes, returnType, language, body, deterministic);
-        assert language.equals("class");
-        this.method = resolveClassMethod();
     }
 
-    private Method resolveClassMethod() throws InvalidRequestException
+    String requiredLanguage()
+    {
+        return "class";
+    }
+
+    Method resolveMethod() throws InvalidRequestException
     {
-        Class<?> jReturnType = returnType.getSerializer().getType();
-        Class<?> paramTypes[] = new Class[argTypes.size()];
-        for (int i = 0; i < paramTypes.length; i++)
-            paramTypes[i] = argTypes.get(i).getSerializer().getType();
+        Class<?> jReturnType = javaReturnType();
+        Class<?>[] paramTypes = javaParamTypes();
 
         String className;
         String methodName;
@@ -98,29 +95,4 @@ class ReflectionBasedUDF extends UDFunction
             throw new InvalidRequestException("Method " + className + '.' + methodName + '(' + Arrays.toString(paramTypes) + ") does not exist");
         }
     }
-
-    public ByteBuffer execute(List<ByteBuffer> parameters) throws InvalidRequestException
-    {
-        Object[] parms = new Object[argTypes.size()];
-        for (int i = 0; i < parms.length; i++)
-        {
-            ByteBuffer bb = parameters.get(i);
-            if (bb != null)
-                parms[i] = argTypes.get(i).compose(bb);
-        }
-
-        Object result;
-        try
-        {
-            result = method.invoke(null, parms);
-            @SuppressWarnings("unchecked") ByteBuffer r = result != null ? ((AbstractType) returnType).decompose(result) : null;
-            return r;
-        }
-        catch (InvocationTargetException | IllegalAccessException e)
-        {
-            Throwable c = e.getCause();
-            logger.error("Invocation of function {} failed", name, c);
-            throw new InvalidRequestException("Invocation of function " + name + " failed: " + c);
-        }
-    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/511129dd/src/java/org/apache/cassandra/cql3/functions/UDFunction.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/functions/UDFunction.java b/src/java/org/apache/cassandra/cql3/functions/UDFunction.java
index aeefe41..b4a706d 100644
--- a/src/java/org/apache/cassandra/cql3/functions/UDFunction.java
+++ b/src/java/org/apache/cassandra/cql3/functions/UDFunction.java
@@ -77,6 +77,7 @@ public abstract class UDFunction extends AbstractFunction
         switch (language)
         {
             case "class": return new ReflectionBasedUDF(name, argNames, argTypes, returnType, language, body, deterministic);
+            case "java": return new JavaSourceBasedUDF(name, argNames, argTypes, returnType, language, body, deterministic);
             default: throw new InvalidRequestException(String.format("Invalid language %s for '%s'", language, name));
         }
     }
@@ -183,13 +184,25 @@ public abstract class UDFunction extends AbstractFunction
         List<String> names = row.getList("argument_names", UTF8Type.instance);
         List<String> types = row.getList("argument_types", UTF8Type.instance);
 
-        List<ColumnIdentifier> argNames = new ArrayList<>(names.size());
-        for (String arg : names)
-            argNames.add(new ColumnIdentifier(arg, true));
+        List<ColumnIdentifier> argNames;
+        if (names == null)
+            argNames = Collections.emptyList();
+        else
+        {
+            argNames = new ArrayList<>(names.size());
+            for (String arg : names)
+                argNames.add(new ColumnIdentifier(arg, true));
+        }
 
-        List<AbstractType<?>> argTypes = new ArrayList<>(types.size());
-        for (String type : types)
-            argTypes.add(parseType(type));
+        List<AbstractType<?>> argTypes;
+        if (types == null)
+            argTypes = Collections.emptyList();
+        else
+        {
+            argTypes = new ArrayList<>(types.size());
+            for (String type : types)
+                argTypes.add(parseType(type));
+        }
 
         AbstractType<?> returnType = parseType(row.getString("return_type"));
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/511129dd/test/unit/org/apache/cassandra/cql3/UFTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/UFTest.java b/test/unit/org/apache/cassandra/cql3/UFTest.java
index 60a7a68..b2e4b84 100644
--- a/test/unit/org/apache/cassandra/cql3/UFTest.java
+++ b/test/unit/org/apache/cassandra/cql3/UFTest.java
@@ -17,8 +17,11 @@
  */
 package org.apache.cassandra.cql3;
 
+import org.junit.Assert;
 import org.junit.Test;
 
+import org.apache.cassandra.exceptions.InvalidRequestException;
+
 public class UFTest extends CQLTester
 {
     public static Double sin(Double val)
@@ -206,4 +209,205 @@ public class UFTest extends CQLTester
         // overloaded has just one overload now - so the following DROP FUNCTION is not ambigious (CASSANDRA-7812)
         execute("DROP FUNCTION overloaded");
     }
+
+    @Test
+    public void testCreateOrReplaceJavaFunction() throws Throwable
+    {
+        execute("create function foo::corjf ( input double ) returns double language java\n" +
+                "AS '\n" +
+                "  // parameter val is of type java.lang.Double\n" +
+                "  /* return type is of type java.lang.Double */\n" +
+                "  if (input == null) {\n" +
+                "    return null;\n" +
+                "  }\n" +
+                "  double v = Math.sin( input.doubleValue() );\n" +
+                "  return Double.valueOf(v);\n" +
+                "';");
+        execute("create or replace function foo::corjf ( input double ) returns double language java\n" +
+                "AS '\n" +
+                "  // parameter val is of type java.lang.Double\n" +
+                "  /* return type is of type java.lang.Double */\n" +
+                "  if (input == null) {\n" +
+                "    return null;\n" +
+                "  }\n" +
+                "  double v = Math.sin( input.doubleValue() );\n" +
+                "  return Double.valueOf(v);\n" +
+                "';");
+    }
+
+    @Test
+    public void testJavaFunctionNoParameters() throws Throwable
+    {
+        createTable("CREATE TABLE %s (key int primary key, val double)");
+
+        String functionBody = "\n  return Long.valueOf(1L);\n";
+
+        String cql = "CREATE OR REPLACE FUNCTION jfnpt() RETURNS bigint LANGUAGE JAVA\n" +
+                     "AS '" + functionBody + "';";
+
+        execute(cql);
+
+        assertRows(execute("SELECT language, body FROM system.schema_functions WHERE namespace='' AND name='jfnpt'"),
+                   row("java", functionBody));
+
+        execute("INSERT INTO %s (key, val) VALUES (?, ?)", 1, 1d);
+        execute("INSERT INTO %s (key, val) VALUES (?, ?)", 2, 2d);
+        execute("INSERT INTO %s (key, val) VALUES (?, ?)", 3, 3d);
+        assertRows(execute("SELECT key, val, jfnpt() FROM %s"),
+                   row(1, 1d, 1L),
+                   row(2, 2d, 1L),
+                   row(3, 3d, 1L)
+        );
+    }
+
+    @Test
+    public void testJavaFunctionInvalidBodies() throws Throwable
+    {
+        try
+        {
+            execute("CREATE OR REPLACE FUNCTION jfinv() RETURNS bigint LANGUAGE JAVA\n" +
+                    "AS '\n" +
+                    "foobarbaz" +
+                    "\n';");
+            Assert.fail();
+        }
+        catch (InvalidRequestException e)
+        {
+            Assert.assertTrue(e.getMessage(), e.getMessage().contains("[source error]"));
+            Assert.assertTrue(e.getMessage(), e.getMessage().contains("; is missing"));
+        }
+
+        try
+        {
+            execute("CREATE OR REPLACE FUNCTION jfinv() RETURNS bigint LANGUAGE JAVA\n" +
+                    "AS '\n" +
+                    "foobarbaz;" +
+                    "\n';");
+            Assert.fail();
+        }
+        catch (InvalidRequestException e)
+        {
+            Assert.assertTrue(e.getMessage(), e.getMessage().contains("[source error]"));
+            Assert.assertTrue(e.getMessage(), e.getMessage().contains("no such field: foobarbaz"));
+        }
+    }
+
+    @Test
+    public void testJavaFunctionInvalidReturn() throws Throwable
+    {
+        String functionBody = "\n" +
+                              "  return Long.valueOf(1L);\n";
+
+        String cql = "CREATE OR REPLACE FUNCTION jfir(val double) RETURNS double LANGUAGE JAVA\n" +
+                     "AS '" + functionBody + "';";
+
+        assertInvalid(cql);
+    }
+
+    @Test
+    public void testJavaFunctionArgumentTypeMismatch() throws Throwable
+    {
+        createTable("CREATE TABLE %s (key int primary key, val bigint)");
+
+        String functionBody = "\n" +
+                              "  return val;\n";
+
+        String cql = "CREATE OR REPLACE FUNCTION jft(val double) RETURNS double LANGUAGE JAVA\n" +
+                     "AS '" + functionBody + "';";
+
+        execute(cql);
+
+        execute("INSERT INTO %s (key, val) VALUES (?, ?)", 1, 1L);
+        execute("INSERT INTO %s (key, val) VALUES (?, ?)", 2, 2L);
+        execute("INSERT INTO %s (key, val) VALUES (?, ?)", 3, 3L);
+        assertInvalid("SELECT key, val, jft(val) FROM %s");
+    }
+
+    @Test
+    public void testJavaFunction() throws Throwable
+    {
+        createTable("CREATE TABLE %s (key int primary key, val double)");
+
+        String functionBody = "\n" +
+                              "  // parameter val is of type java.lang.Double\n" +
+                              "  /* return type is of type java.lang.Double */\n" +
+                              "  if (val == null) {\n" +
+                              "    return null;\n" +
+                              "  }\n" +
+                              "  double v = Math.sin( val.doubleValue() );\n" +
+                              "  return Double.valueOf(v);\n";
+
+        String cql = "CREATE OR REPLACE FUNCTION jft(val double) RETURNS double LANGUAGE JAVA\n" +
+                     "AS '" + functionBody + "';";
+
+        execute(cql);
+
+        assertRows(execute("SELECT language, body FROM system.schema_functions WHERE namespace='' AND name='jft'"),
+                   row("java", functionBody));
+
+        execute("INSERT INTO %s (key, val) VALUES (?, ?)", 1, 1d);
+        execute("INSERT INTO %s (key, val) VALUES (?, ?)", 2, 2d);
+        execute("INSERT INTO %s (key, val) VALUES (?, ?)", 3, 3d);
+        assertRows(execute("SELECT key, val, jft(val) FROM %s"),
+                   row(1, 1d, Math.sin(1d)),
+                   row(2, 2d, Math.sin(2d)),
+                   row(3, 3d, Math.sin(3d))
+        );
+    }
+
+    @Test
+    public void testJavaNamespaceFunction() throws Throwable
+    {
+        createTable("CREATE TABLE %s (key int primary key, val double)");
+
+        String functionBody = "\n" +
+                              "  // parameter val is of type java.lang.Double\n" +
+                              "  /* return type is of type java.lang.Double */\n" +
+                              "  if (val == null) {\n" +
+                              "    return null;\n" +
+                              "  }\n" +
+                              "  double v = Math.sin( val.doubleValue() );\n" +
+                              "  return Double.valueOf(v);\n";
+
+        String cql = "CREATE OR REPLACE FUNCTION foo::jnft(val double) RETURNS double LANGUAGE JAVA\n" +
+                     "AS '" + functionBody + "';";
+
+        execute(cql);
+
+        assertRows(execute("SELECT language, body FROM system.schema_functions WHERE namespace='foo' AND name='jnft'"),
+                   row("java", functionBody));
+
+        execute("INSERT INTO %s (key, val) VALUES (?, ?)", 1, 1d);
+        execute("INSERT INTO %s (key, val) VALUES (?, ?)", 2, 2d);
+        execute("INSERT INTO %s (key, val) VALUES (?, ?)", 3, 3d);
+        assertRows(execute("SELECT key, val, foo::jnft(val) FROM %s"),
+                   row(1, 1d, Math.sin(1d)),
+                   row(2, 2d, Math.sin(2d)),
+                   row(3, 3d, Math.sin(3d))
+        );
+    }
+
+    @Test
+    public void testJavaRuntimeException() throws Throwable
+    {
+        createTable("CREATE TABLE %s (key int primary key, val double)");
+
+        String functionBody = "\n" +
+                              "  throw new RuntimeException(\"oh no!\");\n";
+
+        String cql = "CREATE OR REPLACE FUNCTION foo::jrtef(val double) RETURNS double LANGUAGE JAVA\n" +
+                     "AS '" + functionBody + "';";
+
+        execute(cql);
+
+        assertRows(execute("SELECT language, body FROM system.schema_functions WHERE namespace='foo' AND name='jrtef'"),
+                   row("java", functionBody));
+
+        execute("INSERT INTO %s (key, val) VALUES (?, ?)", 1, 1d);
+        execute("INSERT INTO %s (key, val) VALUES (?, ?)", 2, 2d);
+        execute("INSERT INTO %s (key, val) VALUES (?, ?)", 3, 3d);
+
+        // function throws a RuntimeException which is wrapped by InvalidRequestException
+        assertInvalid("SELECT key, val, foo::jrtef(val) FROM %s");
+    }
 }