You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ty...@apache.org on 2014/11/27 00:49:59 UTC

[1/2] cassandra git commit: Support for UDTs, tuples, and collections in UDFs

Repository: cassandra
Updated Branches:
  refs/heads/trunk e13121318 -> 794d68b51


http://git-wip-us.apache.org/repos/asf/cassandra/blob/794d68b5/test/unit/org/apache/cassandra/cql3/UFTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/UFTest.java b/test/unit/org/apache/cassandra/cql3/UFTest.java
index 4975ca9..824719b 100644
--- a/test/unit/org/apache/cassandra/cql3/UFTest.java
+++ b/test/unit/org/apache/cassandra/cql3/UFTest.java
@@ -19,50 +19,51 @@ package org.apache.cassandra.cql3;
 
 import java.math.BigDecimal;
 import java.math.BigInteger;
-import java.util.Date;
+import java.util.*;
 
-import org.junit.After;
 import org.junit.Assert;
-import org.junit.Before;
 import org.junit.Test;
 
+import com.datastax.driver.core.*;
 import org.apache.cassandra.cql3.functions.FunctionName;
 import org.apache.cassandra.cql3.functions.Functions;
 import org.apache.cassandra.exceptions.InvalidRequestException;
 import org.apache.cassandra.service.ClientState;
+import org.apache.cassandra.transport.Server;
 import org.apache.cassandra.transport.messages.ResultMessage;
 
 public class UFTest extends CQLTester
 {
-    private static final String KS_FOO = "cqltest_foo";
 
-    @Before
-    public void createKsFoo() throws Throwable
+    public static FunctionName parseFunctionName(String qualifiedName)
     {
-        execute("CREATE KEYSPACE IF NOT EXISTS "+KS_FOO+" WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 3};");
-    }
-
-    @After
-    public void dropKsFoo() throws Throwable
-    {
-        execute("DROP KEYSPACE IF EXISTS "+KS_FOO+";");
+        int i = qualifiedName.indexOf('.');
+        return i == -1
+               ? FunctionName.nativeFunction(qualifiedName)
+               : new FunctionName(qualifiedName.substring(0, i).trim(), qualifiedName.substring(i+1).trim());
     }
 
     @Test
     public void testFunctionDropOnKeyspaceDrop() throws Throwable
     {
-        execute("CREATE FUNCTION " + KS_FOO + ".sin ( input double ) RETURNS double LANGUAGE java AS 'return Double.valueOf(Math.sin(input.doubleValue()));'");
+        String fSin = createFunction(KEYSPACE_PER_TEST, "double",
+                                     "CREATE FUNCTION %s ( input double ) " +
+                                     "RETURNS double " +
+                                     "LANGUAGE java " +
+                                     "AS 'return Double.valueOf(Math.sin(input.doubleValue()));'");
 
-        Assert.assertEquals(1, Functions.find(new FunctionName(KS_FOO, "sin")).size());
+        FunctionName fSinName = parseFunctionName(fSin);
 
-        assertRows(execute("SELECT function_name, language FROM system.schema_functions WHERE keyspace_name=?", KS_FOO),
-                   row("sin", "java"));
+        Assert.assertEquals(1, Functions.find(parseFunctionName(fSin)).size());
 
-        execute("DROP KEYSPACE "+KS_FOO+";");
+        assertRows(execute("SELECT function_name, language FROM system.schema_functions WHERE keyspace_name=?", KEYSPACE_PER_TEST),
+                   row(fSinName.name, "java"));
 
-        assertRows(execute("SELECT function_name, language FROM system.schema_functions WHERE keyspace_name=?", KS_FOO));
+        dropPerTestKeyspace();
 
-        Assert.assertEquals(0, Functions.find(new FunctionName(KS_FOO, "sin")).size());
+        assertRows(execute("SELECT function_name, language FROM system.schema_functions WHERE keyspace_name=?", KEYSPACE_PER_TEST));
+
+        Assert.assertEquals(0, Functions.find(fSinName).size());
     }
 
     @Test
@@ -70,27 +71,40 @@ public class UFTest extends CQLTester
     {
         createTable("CREATE TABLE %s (key int PRIMARY KEY, d double)");
 
-        execute("CREATE FUNCTION " + KS_FOO + ".sin ( input double ) RETURNS double LANGUAGE java AS 'return Double.valueOf(Math.sin(input.doubleValue()));'");
+        String fSin = createFunction(KEYSPACE_PER_TEST, "double",
+                                     "CREATE FUNCTION %s ( input double ) " +
+                                     "RETURNS double " +
+                                     "LANGUAGE java " +
+                                     "AS 'return Double.valueOf(Math.sin(input.doubleValue()));'");
+
+        FunctionName fSinName = parseFunctionName(fSin);
 
-        Assert.assertEquals(1, Functions.find(new FunctionName(KS_FOO, "sin")).size());
+        Assert.assertEquals(1, Functions.find(parseFunctionName(fSin)).size());
 
-        ResultMessage.Prepared prepared = QueryProcessor.prepare("SELECT key, "+KS_FOO+".sin(d) FROM "+KEYSPACE+'.'+currentTable(), ClientState.forInternalCalls(), false);
+        ResultMessage.Prepared prepared = QueryProcessor.prepare(
+                                                    String.format("SELECT key, %s(d) FROM %s.%s", fSin, KEYSPACE, currentTable()),
+                                                    ClientState.forInternalCalls(), false);
         Assert.assertNotNull(QueryProcessor.instance.getPrepared(prepared.statementId));
 
-        execute("DROP FUNCTION " + KS_FOO + ".sin(double);");
+        execute("DROP FUNCTION " + fSin + "(double);");
 
         Assert.assertNull(QueryProcessor.instance.getPrepared(prepared.statementId));
 
         //
 
-        execute("CREATE FUNCTION " + KS_FOO + ".sin ( input double ) RETURNS double LANGUAGE java AS 'return Double.valueOf(Math.sin(input.doubleValue()));'");
+        execute("CREATE FUNCTION " + fSin + " ( input double ) " +
+                "RETURNS double " +
+                "LANGUAGE java " +
+                "AS 'return Double.valueOf(Math.sin(input.doubleValue()));'");
 
-        Assert.assertEquals(1, Functions.find(new FunctionName(KS_FOO, "sin")).size());
+        Assert.assertEquals(1, Functions.find(fSinName).size());
 
-        prepared = QueryProcessor.prepare("SELECT key, "+KS_FOO+".sin(d) FROM "+KEYSPACE+'.'+currentTable(), ClientState.forInternalCalls(), false);
+        prepared = QueryProcessor.prepare(
+                                         String.format("SELECT key, %s(d) FROM %s.%s", fSin, KEYSPACE, currentTable()),
+                                         ClientState.forInternalCalls(), false);
         Assert.assertNotNull(QueryProcessor.instance.getPrepared(prepared.statementId));
 
-        execute("DROP KEYSPACE " + KS_FOO + ";");
+        dropPerTestKeyspace();
 
         Assert.assertNull(QueryProcessor.instance.getPrepared(prepared.statementId));
     }
@@ -105,47 +119,55 @@ public class UFTest extends CQLTester
         execute("INSERT INTO %s(key, d) VALUES (?, ?)", 3, 3d);
 
         // simple creation
-        execute("CREATE FUNCTION "+KS_FOO+".sin ( input double ) RETURNS double LANGUAGE java AS 'return Double.valueOf(Math.sin(input.doubleValue()));'");
+        String fSin = createFunction(KEYSPACE_PER_TEST, "double",
+                                     "CREATE FUNCTION %s ( input double ) " +
+                                     "RETURNS double " +
+                                     "LANGUAGE java " +
+                                     "AS 'return Double.valueOf(Math.sin(input.doubleValue()));'");
         // check we can't recreate the same function
-        assertInvalid("CREATE FUNCTION "+KS_FOO+".sin ( input double ) RETURNS double LANGUAGE java AS 'return Double.valueOf(Math.sin(input.doubleValue()));'");
-        // but that it doesn't complay with "IF NOT EXISTS"
-        execute("CREATE FUNCTION IF NOT EXISTS "+KS_FOO+".sin ( input double ) RETURNS double LANGUAGE java AS 'return Double.valueOf(Math.sin(input.doubleValue()));'");
+        assertInvalid("CREATE FUNCTION " + fSin + " ( input double ) RETURNS double LANGUAGE java AS 'return Double.valueOf(Math.sin(input.doubleValue()));'");
+        // but that it doesn't comply with "IF NOT EXISTS"
+        execute("CREATE FUNCTION IF NOT EXISTS " + fSin + " ( input double ) RETURNS double LANGUAGE java AS 'return Double.valueOf(Math.sin(input.doubleValue()));'");
 
         // Validate that it works as expected
-        assertRows(execute("SELECT key, "+KS_FOO+".sin(d) FROM %s"),
+        assertRows(execute("SELECT key, " + fSin + "(d) FROM %s"),
             row(1, Math.sin(1d)),
             row(2, Math.sin(2d)),
             row(3, Math.sin(3d))
         );
 
         // Replace the method with incompatible return type
-        assertInvalid("CREATE OR REPLACE FUNCTION " + KS_FOO + ".sin ( input double ) RETURNS text LANGUAGE java AS 'return Double.valueOf(42d);'");
+        assertInvalid("CREATE OR REPLACE FUNCTION " + fSin + " ( input double ) RETURNS text LANGUAGE java AS 'return Double.valueOf(42d);'");
         // proper replacement
-        execute("CREATE OR REPLACE FUNCTION "+KS_FOO+".sin ( input double ) RETURNS double LANGUAGE java AS 'return Double.valueOf(42d);'");
+        execute("CREATE OR REPLACE FUNCTION " + fSin + " ( input double ) RETURNS double LANGUAGE java AS 'return Double.valueOf(42d);'");
 
         // Validate the method as been replaced
-        assertRows(execute("SELECT key, "+KS_FOO+".sin(d) FROM %s"),
+        assertRows(execute("SELECT key, " + fSin + "(d) FROM %s"),
             row(1, 42.0),
             row(2, 42.0),
             row(3, 42.0)
         );
 
-        // same function but without namespace
-        execute("CREATE FUNCTION "+KEYSPACE+".sin ( input double ) RETURNS double LANGUAGE java AS 'return Double.valueOf(Math.sin(input.doubleValue()));'");
-        assertRows(execute("SELECT key, "+KEYSPACE+".sin(d) FROM %s"),
+        // same function but other keyspace
+        String fSin2 = createFunction(KEYSPACE, "double",
+                                      "CREATE FUNCTION %s ( input double ) " +
+                                      "RETURNS double " +
+                                      "LANGUAGE java " +
+                                      "AS 'return Double.valueOf(Math.sin(input.doubleValue()));'");
+        assertRows(execute("SELECT key, " + fSin2 + "(d) FROM %s"),
             row(1, Math.sin(1d)),
             row(2, Math.sin(2d)),
             row(3, Math.sin(3d))
         );
 
-        // Drop with and without keyspace
-        execute("DROP FUNCTION "+KS_FOO+".sin");
-        execute("DROP FUNCTION "+KEYSPACE+".sin");
+        // Drop
+        execute("DROP FUNCTION " + fSin);
+        execute("DROP FUNCTION " + fSin2);
 
         // Drop unexisting function
-        assertInvalid("DROP FUNCTION "+KS_FOO+".sin");
+        assertInvalid("DROP FUNCTION " + fSin);
         // but don't complain with "IF EXISTS"
-        execute("DROP FUNCTION IF EXISTS "+KS_FOO+".sin");
+        execute("DROP FUNCTION IF EXISTS " + fSin);
 
         // can't drop native functions
         assertInvalid("DROP FUNCTION dateof");
@@ -162,13 +184,17 @@ public class UFTest extends CQLTester
 
         execute("INSERT INTO %s(v) VALUES (?)", "aaa");
 
-        execute("CREATE FUNCTION "+KEYSPACE+".repeat (v text, n int) RETURNS text LANGUAGE java AS 'StringBuilder sb = new StringBuilder();\n" +
-                "        for (int i = 0; i < n.intValue(); i++)\n" +
-                "            sb.append(v);\n" +
-                "        return sb.toString();'");
-
-        assertRows(execute("SELECT v FROM %s WHERE v="+KEYSPACE+".repeat(?, ?)", "a", 3), row("aaa"));
-        assertEmpty(execute("SELECT v FROM %s WHERE v="+KEYSPACE+".repeat(?, ?)", "a", 2));
+        String fRepeat = createFunction(KEYSPACE_PER_TEST, "text,int",
+                                        "CREATE FUNCTION %s(v text, n int) " +
+                                        "RETURNS text " +
+                                        "LANGUAGE java " +
+                                        "AS 'StringBuilder sb = new StringBuilder();\n" +
+                                        "    for (int i = 0; i < n.intValue(); i++)\n" +
+                                        "        sb.append(v);\n" +
+                                        "    return sb.toString();'");
+
+        assertRows(execute("SELECT v FROM %s WHERE v=" + fRepeat + "(?, ?)", "a", 3), row("aaa"));
+        assertEmpty(execute("SELECT v FROM %s WHERE v=" + fRepeat + "(?, ?)", "a", 2));
     }
 
     @Test
@@ -178,46 +204,56 @@ public class UFTest extends CQLTester
 
         execute("INSERT INTO %s(k, v) VALUES (?, ?)", "f2", 1);
 
-        execute("CREATE FUNCTION "+KEYSPACE+".overloaded(v varchar) RETURNS text LANGUAGE java AS 'return \"f1\";'");
-        execute("CREATE OR REPLACE FUNCTION "+KEYSPACE+".overloaded(i int) RETURNS text LANGUAGE java AS 'return \"f2\";'");
-        execute("CREATE OR REPLACE FUNCTION "+KEYSPACE+".overloaded(v1 text, v2 text) RETURNS text LANGUAGE java AS 'return \"f3\";'");
-        execute("CREATE OR REPLACE FUNCTION "+KEYSPACE+".overloaded(v ascii) RETURNS text LANGUAGE java AS 'return \"f1\";'");
+        String fOverload = createFunction(KEYSPACE_PER_TEST, "varchar",
+                                          "CREATE FUNCTION %s ( input varchar ) " +
+                                          "RETURNS text " +
+                                          "LANGUAGE java " +
+                                          "AS 'return \"f1\";'");
+        createFunctionOverload(fOverload,
+                               "int",
+                               "CREATE OR REPLACE FUNCTION %s(i int) RETURNS text LANGUAGE java AS 'return \"f2\";'");
+        createFunctionOverload(fOverload,
+                               "text,text",
+                               "CREATE OR REPLACE FUNCTION %s(v1 text, v2 text) RETURNS text LANGUAGE java AS 'return \"f3\";'");
+        createFunctionOverload(fOverload,
+                               "ascii",
+                               "CREATE OR REPLACE FUNCTION %s(v ascii) RETURNS text LANGUAGE java AS 'return \"f1\";'");
 
         // text == varchar, so this should be considered as a duplicate
-        assertInvalid("CREATE FUNCTION "+KEYSPACE+".overloaded(v varchar) RETURNS text LANGUAGE java AS 'return \"f1\";'");
+        assertInvalid("CREATE FUNCTION " + fOverload + "(v varchar) RETURNS text LANGUAGE java AS 'return \"f1\";'");
 
-        assertRows(execute("SELECT "+KEYSPACE+".overloaded(k), "+KEYSPACE+".overloaded(v), "+KEYSPACE+".overloaded(k, k) FROM %s"),
+        assertRows(execute("SELECT " + fOverload + "(k), " + fOverload + "(v), " + fOverload + "(k, k) FROM %s"),
             row("f1", "f2", "f3")
         );
 
         forcePreparedValues();
         // This shouldn't work if we use preparation since there no way to know which overload to use
-        assertInvalid("SELECT v FROM %s WHERE k = "+KEYSPACE+".overloaded(?)", "foo");
+        assertInvalid("SELECT v FROM %s WHERE k = " + fOverload + "(?)", "foo");
         stopForcingPreparedValues();
 
         // but those should since we specifically cast
-        assertEmpty(execute("SELECT v FROM %s WHERE k = "+KEYSPACE+".overloaded((text)?)", "foo"));
-        assertRows(execute("SELECT v FROM %s WHERE k = "+KEYSPACE+".overloaded((int)?)", 3), row(1));
-        assertEmpty(execute("SELECT v FROM %s WHERE k = "+KEYSPACE+".overloaded((ascii)?)", "foo"));
+        assertEmpty(execute("SELECT v FROM %s WHERE k = " + fOverload + "((text)?)", "foo"));
+        assertRows(execute("SELECT v FROM %s WHERE k = " + fOverload + "((int)?)", 3), row(1));
+        assertEmpty(execute("SELECT v FROM %s WHERE k = " + fOverload + "((ascii)?)", "foo"));
         // And since varchar == text, this should work too
-        assertEmpty(execute("SELECT v FROM %s WHERE k = "+KEYSPACE+".overloaded((varchar)?)", "foo"));
+        assertEmpty(execute("SELECT v FROM %s WHERE k = " + fOverload + "((varchar)?)", "foo"));
 
         // no such functions exist...
-        assertInvalid("DROP FUNCTION "+KEYSPACE+".overloaded(boolean)");
-        assertInvalid("DROP FUNCTION "+KEYSPACE+".overloaded(bigint)");
+        assertInvalid("DROP FUNCTION " + fOverload + "(boolean)");
+        assertInvalid("DROP FUNCTION " + fOverload + "(bigint)");
 
         // 'overloaded' has multiple overloads - so it has to fail (CASSANDRA-7812)
-        assertInvalid("DROP FUNCTION "+KEYSPACE+".overloaded");
-        execute("DROP FUNCTION " + KEYSPACE + ".overloaded(varchar)");
-        assertInvalid("SELECT v FROM %s WHERE k = " + KEYSPACE + ".overloaded((text)?)", "foo");
-        execute("DROP FUNCTION " + KEYSPACE + ".overloaded(text, text)");
-        assertInvalid("SELECT v FROM %s WHERE k = " + KEYSPACE + ".overloaded((text)?,(text)?)", "foo", "bar");
-        execute("DROP FUNCTION " + KEYSPACE + ".overloaded(ascii)");
-        assertInvalid("SELECT v FROM %s WHERE k = "+KEYSPACE+".overloaded((ascii)?)", "foo");
+        assertInvalid("DROP FUNCTION " + fOverload);
+        execute("DROP FUNCTION " + fOverload + "(varchar)");
+        assertInvalid("SELECT v FROM %s WHERE k = " + fOverload + "((text)?)", "foo");
+        execute("DROP FUNCTION " + fOverload + "(text, text)");
+        assertInvalid("SELECT v FROM %s WHERE k = " + fOverload + "((text)?,(text)?)", "foo", "bar");
+        execute("DROP FUNCTION " + fOverload + "(ascii)");
+        assertInvalid("SELECT v FROM %s WHERE k = " + fOverload + "((ascii)?)", "foo");
         // single-int-overload must still work
-        assertRows(execute("SELECT v FROM %s WHERE k = " + KEYSPACE + ".overloaded((int)?)", 3), row(1));
+        assertRows(execute("SELECT v FROM %s WHERE k = " + fOverload + "((int)?)", 3), row(1));
         // overloaded has just one overload now - so the following DROP FUNCTION is not ambigious (CASSANDRA-7812)
-        execute("DROP FUNCTION "+KEYSPACE+".overloaded");
+        execute("DROP FUNCTION " + fOverload + "");
     }
 
     @Test
@@ -228,7 +264,9 @@ public class UFTest extends CQLTester
         execute("INSERT INTO %s (key, val) VALUES (?, ?)", 2, 2d);
         execute("INSERT INTO %s (key, val) VALUES (?, ?)", 3, 3d);
 
-        execute("create function "+KS_FOO+".corjf ( input double ) returns double language java\n" +
+        String fName = createFunction(KEYSPACE_PER_TEST, "double",
+                "CREATE FUNCTION %s( input double ) " +
+                "RETURNS double LANGUAGE java\n" +
                 "AS '\n" +
                 "  // parameter val is of type java.lang.Double\n" +
                 "  /* return type is of type java.lang.Double */\n" +
@@ -240,19 +278,20 @@ public class UFTest extends CQLTester
                 "';");
 
         // just check created function
-        assertRows(execute("SELECT key, val, "+KS_FOO+".corjf(val) FROM %s"),
+        assertRows(execute("SELECT key, val, " + fName + "(val) FROM %s"),
                    row(1, 1d, Math.sin(1d)),
                    row(2, 2d, Math.sin(2d)),
                    row(3, 3d, Math.sin(3d))
         );
 
-        execute("create or replace function "+KS_FOO+".corjf ( input double ) returns double language java\n" +
+        execute("CREATE OR REPLACE FUNCTION " + fName + "( input double ) " +
+                "RETURNS double LANGUAGE java\n" +
                 "AS '\n" +
                 "  return input;\n" +
                 "';");
 
         // check if replaced function returns correct result
-        assertRows(execute("SELECT key, val, "+KS_FOO+".corjf(val) FROM %s"),
+        assertRows(execute("SELECT key, val, " + fName + "(val) FROM %s"),
                    row(1, 1d, 1d),
                    row(2, 2d, 2d),
                    row(3, 3d, 3d)
@@ -266,18 +305,18 @@ public class UFTest extends CQLTester
 
         String functionBody = "\n  return Long.valueOf(1L);\n";
 
-        String cql = String.format("CREATE OR REPLACE FUNCTION %s.jfnpt() RETURNS bigint LANGUAGE JAVA\n" +
-                     "AS '%s';", KEYSPACE, functionBody);
-
-        execute(cql);
+        String fName = createFunction(KEYSPACE, "",
+                                      "CREATE OR REPLACE FUNCTION %s() RETURNS bigint LANGUAGE JAVA\n" +
+                                      "AS '" +functionBody + "';");
 
-        assertRows(execute("SELECT language, body FROM system.schema_functions WHERE keyspace_name=? AND function_name='jfnpt'", KEYSPACE),
+        assertRows(execute("SELECT language, body FROM system.schema_functions WHERE keyspace_name=? AND function_name=?",
+                           KEYSPACE, parseFunctionName(fName).name),
                    row("java", functionBody));
 
         execute("INSERT INTO %s (key, val) VALUES (?, ?)", 1, 1d);
         execute("INSERT INTO %s (key, val) VALUES (?, ?)", 2, 2d);
         execute("INSERT INTO %s (key, val) VALUES (?, ?)", 3, 3d);
-        assertRows(execute("SELECT key, val, "+KEYSPACE+".jfnpt() FROM %s"),
+        assertRows(execute("SELECT key, val, " + fName + "() FROM %s"),
                    row(1, 1d, 1L),
                    row(2, 2d, 1L),
                    row(3, 3d, 1L)
@@ -289,7 +328,7 @@ public class UFTest extends CQLTester
     {
         try
         {
-            execute("CREATE OR REPLACE FUNCTION "+KEYSPACE+".jfinv() RETURNS bigint LANGUAGE JAVA\n" +
+            execute("CREATE OR REPLACE FUNCTION " + KEYSPACE + ".jfinv() RETURNS bigint LANGUAGE JAVA\n" +
                     "AS '\n" +
                     "foobarbaz" +
                     "\n';");
@@ -303,7 +342,7 @@ public class UFTest extends CQLTester
 
         try
         {
-            execute("CREATE OR REPLACE FUNCTION "+KEYSPACE+".jfinv() RETURNS bigint LANGUAGE JAVA\n" +
+            execute("CREATE OR REPLACE FUNCTION " + KEYSPACE + ".jfinv() RETURNS bigint LANGUAGE JAVA\n" +
                     "AS '\n" +
                     "foobarbaz;" +
                     "\n';");
@@ -319,13 +358,8 @@ public class UFTest extends CQLTester
     @Test
     public void testJavaFunctionInvalidReturn() throws Throwable
     {
-        String functionBody = "\n" +
-                              "  return Long.valueOf(1L);\n";
-
-        String cql = "CREATE OR REPLACE FUNCTION jfir(val double) RETURNS double LANGUAGE JAVA\n" +
-                     "AS '" + functionBody + "';";
-
-        assertInvalid(cql);
+        assertInvalid("CREATE OR REPLACE FUNCTION jfir(val double) RETURNS double LANGUAGE JAVA\n" +
+                      "AS 'return Long.valueOf(1L);';");
     }
 
     @Test
@@ -333,18 +367,15 @@ public class UFTest extends CQLTester
     {
         createTable("CREATE TABLE %s (key int primary key, val bigint)");
 
-        String functionBody = "\n" +
-                              "  return val;\n";
-
-        String cql = "CREATE OR REPLACE FUNCTION "+KEYSPACE+".jft(val double) RETURNS double LANGUAGE JAVA\n" +
-                     "AS '" + functionBody + "';";
-
-        execute(cql);
+        String fName = createFunction(KEYSPACE, "double",
+                                      "CREATE OR REPLACE FUNCTION " + KEYSPACE + ".jft(val double)" +
+                                      "RETURNS double LANGUAGE JAVA " +
+                                      "AS 'return val;';");
 
         execute("INSERT INTO %s (key, val) VALUES (?, ?)", 1, 1L);
         execute("INSERT INTO %s (key, val) VALUES (?, ?)", 2, 2L);
         execute("INSERT INTO %s (key, val) VALUES (?, ?)", 3, 3L);
-        assertInvalid("SELECT key, val, "+KEYSPACE+".jft(val) FROM %s");
+        assertInvalid("SELECT key, val, " + fName + "(val) FROM %s");
     }
 
     @Test
@@ -361,18 +392,22 @@ public class UFTest extends CQLTester
                               "  double v = Math.sin( val.doubleValue() );\n" +
                               "  return Double.valueOf(v);\n";
 
-        String cql = String.format("CREATE OR REPLACE FUNCTION %s.jft(val double) RETURNS double LANGUAGE JAVA\n" +
-                     "AS '%s';", KEYSPACE, functionBody);
+        String fName = createFunction(KEYSPACE, "double",
+                                      "CREATE OR REPLACE FUNCTION %s(val double) " +
+                                      "RETURNS double " +
+                                      "LANGUAGE JAVA " +
+                                      "AS '" + functionBody + "';");
 
-        execute(cql);
+        FunctionName fNameName = parseFunctionName(fName);
 
-        assertRows(execute("SELECT language, body FROM system.schema_functions WHERE keyspace_name=? AND function_name='jft'", KEYSPACE),
+        assertRows(execute("SELECT language, body FROM system.schema_functions WHERE keyspace_name=? AND function_name=?",
+                           fNameName.keyspace, fNameName.name),
                    row("java", functionBody));
 
         execute("INSERT INTO %s (key, val) VALUES (?, ?)", 1, 1d);
         execute("INSERT INTO %s (key, val) VALUES (?, ?)", 2, 2d);
         execute("INSERT INTO %s (key, val) VALUES (?, ?)", 3, 3d);
-        assertRows(execute("SELECT key, val, " + KEYSPACE + ".jft(val) FROM %s"),
+        assertRows(execute("SELECT key, val, " + fName + "(val) FROM %s"),
                    row(1, 1d, Math.sin(1d)),
                    row(2, 2d, Math.sin(2d)),
                    row(3, 3d, Math.sin(3d))
@@ -384,25 +419,21 @@ public class UFTest extends CQLTester
     {
         createTable("CREATE TABLE %s (key int primary key, val double)");
 
-        execute("CREATE TABLE "+KS_FOO+".second_tab (key int primary key, val double)");
+        execute("CREATE TABLE " + KEYSPACE_PER_TEST + ".second_tab (key int primary key, val double)");
 
-        String functionBody = "\n" +
-                              "  return val;\n";
-
-        String cql = "CREATE OR REPLACE FUNCTION "+KS_FOO+".jfitks(val double) RETURNS double LANGUAGE JAVA\n" +
-                     "AS '" + functionBody + "';";
-
-        execute(cql);
+        String fName = createFunction(KEYSPACE_PER_TEST, "double",
+                                      "CREATE OR REPLACE FUNCTION %s(val double) RETURNS double LANGUAGE JAVA " +
+                                      "AS 'return val;';");
 
         execute("INSERT INTO %s (key, val) VALUES (?, ?)", 1, 1d);
         execute("INSERT INTO %s (key, val) VALUES (?, ?)", 2, 2d);
         execute("INSERT INTO %s (key, val) VALUES (?, ?)", 3, 3d);
-        assertInvalid("SELECT key, val, " + KEYSPACE + ".jfitks(val) FROM %s");
+        assertInvalid("SELECT key, val, " + parseFunctionName(fName).name + "(val) FROM %s");
 
-        execute("INSERT INTO "+KS_FOO+".second_tab (key, val) VALUES (?, ?)", 1, 1d);
-        execute("INSERT INTO "+KS_FOO+".second_tab (key, val) VALUES (?, ?)", 2, 2d);
-        execute("INSERT INTO "+KS_FOO+".second_tab (key, val) VALUES (?, ?)", 3, 3d);
-        assertRows(execute("SELECT key, val, jfitks(val) FROM " + KS_FOO + ".second_tab"),
+        execute("INSERT INTO " + KEYSPACE_PER_TEST + ".second_tab (key, val) VALUES (?, ?)", 1, 1d);
+        execute("INSERT INTO " + KEYSPACE_PER_TEST + ".second_tab (key, val) VALUES (?, ?)", 2, 2d);
+        execute("INSERT INTO " + KEYSPACE_PER_TEST + ".second_tab (key, val) VALUES (?, ?)", 3, 3d);
+        assertRows(execute("SELECT key, val, " + fName + "(val) FROM " + KEYSPACE_PER_TEST + ".second_tab"),
                    row(1, 1d, 1d),
                    row(2, 2d, 2d),
                    row(3, 3d, 3d)
@@ -412,26 +443,25 @@ public class UFTest extends CQLTester
     @Test
     public void testFunctionWithReservedName() throws Throwable
     {
-        execute("CREATE TABLE " + KS_FOO + ".second_tab (key int primary key, val double)");
-
-        String cql = "CREATE OR REPLACE FUNCTION "+KS_FOO+".now() RETURNS timestamp LANGUAGE JAVA\n" +
-                     "AS 'return null;';";
+        execute("CREATE TABLE " + KEYSPACE_PER_TEST + ".second_tab (key int primary key, val double)");
 
-        execute(cql);
+        String fName = createFunction(KEYSPACE_PER_TEST, "",
+                                      "CREATE OR REPLACE FUNCTION %s() RETURNS timestamp LANGUAGE JAVA " +
+                                      "AS 'return null;';");
 
-        execute("INSERT INTO "+KS_FOO+".second_tab (key, val) VALUES (?, ?)", 1, 1d);
-        execute("INSERT INTO "+KS_FOO+".second_tab (key, val) VALUES (?, ?)", 2, 2d);
-        execute("INSERT INTO "+KS_FOO+".second_tab (key, val) VALUES (?, ?)", 3, 3d);
+        execute("INSERT INTO " + KEYSPACE_PER_TEST + ".second_tab (key, val) VALUES (?, ?)", 1, 1d);
+        execute("INSERT INTO " + KEYSPACE_PER_TEST + ".second_tab (key, val) VALUES (?, ?)", 2, 2d);
+        execute("INSERT INTO " + KEYSPACE_PER_TEST + ".second_tab (key, val) VALUES (?, ?)", 3, 3d);
 
         // ensure that system now() is executed
-        UntypedResultSet rows = execute("SELECT key, val, now() FROM " + KS_FOO + ".second_tab");
+        UntypedResultSet rows = execute("SELECT key, val, now() FROM " + KEYSPACE_PER_TEST + ".second_tab");
         Assert.assertEquals(3, rows.size());
         UntypedResultSet.Row row = rows.iterator().next();
         Date ts = row.getTimestamp(row.getColumns().get(2).name.toString());
         Assert.assertNotNull(ts);
 
-        // ensure that KS_FOO's now() is executed
-        rows = execute("SELECT key, val, "+KS_FOO+".now() FROM " + KS_FOO + ".second_tab");
+        // ensure that KEYSPACE_PER_TEST's now() is executed
+        rows = execute("SELECT key, val, " + fName + "() FROM " + KEYSPACE_PER_TEST + ".second_tab");
         Assert.assertEquals(3, rows.size());
         row = rows.iterator().next();
         Assert.assertFalse(row.has(row.getColumns().get(2).name.toString()));
@@ -460,19 +490,17 @@ public class UFTest extends CQLTester
     @Test
     public void testFunctionNonExistingKeyspace() throws Throwable
     {
-        String cql = "CREATE OR REPLACE FUNCTION this_ks_does_not_exist.jnft(val double) RETURNS double LANGUAGE JAVA\n" +
-                     "AS 'return null;';";
-        assertInvalid(cql);
+        assertInvalid("CREATE OR REPLACE FUNCTION this_ks_does_not_exist.jnft(val double) RETURNS double LANGUAGE JAVA\n" +
+                      "AS 'return null;';");
     }
 
     @Test
     public void testFunctionAfterOnDropKeyspace() throws Throwable
     {
-        dropKsFoo();
+        dropPerTestKeyspace();
 
-        String cql = "CREATE OR REPLACE FUNCTION "+KS_FOO+".jnft(val double) RETURNS double LANGUAGE JAVA\n" +
-                     "AS 'return null;';";
-        assertInvalid(cql);
+        assertInvalid("CREATE OR REPLACE FUNCTION " + KEYSPACE_PER_TEST + ".jnft(val double) RETURNS double LANGUAGE JAVA\n" +
+                      "AS 'return null;';");
     }
 
     @Test
@@ -489,18 +517,20 @@ public class UFTest extends CQLTester
                               "  double v = Math.sin( val.doubleValue() );\n" +
                               "  return Double.valueOf(v);\n";
 
-        String cql = "CREATE OR REPLACE FUNCTION "+KS_FOO+".jnft(val double) RETURNS double LANGUAGE JAVA\n" +
-                     "AS '" + functionBody + "';";
+        String fName = createFunction(KEYSPACE_PER_TEST, "double",
+                                     "CREATE OR REPLACE FUNCTION %s(val double) RETURNS double LANGUAGE JAVA " +
+                                     "AS '" + functionBody + "';");
 
-        execute(cql);
+        FunctionName fNameName = parseFunctionName(fName);
 
-        assertRows(execute("SELECT language, body FROM system.schema_functions WHERE keyspace_name='"+KS_FOO+"' AND function_name='jnft'"),
+        assertRows(execute("SELECT language, body FROM system.schema_functions WHERE keyspace_name=? AND function_name=?",
+                           fNameName.keyspace, fNameName.name),
                    row("java", functionBody));
 
         execute("INSERT INTO %s (key, val) VALUES (?, ?)", 1, 1d);
         execute("INSERT INTO %s (key, val) VALUES (?, ?)", 2, 2d);
         execute("INSERT INTO %s (key, val) VALUES (?, ?)", 3, 3d);
-        assertRows(execute("SELECT key, val, "+KS_FOO+".jnft(val) FROM %s"),
+        assertRows(execute("SELECT key, val, " + fName + "(val) FROM %s"),
                    row(1, 1d, Math.sin(1d)),
                    row(2, 2d, Math.sin(2d)),
                    row(3, 3d, Math.sin(3d))
@@ -515,12 +545,14 @@ public class UFTest extends CQLTester
         String functionBody = "\n" +
                               "  throw new RuntimeException(\"oh no!\");\n";
 
-        String cql = "CREATE OR REPLACE FUNCTION "+KS_FOO+".jrtef(val double) RETURNS double LANGUAGE JAVA\n" +
-                     "AS '" + functionBody + "';";
+        String fName = createFunction(KEYSPACE_PER_TEST, "double",
+                                      "CREATE OR REPLACE FUNCTION %s(val double) RETURNS double LANGUAGE JAVA\n" +
+                                      "AS '" + functionBody + "';");
 
-        execute(cql);
+        FunctionName fNameName = parseFunctionName(fName);
 
-        assertRows(execute("SELECT language, body FROM system.schema_functions WHERE keyspace_name='"+KS_FOO+"' AND function_name='jrtef'"),
+        assertRows(execute("SELECT language, body FROM system.schema_functions WHERE keyspace_name=? AND function_name=?",
+                           fNameName.keyspace, fNameName.name),
                    row("java", functionBody));
 
         execute("INSERT INTO %s (key, val) VALUES (?, ?)", 1, 1d);
@@ -528,7 +560,7 @@ public class UFTest extends CQLTester
         execute("INSERT INTO %s (key, val) VALUES (?, ?)", 3, 3d);
 
         // function throws a RuntimeException which is wrapped by InvalidRequestException
-        assertInvalid("SELECT key, val, "+KS_FOO+".jrtef(val) FROM %s");
+        assertInvalid("SELECT key, val, " + fName + "(val) FROM %s");
     }
 
     @Test
@@ -541,17 +573,825 @@ public class UFTest extends CQLTester
                               "    return null;\n" +
                               "  }\n" +
                               "  double v = Math.sin( input.doubleValue() );\n" +
-                              "  return \"'\"+Double.valueOf(v)+'\\\'';\n";
+                              "  return \"'\" + Double.valueOf(v)+'\\\'';\n";
 
-        execute("create function "+KS_FOO+".pgfun1 ( input double ) returns text language java\n" +
-                "AS $$" + functionBody + "$$;");
-        execute("CREATE FUNCTION "+KS_FOO+".pgsin ( input double ) RETURNS double LANGUAGE java AS $$return Double.valueOf(Math.sin(input.doubleValue()));$$");
+        String fName = createFunction(KEYSPACE_PER_TEST, "double",
+                                      "CREATE FUNCTION %s( input double ) " +
+                                      "RETURNS text " +
+                                      "LANGUAGE java\n" +
+                                      "AS $$" + functionBody + "$$;");
 
-        assertRows(execute("SELECT language, body FROM system.schema_functions WHERE keyspace_name='"+KS_FOO+"' AND function_name='pgfun1'"),
+        FunctionName fNameName = parseFunctionName(fName);
+
+        assertRows(execute("SELECT language, body FROM system.schema_functions WHERE keyspace_name=? AND function_name=?",
+                           fNameName.keyspace, fNameName.name),
                    row("java", functionBody));
     }
 
     @Test
+    public void testJavaSimpleCollections() throws Throwable
+    {
+        createTable("CREATE TABLE %s (key int primary key, lst list<double>, st set<text>, mp map<int, boolean>)");
+
+        String fList = createFunction(KEYSPACE_PER_TEST, "list<double>",
+                                     "CREATE FUNCTION %s( lst list<double> ) " +
+                                     "RETURNS list<double> LANGUAGE java\n" +
+                                     "AS $$return lst;$$;");
+        String fSet = createFunction(KEYSPACE_PER_TEST, "set<text>",
+                                     "CREATE FUNCTION %s( st set<text> ) " +
+                                     "RETURNS set<text> LANGUAGE java\n" +
+                                     "AS $$return st;$$;");
+        String fMap = createFunction(KEYSPACE_PER_TEST, "map<int, boolean>",
+                                     "CREATE FUNCTION %s( mp map<int, boolean> ) " +
+                                     "RETURNS map<int, boolean> LANGUAGE java\n" +
+                                     "AS $$return mp;$$;");
+
+        List<Double> list = Arrays.asList(1d, 2d, 3d);
+        Set<String> set = new TreeSet<>(Arrays.asList("one", "three", "two"));
+        Map<Integer, Boolean> map = new TreeMap<>();
+        map.put(1, true);
+        map.put(2, false);
+        map.put(3, true);
+
+        execute("INSERT INTO %s (key, lst, st, mp) VALUES (1, ?, ?, ?)", list, set, map);
+
+        assertRows(execute("SELECT " + fList + "(lst), " + fSet + "(st), " + fMap + "(mp) FROM %s WHERE key = 1"),
+                   row(list, set, map));
+
+        // same test - but via native protocol
+        for (int version = Server.VERSION_2; version <= Server.CURRENT_VERSION; version++)
+            assertRowsNet(version,
+                          executeNet(version, "SELECT " + fList + "(lst), " + fSet + "(st), " + fMap + "(mp) FROM %s WHERE key = 1"),
+                          row(list, set, map));
+    }
+
+    @Test
+    public void testComplexNullValues() throws Throwable
+    {
+        String type = KEYSPACE + "." + createType("CREATE TYPE %s (txt text, i int)");
+
+        createTable("CREATE TABLE %s (key int primary key, lst list<double>, st set<text>, mp map<int, boolean>," +
+                    "tup frozen<tuple<double, text, int, boolean>>, udt frozen<" + type + ">)");
+
+        String fList = createFunction(KEYSPACE, "list<double>",
+                                      "CREATE FUNCTION %s( coll list<double> ) " +
+                                      "RETURNS list<double> " +
+                                      "LANGUAGE java\n" +
+                                      "AS $$return coll;$$;");
+        String fSet = createFunction(KEYSPACE, "set<text>",
+                                     "CREATE FUNCTION %s( coll set<text> ) " +
+                                     "RETURNS set<text> " +
+                                     "LANGUAGE java\n" +
+                                     "AS $$return coll;$$;");
+        String fMap = createFunction(KEYSPACE, "map<int, boolean>",
+                                     "CREATE FUNCTION %s( coll map<int, boolean> ) " +
+                                     "RETURNS map<int, boolean> " +
+                                     "LANGUAGE java\n" +
+                                     "AS $$return coll;$$;");
+        String fTup = createFunction(KEYSPACE, "frozen<tuple<double, text, int, boolean>>",
+                                     "CREATE FUNCTION %s( val frozen<tuple<double, text, int, boolean>> ) " +
+                                     "RETURNS frozen<tuple<double, text, int, boolean>> " +
+                                     "LANGUAGE java\n" +
+                                     "AS $$return val;$$;");
+        String fUdt = createFunction(KEYSPACE, "frozen<" + type+'>',
+                                     "CREATE FUNCTION %s( val frozen<" + type + "> ) " +
+                                     "RETURNS frozen<" + type + "> " +
+                                     "LANGUAGE java\n" +
+                                     "AS $$return val;$$;");
+        List<Double> list = Arrays.asList(1d, 2d, 3d);
+        Set<String> set = new TreeSet<>(Arrays.asList("one", "three", "two"));
+        Map<Integer, Boolean> map = new TreeMap<>();
+        map.put(1, true);
+        map.put(2, false);
+        map.put(3, true);
+        Object t = tuple(1d, "one", 42, false);
+
+        execute("INSERT INTO %s (key, lst, st, mp, tup, udt) VALUES (1, ?, ?, ?, ?, {txt: 'one', i:1})", list, set, map, t);
+        execute("INSERT INTO %s (key, lst, st, mp, tup, udt) VALUES (2, ?, ?, ?, ?, null)", null, null, null, null);
+
+        execute("SELECT " +
+                fList + "(lst), " +
+                fSet + "(st), " +
+                fMap + "(mp), " +
+                fTup + "(tup), " +
+                fUdt + "(udt) FROM %s WHERE key = 1");
+        UntypedResultSet.Row row = execute("SELECT " +
+                                           fList + "(lst) as l, " +
+                                           fSet + "(st) as s, " +
+                                           fMap + "(mp) as m, " +
+                                           fTup + "(tup) as t, " +
+                                           fUdt + "(udt) as u " +
+                                           "FROM %s WHERE key = 1").one();
+        Assert.assertNotNull(row.getBytes("l"));
+        Assert.assertNotNull(row.getBytes("s"));
+        Assert.assertNotNull(row.getBytes("m"));
+        Assert.assertNotNull(row.getBytes("t"));
+        Assert.assertNotNull(row.getBytes("u"));
+        row = execute("SELECT " +
+                      fList + "(lst) as l, " +
+                      fSet + "(st) as s, " +
+                      fMap + "(mp) as m, " +
+                      fTup + "(tup) as t, " +
+                      fUdt + "(udt) as u " +
+                      "FROM %s WHERE key = 2").one();
+        Assert.assertNull(row.getBytes("l"));
+        Assert.assertNull(row.getBytes("s"));
+        Assert.assertNull(row.getBytes("m"));
+        Assert.assertNull(row.getBytes("t"));
+        Assert.assertNull(row.getBytes("u"));
+
+        for (int version = Server.VERSION_2; version <= Server.CURRENT_VERSION; version++)
+        {
+            Row r = executeNet(version, "SELECT " +
+                                        fList + "(lst) as l, " +
+                                        fSet + "(st) as s, " +
+                                        fMap + "(mp) as m, " +
+                                        fTup + "(tup) as t, " +
+                                        fUdt + "(udt) as u " +
+                                        "FROM %s WHERE key = 1").one();
+            Assert.assertNotNull(r.getBytesUnsafe("l"));
+            Assert.assertNotNull(r.getBytesUnsafe("s"));
+            Assert.assertNotNull(r.getBytesUnsafe("m"));
+            Assert.assertNotNull(r.getBytesUnsafe("t"));
+            Assert.assertNotNull(r.getBytesUnsafe("u"));
+            r = executeNet(version, "SELECT " +
+                                    fList + "(lst) as l, " +
+                                    fSet + "(st) as s, " +
+                                    fMap + "(mp) as m, " +
+                                    fTup + "(tup) as t, " +
+                                    fUdt + "(udt) as u " +
+                                    "FROM %s WHERE key = 2").one();
+            Assert.assertNull(r.getBytesUnsafe("l"));
+            Assert.assertNull(r.getBytesUnsafe("s"));
+            Assert.assertNull(r.getBytesUnsafe("m"));
+            Assert.assertNull(r.getBytesUnsafe("t"));
+            Assert.assertNull(r.getBytesUnsafe("u"));
+        }
+    }
+
+    @Test
+    public void testJavaTupleType() throws Throwable
+    {
+        createTable("CREATE TABLE %s (key int primary key, tup frozen<tuple<double, text, int, boolean>>)");
+
+        String fName = createFunction(KEYSPACE, "frozen<tuple<double, text, int, boolean>>",
+                                     "CREATE FUNCTION %s( tup frozen<tuple<double, text, int, boolean>> ) " +
+                                     "RETURNS frozen<tuple<double, text, int, boolean>> " +
+                                     "LANGUAGE java\n" +
+                                     "AS $$return tup;$$;");
+
+        Object t = tuple(1d, "foo", 2, true);
+
+        execute("INSERT INTO %s (key, tup) VALUES (1, ?)", t);
+
+        assertRows(execute("SELECT tup FROM %s WHERE key = 1"),
+                   row(t));
+
+        assertRows(execute("SELECT " + fName + "(tup) FROM %s WHERE key = 1"),
+                   row(t));
+    }
+
+    @Test
+    public void testJavaTupleTypeCollection() throws Throwable
+    {
+        String tupleTypeDef = "frozen<tuple<double, list<double>, set<text>, map<int, boolean>>>";
+
+        createTable("CREATE TABLE %s (key int primary key, tup " + tupleTypeDef + ")");
+
+        String fTup0 = createFunction(KEYSPACE_PER_TEST, tupleTypeDef,
+                "CREATE FUNCTION %s( tup " + tupleTypeDef + " ) " +
+                "RETURNS " + tupleTypeDef + " " +
+                "LANGUAGE java\n" +
+                "AS $$return " +
+                "       tup;$$;");
+        String fTup1 = createFunction(KEYSPACE_PER_TEST, tupleTypeDef,
+                "CREATE FUNCTION %s( tup " + tupleTypeDef + " ) " +
+                "RETURNS double " +
+                "LANGUAGE java\n" +
+                "AS $$return " +
+                "       Double.valueOf(tup.getDouble(0));$$;");
+        String fTup2 = createFunction(KEYSPACE_PER_TEST, tupleTypeDef,
+                "CREATE FUNCTION %s( tup " + tupleTypeDef + " ) " +
+                "RETURNS list<double> " +
+                "LANGUAGE java\n" +
+                "AS $$return " +
+                "       tup.getList(1, Double.class);$$;");
+        String fTup3 = createFunction(KEYSPACE_PER_TEST, tupleTypeDef,
+                "CREATE FUNCTION %s( tup " + tupleTypeDef + " ) " +
+                "RETURNS set<text> " +
+                "LANGUAGE java\n" +
+                "AS $$return " +
+                "       tup.getSet(2, String.class);$$;");
+        String fTup4 = createFunction(KEYSPACE_PER_TEST, tupleTypeDef,
+                "CREATE FUNCTION %s( tup " + tupleTypeDef + " ) " +
+                "RETURNS map<int, boolean> " +
+                "LANGUAGE java\n" +
+                "AS $$return " +
+                "       tup.getMap(3, Integer.class, Boolean.class);$$;");
+
+        List<Double> list = Arrays.asList(1d, 2d, 3d);
+        Set<String> set = new TreeSet<>(Arrays.asList("one", "three", "two"));
+        Map<Integer, Boolean> map = new TreeMap<>();
+        map.put(1, true);
+        map.put(2, false);
+        map.put(3, true);
+
+        Object t = tuple(1d, list, set, map);
+
+        execute("INSERT INTO %s (key, tup) VALUES (1, ?)", t);
+
+        assertRows(execute("SELECT " + fTup0 + "(tup) FROM %s WHERE key = 1"),
+                   row(t));
+        assertRows(execute("SELECT " + fTup1 + "(tup) FROM %s WHERE key = 1"),
+                   row(1d));
+        assertRows(execute("SELECT " + fTup2 + "(tup) FROM %s WHERE key = 1"),
+                   row(list));
+        assertRows(execute("SELECT " + fTup3 + "(tup) FROM %s WHERE key = 1"),
+                   row(set));
+        assertRows(execute("SELECT " + fTup4 + "(tup) FROM %s WHERE key = 1"),
+                   row(map));
+
+        TupleType tType = TupleType.of(DataType.cdouble(),
+                                       DataType.list(DataType.cdouble()),
+                                       DataType.set(DataType.text()),
+                                       DataType.map(DataType.cint(), DataType.cboolean()));
+        TupleValue tup = tType.newValue(1d, list, set, map);
+        for (int version = Server.VERSION_2; version <= Server.CURRENT_VERSION; version++)
+        {
+            assertRowsNet(version,
+                          executeNet(version, "SELECT " + fTup0 + "(tup) FROM %s WHERE key = 1"),
+                          row(tup));
+            assertRowsNet(version,
+                          executeNet(version, "SELECT " + fTup1 + "(tup) FROM %s WHERE key = 1"),
+                          row(1d));
+            assertRowsNet(version,
+                          executeNet(version, "SELECT " + fTup2 + "(tup) FROM %s WHERE key = 1"),
+                          row(list));
+            assertRowsNet(version,
+                          executeNet(version, "SELECT " + fTup3 + "(tup) FROM %s WHERE key = 1"),
+                          row(set));
+            assertRowsNet(version,
+                          executeNet(version, "SELECT " + fTup4 + "(tup) FROM %s WHERE key = 1"),
+                          row(map));
+        }
+    }
+
+    @Test
+    public void testJavaUserTypeWithUse() throws Throwable
+    {
+        String type = createType("CREATE TYPE %s (txt text, i int)");
+        createTable("CREATE TABLE %s (key int primary key, udt frozen<" + KEYSPACE + "." + type + ">)");
+        execute("INSERT INTO %s (key, udt) VALUES (1, {txt: 'one', i:1})");
+
+        for (int version = Server.VERSION_2; version <= Server.CURRENT_VERSION; version++)
+        {
+            executeNet(version, "USE " + KEYSPACE);
+
+            executeNet(version,
+                       "CREATE FUNCTION f_use1( udt frozen<" + type + "> ) " +
+                       "RETURNS frozen<" + type + "> " +
+                       "LANGUAGE java " +
+                       "AS $$return " +
+                       "     udt;$$;");
+            try
+            {
+                List<Row> rowsNet = executeNet(version, "SELECT f_use1(udt) FROM %s WHERE key = 1").all();
+                Assert.assertEquals(1, rowsNet.size());
+                UDTValue udtVal = rowsNet.get(0).getUDTValue(0);
+                Assert.assertEquals("one", udtVal.getString("txt"));
+                Assert.assertEquals(1, udtVal.getInt("i"));
+            }
+            finally
+            {
+                executeNet(version, "DROP FUNCTION f_use1");
+            }
+        }
+    }
+
+    @Test
+    public void testJavaUserTypeOtherKeyspace() throws Throwable
+    {
+        String type = KEYSPACE + "." + createType("CREATE TYPE %s (txt text, i int)");
+
+        String fName = createFunction(KEYSPACE_PER_TEST, "frozen<" + type + ">",
+                                      "CREATE FUNCTION %s( udt frozen<" + type + "> ) " +
+                                      "RETURNS frozen<" + type + "> " +
+                                      "LANGUAGE java " +
+                                      "AS $$return " +
+                                      "     udt;$$;");
+
+        execute("DROP FUNCTION " + fName);
+    }
+
+    @Test
+    public void testJavaUserType() throws Throwable
+    {
+        String type = KEYSPACE + "." + createType("CREATE TYPE %s (txt text, i int)");
+
+        createTable("CREATE TABLE %s (key int primary key, udt frozen<" + type + ">)");
+
+        String fUdt0 = createFunction(KEYSPACE, "frozen<" + type + ">",
+                                      "CREATE FUNCTION %s( udt frozen<" + type + "> ) " +
+                                      "RETURNS frozen<" + type + "> " +
+                                      "LANGUAGE java " +
+                                      "AS $$return " +
+                                      "     udt;$$;");
+        String fUdt1 = createFunction(KEYSPACE, "frozen<" + type + ">",
+                                      "CREATE FUNCTION %s( udt frozen<" + type + "> ) " +
+                                      "RETURNS text " +
+                                      "LANGUAGE java " +
+                                      "AS $$return " +
+                                      "     udt.getString(\"txt\");$$;");
+        String fUdt2 = createFunction(KEYSPACE, "frozen<" + type + ">",
+                                      "CREATE FUNCTION %s( udt frozen<" + type + "> ) " +
+                                      "RETURNS int " +
+                                      "LANGUAGE java " +
+                                      "AS $$return " +
+                                      "     Integer.valueOf(udt.getInt(\"i\"));$$;");
+
+        execute("INSERT INTO %s (key, udt) VALUES (1, {txt: 'one', i:1})");
+
+        UntypedResultSet rows = execute("SELECT " + fUdt0 + "(udt) FROM %s WHERE key = 1");
+        Assert.assertEquals(1, rows.size());
+        assertRows(execute("SELECT " + fUdt1 + "(udt) FROM %s WHERE key = 1"),
+                   row("one"));
+        assertRows(execute("SELECT " + fUdt2 + "(udt) FROM %s WHERE key = 1"),
+                   row(1));
+
+        for (int version = Server.VERSION_2; version <= Server.CURRENT_VERSION; version++)
+        {
+            List<Row> rowsNet = executeNet(version, "SELECT " + fUdt0 + "(udt) FROM %s WHERE key = 1").all();
+            Assert.assertEquals(1, rowsNet.size());
+            UDTValue udtVal = rowsNet.get(0).getUDTValue(0);
+            Assert.assertEquals("one", udtVal.getString("txt"));
+            Assert.assertEquals(1, udtVal.getInt("i"));
+            assertRowsNet(version,
+                          executeNet(version, "SELECT " + fUdt1 + "(udt) FROM %s WHERE key = 1"),
+                          row("one"));
+            assertRowsNet(version,
+                          executeNet(version, "SELECT " + fUdt2 + "(udt) FROM %s WHERE key = 1"),
+                          row(1));
+        }
+    }
+
+    @Test
+    public void testUserTypeDrop() throws Throwable
+    {
+        String type = KEYSPACE + "." + createType("CREATE TYPE %s (txt text, i int)");
+
+        createTable("CREATE TABLE %s (key int primary key, udt frozen<" + type + ">)");
+
+        String fName = createFunction(KEYSPACE, "frozen<" + type + ">",
+                                      "CREATE FUNCTION %s( udt frozen<" + type + "> ) " +
+                                      "RETURNS int " +
+                                      "LANGUAGE java " +
+                                      "AS $$return " +
+                                      "     Integer.valueOf(udt.getInt(\"i\"));$$;");
+
+        FunctionName fNameName = parseFunctionName(fName);
+
+        Assert.assertEquals(1, Functions.find(fNameName).size());
+
+        ResultMessage.Prepared prepared = QueryProcessor.prepare(String.format("SELECT key, %s(udt) FROM %s.%s", fName, KEYSPACE, currentTable()),
+                                                                 ClientState.forInternalCalls(), false);
+        Assert.assertNotNull(QueryProcessor.instance.getPrepared(prepared.statementId));
+
+        // UT still referenced by table
+        assertInvalid("DROP TYPE " + type);
+
+        execute("DROP TABLE %s");
+
+        // UT still referenced by UDF
+        assertInvalid("DROP TYPE " + type);
+
+        Assert.assertNull(QueryProcessor.instance.getPrepared(prepared.statementId));
+
+        // function stays
+        Assert.assertEquals(1, Functions.find(fNameName).size());
+    }
+
+    @Test
+    public void testJavaUserTypeRenameField() throws Throwable
+    {
+        String type = KEYSPACE + "." + createType("CREATE TYPE %s (txt text, i int)");
+
+        createTable("CREATE TABLE %s (key int primary key, udt frozen<" + type + ">)");
+
+        String fName = createFunction(KEYSPACE, "frozen<" + type + ">",
+                                      "CREATE FUNCTION %s( udt frozen<" + type + "> ) " +
+                                      "RETURNS text LANGUAGE java\n" +
+                                      "AS $$return udt.getString(\"txt\");$$;");
+
+        execute("INSERT INTO %s (key, udt) VALUES (1, {txt: 'one', i:1})");
+
+        assertRows(execute("SELECT " + fName + "(udt) FROM %s WHERE key = 1"),
+                   row("one"));
+
+        execute("ALTER TYPE " + type + " RENAME txt TO str");
+
+        assertInvalid("SELECT " + fName + "(udt) FROM %s WHERE key = 1");
+
+        execute("ALTER TYPE " + type + " RENAME str TO txt");
+
+        assertRows(execute("SELECT " + fName + "(udt) FROM %s WHERE key = 1"),
+                   row("one"));
+    }
+
+    @Test
+    public void testJavaUserTypeAddFieldWithReplace() throws Throwable
+    {
+        String type = KEYSPACE + "." + createType("CREATE TYPE %s (txt text, i int)");
+
+        createTable("CREATE TABLE %s (key int primary key, udt frozen<" + type + ">)");
+
+        String fName1replace = createFunction(KEYSPACE, "frozen<" + type + ">",
+                                              "CREATE FUNCTION %s( udt frozen<" + type + "> ) " +
+                                              "RETURNS text LANGUAGE java\n" +
+                                              "AS $$return udt.getString(\"txt\");$$;");
+        String fName2replace = createFunction(KEYSPACE, "frozen<" + type + ">",
+                                              "CREATE FUNCTION %s( udt frozen<" + type + "> ) " +
+                                              "RETURNS int LANGUAGE java\n" +
+                                              "AS $$return Integer.valueOf(udt.getInt(\"i\"));$$;");
+        String fName3replace = createFunction(KEYSPACE, "frozen<" + type + ">",
+                                              "CREATE FUNCTION %s( udt frozen<" + type + "> ) " +
+                                              "RETURNS double LANGUAGE java\n" +
+                                              "AS $$return Double.valueOf(udt.getDouble(\"added\"));$$;");
+        String fName4replace = createFunction(KEYSPACE, "frozen<" + type + ">",
+                                              "CREATE FUNCTION %s( udt frozen<" + type + "> ) " +
+                                              "RETURNS frozen<" + type + "> LANGUAGE java\n" +
+                                              "AS $$return udt;$$;");
+
+        String fName1noReplace = createFunction(KEYSPACE, "frozen<" + type + ">",
+                                              "CREATE FUNCTION %s( udt frozen<" + type + "> ) " +
+                                              "RETURNS text LANGUAGE java\n" +
+                                              "AS $$return udt.getString(\"txt\");$$;");
+        String fName2noReplace = createFunction(KEYSPACE, "frozen<" + type + ">",
+                                              "CREATE FUNCTION %s( udt frozen<" + type + "> ) " +
+                                              "RETURNS int LANGUAGE java\n" +
+                                              "AS $$return Integer.valueOf(udt.getInt(\"i\"));$$;");
+        String fName3noReplace = createFunction(KEYSPACE, "frozen<" + type + ">",
+                                                "CREATE FUNCTION %s( udt frozen<" + type + "> ) " +
+                                                "RETURNS double LANGUAGE java\n" +
+                                                "AS $$return Double.valueOf(udt.getDouble(\"added\"));$$;");
+        String fName4noReplace = createFunction(KEYSPACE, "frozen<" + type + ">",
+                                                "CREATE FUNCTION %s( udt frozen<" + type + "> ) " +
+                                                "RETURNS frozen<" + type + "> LANGUAGE java\n" +
+                                                "AS $$return udt;$$;");
+
+        execute("INSERT INTO %s (key, udt) VALUES (1, {txt: 'one', i:1})");
+
+        assertRows(execute("SELECT " + fName1replace + "(udt) FROM %s WHERE key = 1"),
+                   row("one"));
+        assertRows(execute("SELECT " + fName2replace + "(udt) FROM %s WHERE key = 1"),
+                   row(1));
+
+        // add field
+
+        execute("ALTER TYPE " + type + " ADD added double");
+
+        execute("INSERT INTO %s (key, udt) VALUES (2, {txt: 'two', i:2, added: 2})");
+
+        // note: type references of functions remain at the state _before_ the type mutation
+        // means we need to recreate the functions
+
+        execute(String.format("CREATE OR REPLACE FUNCTION %s( udt frozen<%s> ) " +
+                              "RETURNS text LANGUAGE java\n" +
+                              "AS $$return " +
+                              "     udt.getString(\"txt\");$$;",
+                              fName1replace, type));
+        Assert.assertEquals(1, Functions.find(parseFunctionName(fName1replace)).size());
+        execute(String.format("CREATE OR REPLACE FUNCTION %s( udt frozen<%s> ) " +
+                              "RETURNS int LANGUAGE java\n" +
+                              "AS $$return " +
+                              "     Integer.valueOf(udt.getInt(\"i\"));$$;",
+                              fName2replace, type));
+        Assert.assertEquals(1, Functions.find(parseFunctionName(fName2replace)).size());
+        execute(String.format("CREATE OR REPLACE FUNCTION %s( udt frozen<%s> ) " +
+                              "RETURNS double LANGUAGE java\n" +
+                              "AS $$return " +
+                              "     Double.valueOf(udt.getDouble(\"added\"));$$;",
+                              fName3replace, type));
+        Assert.assertEquals(1, Functions.find(parseFunctionName(fName3replace)).size());
+        execute(String.format("CREATE OR REPLACE FUNCTION %s( udt frozen<%s> ) " +
+                              "RETURNS frozen<%s> LANGUAGE java\n" +
+                              "AS $$return " +
+                              "     udt;$$;",
+                              fName4replace, type, type));
+        Assert.assertEquals(1, Functions.find(parseFunctionName(fName4replace)).size());
+
+        assertRows(execute("SELECT " + fName1replace + "(udt) FROM %s WHERE key = 2"),
+                   row("two"));
+        assertRows(execute("SELECT " + fName2replace + "(udt) FROM %s WHERE key = 2"),
+                   row(2));
+        assertRows(execute("SELECT " + fName3replace + "(udt) FROM %s WHERE key = 2"),
+                   row(2d));
+        assertRows(execute("SELECT " + fName3replace + "(udt) FROM %s WHERE key = 1"),
+                   row(0d));
+
+        // un-replaced functions will work since the user type has changed
+        // and the UDF has exchanged the user type reference
+
+        assertRows(execute("SELECT " + fName1noReplace + "(udt) FROM %s WHERE key = 2"),
+                   row("two"));
+        assertRows(execute("SELECT " + fName2noReplace + "(udt) FROM %s WHERE key = 2"),
+                   row(2));
+        assertRows(execute("SELECT " + fName3noReplace + "(udt) FROM %s WHERE key = 2"),
+                   row(2d));
+        assertRows(execute("SELECT " + fName3noReplace + "(udt) FROM %s WHERE key = 1"),
+                   row(0d));
+
+        execute("DROP FUNCTION " + fName1replace);
+        execute("DROP FUNCTION " + fName2replace);
+        execute("DROP FUNCTION " + fName3replace);
+        execute("DROP FUNCTION " + fName4replace);
+        execute("DROP FUNCTION " + fName1noReplace);
+        execute("DROP FUNCTION " + fName2noReplace);
+        execute("DROP FUNCTION " + fName3noReplace);
+        execute("DROP FUNCTION " + fName4noReplace);
+    }
+
+    @Test
+    public void testJavaUTCollections() throws Throwable
+    {
+        String type = KEYSPACE + "." + createType("CREATE TYPE %s (txt text, i int)");
+
+        createTable(String.format("CREATE TABLE %%s " +
+                                  "(key int primary key, lst list<frozen<%s>>, st set<frozen<%s>>, mp map<int, frozen<%s>>)",
+                                  type, type, type));
+
+        String fName1 = createFunction(KEYSPACE, "list<frozen<" + type + ">>",
+                              "CREATE FUNCTION %s( lst list<frozen<" + type + ">> ) " +
+                              "RETURNS text LANGUAGE java\n" +
+                              "AS $$" +
+                              "     com.datastax.driver.core.UDTValue udtVal = (com.datastax.driver.core.UDTValue)lst.get(1);" +
+                              "     return udtVal.getString(\"txt\");$$;");
+        String fName2 = createFunction(KEYSPACE, "set<frozen<" + type + ">>",
+                              "CREATE FUNCTION %s( st set<frozen<" + type + ">> ) " +
+                              "RETURNS text LANGUAGE java\n" +
+                              "AS $$" +
+                              "     com.datastax.driver.core.UDTValue udtVal = (com.datastax.driver.core.UDTValue)st.iterator().next();" +
+                              "     return udtVal.getString(\"txt\");$$;");
+        String fName3 = createFunction(KEYSPACE, "map<int, frozen<" + type + ">>",
+                              "CREATE FUNCTION %s( mp map<int, frozen<" + type + ">> ) " +
+                              "RETURNS text LANGUAGE java\n" +
+                              "AS $$" +
+                              "     com.datastax.driver.core.UDTValue udtVal = (com.datastax.driver.core.UDTValue)mp.get(Integer.valueOf(3));" +
+                              "     return udtVal.getString(\"txt\");$$;");
+
+        execute("INSERT INTO %s (key, lst, st, mp) values (1, " +
+                "[ {txt: 'one', i:1}, {txt: 'three', i:1}, {txt: 'one', i:1} ] , " +
+                "{ {txt: 'one', i:1}, {txt: 'three', i:3}, {txt: 'two', i:2} }, " +
+                "{ 1: {txt: 'one', i:1}, 2: {txt: 'one', i:3}, 3: {txt: 'two', i:2} })");
+
+        assertRows(execute("SELECT " + fName1 + "(lst), " + fName2 + "(st), " + fName3 + "(mp) FROM %s WHERE key = 1"),
+                   row("three", "one", "two"));
+
+        for (int version = Server.VERSION_2; version <= Server.CURRENT_VERSION; version++)
+            assertRowsNet(version,
+                          executeNet(version, "SELECT " + fName1 + "(lst), " + fName2 + "(st), " + fName3 + "(mp) FROM %s WHERE key = 1"),
+                          row("three", "one", "two"));
+    }
+
+    @Test
+    public void testJavascriptSimpleCollections() throws Throwable
+    {
+        createTable("CREATE TABLE %s (key int primary key, lst list<double>, st set<text>, mp map<int, boolean>)");
+
+        String fName1 = createFunction(KEYSPACE_PER_TEST, "list<double>",
+                "CREATE FUNCTION %s( lst list<double> ) " +
+                "RETURNS list<double> " +
+                "LANGUAGE javascript\n" +
+                "AS 'lst;';");
+        String fName2 = createFunction(KEYSPACE_PER_TEST, "set<text>",
+                "CREATE FUNCTION %s( st set<text> ) " +
+                "RETURNS set<text> " +
+                "LANGUAGE javascript\n" +
+                "AS 'st;';");
+        String fName3 = createFunction(KEYSPACE_PER_TEST, "map<int, boolean>",
+                "CREATE FUNCTION %s( mp map<int, boolean> ) " +
+                "RETURNS map<int, boolean> " +
+                "LANGUAGE javascript\n" +
+                "AS 'mp;';");
+
+        List<Double> list = Arrays.asList(1d, 2d, 3d);
+        Set<String> set = new TreeSet<>(Arrays.asList("one", "three", "two"));
+        Map<Integer, Boolean> map = new TreeMap<>();
+        map.put(1, true);
+        map.put(2, false);
+        map.put(3, true);
+
+        execute("INSERT INTO %s (key, lst, st, mp) VALUES (1, ?, ?, ?)", list, set, map);
+
+        assertRows(execute("SELECT lst, st, mp FROM %s WHERE key = 1"),
+                   row(list, set, map));
+
+        assertRows(execute("SELECT " + fName1 + "(lst), " + fName2 + "(st), " + fName3 + "(mp) FROM %s WHERE key = 1"),
+                   row(list, set, map));
+
+        for (int version = Server.VERSION_2; version <= Server.CURRENT_VERSION; version++)
+            assertRowsNet(version,
+                          executeNet(version, "SELECT " + fName1 + "(lst), " + fName2 + "(st), " + fName3 + "(mp) FROM %s WHERE key = 1"),
+                          row(list, set, map));
+    }
+
+    @Test
+    public void testJavascriptTupleType() throws Throwable
+    {
+        createTable("CREATE TABLE %s (key int primary key, tup frozen<tuple<double, text, int, boolean>>)");
+
+        String fName = createFunction(KEYSPACE_PER_TEST, "frozen<tuple<double, text, int, boolean>>",
+                "CREATE FUNCTION %s( tup frozen<tuple<double, text, int, boolean>> ) " +
+                "RETURNS frozen<tuple<double, text, int, boolean>> " +
+                "LANGUAGE javascript\n" +
+                "AS $$tup;$$;");
+
+        Object t = tuple(1d, "foo", 2, true);
+
+        execute("INSERT INTO %s (key, tup) VALUES (1, ?)", t);
+
+        assertRows(execute("SELECT tup FROM %s WHERE key = 1"),
+                   row(t));
+
+        assertRows(execute("SELECT " + fName + "(tup) FROM %s WHERE key = 1"),
+                   row(t));
+    }
+
+    @Test
+    public void testJavascriptTupleTypeCollection() throws Throwable
+    {
+        String tupleTypeDef = "frozen<tuple<double, list<double>, set<text>, map<int, boolean>>>";
+        createTable("CREATE TABLE %s (key int primary key, tup " + tupleTypeDef + ")");
+
+        String fTup1 = createFunction(KEYSPACE_PER_TEST, tupleTypeDef,
+                "CREATE FUNCTION %s( tup " + tupleTypeDef + " ) " +
+                "RETURNS frozen<tuple<double, list<double>, set<text>, map<int, boolean>>> " +
+                "LANGUAGE javascript\n" +
+                "AS $$" +
+                "       tup;$$;");
+        String fTup2 = createFunction(KEYSPACE_PER_TEST, tupleTypeDef,
+                "CREATE FUNCTION %s( tup " + tupleTypeDef + " ) " +
+                "RETURNS double LANGUAGE " +
+                "javascript\n" +
+                "AS $$" +
+                "       tup.getDouble(0);$$;");
+        String fTup3 = createFunction(KEYSPACE_PER_TEST, tupleTypeDef,
+                "CREATE FUNCTION %s( tup " + tupleTypeDef + " ) " +
+                "RETURNS list<double> " +
+                "LANGUAGE javascript\n" +
+                "AS $$" +
+                "       tup.getList(1, java.lang.Class.forName(\"java.lang.Double\"));$$;");
+        String fTup4 = createFunction(KEYSPACE_PER_TEST, tupleTypeDef,
+                "CREATE FUNCTION %s( tup " + tupleTypeDef + " ) " +
+                "RETURNS set<text> " +
+                "LANGUAGE javascript\n" +
+                "AS $$" +
+                "       tup.getSet(2, java.lang.Class.forName(\"java.lang.String\"));$$;");
+        String fTup5 = createFunction(KEYSPACE_PER_TEST, tupleTypeDef,
+                "CREATE FUNCTION %s( tup " + tupleTypeDef + " ) " +
+                "RETURNS map<int, boolean> " +
+                "LANGUAGE javascript\n" +
+                "AS $$" +
+                "       tup.getMap(3, java.lang.Class.forName(\"java.lang.Integer\"), java.lang.Class.forName(\"java.lang.Boolean\"));$$;");
+
+        List<Double> list = Arrays.asList(1d, 2d, 3d);
+        Set<String> set = new TreeSet<>(Arrays.asList("one", "three", "two"));
+        Map<Integer, Boolean> map = new TreeMap<>();
+        map.put(1, true);
+        map.put(2, false);
+        map.put(3, true);
+
+        Object t = tuple(1d, list, set, map);
+
+        execute("INSERT INTO %s (key, tup) VALUES (1, ?)", t);
+
+        assertRows(execute("SELECT " + fTup1 + "(tup) FROM %s WHERE key = 1"),
+                   row(t));
+        assertRows(execute("SELECT " + fTup2 + "(tup) FROM %s WHERE key = 1"),
+                   row(1d));
+        assertRows(execute("SELECT " + fTup3 + "(tup) FROM %s WHERE key = 1"),
+                   row(list));
+        assertRows(execute("SELECT " + fTup4 + "(tup) FROM %s WHERE key = 1"),
+                   row(set));
+        assertRows(execute("SELECT " + fTup5 + "(tup) FROM %s WHERE key = 1"),
+                   row(map));
+
+        // same test - but via native protocol
+        TupleType tType = TupleType.of(DataType.cdouble(),
+                                       DataType.list(DataType.cdouble()),
+                                       DataType.set(DataType.text()),
+                                       DataType.map(DataType.cint(), DataType.cboolean()));
+        TupleValue tup = tType.newValue(1d, list, set, map);
+        for (int version = Server.VERSION_2; version <= Server.CURRENT_VERSION; version++)
+        {
+            assertRowsNet(version,
+                          executeNet(version, "SELECT " + fTup1 + "(tup) FROM %s WHERE key = 1"),
+                          row(tup));
+            assertRowsNet(version,
+                          executeNet(version, "SELECT " + fTup2 + "(tup) FROM %s WHERE key = 1"),
+                          row(1d));
+            assertRowsNet(version,
+                          executeNet(version, "SELECT " + fTup3 + "(tup) FROM %s WHERE key = 1"),
+                          row(list));
+            assertRowsNet(version,
+                          executeNet(version, "SELECT " + fTup4 + "(tup) FROM %s WHERE key = 1"),
+                          row(set));
+            assertRowsNet(version,
+                          executeNet(version, "SELECT " + fTup5 + "(tup) FROM %s WHERE key = 1"),
+                          row(map));
+        }
+    }
+
+    @Test
+    public void testJavascriptUserType() throws Throwable
+    {
+        String type = createType("CREATE TYPE %s (txt text, i int)");
+
+        createTable("CREATE TABLE %s (key int primary key, udt frozen<" + type + ">)");
+
+        String fUdt1 = createFunction(KEYSPACE, "frozen<" + type + ">",
+                              "CREATE FUNCTION %s( udt frozen<" + type + "> ) " +
+                              "RETURNS frozen<" + type + "> " +
+                              "LANGUAGE javascript\n" +
+                              "AS $$" +
+                              "     udt;$$;");
+        String fUdt2 = createFunction(KEYSPACE, "frozen<" + type + ">",
+                              "CREATE FUNCTION %s( udt frozen<" + type + "> ) " +
+                              "RETURNS text " +
+                              "LANGUAGE javascript\n" +
+                              "AS $$" +
+                              "     udt.getString(\"txt\");$$;");
+        String fUdt3 = createFunction(KEYSPACE, "frozen<" + type + ">",
+                              "CREATE FUNCTION %s( udt frozen<" + type + "> ) " +
+                              "RETURNS int " +
+                              "LANGUAGE javascript\n" +
+                              "AS $$" +
+                              "     udt.getInt(\"i\");$$;");
+
+        execute("INSERT INTO %s (key, udt) VALUES (1, {txt: 'one', i:1})");
+
+        UntypedResultSet rows = execute("SELECT " + fUdt1 + "(udt) FROM %s WHERE key = 1");
+        Assert.assertEquals(1, rows.size());
+        assertRows(execute("SELECT " + fUdt2 + "(udt) FROM %s WHERE key = 1"),
+                   row("one"));
+        assertRows(execute("SELECT " + fUdt3 + "(udt) FROM %s WHERE key = 1"),
+                   row(1));
+    }
+
+    @Test
+    public void testJavascriptUTCollections() throws Throwable
+    {
+        String type = createType("CREATE TYPE %s (txt text, i int)");
+
+        createTable(String.format("CREATE TABLE %%s " +
+                                  "(key int primary key, lst list<frozen<%s>>, st set<frozen<%s>>, mp map<int, frozen<%s>>)",
+                                  type, type, type));
+
+        String fName = createFunction(KEYSPACE, "list<frozen<" + type + ">>",
+                       "CREATE FUNCTION %s( lst list<frozen<" + type + ">> ) " +
+                       "RETURNS text " +
+                       "LANGUAGE javascript\n" +
+                       "AS $$" +
+                       "        lst.get(1).getString(\"txt\");$$;");
+        createFunctionOverload(fName, "set<frozen<" + type + ">>",
+                               "CREATE FUNCTION %s( st set<frozen<" + type + ">> ) " +
+                               "RETURNS text " +
+                               "LANGUAGE javascript\n" +
+                               "AS $$" +
+                               "        st.iterator().next().getString(\"txt\");$$;");
+        createFunctionOverload(fName, "map<int, frozen<" + type + ">>",
+                       "CREATE FUNCTION %s( mp map<int, frozen<" + type + ">> ) " +
+                       "RETURNS text " +
+                       "LANGUAGE javascript\n" +
+                       "AS $$" +
+                       "        mp.get(java.lang.Integer.valueOf(3)).getString(\"txt\");$$;");
+
+        execute("INSERT INTO %s (key, lst, st, mp) values (1, " +
+                // list<frozen<UDT>>
+                "[ {txt: 'one', i:1}, {txt: 'three', i:1}, {txt: 'one', i:1} ] , " +
+                // set<frozen<UDT>>
+                "{ {txt: 'one', i:1}, {txt: 'three', i:3}, {txt: 'two', i:2} }, " +
+                // map<int, frozen<UDT>>
+                "{ 1: {txt: 'one', i:1}, 2: {txt: 'one', i:3}, 3: {txt: 'two', i:2} })");
+
+        assertRows(execute("SELECT " + fName + "(lst) FROM %s WHERE key = 1"),
+                   row("three"));
+        assertRows(execute("SELECT " + fName + "(st) FROM %s WHERE key = 1"),
+                   row("one"));
+        assertRows(execute("SELECT " + fName + "(mp) FROM %s WHERE key = 1"),
+                   row("two"));
+
+        String cqlSelect = "SELECT " + fName + "(lst), " + fName + "(st), " + fName + "(mp) FROM %s WHERE key = 1";
+        assertRows(execute(cqlSelect),
+                   row("three", "one", "two"));
+
+        // same test - but via native protocol
+        for (int version = Server.VERSION_2; version <= Server.CURRENT_VERSION; version++)
+            assertRowsNet(version,
+                          executeNet(version, cqlSelect),
+                          row("three", "one", "two"));
+    }
+
+    @Test
     public void testJavascriptFunction() throws Throwable
     {
         createTable("CREATE TABLE %s (key int primary key, val double)");
@@ -559,18 +1399,21 @@ public class UFTest extends CQLTester
         String functionBody = "\n" +
                               "  Math.sin(val);\n";
 
-        String cql = "CREATE OR REPLACE FUNCTION "+KEYSPACE+".jsft(val double) RETURNS double LANGUAGE javascript\n" +
-                     "AS '" + functionBody + "';";
+        String fName = createFunction(KEYSPACE, "double",
+                                      "CREATE OR REPLACE FUNCTION %s(val double) " +
+                                      "RETURNS double LANGUAGE javascript\n" +
+                                      "AS '" + functionBody + "';");
 
-        execute(cql);
+        FunctionName fNameName = parseFunctionName(fName);
 
-        assertRows(execute("SELECT language, body FROM system.schema_functions WHERE keyspace_name='"+KEYSPACE+"' AND function_name='jsft'"),
+        assertRows(execute("SELECT language, body FROM system.schema_functions WHERE keyspace_name=? AND function_name=?",
+                           fNameName.keyspace, fNameName.name),
                    row("javascript", functionBody));
 
         execute("INSERT INTO %s (key, val) VALUES (?, ?)", 1, 1d);
         execute("INSERT INTO %s (key, val) VALUES (?, ?)", 2, 2d);
         execute("INSERT INTO %s (key, val) VALUES (?, ?)", 3, 3d);
-        assertRows(execute("SELECT key, val, jsft(val) FROM %s"),
+        assertRows(execute("SELECT key, val, " + fName + "(val) FROM %s"),
                    row(1, 1d, Math.sin(1d)),
                    row(2, 2d, Math.sin(2d)),
                    row(3, 3d, Math.sin(3d))
@@ -582,12 +1425,15 @@ public class UFTest extends CQLTester
     {
         createTable("CREATE TABLE %s (key int primary key, val double)");
 
-        execute("CREATE OR REPLACE FUNCTION "+KEYSPACE+".jsft(val double) RETURNS double LANGUAGE javascript\n" +
-                "AS '\"string\";';");
+        String fName = createFunction(KEYSPACE, "double",
+                                      "CREATE OR REPLACE FUNCTION %s(val double) " +
+                                      "RETURNS double " +
+                                      "LANGUAGE javascript\n" +
+                                      "AS '\"string\";';");
 
         execute("INSERT INTO %s (key, val) VALUES (?, ?)", 1, 1d);
         // throws IRE with ClassCastException
-        assertInvalid("SELECT key, val, jsft(val) FROM %s");
+        assertInvalid("SELECT key, val, " + fName + "(val) FROM %s");
     }
 
     @Test
@@ -595,32 +1441,38 @@ public class UFTest extends CQLTester
     {
         createTable("CREATE TABLE %s (key int primary key, val double)");
 
-        execute("CREATE OR REPLACE FUNCTION "+KEYSPACE+".jsft(val double) RETURNS double LANGUAGE javascript\n" +
-                "AS 'throw \"fool\";';");
+        String fName = createFunction(KEYSPACE, "double",
+                       "CREATE OR REPLACE FUNCTION %s(val double) " +
+                       "RETURNS double " +
+                       "LANGUAGE javascript\n" +
+                       "AS 'throw \"fool\";';");
 
         execute("INSERT INTO %s (key, val) VALUES (?, ?)", 1, 1d);
         // throws IRE with ScriptException
-        assertInvalid("SELECT key, val, jsft(val) FROM %s");
+        assertInvalid("SELECT key, val, " + fName + "(val) FROM %s");
     }
 
     @Test
     public void testDuplicateArgNames() throws Throwable
     {
-        assertInvalid("CREATE OR REPLACE FUNCTION "+KEYSPACE+".scrinv(val double, val text) RETURNS text LANGUAGE javascript\n" +
+        assertInvalid("CREATE OR REPLACE FUNCTION " + KEYSPACE + ".scrinv(val double, val text) " +
+                      "RETURNS text LANGUAGE javascript\n" +
                       "AS '\"foo bar\";';");
     }
 
     @Test
     public void testJavascriptCompileFailure() throws Throwable
     {
-        assertInvalid("CREATE OR REPLACE FUNCTION "+KEYSPACE+".scrinv(val double) RETURNS double LANGUAGE javascript\n" +
+        assertInvalid("CREATE OR REPLACE FUNCTION " + KEYSPACE + ".scrinv(val double) " +
+                      "RETURNS double LANGUAGE javascript\n" +
                       "AS 'foo bar';");
     }
 
     @Test
     public void testScriptInvalidLanguage() throws Throwable
     {
-        assertInvalid("CREATE OR REPLACE FUNCTION "+KEYSPACE+".scrinv(val double) RETURNS double LANGUAGE artificial_intelligence\n" +
+        assertInvalid("CREATE OR REPLACE FUNCTION " + KEYSPACE + ".scrinv(val double) " +
+                      "RETURNS double LANGUAGE artificial_intelligence\n" +
                       "AS 'question for 42?';");
     }
 
@@ -630,85 +1482,36 @@ public class UFTest extends CQLTester
         createTable("CREATE TABLE %s (key int primary key, val double)");
         execute("INSERT INTO %s (key, val) VALUES (?, ?)", 1, 1d);
 
-        execute("CREATE OR REPLACE FUNCTION "+KEYSPACE+".js(val double) RETURNS boolean LANGUAGE javascript\n" +
-                "AS 'true;';");
-        assertRows(execute("SELECT key, val, js(val) FROM %s"),
-                   row(1, 1d, true));
-        execute("CREATE OR REPLACE FUNCTION "+KEYSPACE+".js(val double) RETURNS boolean LANGUAGE javascript\n" +
-                "AS 'false;';");
-        assertRows(execute("SELECT key, val, js(val) FROM %s"),
-                   row(1, 1d, false));
-        execute("DROP FUNCTION "+KEYSPACE+".js(double)");
-
-        // declared rtype = int , return type = int
-        execute("CREATE OR REPLACE FUNCTION "+KEYSPACE+".js(val double) RETURNS int LANGUAGE javascript\n" +
-                "AS '100;';");
-        assertRows(execute("SELECT key, val, js(val) FROM %s"),
-                   row(1, 1d, 100));
-        execute("DROP FUNCTION "+KEYSPACE+".js(double)");
-
-        // declared rtype = int , return type = double
-        execute("CREATE OR REPLACE FUNCTION "+KEYSPACE+".js(val double) RETURNS int LANGUAGE javascript\n" +
-                "AS '100.;';");
-        assertRows(execute("SELECT key, val, js(val) FROM %s"),
-                   row(1, 1d, 100));
-        execute("DROP FUNCTION "+KEYSPACE+".js(double)");
-
-        // declared rtype = double , return type = int
-        execute("CREATE OR REPLACE FUNCTION "+KEYSPACE+".js(val double) RETURNS double LANGUAGE javascript\n" +
-                "AS '100;';");
-        assertRows(execute("SELECT key, val, js(val) FROM %s"),
-                   row(1, 1d, 100d));
-        execute("DROP FUNCTION "+KEYSPACE+".js(double)");
-
-        // declared rtype = double , return type = double
-        execute("CREATE OR REPLACE FUNCTION "+KEYSPACE+".js(val double) RETURNS double LANGUAGE javascript\n" +
-                "AS '100.;';");
-        assertRows(execute("SELECT key, val, js(val) FROM %s"),
-                   row(1, 1d, 100d));
-        execute("DROP FUNCTION "+KEYSPACE+".js(double)");
-
-        // declared rtype = bigint , return type = int
-        execute("CREATE OR REPLACE FUNCTION "+KEYSPACE+".js(val double) RETURNS bigint LANGUAGE javascript\n" +
-                "AS '100;';");
-        assertRows(execute("SELECT key, val, js(val) FROM %s"),
-                   row(1, 1d, 100L));
-        execute("DROP FUNCTION "+KEYSPACE+".js(double)");
-
-        // declared rtype = bigint , return type = double
-        execute("CREATE OR REPLACE FUNCTION "+KEYSPACE+".js(val double) RETURNS bigint LANGUAGE javascript\n" +
-                "AS '100.;';");
-        assertRows(execute("SELECT key, val, js(val) FROM %s"),
-                   row(1, 1d, 100L));
-        execute("DROP FUNCTION "+KEYSPACE+".js(double)");
-
-        // declared rtype = varint , return type = int
-        execute("CREATE OR REPLACE FUNCTION "+KEYSPACE+".js(val double) RETURNS varint LANGUAGE javascript\n" +
-                "AS '100;';");
-        assertRows(execute("SELECT key, val, js(val) FROM %s"),
-                   row(1, 1d, BigInteger.valueOf(100L)));
-        execute("DROP FUNCTION "+KEYSPACE+".js(double)");
-
-        // declared rtype = varint , return type = double
-        execute("CREATE OR REPLACE FUNCTION "+KEYSPACE+".js(val double) RETURNS varint LANGUAGE javascript\n" +
-                "AS '100.;';");
-        assertRows(execute("SELECT key, val, js(val) FROM %s"),
-                   row(1, 1d, BigInteger.valueOf(100L)));
-        execute("DROP FUNCTION "+KEYSPACE+".js(double)");
-
-        // declared rtype = decimal , return type = int
-        execute("CREATE OR REPLACE FUNCTION "+KEYSPACE+".js(val double) RETURNS decimal LANGUAGE javascript\n" +
-                "AS 'parseInt(\"100\");';");
-        assertRows(execute("SELECT key, val, js(val) FROM %s"),
-                   row(1, 1d, BigDecimal.valueOf(100d)));
-        execute("DROP FUNCTION "+KEYSPACE+".js(double)");
-
-        // declared rtype = decimal , return type = double
-        execute("CREATE OR REPLACE FUNCTION "+KEYSPACE+".js(val double) RETURNS decimal LANGUAGE javascript\n" +
-                "AS '100.;';");
-        assertRows(execute("SELECT key, val, js(val) FROM %s"),
-                   row(1, 1d, BigDecimal.valueOf(100d)));
-        execute("DROP FUNCTION "+KEYSPACE+".js(double)");
+        Object[][] variations = new Object[][]
+                                {
+                                new Object[]    {   "true",     "boolean",  true    },
+                                new Object[]    {   "false",    "boolean",  false   },
+                                new Object[]    {   "100",      "int",      100     },
+                                new Object[]    {   "100.",     "int",      100     },
+                                new Object[]    {   "100",      "double",   100d    },
+                                new Object[]    {   "100.",     "double",   100d    },
+                                new Object[]    {   "100",      "bigint",   100L    },
+                                new Object[]    {   "100.",     "bigint",   100L    },
+                                new Object[]    {   "100",      "varint",   BigInteger.valueOf(100L)    },
+                                new Object[]    {   "100.",     "varint",   BigInteger.valueOf(100L)    },
+                                new Object[]    {   "parseInt(\"100\");", "decimal",  BigDecimal.valueOf(100d)    },
+                                new Object[]    {   "100.",     "decimal",  BigDecimal.valueOf(100d)    },
+                                };
+
+        for (Object[] variation : variations)
+        {
+            Object functionBody = variation[0];
+            Object returnType = variation[1];
+            Object expectedResult = variation[2];
+
+            String fName = createFunction(KEYSPACE, "double",
+                                          "CREATE OR REPLACE FUNCTION %s(val double) " +
+                                          "RETURNS " +returnType + " " +
+                                          "LANGUAGE javascript " +
+                                          "AS '" + functionBody + ";';");
+            assertRows(execute("SELECT key, val, " + fName + "(val) FROM %s"),
+                       row(1, 1d, expectedResult));
+        }
     }
 
     @Test
@@ -718,46 +1521,29 @@ public class UFTest extends CQLTester
         execute("INSERT INTO %s (key, ival, lval, fval, dval, vval, ddval) VALUES (?, ?, ?, ?, ?, ?, ?)", 1,
                 1, 1L, 1f, 1d, BigInteger.valueOf(1L), BigDecimal.valueOf(1d));
 
-        // type = int
-        execute("CREATE OR REPLACE FUNCTION "+KEYSPACE+".jsint(val int) RETURNS int LANGUAGE javascript\n" +
-                "AS 'val+1;';");
-        assertRows(execute("SELECT key, ival, jsint(ival) FROM %s"),
-                   row(1, 1, 2));
-        execute("DROP FUNCTION "+KEYSPACE+".jsint(int)");
-
-        // bigint
-        execute("CREATE OR REPLACE FUNCTION "+KEYSPACE+".jsbigint(val bigint) RETURNS bigint LANGUAGE javascript\n" +
-                "AS 'val+1;';");
-        assertRows(execute("SELECT key, lval, jsbigint(lval) FROM %s"),
-                   row(1, 1L, 2L));
-        execute("DROP FUNCTION "+KEYSPACE+".jsbigint(bigint)");
-
-        // float
-        execute("CREATE OR REPLACE FUNCTION "+KEYSPACE+".jsfloat(val float) RETURNS float LANGUAGE javascript\n" +
-                "AS 'val+1;';");
-        assertRows(execute("SELECT key, fval, jsfloat(fval) FROM %s"),
-                   row(1, 1f, 2f));
-        execute("DROP FUNCTION "+KEYSPACE+".jsfloat(float)");
-
-        // double
-        execute("CREATE OR REPLACE FUNCTION "+KEYSPACE+".jsdouble(val double) RETURNS double LANGUAGE javascript\n" +
-                "AS 'val+1;';");
-        assertRows(ex

<TRUNCATED>
http://git-wip-us.apache.org/repos/asf/cassandra/blob/794d68b5/tools/lib/cassandra-driver-core-2.0.5.jar
----------------------------------------------------------------------
diff --git a/tools/lib/cassandra-driver-core-2.0.5.jar b/tools/lib/cassandra-driver-core-2.0.5.jar
deleted file mode 100644
index 260183e..0000000
Binary files a/tools/lib/cassandra-driver-core-2.0.5.jar and /dev/null differ


[2/2] cassandra git commit: Support for UDTs, tuples, and collections in UDFs

Posted by ty...@apache.org.
Support for UDTs, tuples, and collections in UDFs

Patch by Robert Stupp; reviewed by Tyler Hobbs for CASSANDRA-7563


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/794d68b5
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/794d68b5
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/794d68b5

Branch: refs/heads/trunk
Commit: 794d68b51b77c2a3cb09374010b6f84231ead604
Parents: e131213
Author: Robert Stupp <sn...@snazy.de>
Authored: Wed Nov 26 17:49:45 2014 -0600
Committer: Tyler Hobbs <ty...@datastax.com>
Committed: Wed Nov 26 17:49:45 2014 -0600

----------------------------------------------------------------------
 CHANGES.txt                                     |    2 +
 .../org/apache/cassandra/cql3/CQL3Type.java     |   12 +-
 .../cql3/functions/BytesConversionFcts.java     |    8 +-
 .../cassandra/cql3/functions/FunctionCall.java  |    9 +-
 .../cassandra/cql3/functions/FunctionName.java  |    1 -
 .../cassandra/cql3/functions/Functions.java     |   50 +-
 .../cql3/functions/JavaSourceUDFFactory.java    |   51 +-
 .../cql3/functions/ScalarFunction.java          |    3 +-
 .../cql3/functions/ScriptBasedUDF.java          |   13 +-
 .../cassandra/cql3/functions/TimeuuidFcts.java  |   10 +-
 .../cassandra/cql3/functions/TokenFct.java      |    2 +-
 .../cassandra/cql3/functions/UDFunction.java    |  177 ++-
 .../cassandra/cql3/functions/UuidFcts.java      |    2 +-
 .../selection/AggregateFunctionSelector.java    |    8 +-
 .../cassandra/cql3/selection/FieldSelector.java |    8 +-
 .../cql3/selection/ScalarFunctionSelector.java  |   10 +-
 .../cassandra/cql3/selection/Selection.java     |   30 +-
 .../cassandra/cql3/selection/Selector.java      |    6 +-
 .../cql3/selection/SimpleSelector.java          |    5 +-
 .../cql3/selection/WritetimeOrTTLSelector.java  |    4 +-
 .../statements/CreateFunctionStatement.java     |   23 +-
 .../cql3/statements/DropFunctionStatement.java  |   10 +-
 .../cql3/statements/DropTypeStatement.java      |   11 +
 .../cql3/statements/ModificationStatement.java  |    2 +-
 .../cql3/statements/SelectStatement.java        |    8 +-
 .../cassandra/hadoop/cql3/CqlRecordReader.java  |    2 +-
 .../org/apache/cassandra/transport/Server.java  |    1 +
 .../org/apache/cassandra/cql3/CQLTester.java    |  232 ++-
 test/unit/org/apache/cassandra/cql3/UFTest.java | 1356 ++++++++++++++----
 tools/lib/cassandra-driver-core-2.0.5.jar       |  Bin 544552 -> 0 bytes
 30 files changed, 1643 insertions(+), 413 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/794d68b5/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 162d579..55c86dd 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,6 @@
 3.0
+ * Support UDTs, tuples, and collections in user-defined
+   functions (CASSANDRA-7563)
  * Fix aggregate fn results on empty selection, result column name,
    and cqlsh parsing (CASSANDRA-8229)
  * Mark sstables as repaired after full repair (CASSANDRA-7586)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/794d68b5/src/java/org/apache/cassandra/cql3/CQL3Type.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/CQL3Type.java b/src/java/org/apache/cassandra/cql3/CQL3Type.java
index b656de8..98d1b15 100644
--- a/src/java/org/apache/cassandra/cql3/CQL3Type.java
+++ b/src/java/org/apache/cassandra/cql3/CQL3Type.java
@@ -315,6 +315,11 @@ public interface CQL3Type
             return false;
         }
 
+        public String keyspace()
+        {
+            return null;
+        }
+
         public void freeze() throws InvalidRequestException
         {
             String message = String.format("frozen<> is only allowed on collections, tuples, and user-defined types (got %s)", this);
@@ -474,6 +479,11 @@ public interface CQL3Type
                 this.name = name;
             }
 
+            public String keyspace()
+            {
+                return name.getKeyspace();
+            }
+
             public void freeze()
             {
                 frozen = true;
@@ -485,7 +495,7 @@ public interface CQL3Type
                 {
                     // The provided keyspace is the one of the current statement this is part of. If it's different from the keyspace of
                     // the UTName, we reject since we want to limit user types to their own keyspace (see #6643)
-                    if (!keyspace.equals(name.getKeyspace()))
+                    if (keyspace != null && !keyspace.equals(name.getKeyspace()))
                         throw new InvalidRequestException(String.format("Statement on keyspace %s cannot refer to a user type in keyspace %s; "
                                                                         + "user types can only be used in the keyspace they are defined in",
                                                                         keyspace, name.getKeyspace()));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/794d68b5/src/java/org/apache/cassandra/cql3/functions/BytesConversionFcts.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/functions/BytesConversionFcts.java b/src/java/org/apache/cassandra/cql3/functions/BytesConversionFcts.java
index 1cd1d69..ddb33fc 100644
--- a/src/java/org/apache/cassandra/cql3/functions/BytesConversionFcts.java
+++ b/src/java/org/apache/cassandra/cql3/functions/BytesConversionFcts.java
@@ -36,7 +36,7 @@ public abstract class BytesConversionFcts
         String name = fromType.asCQL3Type() + "asblob";
         return new NativeScalarFunction(name, BytesType.instance, fromType)
         {
-            public ByteBuffer execute(List<ByteBuffer> parameters)
+            public ByteBuffer execute(int protocolVersion, List<ByteBuffer> parameters)
             {
                 return parameters.get(0);
             }
@@ -48,7 +48,7 @@ public abstract class BytesConversionFcts
         final String name = "blobas" + toType.asCQL3Type();
         return new NativeScalarFunction(name, toType, BytesType.instance)
         {
-            public ByteBuffer execute(List<ByteBuffer> parameters) throws InvalidRequestException
+            public ByteBuffer execute(int protocolVersion, List<ByteBuffer> parameters) throws InvalidRequestException
             {
                 ByteBuffer val = parameters.get(0);
                 try
@@ -68,7 +68,7 @@ public abstract class BytesConversionFcts
 
     public static final Function VarcharAsBlobFct = new NativeScalarFunction("varcharasblob", BytesType.instance, UTF8Type.instance)
     {
-        public ByteBuffer execute(List<ByteBuffer> parameters)
+        public ByteBuffer execute(int protocolVersion, List<ByteBuffer> parameters)
         {
             return parameters.get(0);
         }
@@ -76,7 +76,7 @@ public abstract class BytesConversionFcts
 
     public static final Function BlobAsVarcharFact = new NativeScalarFunction("blobasvarchar", UTF8Type.instance, BytesType.instance)
     {
-        public ByteBuffer execute(List<ByteBuffer> parameters)
+        public ByteBuffer execute(int protocolVersion, List<ByteBuffer> parameters)
         {
             return parameters.get(0);
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/794d68b5/src/java/org/apache/cassandra/cql3/functions/FunctionCall.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/functions/FunctionCall.java b/src/java/org/apache/cassandra/cql3/functions/FunctionCall.java
index efaa12a..01443d2 100644
--- a/src/java/org/apache/cassandra/cql3/functions/FunctionCall.java
+++ b/src/java/org/apache/cassandra/cql3/functions/FunctionCall.java
@@ -27,6 +27,7 @@ import org.apache.cassandra.db.marshal.ListType;
 import org.apache.cassandra.db.marshal.MapType;
 import org.apache.cassandra.db.marshal.SetType;
 import org.apache.cassandra.exceptions.InvalidRequestException;
+import org.apache.cassandra.transport.Server;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.serializers.MarshalException;
 
@@ -69,12 +70,12 @@ public class FunctionCall extends Term.NonTerminal
                 throw new InvalidRequestException(String.format("Invalid null value for argument to %s", fun));
             buffers.add(val);
         }
-        return executeInternal(fun, buffers);
+        return executeInternal(options.getProtocolVersion(), fun, buffers);
     }
 
-    private static ByteBuffer executeInternal(ScalarFunction fun, List<ByteBuffer> params) throws InvalidRequestException
+    private static ByteBuffer executeInternal(int protocolVersion, ScalarFunction fun, List<ByteBuffer> params) throws InvalidRequestException
     {
-        ByteBuffer result = fun.execute(params);
+        ByteBuffer result = fun.execute(protocolVersion, params);
         try
         {
             // Check the method didn't lied on it's declared return type
@@ -172,7 +173,7 @@ public class FunctionCall extends Term.NonTerminal
                 buffers.add(((Term.Terminal)t).get(QueryOptions.DEFAULT));
             }
 
-            return executeInternal(fun, buffers);
+            return executeInternal(Server.CURRENT_VERSION, fun, buffers);
         }
 
         public AssignmentTestable.TestResult testAssignment(String keyspace, ColumnSpecification receiver)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/794d68b5/src/java/org/apache/cassandra/cql3/functions/FunctionName.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/functions/FunctionName.java b/src/java/org/apache/cassandra/cql3/functions/FunctionName.java
index 460e7a6..bb30040 100644
--- a/src/java/org/apache/cassandra/cql3/functions/FunctionName.java
+++ b/src/java/org/apache/cassandra/cql3/functions/FunctionName.java
@@ -19,7 +19,6 @@ package org.apache.cassandra.cql3.functions;
 
 import com.google.common.base.Objects;
 
-import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.db.SystemKeyspace;
 
 public final class FunctionName

http://git-wip-us.apache.org/repos/asf/cassandra/blob/794d68b5/src/java/org/apache/cassandra/cql3/functions/Functions.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/functions/Functions.java b/src/java/org/apache/cassandra/cql3/functions/Functions.java
index 7021475..a8fdf0f 100644
--- a/src/java/org/apache/cassandra/cql3/functions/Functions.java
+++ b/src/java/org/apache/cassandra/cql3/functions/Functions.java
@@ -18,6 +18,7 @@
 package org.apache.cassandra.cql3.functions;
 
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.List;
 
 import com.google.common.collect.ArrayListMultimap;
@@ -29,6 +30,8 @@ import org.apache.cassandra.cql3.*;
 import org.apache.cassandra.db.SystemKeyspace;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.exceptions.InvalidRequestException;
+import org.apache.cassandra.service.IMigrationListener;
+import org.apache.cassandra.service.MigrationManager;
 
 public abstract class Functions
 {
@@ -83,6 +86,8 @@ public abstract class Functions
         declare(AggregateFcts.avgFunctionForDouble);
         declare(AggregateFcts.avgFunctionForVarint);
         declare(AggregateFcts.avgFunctionForDecimal);
+
+        MigrationManager.instance.register(new FunctionsMigrationListener());
     }
 
     private static void declare(Function fun)
@@ -188,7 +193,7 @@ public abstract class Functions
         assert name.hasKeyspace() : "function name not fully qualified";
         for (Function f : declared.get(name))
         {
-            if (f.argTypes().equals(argTypes))
+            if (typeEquals(f.argTypes(), argTypes))
                 return f;
         }
         return null;
@@ -284,4 +289,47 @@ public abstract class Functions
         removeFunction(fun.name(), fun.argTypes());
         addFunction(fun);
     }
+
+    public static Collection<Function> all()
+    {
+        return declared.values();
+    }
+
+    public static boolean typeEquals(AbstractType<?> t1, AbstractType<?> t2)
+    {
+        return t1.asCQL3Type().toString().equals(t2.asCQL3Type().toString());
+    }
+
+    public static boolean typeEquals(List<AbstractType<?>> t1, List<AbstractType<?>> t2)
+    {
+        if (t1.size() != t2.size())
+            return false;
+        for (int i = 0; i < t1.size(); i ++)
+            if (!typeEquals(t1.get(i), t2.get(i)))
+                return false;
+        return true;
+    }
+
+    private static class FunctionsMigrationListener implements IMigrationListener
+    {
+        public void onCreateKeyspace(String ksName) { }
+        public void onCreateColumnFamily(String ksName, String cfName) { }
+        public void onCreateUserType(String ksName, String typeName) { }
+        public void onCreateFunction(String ksName, String functionName) { }
+
+        public void onUpdateKeyspace(String ksName) { }
+        public void onUpdateColumnFamily(String ksName, String cfName) { }
+        public void onUpdateUserType(String ksName, String typeName) {
+            for (Function function : all())
+                if (function instanceof UDFunction)
+                    ((UDFunction)function).userTypeUpdated(ksName, typeName);
+        }
+        public void onUpdateFunction(String ksName, String functionName) { }
+
+        public void onDropKeyspace(String ksName) { }
+        public void onDropColumnFamily(String ksName, String cfName) { }
+        public void onDropUserType(String ksName, String typeName) { }
+        public void onDropFunction(String ksName, String functionName) { }
+
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/794d68b5/src/java/org/apache/cassandra/cql3/functions/JavaSourceUDFFactory.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/functions/JavaSourceUDFFactory.java b/src/java/org/apache/cassandra/cql3/functions/JavaSourceUDFFactory.java
index 0f5fe48..560f077 100644
--- a/src/java/org/apache/cassandra/cql3/functions/JavaSourceUDFFactory.java
+++ b/src/java/org/apache/cassandra/cql3/functions/JavaSourceUDFFactory.java
@@ -20,13 +20,13 @@ package org.apache.cassandra.cql3.functions;
 import java.lang.reflect.Constructor;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Modifier;
-import java.nio.ByteBuffer;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.datastax.driver.core.DataType;
 import javassist.CannotCompileException;
 import javassist.ClassPool;
 import javassist.CtClass;
@@ -56,8 +56,14 @@ public final class JavaSourceUDFFactory
                                boolean deterministic)
     throws InvalidRequestException
     {
-        Class<?> javaReturnType = UDFunction.javaType(returnType);
-        Class<?>[] javaParamTypes = UDFunction.javaParamTypes(argTypes);
+        // argDataTypes is just the C* internal argTypes converted to the Java Driver DataType
+        DataType[] argDataTypes = UDFunction.driverTypes(argTypes);
+        // returnDataType is just the C* internal returnType converted to the Java Driver DataType
+        DataType returnDataType = UDFunction.driverType(returnType);
+        // javaParamTypes is just the Java representation for argTypes resp. argDataTypes
+        Class<?>[] javaParamTypes = UDFunction.javaTypes(argDataTypes);
+        // javaReturnType is just the Java representation for returnType resp. returnDataType
+        Class<?> javaReturnType = returnDataType.asJavaClass();
 
         String clsName = generateClassName(name);
 
@@ -92,9 +98,13 @@ public final class JavaSourceUDFFactory
 
             Constructor ctor =
                 cc.toClass().getDeclaredConstructor(
-                   FunctionName.class, List.class, List.class,
-                   AbstractType.class, String.class, boolean.class);
-            return (UDFunction) ctor.newInstance(name, argNames, argTypes, returnType, body, deterministic);
+                   FunctionName.class, List.class, List.class, DataType[].class,
+                   AbstractType.class, DataType.class,
+                   String.class, boolean.class);
+            return (UDFunction) ctor.newInstance(
+                   name, argNames, argTypes, argDataTypes,
+                   returnType, returnDataType,
+                   body, deterministic);
         }
         catch (NotFoundException | CannotCompileException | NoSuchMethodException | LinkageError | InstantiationException | IllegalAccessException e)
         {
@@ -133,10 +143,12 @@ public final class JavaSourceUDFFactory
                "(org.apache.cassandra.cql3.functions.FunctionName name, " +
                "java.util.List argNames, " +
                "java.util.List argTypes, " +
+               "com.datastax.driver.core.DataType[] argDataTypes, " +
                "org.apache.cassandra.db.marshal.AbstractType returnType, " +
+               "com.datastax.driver.core.DataType returnDataType, " +
                "String body," +
                "boolean deterministic)\n{" +
-               "  super(name, argNames, argTypes, returnType, \"java\", body, deterministic);\n" +
+               "  super(name, argNames, argTypes, argDataTypes, returnType, returnDataType, \"java\", body, deterministic);\n" +
                "}";
     }
 
@@ -177,15 +189,17 @@ public final class JavaSourceUDFFactory
      * Generated looks like this:
      * <code><pre>
      *
-     * public java.nio.ByteBuffer execute(java.util.List params)
+     * public java.nio.ByteBuffer execute(int protocolVersion, java.util.List params)
      * throws org.apache.cassandra.exceptions.InvalidRequestException
      * {
      *     try
      *     {
      *         Object result = executeInternal(
-     *             (<cast to JAVA_ARG_TYPE>)org.apache.cassandra.cql3.functions.JavaSourceUDFFactory.compose(argTypes, params, 0)
+     *             (<cast to JAVA_ARG_TYPE>)compose(protocolVersion, 0, (java.nio.ByteBuffer)params.get(0)),
+     *             (<cast to JAVA_ARG_TYPE>)compose(protocolVersion, 1, (java.nio.ByteBuffer)params.get(1)),
+     *             ...
      *         );
-     *         return result != null ? returnType.decompose(result) : null;
+     *         return decompose(protocolVersion, result);
      *     }
      *     catch (Throwable t)
      *     {
@@ -202,7 +216,7 @@ public final class JavaSourceUDFFactory
         // usual methods are 700-800 chars long (prevent temp object allocations)
         StringBuilder code = new StringBuilder(1024);
         // overrides org.apache.cassandra.cql3.functions.Function.execute(java.util.List)
-        code.append("public java.nio.ByteBuffer execute(java.util.List params)\n" +
+        code.append("public java.nio.ByteBuffer execute(int protocolVersion, java.util.List params)\n" +
                     "throws org.apache.cassandra.exceptions.InvalidRequestException\n" +
                     "{\n" +
                     "  try\n" +
@@ -219,13 +233,13 @@ public final class JavaSourceUDFFactory
             code.
                  // cast to Java type
                  append("\n      (").append(paramTypes[i].getName()).append(")").
-                 // generate object representation of input parameter
-                 append("org.apache.cassandra.cql3.functions.JavaSourceUDFFactory.compose(argTypes, params, ").append(i).append(')');
+                 // generate object representation of input parameter (call UDFunction.compose)
+                 append("compose(protocolVersion, ").append(i).append(", (java.nio.ByteBuffer)params.get(").append(i).append("))");
         }
 
         code.append("\n    );\n" +
-                    // generate serialized return value (returnType is a field in AbstractFunction class)
-                    "    return result != null ? returnType.decompose(result) : null;\n" +
+                    // generate serialized return value (returnType is a field in AbstractFunction class), (call UDFunction.decompose)
+                    "    return decompose(protocolVersion, result);\n" +
                     //
                     // error handling ...
                     "  }\n" +
@@ -242,11 +256,4 @@ public final class JavaSourceUDFFactory
         return code.toString();
     }
 
-    // Used by execute() implementations of generated java source UDFs.
-    public static Object compose(List<AbstractType<?>> argTypes, List<ByteBuffer> parameters, int param)
-    {
-        ByteBuffer bb = parameters.get(param);
-        return bb == null ? null : argTypes.get(param).compose(bb);
-    }
-
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/794d68b5/src/java/org/apache/cassandra/cql3/functions/ScalarFunction.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/functions/ScalarFunction.java b/src/java/org/apache/cassandra/cql3/functions/ScalarFunction.java
index ba2a374..f00faf7 100644
--- a/src/java/org/apache/cassandra/cql3/functions/ScalarFunction.java
+++ b/src/java/org/apache/cassandra/cql3/functions/ScalarFunction.java
@@ -30,9 +30,10 @@ public interface ScalarFunction extends Function
     /**
      * Applies this function to the specified parameter.
      *
+     * @param protocolVersion protocol version used for parameters and return value
      * @param parameters the input parameters
      * @return the result of applying this function to the parameter
      * @throws InvalidRequestException if this function cannot not be applied to the parameter
      */
-    public ByteBuffer execute(List<ByteBuffer> parameters) throws InvalidRequestException;
+    public ByteBuffer execute(int protocolVersion, List<ByteBuffer> parameters) throws InvalidRequestException;
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/794d68b5/src/java/org/apache/cassandra/cql3/functions/ScriptBasedUDF.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/functions/ScriptBasedUDF.java b/src/java/org/apache/cassandra/cql3/functions/ScriptBasedUDF.java
index 73fc43b..059a612 100644
--- a/src/java/org/apache/cassandra/cql3/functions/ScriptBasedUDF.java
+++ b/src/java/org/apache/cassandra/cql3/functions/ScriptBasedUDF.java
@@ -88,15 +88,11 @@ public class ScriptBasedUDF extends UDFunction
         }
     }
 
-    public ByteBuffer execute(List<ByteBuffer> parameters) throws InvalidRequestException
+    public ByteBuffer execute(int protocolVersion, List<ByteBuffer> parameters) throws InvalidRequestException
     {
         Object[] params = new Object[argTypes.size()];
         for (int i = 0; i < params.length; i++)
-        {
-            ByteBuffer bb = parameters.get(i);
-            if (bb != null)
-                params[i] = argTypes.get(i).compose(bb);
-        }
+            params[i] = compose(protocolVersion, i, parameters.get(i));
 
         try
         {
@@ -108,7 +104,7 @@ public class ScriptBasedUDF extends UDFunction
             if (result == null)
                 return null;
 
-            Class<?> javaReturnType = returnType.getSerializer().getType();
+            Class<?> javaReturnType = returnDataType.asJavaClass();
             Class<?> resultType = result.getClass();
             if (!javaReturnType.isAssignableFrom(resultType))
             {
@@ -138,8 +134,7 @@ public class ScriptBasedUDF extends UDFunction
                 }
             }
 
-            @SuppressWarnings("unchecked") ByteBuffer r = ((AbstractType) returnType).decompose(result);
-            return r;
+            return decompose(protocolVersion, result);
         }
         catch (RuntimeException | ScriptException e)
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/794d68b5/src/java/org/apache/cassandra/cql3/functions/TimeuuidFcts.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/functions/TimeuuidFcts.java b/src/java/org/apache/cassandra/cql3/functions/TimeuuidFcts.java
index e481cf5..c1c3490 100644
--- a/src/java/org/apache/cassandra/cql3/functions/TimeuuidFcts.java
+++ b/src/java/org/apache/cassandra/cql3/functions/TimeuuidFcts.java
@@ -31,7 +31,7 @@ public abstract class TimeuuidFcts
 {
     public static final Function nowFct = new NativeScalarFunction("now", TimeUUIDType.instance)
     {
-        public ByteBuffer execute(List<ByteBuffer> parameters)
+        public ByteBuffer execute(int protocolVersion, List<ByteBuffer> parameters)
         {
             return ByteBuffer.wrap(UUIDGen.getTimeUUIDBytes());
         }
@@ -45,7 +45,7 @@ public abstract class TimeuuidFcts
 
     public static final Function minTimeuuidFct = new NativeScalarFunction("mintimeuuid", TimeUUIDType.instance, TimestampType.instance)
     {
-        public ByteBuffer execute(List<ByteBuffer> parameters)
+        public ByteBuffer execute(int protocolVersion, List<ByteBuffer> parameters)
         {
             ByteBuffer bb = parameters.get(0);
             if (bb == null)
@@ -57,7 +57,7 @@ public abstract class TimeuuidFcts
 
     public static final Function maxTimeuuidFct = new NativeScalarFunction("maxtimeuuid", TimeUUIDType.instance, TimestampType.instance)
     {
-        public ByteBuffer execute(List<ByteBuffer> parameters)
+        public ByteBuffer execute(int protocolVersion, List<ByteBuffer> parameters)
         {
             ByteBuffer bb = parameters.get(0);
             if (bb == null)
@@ -69,7 +69,7 @@ public abstract class TimeuuidFcts
 
     public static final Function dateOfFct = new NativeScalarFunction("dateof", TimestampType.instance, TimeUUIDType.instance)
     {
-        public ByteBuffer execute(List<ByteBuffer> parameters)
+        public ByteBuffer execute(int protocolVersion, List<ByteBuffer> parameters)
         {
             ByteBuffer bb = parameters.get(0);
             if (bb == null)
@@ -81,7 +81,7 @@ public abstract class TimeuuidFcts
 
     public static final Function unixTimestampOfFct = new NativeScalarFunction("unixtimestampof", LongType.instance, TimeUUIDType.instance)
     {
-        public ByteBuffer execute(List<ByteBuffer> parameters)
+        public ByteBuffer execute(int protocolVersion, List<ByteBuffer> parameters)
         {
             ByteBuffer bb = parameters.get(0);
             if (bb == null)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/794d68b5/src/java/org/apache/cassandra/cql3/functions/TokenFct.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/functions/TokenFct.java b/src/java/org/apache/cassandra/cql3/functions/TokenFct.java
index ca4d473..9d50a97 100644
--- a/src/java/org/apache/cassandra/cql3/functions/TokenFct.java
+++ b/src/java/org/apache/cassandra/cql3/functions/TokenFct.java
@@ -50,7 +50,7 @@ public class TokenFct extends NativeScalarFunction
         return types;
     }
 
-    public ByteBuffer execute(List<ByteBuffer> parameters) throws InvalidRequestException
+    public ByteBuffer execute(int protocolVersion, List<ByteBuffer> parameters) throws InvalidRequestException
     {
         CBuilder builder = cfm.getKeyValidatorAsCType().builder();
         for (int i = 0; i < parameters.size(); i++)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/794d68b5/src/java/org/apache/cassandra/cql3/functions/UDFunction.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/functions/UDFunction.java b/src/java/org/apache/cassandra/cql3/functions/UDFunction.java
index 42418c6..973c70a 100644
--- a/src/java/org/apache/cassandra/cql3/functions/UDFunction.java
+++ b/src/java/org/apache/cassandra/cql3/functions/UDFunction.java
@@ -17,6 +17,9 @@
  */
 package org.apache.cassandra.cql3.functions;
 
+import java.lang.invoke.MethodHandle;
+import java.lang.invoke.MethodHandles;
+import java.lang.reflect.Method;
 import java.nio.ByteBuffer;
 import java.nio.charset.StandardCharsets;
 import java.security.MessageDigest;
@@ -26,6 +29,11 @@ import com.google.common.base.Objects;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.datastax.driver.core.DataType;
+import com.datastax.driver.core.ProtocolVersion;
+import com.datastax.driver.core.UserType;
+import org.apache.cassandra.config.KSMetaData;
+import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.cql3.*;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.composites.Composite;
@@ -33,6 +41,8 @@ import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.db.marshal.UTF8Type;
 import org.apache.cassandra.db.marshal.TypeParser;
 import org.apache.cassandra.exceptions.*;
+import org.apache.cassandra.service.MigrationManager;
+import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
 
 /**
@@ -42,11 +52,83 @@ public abstract class UDFunction extends AbstractFunction implements ScalarFunct
 {
     protected static final Logger logger = LoggerFactory.getLogger(UDFunction.class);
 
+    // TODO make these c'tors and methods public in Java-Driver - see https://datastax-oss.atlassian.net/browse/JAVA-502
+    static final MethodHandle methodParseOne;
+    static
+    {
+        try
+        {
+            Class<?> cls = Class.forName("com.datastax.driver.core.CassandraTypeParser");
+            Method m = cls.getDeclaredMethod("parseOne", String.class);
+            m.setAccessible(true);
+            methodParseOne = MethodHandles.lookup().unreflect(m);
+        }
+        catch (Exception e)
+        {
+            throw new RuntimeException(e);
+        }
+    }
+
+    /**
+     * Construct an array containing the Java classes for the given Java Driver {@link com.datastax.driver.core.DataType}s.
+     *
+     * @param dataTypes array with UDF argument types
+     * @return array of same size with UDF arguments
+     */
+    public static Class<?>[] javaTypes(DataType[] dataTypes)
+    {
+        Class<?> paramTypes[] = new Class[dataTypes.length];
+        for (int i = 0; i < paramTypes.length; i++)
+            paramTypes[i] = dataTypes[i].asJavaClass();
+        return paramTypes;
+    }
+
+    /**
+     * Construct an array containing the Java Driver {@link com.datastax.driver.core.DataType}s for the
+     * C* internal types.
+     *
+     * @param abstractTypes list with UDF argument types
+     * @return array with argument types as {@link com.datastax.driver.core.DataType}
+     */
+    public static DataType[] driverTypes(List<AbstractType<?>> abstractTypes)
+    {
+        DataType[] argDataTypes = new DataType[abstractTypes.size()];
+        for (int i = 0; i < argDataTypes.length; i++)
+            argDataTypes[i] = driverType(abstractTypes.get(i));
+        return argDataTypes;
+    }
+
+    /**
+     * Returns the Java Driver {@link com.datastax.driver.core.DataType} for the C* internal type.
+     */
+    public static DataType driverType(AbstractType abstractType)
+    {
+        CQL3Type cqlType = abstractType.asCQL3Type();
+        try
+        {
+            return (DataType) methodParseOne.invoke(cqlType.getType().toString());
+        }
+        catch (RuntimeException | Error e)
+        {
+            // immediately rethrow these...
+            throw e;
+        }
+        catch (Throwable e)
+        {
+            throw new RuntimeException("cannot parse driver type " + cqlType.getType().toString(), e);
+        }
+    }
+
+    // instance vars
+
     protected final List<ColumnIdentifier> argNames;
 
     protected final String language;
     protected final String body;
-    private final boolean deterministic;
+    protected final boolean deterministic;
+
+    protected final DataType[] argDataTypes;
+    protected final DataType returnDataType;
 
     protected UDFunction(FunctionName name,
                          List<ColumnIdentifier> argNames,
@@ -56,12 +138,53 @@ public abstract class UDFunction extends AbstractFunction implements ScalarFunct
                          String body,
                          boolean deterministic)
     {
+        this(name, argNames, argTypes, driverTypes(argTypes), returnType,
+             driverType(returnType), language, body, deterministic);
+    }
+
+    protected UDFunction(FunctionName name,
+                         List<ColumnIdentifier> argNames,
+                         List<AbstractType<?>> argTypes,
+                         DataType[] argDataTypes,
+                         AbstractType<?> returnType,
+                         DataType returnDataType,
+                         String language,
+                         String body,
+                         boolean deterministic)
+        {
         super(name, argTypes, returnType);
         assert new HashSet<>(argNames).size() == argNames.size() : "duplicate argument names";
         this.argNames = argNames;
         this.language = language;
         this.body = body;
         this.deterministic = deterministic;
+        this.argDataTypes = argDataTypes;
+        this.returnDataType = returnDataType;
+    }
+
+    /**
+     * Used by UDF implementations (both Java code generated by {@link org.apache.cassandra.cql3.functions.JavaSourceUDFFactory}
+     * and script executor {@link org.apache.cassandra.cql3.functions.ScriptBasedUDF}) to convert the C*
+     * serialized representation to the Java object representation.
+     *
+     * @param protocolVersion the native protocol version used for serialization
+     * @param argIndex        index of the UDF input argument
+     */
+    protected Object compose(int protocolVersion, int argIndex, ByteBuffer value)
+    {
+        return value == null ? null : argDataTypes[argIndex].deserialize(value, ProtocolVersion.fromInt(protocolVersion));
+    }
+
+    /**
+     * Used by UDF implementations (both Java code generated by {@link org.apache.cassandra.cql3.functions.JavaSourceUDFFactory}
+     * and script executor {@link org.apache.cassandra.cql3.functions.ScriptBasedUDF}) to convert the Java
+     * object representation for the return value to the C* serialized representation.
+     *
+     * @param protocolVersion the native protocol version used for serialization
+     */
+    protected ByteBuffer decompose(int protocolVersion, Object value)
+    {
+        return value == null ? null : returnDataType.serialize(value, ProtocolVersion.fromInt(protocolVersion));
     }
 
     public boolean isAggregate()
@@ -85,19 +208,6 @@ public abstract class UDFunction extends AbstractFunction implements ScalarFunct
         }
     }
 
-    static Class<?>[] javaParamTypes(List<AbstractType<?>> argTypes)
-    {
-        Class<?> paramTypes[] = new Class[argTypes.size()];
-        for (int i = 0; i < paramTypes.length; i++)
-            paramTypes[i] = javaType(argTypes.get(i));
-        return paramTypes;
-    }
-
-    static Class<?> javaType(AbstractType<?> type)
-    {
-        return type.getSerializer().getType();
-    }
-
     /**
      * It can happen that a function has been declared (is listed in the scheam) but cannot
      * be loaded (maybe only on some nodes). This is the case for instance if the class defining
@@ -117,7 +227,7 @@ public abstract class UDFunction extends AbstractFunction implements ScalarFunct
     {
         return new UDFunction(name, argNames, argTypes, returnType, language, body, true)
         {
-            public ByteBuffer execute(List<ByteBuffer> parameters) throws InvalidRequestException
+            public ByteBuffer execute(int protocolVersion, List<ByteBuffer> parameters) throws InvalidRequestException
             {
                 throw new InvalidRequestException(String.format("Function '%s' exists but hasn't been loaded successfully for the following reason: %s. "
                                                               + "Please see the server log for more details", this, reason.getMessage()));
@@ -135,7 +245,7 @@ public abstract class UDFunction extends AbstractFunction implements ScalarFunct
     {
         MessageDigest digest = FBUtilities.newMessageDigest("SHA-1");
         for (AbstractType<?> type : argTypes)
-            digest.update(type.toString().getBytes(StandardCharsets.UTF_8));
+            digest.update(type.asCQL3Type().toString().getBytes(StandardCharsets.UTF_8));
         return ByteBuffer.wrap(digest.digest());
     }
 
@@ -268,8 +378,8 @@ public abstract class UDFunction extends AbstractFunction implements ScalarFunct
         UDFunction that = (UDFunction)o;
         return Objects.equal(this.name, that.name)
             && Objects.equal(this.argNames, that.argNames)
-            && Objects.equal(this.argTypes, that.argTypes)
-            && Objects.equal(this.returnType, that.returnType)
+            && Functions.typeEquals(this.argTypes, that.argTypes)
+            && Functions.typeEquals(this.returnType, that.returnType)
             && Objects.equal(this.language, that.language)
             && Objects.equal(this.body, that.body)
             && Objects.equal(this.deterministic, that.deterministic);
@@ -280,4 +390,35 @@ public abstract class UDFunction extends AbstractFunction implements ScalarFunct
     {
         return Objects.hashCode(name, argNames, argTypes, returnType, language, body, deterministic);
     }
+
+    public void userTypeUpdated(String ksName, String typeName)
+    {
+        boolean updated = false;
+
+        for (int i = 0; i < argDataTypes.length; i++)
+        {
+            DataType dataType = argDataTypes[i];
+            if (dataType instanceof UserType)
+            {
+                UserType userType = (UserType) dataType;
+                if (userType.getKeyspace().equals(ksName) && userType.getTypeName().equals(typeName))
+                {
+                    KSMetaData ksm = Schema.instance.getKSMetaData(ksName);
+                    assert ksm != null;
+
+                    org.apache.cassandra.db.marshal.UserType ut = ksm.userTypes.getType(ByteBufferUtil.bytes(typeName));
+
+                    DataType newUserType = driverType(ut);
+                    argDataTypes[i] = newUserType;
+
+                    argTypes.set(i, ut);
+
+                    updated = true;
+                }
+            }
+        }
+
+        if (updated)
+            MigrationManager.announceNewFunction(this, true);
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/794d68b5/src/java/org/apache/cassandra/cql3/functions/UuidFcts.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/functions/UuidFcts.java b/src/java/org/apache/cassandra/cql3/functions/UuidFcts.java
index b3cef85..afb5aae 100644
--- a/src/java/org/apache/cassandra/cql3/functions/UuidFcts.java
+++ b/src/java/org/apache/cassandra/cql3/functions/UuidFcts.java
@@ -28,7 +28,7 @@ public abstract class UuidFcts
 {
     public static final Function uuidFct = new NativeScalarFunction("uuid", UUIDType.instance)
     {
-        public ByteBuffer execute(List<ByteBuffer> parameters)
+        public ByteBuffer execute(int protocolVersion, List<ByteBuffer> parameters)
         {
             return UUIDSerializer.instance.serialize(UUID.randomUUID());
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/794d68b5/src/java/org/apache/cassandra/cql3/selection/AggregateFunctionSelector.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/selection/AggregateFunctionSelector.java b/src/java/org/apache/cassandra/cql3/selection/AggregateFunctionSelector.java
index 6ea9716..7702796 100644
--- a/src/java/org/apache/cassandra/cql3/selection/AggregateFunctionSelector.java
+++ b/src/java/org/apache/cassandra/cql3/selection/AggregateFunctionSelector.java
@@ -34,20 +34,20 @@ final class AggregateFunctionSelector extends AbstractFunctionSelector<Aggregate
         return true;
     }
 
-    public void addInput(ResultSetBuilder rs) throws InvalidRequestException
+    public void addInput(int protocolVersion, ResultSetBuilder rs) throws InvalidRequestException
     {
         // Aggregation of aggregation is not supported
         for (int i = 0, m = argSelectors.size(); i < m; i++)
         {
             Selector s = argSelectors.get(i);
-            s.addInput(rs);
-            args.set(i, s.getOutput());
+            s.addInput(protocolVersion, rs);
+            args.set(i, s.getOutput(protocolVersion));
             s.reset();
         }
         this.aggregate.addInput(args);
     }
 
-    public ByteBuffer getOutput() throws InvalidRequestException
+    public ByteBuffer getOutput(int protocolVersion) throws InvalidRequestException
     {
         return aggregate.compute();
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/794d68b5/src/java/org/apache/cassandra/cql3/selection/FieldSelector.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/selection/FieldSelector.java b/src/java/org/apache/cassandra/cql3/selection/FieldSelector.java
index 7e14486..d695598 100644
--- a/src/java/org/apache/cassandra/cql3/selection/FieldSelector.java
+++ b/src/java/org/apache/cassandra/cql3/selection/FieldSelector.java
@@ -64,14 +64,14 @@ final class FieldSelector extends Selector
         return false;
     }
 
-    public void addInput(ResultSetBuilder rs) throws InvalidRequestException
+    public void addInput(int protocolVersion, ResultSetBuilder rs) throws InvalidRequestException
     {
-        selected.addInput(rs);
+        selected.addInput(protocolVersion, rs);
     }
 
-    public ByteBuffer getOutput() throws InvalidRequestException
+    public ByteBuffer getOutput(int protocolVersion) throws InvalidRequestException
     {
-        ByteBuffer value = selected.getOutput();
+        ByteBuffer value = selected.getOutput(protocolVersion);
         if (value == null)
             return null;
         ByteBuffer[] buffers = type.split(value);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/794d68b5/src/java/org/apache/cassandra/cql3/selection/ScalarFunctionSelector.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/selection/ScalarFunctionSelector.java b/src/java/org/apache/cassandra/cql3/selection/ScalarFunctionSelector.java
index 4ceadb9..bb56bb8 100644
--- a/src/java/org/apache/cassandra/cql3/selection/ScalarFunctionSelector.java
+++ b/src/java/org/apache/cassandra/cql3/selection/ScalarFunctionSelector.java
@@ -36,12 +36,12 @@ final class ScalarFunctionSelector extends AbstractFunctionSelector<ScalarFuncti
         return argSelectors.get(0).isAggregate();
     }
 
-    public void addInput(ResultSetBuilder rs) throws InvalidRequestException
+    public void addInput(int protocolVersion, ResultSetBuilder rs) throws InvalidRequestException
     {
         for (int i = 0, m = argSelectors.size(); i < m; i++)
         {
             Selector s = argSelectors.get(i);
-            s.addInput(rs);
+            s.addInput(protocolVersion, rs);
         }
     }
 
@@ -49,15 +49,15 @@ final class ScalarFunctionSelector extends AbstractFunctionSelector<ScalarFuncti
     {
     }
 
-    public ByteBuffer getOutput() throws InvalidRequestException
+    public ByteBuffer getOutput(int protocolVersion) throws InvalidRequestException
     {
         for (int i = 0, m = argSelectors.size(); i < m; i++)
         {
             Selector s = argSelectors.get(i);
-            args.set(i, s.getOutput());
+            args.set(i, s.getOutput(protocolVersion));
             s.reset();
         }
-        return fun.execute(args);
+        return fun.execute(protocolVersion, args);
     }
 
     ScalarFunctionSelector(Function fun, List<Selector> argSelectors)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/794d68b5/src/java/org/apache/cassandra/cql3/selection/Selection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/selection/Selection.java b/src/java/org/apache/cassandra/cql3/selection/Selection.java
index 888d96d..6ad36e9 100644
--- a/src/java/org/apache/cassandra/cql3/selection/Selection.java
+++ b/src/java/org/apache/cassandra/cql3/selection/Selection.java
@@ -219,33 +219,33 @@ public abstract class Selection
             return c == null || !c.isLive(now);
         }
 
-        public void newRow() throws InvalidRequestException
+        public void newRow(int protocolVersion) throws InvalidRequestException
         {
             if (current != null)
             {
-                selectors.addInputRow(this);
+                selectors.addInputRow(protocolVersion, this);
                 if (!selectors.isAggregate())
                 {
-                    resultSet.addRow(selectors.getOutputRow());
+                    resultSet.addRow(selectors.getOutputRow(protocolVersion));
                     selectors.reset();
                 }
             }
             current = new ArrayList<ByteBuffer>(columns.size());
         }
 
-        public ResultSet build() throws InvalidRequestException
+        public ResultSet build(int protocolVersion) throws InvalidRequestException
         {
             if (current != null)
             {
-                selectors.addInputRow(this);
-                resultSet.addRow(selectors.getOutputRow());
+                selectors.addInputRow(protocolVersion, this);
+                resultSet.addRow(selectors.getOutputRow(protocolVersion));
                 selectors.reset();
                 current = null;
             }
 
             if (resultSet.isEmpty() && selectors.isAggregate())
             {
-                resultSet.addRow(selectors.getOutputRow());
+                resultSet.addRow(selectors.getOutputRow(protocolVersion));
             }
             return resultSet;
         }
@@ -268,9 +268,9 @@ public abstract class Selection
          * @param rs the <code>ResultSetBuilder</code>
          * @throws InvalidRequestException
          */
-        public void addInputRow(ResultSetBuilder rs) throws InvalidRequestException;
+        public void addInputRow(int protocolVersion, ResultSetBuilder rs) throws InvalidRequestException;
 
-        public List<ByteBuffer> getOutputRow() throws InvalidRequestException;
+        public List<ByteBuffer> getOutputRow(int protocolVersion) throws InvalidRequestException;
 
         public void reset();
     }
@@ -318,12 +318,12 @@ public abstract class Selection
                     current = null;
                 }
 
-                public List<ByteBuffer> getOutputRow()
+                public List<ByteBuffer> getOutputRow(int protocolVersion)
                 {
                     return current;
                 }
 
-                public void addInputRow(ResultSetBuilder rs) throws InvalidRequestException
+                public void addInputRow(int protocolVersion, ResultSetBuilder rs) throws InvalidRequestException
                 {
                     current = rs.current;
                 }
@@ -388,22 +388,22 @@ public abstract class Selection
                     return factories.containsOnlyAggregateFunctions();
                 }
 
-                public List<ByteBuffer> getOutputRow() throws InvalidRequestException
+                public List<ByteBuffer> getOutputRow(int protocolVersion) throws InvalidRequestException
                 {
                     List<ByteBuffer> outputRow = new ArrayList<>(selectors.size());
 
                     for (int i = 0, m = selectors.size(); i < m; i++)
                     {
-                        outputRow.add(selectors.get(i).getOutput());
+                        outputRow.add(selectors.get(i).getOutput(protocolVersion));
                     }
                     return outputRow;
                 }
 
-                public void addInputRow(ResultSetBuilder rs) throws InvalidRequestException
+                public void addInputRow(int protocolVersion, ResultSetBuilder rs) throws InvalidRequestException
                 {
                     for (int i = 0, m = selectors.size(); i < m; i++)
                     {
-                        selectors.get(i).addInput(rs);
+                        selectors.get(i).addInput(protocolVersion, rs);
                     }
                 }
             };

http://git-wip-us.apache.org/repos/asf/cassandra/blob/794d68b5/src/java/org/apache/cassandra/cql3/selection/Selector.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/selection/Selector.java b/src/java/org/apache/cassandra/cql3/selection/Selector.java
index f2c729b..0c1933f 100644
--- a/src/java/org/apache/cassandra/cql3/selection/Selector.java
+++ b/src/java/org/apache/cassandra/cql3/selection/Selector.java
@@ -120,18 +120,20 @@ public abstract class Selector implements AssignmentTestable
     /**
      * Add the current value from the specified <code>ResultSetBuilder</code>.
      *
+     * @param protocolVersion protocol version used for serialization
      * @param rs the <code>ResultSetBuilder</code>
      * @throws InvalidRequestException if a problem occurs while add the input value
      */
-    public abstract void addInput(ResultSetBuilder rs) throws InvalidRequestException;
+    public abstract void addInput(int protocolVersion, ResultSetBuilder rs) throws InvalidRequestException;
 
     /**
      * Returns the selector output.
      *
+     * @param protocolVersion protocol version used for serialization
      * @return the selector output
      * @throws InvalidRequestException if a problem occurs while computing the output value
      */
-    public abstract ByteBuffer getOutput() throws InvalidRequestException;
+    public abstract ByteBuffer getOutput(int protocolVersion) throws InvalidRequestException;
 
     /**
      * Returns the <code>Selector</code> output type.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/794d68b5/src/java/org/apache/cassandra/cql3/selection/SimpleSelector.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/selection/SimpleSelector.java b/src/java/org/apache/cassandra/cql3/selection/SimpleSelector.java
index a5ff4cd..c2edaed 100644
--- a/src/java/org/apache/cassandra/cql3/selection/SimpleSelector.java
+++ b/src/java/org/apache/cassandra/cql3/selection/SimpleSelector.java
@@ -21,6 +21,7 @@ import java.nio.ByteBuffer;
 
 import org.apache.cassandra.cql3.selection.Selection.ResultSetBuilder;
 import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.exceptions.InvalidRequestException;
 
 public final class SimpleSelector extends Selector
 {
@@ -54,13 +55,13 @@ public final class SimpleSelector extends Selector
     }
 
     @Override
-    public void addInput(ResultSetBuilder rs)
+    public void addInput(int protocolVersion, ResultSetBuilder rs) throws InvalidRequestException
     {
         current = rs.current.get(idx);
     }
 
     @Override
-    public ByteBuffer getOutput()
+    public ByteBuffer getOutput(int protocolVersion) throws InvalidRequestException
     {
         return current;
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/794d68b5/src/java/org/apache/cassandra/cql3/selection/WritetimeOrTTLSelector.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/selection/WritetimeOrTTLSelector.java b/src/java/org/apache/cassandra/cql3/selection/WritetimeOrTTLSelector.java
index 2494334..a1ecd3d 100644
--- a/src/java/org/apache/cassandra/cql3/selection/WritetimeOrTTLSelector.java
+++ b/src/java/org/apache/cassandra/cql3/selection/WritetimeOrTTLSelector.java
@@ -63,7 +63,7 @@ final class WritetimeOrTTLSelector extends Selector
         };
     }
 
-    public void addInput(ResultSetBuilder rs)
+    public void addInput(int protocolVersion, ResultSetBuilder rs)
     {
         if (isWritetime)
         {
@@ -77,7 +77,7 @@ final class WritetimeOrTTLSelector extends Selector
         }
     }
 
-    public ByteBuffer getOutput()
+    public ByteBuffer getOutput(int protocolVersion)
     {
         return current;
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/794d68b5/src/java/org/apache/cassandra/cql3/statements/CreateFunctionStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/CreateFunctionStatement.java b/src/java/org/apache/cassandra/cql3/statements/CreateFunctionStatement.java
index c41fb08..8d8c27a 100644
--- a/src/java/org/apache/cassandra/cql3/statements/CreateFunctionStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/CreateFunctionStatement.java
@@ -50,6 +50,7 @@ public final class CreateFunctionStatement extends SchemaAlteringStatement
     private final List<ColumnIdentifier> argNames;
     private final List<CQL3Type.Raw> argRawTypes;
     private final CQL3Type.Raw rawReturnType;
+    private String currentKeyspace;
 
     public CreateFunctionStatement(FunctionName functionName,
                                    String language,
@@ -74,8 +75,10 @@ public final class CreateFunctionStatement extends SchemaAlteringStatement
 
     public void prepareKeyspace(ClientState state) throws InvalidRequestException
     {
-        if (!functionName.hasKeyspace() && state.getRawKeyspace() != null)
-            functionName = new FunctionName(state.getKeyspace(), functionName.name);
+        currentKeyspace = state.getRawKeyspace();
+
+        if (!functionName.hasKeyspace() && currentKeyspace != null)
+            functionName = new FunctionName(currentKeyspace, functionName.name);
 
         if (!functionName.hasKeyspace())
             throw new InvalidRequestException("You need to be logged in a keyspace or use a fully qualified function name");
@@ -112,11 +115,9 @@ public final class CreateFunctionStatement extends SchemaAlteringStatement
 
         List<AbstractType<?>> argTypes = new ArrayList<>(argRawTypes.size());
         for (CQL3Type.Raw rawType : argRawTypes)
-            // We have no proper keyspace to give, which means that this will break (NPE currently)
-            // for UDT: #7791 is open to fix this
-            argTypes.add(rawType.prepare(functionName.keyspace).getType());
+            argTypes.add(rawType.prepare(typeKeyspace(rawType)).getType());
 
-        AbstractType<?> returnType = rawReturnType.prepare(null).getType();
+        AbstractType<?> returnType = rawReturnType.prepare(typeKeyspace(rawReturnType)).getType();
 
         Function old = Functions.find(functionName, argTypes);
         if (old != null)
@@ -126,7 +127,7 @@ public final class CreateFunctionStatement extends SchemaAlteringStatement
             if (!orReplace)
                 throw new InvalidRequestException(String.format("Function %s already exists", old));
 
-            if (!old.returnType().isValueCompatibleWith(returnType))
+            if (!Functions.typeEquals(old.returnType(), returnType))
                 throw new InvalidRequestException(String.format("Cannot replace function %s, the new return type %s is not compatible with the return type %s of existing function",
                                                                 functionName, returnType.asCQL3Type(), old.returnType().asCQL3Type()));
         }
@@ -134,4 +135,12 @@ public final class CreateFunctionStatement extends SchemaAlteringStatement
         MigrationManager.announceNewFunction(UDFunction.create(functionName, argNames, argTypes, returnType, language, body, deterministic), isLocalOnly);
         return true;
     }
+
+    private String typeKeyspace(CQL3Type.Raw rawType)
+    {
+        String ks = rawType.keyspace();
+        if (ks != null)
+            return ks;
+        return functionName.keyspace;
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/794d68b5/src/java/org/apache/cassandra/cql3/statements/DropFunctionStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/DropFunctionStatement.java b/src/java/org/apache/cassandra/cql3/statements/DropFunctionStatement.java
index 4cd9460..0ba3721 100644
--- a/src/java/org/apache/cassandra/cql3/statements/DropFunctionStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/DropFunctionStatement.java
@@ -103,7 +103,7 @@ public final class DropFunctionStatement extends SchemaAlteringStatement
 
         List<AbstractType<?>> argTypes = new ArrayList<>(argRawTypes.size());
         for (CQL3Type.Raw rawType : argRawTypes)
-            argTypes.add(rawType.prepare(functionName.keyspace).getType());
+            argTypes.add(rawType.prepare(typeKeyspace(rawType)).getType());
 
         Function old;
         if (argsPresent)
@@ -139,4 +139,12 @@ public final class DropFunctionStatement extends SchemaAlteringStatement
         MigrationManager.announceFunctionDrop((UDFunction)old, isLocalOnly);
         return true;
     }
+
+    private String typeKeyspace(CQL3Type.Raw rawType)
+    {
+        String ks = rawType.keyspace();
+        if (ks != null)
+            return ks;
+        return functionName.keyspace;
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/794d68b5/src/java/org/apache/cassandra/cql3/statements/DropTypeStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/DropTypeStatement.java b/src/java/org/apache/cassandra/cql3/statements/DropTypeStatement.java
index a3b82a4..ed21957 100644
--- a/src/java/org/apache/cassandra/cql3/statements/DropTypeStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/DropTypeStatement.java
@@ -20,6 +20,8 @@ package org.apache.cassandra.cql3.statements;
 import org.apache.cassandra.auth.Permission;
 import org.apache.cassandra.config.*;
 import org.apache.cassandra.cql3.*;
+import org.apache.cassandra.cql3.functions.Function;
+import org.apache.cassandra.cql3.functions.Functions;
 import org.apache.cassandra.db.marshal.*;
 import org.apache.cassandra.exceptions.*;
 import org.apache.cassandra.service.ClientState;
@@ -72,6 +74,15 @@ public class DropTypeStatement extends SchemaAlteringStatement
         // we drop and 2) existing tables referencing the type (maybe in a nested
         // way).
 
+        for (Function function : Functions.all())
+        {
+            if (isUsedBy(function.returnType()))
+                throw new InvalidRequestException(String.format("Cannot drop user type %s as it is still used by function %s", name, function));
+            for (AbstractType<?> argType : function.argTypes())
+                if (isUsedBy(argType))
+                    throw new InvalidRequestException(String.format("Cannot drop user type %s as it is still used by function %s", name, function));
+        }
+
         for (KSMetaData ksm2 : Schema.instance.getKeyspaceDefinitions())
         {
             for (UserType ut : ksm2.userTypes.getAllTypes().values())

http://git-wip-us.apache.org/repos/asf/cassandra/blob/794d68b5/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
index 2607e12..4e39614 100644
--- a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
@@ -630,7 +630,7 @@ public abstract class ModificationStatement implements CQLStatement
         Selection.ResultSetBuilder builder = selection.resultSetBuilder(now);
         SelectStatement.forSelection(cfm, selection).processColumnFamily(key, cf, options, now, builder);
 
-        return builder.build();
+        return builder.build(options.getProtocolVersion());
     }
 
     public ResultMessage executeInternal(QueryState queryState, QueryOptions options) throws RequestValidationException, RequestExecutionException

http://git-wip-us.apache.org/repos/asf/cassandra/blob/794d68b5/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
index 2d28b71..3360d40 100644
--- a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
@@ -280,7 +280,7 @@ public class SelectStatement implements CQLStatement
                 processColumnFamily(row.key.getKey(), row.cf, options, now, result);
             }
         }
-        return new ResultMessage.Rows(result.build());
+        return new ResultMessage.Rows(result.build(options.getProtocolVersion()));
     }
 
     public ResultMessage.Rows processResults(List<Row> rows, QueryOptions options, int limit, long now) throws RequestValidationException
@@ -1149,7 +1149,7 @@ public class SelectStatement implements CQLStatement
             processColumnFamily(row.key.getKey(), row.cf, options, now, result);
         }
 
-        ResultSet cqlRows = result.build();
+        ResultSet cqlRows = result.build(options.getProtocolVersion());
 
         orderResults(cqlRows);
 
@@ -1189,7 +1189,7 @@ public class SelectStatement implements CQLStatement
         CQL3Row staticRow = iter.getStaticRow();
         if (staticRow != null && !iter.hasNext() && !usesSecondaryIndexing && hasNoClusteringColumnsRestriction())
         {
-            result.newRow();
+            result.newRow(options.getProtocolVersion());
             for (ColumnDefinition def : selection.getColumns())
             {
                 switch (def.kind)
@@ -1212,7 +1212,7 @@ public class SelectStatement implements CQLStatement
             CQL3Row cql3Row = iter.next();
 
             // Respect requested order
-            result.newRow();
+            result.newRow(options.getProtocolVersion());
             // Respect selection order
             for (ColumnDefinition def : selection.getColumns())
             {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/794d68b5/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java
index 555c1cd..21e30e2 100644
--- a/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java
+++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java
@@ -236,7 +236,7 @@ public class CqlRecordReader extends RecordReader<Long, Row>
 
     public Long createKey()
     {
-        return new Long(0L);
+        return Long.valueOf(0L);
     }
 
     public Row createValue()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/794d68b5/src/java/org/apache/cassandra/transport/Server.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/Server.java b/src/java/org/apache/cassandra/transport/Server.java
index 15fad88..cc071b1 100644
--- a/src/java/org/apache/cassandra/transport/Server.java
+++ b/src/java/org/apache/cassandra/transport/Server.java
@@ -64,6 +64,7 @@ public class Server implements CassandraDaemon.Server
     private static final Logger logger = LoggerFactory.getLogger(Server.class);
     private static final boolean enableEpoll = Boolean.valueOf(System.getProperty("cassandra.native.epoll.enabled", "true"));
 
+    public static final int VERSION_2 = 2;
     public static final int VERSION_3 = 3;
     public static final int CURRENT_VERSION = VERSION_3;
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/794d68b5/test/unit/org/apache/cassandra/cql3/CQLTester.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/CQLTester.java b/test/unit/org/apache/cassandra/cql3/CQLTester.java
index 9105b9d..68f90bd 100644
--- a/test/unit/org/apache/cassandra/cql3/CQLTester.java
+++ b/test/unit/org/apache/cassandra/cql3/CQLTester.java
@@ -18,8 +18,11 @@
 package org.apache.cassandra.cql3;
 
 import java.io.File;
+import java.io.IOException;
 import java.math.BigDecimal;
 import java.math.BigInteger;
+import java.net.InetAddress;
+import java.net.ServerSocket;
 import java.nio.ByteBuffer;
 import java.util.*;
 import java.util.concurrent.CountDownLatch;
@@ -29,23 +32,31 @@ import java.util.concurrent.atomic.AtomicInteger;
 
 import com.google.common.base.Objects;
 import com.google.common.collect.ImmutableSet;
+import org.apache.cassandra.utils.ByteBufferUtil;
 import org.junit.AfterClass;
 import org.junit.After;
 import org.junit.Assert;
+import org.junit.Before;
 import org.junit.BeforeClass;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.datastax.driver.core.*;
+import com.datastax.driver.core.ResultSet;
 import org.apache.cassandra.SchemaLoader;
 import org.apache.cassandra.concurrent.ScheduledExecutors;
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.db.Directories;
 import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.SystemKeyspace;
 import org.apache.cassandra.db.marshal.*;
+import org.apache.cassandra.db.marshal.TupleType;
 import org.apache.cassandra.exceptions.*;
 import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.serializers.TypeSerializer;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.transport.Server;
 
 /**
  * Base class for CQL tests.
@@ -55,17 +66,38 @@ public abstract class CQLTester
     protected static final Logger logger = LoggerFactory.getLogger(CQLTester.class);
 
     public static final String KEYSPACE = "cql_test_keyspace";
+    public static final String KEYSPACE_PER_TEST = "cql_test_keyspace_alt";
     private static final boolean USE_PREPARED_VALUES = Boolean.valueOf(System.getProperty("cassandra.test.use_prepared", "true"));
     private static final AtomicInteger seqNumber = new AtomicInteger();
 
+    private static org.apache.cassandra.transport.Server server;
+    private static final int nativePort;
+    private static final InetAddress nativeAddr;
+    private static final Cluster cluster[] = new Cluster[Server.CURRENT_VERSION];
+    private static final Session session[] = new Session[Server.CURRENT_VERSION];
+
     static
     {
         // Once per-JVM is enough
         SchemaLoader.prepareServer();
+
+        nativeAddr = InetAddress.getLoopbackAddress();
+
+        try
+        {
+            ServerSocket serverSocket = new ServerSocket(0);
+            nativePort = serverSocket.getLocalPort();
+        }
+        catch (IOException e)
+        {
+            throw new RuntimeException(e);
+        }
     }
 
     private String currentTable;
     private final Set<String> currentTypes = new HashSet<>();
+    private final Set<String> currentFunctions = new HashSet<>();
+    private final Set<String> currentAggregates = new HashSet<>();
 
     // We don't use USE_PREPARED_VALUES in the code below so some test can foce value preparation (if the result
     // is not expected to be the same without preparation)
@@ -80,11 +112,28 @@ public abstract class CQLTester
     @AfterClass
     public static void tearDownClass()
     {
+        for (Session sess : session)
+            if (sess != null)
+                sess.close();
+        for (Cluster cl : cluster)
+            if (cl != null)
+                cl.close();
+
+        if (server != null)
+            server.stop();
+    }
+
+    @Before
+    public void beforeTest() throws Throwable
+    {
+        schemaChange(String.format("CREATE KEYSPACE IF NOT EXISTS %s WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'}", KEYSPACE_PER_TEST));
     }
 
     @After
     public void afterTest() throws Throwable
     {
+        dropPerTestKeyspace();
+
         // Restore standard behavior in case it was changed
         usePrepared = USE_PREPARED_VALUES;
 
@@ -93,8 +142,12 @@ public abstract class CQLTester
 
         final String tableToDrop = currentTable;
         final Set<String> typesToDrop = currentTypes.isEmpty() ? Collections.emptySet() : new HashSet(currentTypes);
+        final Set<String> functionsToDrop = currentFunctions.isEmpty() ? Collections.emptySet() : new HashSet(currentFunctions);
+        final Set<String> aggregatesToDrop = currentAggregates.isEmpty() ? Collections.emptySet() : new HashSet(currentAggregates);
         currentTable = null;
         currentTypes.clear();
+        currentFunctions.clear();
+        currentAggregates.clear();
 
         // We want to clean up after the test, but dropping a table is rather long so just do that asynchronously
         ScheduledExecutors.optionalTasks.execute(new Runnable()
@@ -105,6 +158,12 @@ public abstract class CQLTester
                 {
                     schemaChange(String.format("DROP TABLE IF EXISTS %s.%s", KEYSPACE, tableToDrop));
 
+                    for (String aggregateName : aggregatesToDrop)
+                        schemaChange(String.format("DROP AGGREGATE IF EXISTS %s", aggregateName));
+
+                    for (String functionName : functionsToDrop)
+                        schemaChange(String.format("DROP FUNCTION IF EXISTS %s", functionName));
+
                     for (String typeName : typesToDrop)
                         schemaChange(String.format("DROP TYPE IF EXISTS %s.%s", KEYSPACE, typeName));
 
@@ -133,6 +192,40 @@ public abstract class CQLTester
         });
     }
 
+    // lazy initialization for all tests that require Java Driver
+    private static void requireNetwork() throws ConfigurationException
+    {
+        if (server != null)
+            return;
+
+        SystemKeyspace.finishStartup();
+        StorageService.instance.initServer();
+        SchemaLoader.startGossiper();
+
+        server = new org.apache.cassandra.transport.Server(nativeAddr, nativePort);
+        server.start();
+
+        for (int version = 1; version <= Server.CURRENT_VERSION; version++)
+        {
+            if (cluster[version-1] != null)
+                continue;
+
+            cluster[version-1] = Cluster.builder().addContactPoints(nativeAddr)
+                                  .withClusterName("Test Cluster")
+                                  .withPort(nativePort)
+                                  .withProtocolVersion(ProtocolVersion.fromInt(version))
+                                  .build();
+            session[version-1] = cluster[version-1].connect();
+
+            logger.info("Started Java Driver instance for protocol version {}", version);
+        }
+    }
+
+    protected void dropPerTestKeyspace() throws Throwable
+    {
+        execute(String.format("DROP KEYSPACE IF EXISTS %s", KEYSPACE_PER_TEST));
+    }
+
     public void flush()
     {
         try
@@ -183,7 +276,7 @@ public abstract class CQLTester
 
     protected String createType(String query)
     {
-        String typeName = "type_" + seqNumber.getAndIncrement();
+        String typeName = callerName() + "_type_" + seqNumber.getAndIncrement();
         String fullQuery = String.format(query, KEYSPACE + "." + typeName);
         currentTypes.add(typeName);
         logger.info(fullQuery);
@@ -191,18 +284,48 @@ public abstract class CQLTester
         return typeName;
     }
 
+    protected String createFunction(String keyspace, String argTypes, String query) throws Throwable
+    {
+        String functionName = keyspace + "." + callerName() + "_function_" + seqNumber.getAndIncrement();
+        createFunctionOverload(functionName, argTypes, query);
+        return functionName;
+    }
+
+    protected void createFunctionOverload(String functionName, String argTypes, String query) throws Throwable
+    {
+        String fullQuery = String.format(query, functionName);
+        currentFunctions.add(functionName + '(' + argTypes + ')');
+        logger.info(fullQuery);
+        execute(fullQuery);
+    }
+
+    protected String createAggregate(String keyspace, String argTypes, String query) throws Throwable
+    {
+        String aggregateName = keyspace + "." + callerName() + "_aggregate_" + seqNumber.getAndIncrement();
+        createAggregateOverload(aggregateName, argTypes, query);
+        return aggregateName;
+    }
+
+    protected void createAggregateOverload(String aggregateName, String argTypes, String query) throws Throwable
+    {
+        String fullQuery = String.format(query, aggregateName);
+        currentAggregates.add(aggregateName + '(' + argTypes + ')');
+        logger.info(fullQuery);
+        execute(fullQuery);
+    }
+
     protected void createTable(String query)
     {
-        currentTable = "table_" + seqNumber.getAndIncrement();
-        String fullQuery = String.format(query, KEYSPACE + "." + currentTable);
+        currentTable = callerName() + "_table_" + seqNumber.getAndIncrement();
+        String fullQuery = formatQuery(query);
         logger.info(fullQuery);
         schemaChange(fullQuery);
     }
 
     protected void createTableMayThrow(String query) throws Throwable
     {
-        currentTable = "table_" + seqNumber.getAndIncrement();
-        String fullQuery = String.format(query, KEYSPACE + "." + currentTable);
+        currentTable = callerName() + "_table_" + seqNumber.getAndIncrement();
+        String fullQuery = formatQuery(query);
         logger.info(fullQuery);
         try
         {
@@ -216,14 +339,14 @@ public abstract class CQLTester
 
     protected void alterTable(String query)
     {
-        String fullQuery = String.format(query, KEYSPACE + "." + currentTable);
+        String fullQuery = formatQuery(query);
         logger.info(fullQuery);
         schemaChange(fullQuery);
     }
 
     protected void alterTableMayThrow(String query) throws Throwable
     {
-        String fullQuery = String.format(query, KEYSPACE + "." + currentTable);
+        String fullQuery = formatQuery(query);
         logger.info(fullQuery);
         try
         {
@@ -244,14 +367,14 @@ public abstract class CQLTester
 
     protected void createIndex(String query)
     {
-        String fullQuery = String.format(query, KEYSPACE + "." + currentTable);
+        String fullQuery = formatQuery(query);
         logger.info(fullQuery);
         schemaChange(fullQuery);
     }
 
     protected void createIndexMayThrow(String query) throws Throwable
     {
-        String fullQuery = String.format(query, KEYSPACE + "." + currentTable);
+        String fullQuery = formatQuery(query);
         logger.info(fullQuery);
         try
         {
@@ -270,6 +393,11 @@ public abstract class CQLTester
         schemaChange(fullQuery);
     }
 
+    private static String callerName()
+    {
+        return new Exception().getStackTrace()[2].getMethodName().toLowerCase();
+    }
+
     private static void schemaChange(String query)
     {
         try
@@ -288,11 +416,24 @@ public abstract class CQLTester
         return Schema.instance.getCFMetaData(KEYSPACE, currentTable);
     }
 
+    protected com.datastax.driver.core.ResultSet executeNet(int protocolVersion, String query, Object... values) throws Throwable
+    {
+        requireNetwork();
+
+        return session[protocolVersion-1].execute(formatQuery(query), values);
+    }
+
+    private String formatQuery(String query)
+    {
+        query = currentTable == null ? query : String.format(query, KEYSPACE + "." + currentTable);
+        return query;
+    }
+
     protected UntypedResultSet execute(String query, Object... values) throws Throwable
     {
         try
         {
-            query = currentTable == null ? query : String.format(query, KEYSPACE + "." + currentTable);
+            query = formatQuery(query);
 
             UntypedResultSet rs;
             if (usePrepared)
@@ -318,6 +459,64 @@ public abstract class CQLTester
         }
     }
 
+    protected void assertRowsNet(int protocolVersion, ResultSet result, Object[]... rows)
+    {
+        if (result == null)
+        {
+            if (rows.length > 0)
+                Assert.fail(String.format("No rows returned by query but %d expected", rows.length));
+            return;
+        }
+
+        ColumnDefinitions meta = result.getColumnDefinitions();
+        Iterator<Row> iter = result.iterator();
+        int i = 0;
+        while (iter.hasNext() && i < rows.length)
+        {
+            Object[] expected = rows[i];
+            Row actual = iter.next();
+
+            Assert.assertEquals(String.format("Invalid number of (expected) values provided for row %d (using protocol version %d)",
+                                              i, protocolVersion),
+                                meta.size(), expected.length);
+
+            for (int j = 0; j < meta.size(); j++)
+            {
+                DataType type = meta.getType(j);
+                ByteBuffer expectedByteValue = type.serialize(expected[j], ProtocolVersion.fromInt(protocolVersion));
+                int expectedBytes = expectedByteValue.remaining();
+                ByteBuffer actualValue = actual.getBytesUnsafe(meta.getName(j));
+                int actualBytes = actualValue.remaining();
+
+                if (!Objects.equal(expectedByteValue, actualValue))
+                    Assert.fail(String.format("Invalid value for row %d column %d (%s of type %s), " +
+                                              "expected <%s> (%d bytes) but got <%s> (%d bytes) " +
+                                              "(using protocol version %d)",
+                                              i, j, meta.getName(j), type,
+                                              type.format(expected[j]),
+                                              expectedBytes,
+                                              type.format(type.deserialize(actualValue, ProtocolVersion.fromInt(protocolVersion))),
+                                              actualBytes,
+                                              protocolVersion));
+            }
+            i++;
+        }
+
+        if (iter.hasNext())
+        {
+            while (iter.hasNext())
+            {
+                iter.next();
+                i++;
+            }
+            Assert.fail(String.format("Got less rows than expected. Expected %d but got %d (using protocol version %d).",
+                                      rows.length, i, protocolVersion));
+        }
+
+        Assert.assertTrue(String.format("Got %s rows than expected. Expected %d but got %d (using protocol version %d)",
+                                        rows.length>i ? "less" : "more", rows.length, i, protocolVersion), i == rows.length);
+    }
+
     protected void assertRows(UntypedResultSet result, Object[]... rows)
     {
         if (result == null)
@@ -394,7 +593,7 @@ public abstract class CQLTester
     protected void assertEmpty(UntypedResultSet result) throws Throwable
     {
         if (result != null && result.size() != 0)
-            throw new InvalidRequestException(String.format("Expected empty result but got %d rows", result.size()));
+            throw new AssertionError(String.format("Expected empty result but got %d rows", result.size()));
     }
 
     protected void assertInvalid(String query, Object... values) throws Throwable
@@ -406,7 +605,16 @@ public abstract class CQLTester
     {
         try
         {
-            execute(query, values);
+            try
+            {
+                execute(query, values);
+            }
+            catch (RuntimeException e)
+            {
+                Throwable cause = e.getCause();
+                if (cause instanceof InvalidRequestException)
+                    throw cause;
+            }
             String q = USE_PREPARED_VALUES
                      ? query + " (values: " + formatAllValues(values) + ")"
                      : replaceValues(query, values);