You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ty...@apache.org on 2014/11/27 00:50:00 UTC

[2/2] cassandra git commit: Support for UDTs, tuples, and collections in UDFs

Support for UDTs, tuples, and collections in UDFs

Patch by Robert Stupp; reviewed by Tyler Hobbs for CASSANDRA-7563


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/794d68b5
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/794d68b5
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/794d68b5

Branch: refs/heads/trunk
Commit: 794d68b51b77c2a3cb09374010b6f84231ead604
Parents: e131213
Author: Robert Stupp <sn...@snazy.de>
Authored: Wed Nov 26 17:49:45 2014 -0600
Committer: Tyler Hobbs <ty...@datastax.com>
Committed: Wed Nov 26 17:49:45 2014 -0600

----------------------------------------------------------------------
 CHANGES.txt                                     |    2 +
 .../org/apache/cassandra/cql3/CQL3Type.java     |   12 +-
 .../cql3/functions/BytesConversionFcts.java     |    8 +-
 .../cassandra/cql3/functions/FunctionCall.java  |    9 +-
 .../cassandra/cql3/functions/FunctionName.java  |    1 -
 .../cassandra/cql3/functions/Functions.java     |   50 +-
 .../cql3/functions/JavaSourceUDFFactory.java    |   51 +-
 .../cql3/functions/ScalarFunction.java          |    3 +-
 .../cql3/functions/ScriptBasedUDF.java          |   13 +-
 .../cassandra/cql3/functions/TimeuuidFcts.java  |   10 +-
 .../cassandra/cql3/functions/TokenFct.java      |    2 +-
 .../cassandra/cql3/functions/UDFunction.java    |  177 ++-
 .../cassandra/cql3/functions/UuidFcts.java      |    2 +-
 .../selection/AggregateFunctionSelector.java    |    8 +-
 .../cassandra/cql3/selection/FieldSelector.java |    8 +-
 .../cql3/selection/ScalarFunctionSelector.java  |   10 +-
 .../cassandra/cql3/selection/Selection.java     |   30 +-
 .../cassandra/cql3/selection/Selector.java      |    6 +-
 .../cql3/selection/SimpleSelector.java          |    5 +-
 .../cql3/selection/WritetimeOrTTLSelector.java  |    4 +-
 .../statements/CreateFunctionStatement.java     |   23 +-
 .../cql3/statements/DropFunctionStatement.java  |   10 +-
 .../cql3/statements/DropTypeStatement.java      |   11 +
 .../cql3/statements/ModificationStatement.java  |    2 +-
 .../cql3/statements/SelectStatement.java        |    8 +-
 .../cassandra/hadoop/cql3/CqlRecordReader.java  |    2 +-
 .../org/apache/cassandra/transport/Server.java  |    1 +
 .../org/apache/cassandra/cql3/CQLTester.java    |  232 ++-
 test/unit/org/apache/cassandra/cql3/UFTest.java | 1356 ++++++++++++++----
 tools/lib/cassandra-driver-core-2.0.5.jar       |  Bin 544552 -> 0 bytes
 30 files changed, 1643 insertions(+), 413 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/794d68b5/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 162d579..55c86dd 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,6 @@
 3.0
+ * Support UDTs, tuples, and collections in user-defined
+   functions (CASSANDRA-7563)
  * Fix aggregate fn results on empty selection, result column name,
    and cqlsh parsing (CASSANDRA-8229)
  * Mark sstables as repaired after full repair (CASSANDRA-7586)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/794d68b5/src/java/org/apache/cassandra/cql3/CQL3Type.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/CQL3Type.java b/src/java/org/apache/cassandra/cql3/CQL3Type.java
index b656de8..98d1b15 100644
--- a/src/java/org/apache/cassandra/cql3/CQL3Type.java
+++ b/src/java/org/apache/cassandra/cql3/CQL3Type.java
@@ -315,6 +315,11 @@ public interface CQL3Type
             return false;
         }
 
+        public String keyspace()
+        {
+            return null;
+        }
+
         public void freeze() throws InvalidRequestException
         {
             String message = String.format("frozen<> is only allowed on collections, tuples, and user-defined types (got %s)", this);
@@ -474,6 +479,11 @@ public interface CQL3Type
                 this.name = name;
             }
 
+            public String keyspace()
+            {
+                return name.getKeyspace();
+            }
+
             public void freeze()
             {
                 frozen = true;
@@ -485,7 +495,7 @@ public interface CQL3Type
                 {
                     // The provided keyspace is the one of the current statement this is part of. If it's different from the keyspace of
                     // the UTName, we reject since we want to limit user types to their own keyspace (see #6643)
-                    if (!keyspace.equals(name.getKeyspace()))
+                    if (keyspace != null && !keyspace.equals(name.getKeyspace()))
                         throw new InvalidRequestException(String.format("Statement on keyspace %s cannot refer to a user type in keyspace %s; "
                                                                         + "user types can only be used in the keyspace they are defined in",
                                                                         keyspace, name.getKeyspace()));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/794d68b5/src/java/org/apache/cassandra/cql3/functions/BytesConversionFcts.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/functions/BytesConversionFcts.java b/src/java/org/apache/cassandra/cql3/functions/BytesConversionFcts.java
index 1cd1d69..ddb33fc 100644
--- a/src/java/org/apache/cassandra/cql3/functions/BytesConversionFcts.java
+++ b/src/java/org/apache/cassandra/cql3/functions/BytesConversionFcts.java
@@ -36,7 +36,7 @@ public abstract class BytesConversionFcts
         String name = fromType.asCQL3Type() + "asblob";
         return new NativeScalarFunction(name, BytesType.instance, fromType)
         {
-            public ByteBuffer execute(List<ByteBuffer> parameters)
+            public ByteBuffer execute(int protocolVersion, List<ByteBuffer> parameters)
             {
                 return parameters.get(0);
             }
@@ -48,7 +48,7 @@ public abstract class BytesConversionFcts
         final String name = "blobas" + toType.asCQL3Type();
         return new NativeScalarFunction(name, toType, BytesType.instance)
         {
-            public ByteBuffer execute(List<ByteBuffer> parameters) throws InvalidRequestException
+            public ByteBuffer execute(int protocolVersion, List<ByteBuffer> parameters) throws InvalidRequestException
             {
                 ByteBuffer val = parameters.get(0);
                 try
@@ -68,7 +68,7 @@ public abstract class BytesConversionFcts
 
     public static final Function VarcharAsBlobFct = new NativeScalarFunction("varcharasblob", BytesType.instance, UTF8Type.instance)
     {
-        public ByteBuffer execute(List<ByteBuffer> parameters)
+        public ByteBuffer execute(int protocolVersion, List<ByteBuffer> parameters)
         {
             return parameters.get(0);
         }
@@ -76,7 +76,7 @@ public abstract class BytesConversionFcts
 
     public static final Function BlobAsVarcharFact = new NativeScalarFunction("blobasvarchar", UTF8Type.instance, BytesType.instance)
     {
-        public ByteBuffer execute(List<ByteBuffer> parameters)
+        public ByteBuffer execute(int protocolVersion, List<ByteBuffer> parameters)
         {
             return parameters.get(0);
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/794d68b5/src/java/org/apache/cassandra/cql3/functions/FunctionCall.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/functions/FunctionCall.java b/src/java/org/apache/cassandra/cql3/functions/FunctionCall.java
index efaa12a..01443d2 100644
--- a/src/java/org/apache/cassandra/cql3/functions/FunctionCall.java
+++ b/src/java/org/apache/cassandra/cql3/functions/FunctionCall.java
@@ -27,6 +27,7 @@ import org.apache.cassandra.db.marshal.ListType;
 import org.apache.cassandra.db.marshal.MapType;
 import org.apache.cassandra.db.marshal.SetType;
 import org.apache.cassandra.exceptions.InvalidRequestException;
+import org.apache.cassandra.transport.Server;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.serializers.MarshalException;
 
@@ -69,12 +70,12 @@ public class FunctionCall extends Term.NonTerminal
                 throw new InvalidRequestException(String.format("Invalid null value for argument to %s", fun));
             buffers.add(val);
         }
-        return executeInternal(fun, buffers);
+        return executeInternal(options.getProtocolVersion(), fun, buffers);
     }
 
-    private static ByteBuffer executeInternal(ScalarFunction fun, List<ByteBuffer> params) throws InvalidRequestException
+    private static ByteBuffer executeInternal(int protocolVersion, ScalarFunction fun, List<ByteBuffer> params) throws InvalidRequestException
     {
-        ByteBuffer result = fun.execute(params);
+        ByteBuffer result = fun.execute(protocolVersion, params);
         try
         {
             // Check the method didn't lied on it's declared return type
@@ -172,7 +173,7 @@ public class FunctionCall extends Term.NonTerminal
                 buffers.add(((Term.Terminal)t).get(QueryOptions.DEFAULT));
             }
 
-            return executeInternal(fun, buffers);
+            return executeInternal(Server.CURRENT_VERSION, fun, buffers);
         }
 
         public AssignmentTestable.TestResult testAssignment(String keyspace, ColumnSpecification receiver)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/794d68b5/src/java/org/apache/cassandra/cql3/functions/FunctionName.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/functions/FunctionName.java b/src/java/org/apache/cassandra/cql3/functions/FunctionName.java
index 460e7a6..bb30040 100644
--- a/src/java/org/apache/cassandra/cql3/functions/FunctionName.java
+++ b/src/java/org/apache/cassandra/cql3/functions/FunctionName.java
@@ -19,7 +19,6 @@ package org.apache.cassandra.cql3.functions;
 
 import com.google.common.base.Objects;
 
-import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.db.SystemKeyspace;
 
 public final class FunctionName

http://git-wip-us.apache.org/repos/asf/cassandra/blob/794d68b5/src/java/org/apache/cassandra/cql3/functions/Functions.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/functions/Functions.java b/src/java/org/apache/cassandra/cql3/functions/Functions.java
index 7021475..a8fdf0f 100644
--- a/src/java/org/apache/cassandra/cql3/functions/Functions.java
+++ b/src/java/org/apache/cassandra/cql3/functions/Functions.java
@@ -18,6 +18,7 @@
 package org.apache.cassandra.cql3.functions;
 
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.List;
 
 import com.google.common.collect.ArrayListMultimap;
@@ -29,6 +30,8 @@ import org.apache.cassandra.cql3.*;
 import org.apache.cassandra.db.SystemKeyspace;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.exceptions.InvalidRequestException;
+import org.apache.cassandra.service.IMigrationListener;
+import org.apache.cassandra.service.MigrationManager;
 
 public abstract class Functions
 {
@@ -83,6 +86,8 @@ public abstract class Functions
         declare(AggregateFcts.avgFunctionForDouble);
         declare(AggregateFcts.avgFunctionForVarint);
         declare(AggregateFcts.avgFunctionForDecimal);
+
+        MigrationManager.instance.register(new FunctionsMigrationListener());
     }
 
     private static void declare(Function fun)
@@ -188,7 +193,7 @@ public abstract class Functions
         assert name.hasKeyspace() : "function name not fully qualified";
         for (Function f : declared.get(name))
         {
-            if (f.argTypes().equals(argTypes))
+            if (typeEquals(f.argTypes(), argTypes))
                 return f;
         }
         return null;
@@ -284,4 +289,47 @@ public abstract class Functions
         removeFunction(fun.name(), fun.argTypes());
         addFunction(fun);
     }
+
+    public static Collection<Function> all()
+    {
+        return declared.values();
+    }
+
+    public static boolean typeEquals(AbstractType<?> t1, AbstractType<?> t2)
+    {
+        return t1.asCQL3Type().toString().equals(t2.asCQL3Type().toString());
+    }
+
+    public static boolean typeEquals(List<AbstractType<?>> t1, List<AbstractType<?>> t2)
+    {
+        if (t1.size() != t2.size())
+            return false;
+        for (int i = 0; i < t1.size(); i ++)
+            if (!typeEquals(t1.get(i), t2.get(i)))
+                return false;
+        return true;
+    }
+
+    private static class FunctionsMigrationListener implements IMigrationListener
+    {
+        public void onCreateKeyspace(String ksName) { }
+        public void onCreateColumnFamily(String ksName, String cfName) { }
+        public void onCreateUserType(String ksName, String typeName) { }
+        public void onCreateFunction(String ksName, String functionName) { }
+
+        public void onUpdateKeyspace(String ksName) { }
+        public void onUpdateColumnFamily(String ksName, String cfName) { }
+        public void onUpdateUserType(String ksName, String typeName) {
+            for (Function function : all())
+                if (function instanceof UDFunction)
+                    ((UDFunction)function).userTypeUpdated(ksName, typeName);
+        }
+        public void onUpdateFunction(String ksName, String functionName) { }
+
+        public void onDropKeyspace(String ksName) { }
+        public void onDropColumnFamily(String ksName, String cfName) { }
+        public void onDropUserType(String ksName, String typeName) { }
+        public void onDropFunction(String ksName, String functionName) { }
+
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/794d68b5/src/java/org/apache/cassandra/cql3/functions/JavaSourceUDFFactory.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/functions/JavaSourceUDFFactory.java b/src/java/org/apache/cassandra/cql3/functions/JavaSourceUDFFactory.java
index 0f5fe48..560f077 100644
--- a/src/java/org/apache/cassandra/cql3/functions/JavaSourceUDFFactory.java
+++ b/src/java/org/apache/cassandra/cql3/functions/JavaSourceUDFFactory.java
@@ -20,13 +20,13 @@ package org.apache.cassandra.cql3.functions;
 import java.lang.reflect.Constructor;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Modifier;
-import java.nio.ByteBuffer;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.datastax.driver.core.DataType;
 import javassist.CannotCompileException;
 import javassist.ClassPool;
 import javassist.CtClass;
@@ -56,8 +56,14 @@ public final class JavaSourceUDFFactory
                                boolean deterministic)
     throws InvalidRequestException
     {
-        Class<?> javaReturnType = UDFunction.javaType(returnType);
-        Class<?>[] javaParamTypes = UDFunction.javaParamTypes(argTypes);
+        // argDataTypes is just the C* internal argTypes converted to the Java Driver DataType
+        DataType[] argDataTypes = UDFunction.driverTypes(argTypes);
+        // returnDataType is just the C* internal returnType converted to the Java Driver DataType
+        DataType returnDataType = UDFunction.driverType(returnType);
+        // javaParamTypes is just the Java representation for argTypes resp. argDataTypes
+        Class<?>[] javaParamTypes = UDFunction.javaTypes(argDataTypes);
+        // javaReturnType is just the Java representation for returnType resp. returnDataType
+        Class<?> javaReturnType = returnDataType.asJavaClass();
 
         String clsName = generateClassName(name);
 
@@ -92,9 +98,13 @@ public final class JavaSourceUDFFactory
 
             Constructor ctor =
                 cc.toClass().getDeclaredConstructor(
-                   FunctionName.class, List.class, List.class,
-                   AbstractType.class, String.class, boolean.class);
-            return (UDFunction) ctor.newInstance(name, argNames, argTypes, returnType, body, deterministic);
+                   FunctionName.class, List.class, List.class, DataType[].class,
+                   AbstractType.class, DataType.class,
+                   String.class, boolean.class);
+            return (UDFunction) ctor.newInstance(
+                   name, argNames, argTypes, argDataTypes,
+                   returnType, returnDataType,
+                   body, deterministic);
         }
         catch (NotFoundException | CannotCompileException | NoSuchMethodException | LinkageError | InstantiationException | IllegalAccessException e)
         {
@@ -133,10 +143,12 @@ public final class JavaSourceUDFFactory
                "(org.apache.cassandra.cql3.functions.FunctionName name, " +
                "java.util.List argNames, " +
                "java.util.List argTypes, " +
+               "com.datastax.driver.core.DataType[] argDataTypes, " +
                "org.apache.cassandra.db.marshal.AbstractType returnType, " +
+               "com.datastax.driver.core.DataType returnDataType, " +
                "String body," +
                "boolean deterministic)\n{" +
-               "  super(name, argNames, argTypes, returnType, \"java\", body, deterministic);\n" +
+               "  super(name, argNames, argTypes, argDataTypes, returnType, returnDataType, \"java\", body, deterministic);\n" +
                "}";
     }
 
@@ -177,15 +189,17 @@ public final class JavaSourceUDFFactory
      * Generated looks like this:
      * <code><pre>
      *
-     * public java.nio.ByteBuffer execute(java.util.List params)
+     * public java.nio.ByteBuffer execute(int protocolVersion, java.util.List params)
      * throws org.apache.cassandra.exceptions.InvalidRequestException
      * {
      *     try
      *     {
      *         Object result = executeInternal(
-     *             (<cast to JAVA_ARG_TYPE>)org.apache.cassandra.cql3.functions.JavaSourceUDFFactory.compose(argTypes, params, 0)
+     *             (<cast to JAVA_ARG_TYPE>)compose(protocolVersion, 0, (java.nio.ByteBuffer)params.get(0)),
+     *             (<cast to JAVA_ARG_TYPE>)compose(protocolVersion, 1, (java.nio.ByteBuffer)params.get(1)),
+     *             ...
      *         );
-     *         return result != null ? returnType.decompose(result) : null;
+     *         return decompose(protocolVersion, result);
      *     }
      *     catch (Throwable t)
      *     {
@@ -202,7 +216,7 @@ public final class JavaSourceUDFFactory
         // usual methods are 700-800 chars long (prevent temp object allocations)
         StringBuilder code = new StringBuilder(1024);
         // overrides org.apache.cassandra.cql3.functions.Function.execute(java.util.List)
-        code.append("public java.nio.ByteBuffer execute(java.util.List params)\n" +
+        code.append("public java.nio.ByteBuffer execute(int protocolVersion, java.util.List params)\n" +
                     "throws org.apache.cassandra.exceptions.InvalidRequestException\n" +
                     "{\n" +
                     "  try\n" +
@@ -219,13 +233,13 @@ public final class JavaSourceUDFFactory
             code.
                  // cast to Java type
                  append("\n      (").append(paramTypes[i].getName()).append(")").
-                 // generate object representation of input parameter
-                 append("org.apache.cassandra.cql3.functions.JavaSourceUDFFactory.compose(argTypes, params, ").append(i).append(')');
+                 // generate object representation of input parameter (call UDFunction.compose)
+                 append("compose(protocolVersion, ").append(i).append(", (java.nio.ByteBuffer)params.get(").append(i).append("))");
         }
 
         code.append("\n    );\n" +
-                    // generate serialized return value (returnType is a field in AbstractFunction class)
-                    "    return result != null ? returnType.decompose(result) : null;\n" +
+                    // generate serialized return value (returnType is a field in AbstractFunction class), (call UDFunction.decompose)
+                    "    return decompose(protocolVersion, result);\n" +
                     //
                     // error handling ...
                     "  }\n" +
@@ -242,11 +256,4 @@ public final class JavaSourceUDFFactory
         return code.toString();
     }
 
-    // Used by execute() implementations of generated java source UDFs.
-    public static Object compose(List<AbstractType<?>> argTypes, List<ByteBuffer> parameters, int param)
-    {
-        ByteBuffer bb = parameters.get(param);
-        return bb == null ? null : argTypes.get(param).compose(bb);
-    }
-
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/794d68b5/src/java/org/apache/cassandra/cql3/functions/ScalarFunction.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/functions/ScalarFunction.java b/src/java/org/apache/cassandra/cql3/functions/ScalarFunction.java
index ba2a374..f00faf7 100644
--- a/src/java/org/apache/cassandra/cql3/functions/ScalarFunction.java
+++ b/src/java/org/apache/cassandra/cql3/functions/ScalarFunction.java
@@ -30,9 +30,10 @@ public interface ScalarFunction extends Function
     /**
      * Applies this function to the specified parameter.
      *
+     * @param protocolVersion protocol version used for parameters and return value
      * @param parameters the input parameters
      * @return the result of applying this function to the parameter
      * @throws InvalidRequestException if this function cannot not be applied to the parameter
      */
-    public ByteBuffer execute(List<ByteBuffer> parameters) throws InvalidRequestException;
+    public ByteBuffer execute(int protocolVersion, List<ByteBuffer> parameters) throws InvalidRequestException;
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/794d68b5/src/java/org/apache/cassandra/cql3/functions/ScriptBasedUDF.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/functions/ScriptBasedUDF.java b/src/java/org/apache/cassandra/cql3/functions/ScriptBasedUDF.java
index 73fc43b..059a612 100644
--- a/src/java/org/apache/cassandra/cql3/functions/ScriptBasedUDF.java
+++ b/src/java/org/apache/cassandra/cql3/functions/ScriptBasedUDF.java
@@ -88,15 +88,11 @@ public class ScriptBasedUDF extends UDFunction
         }
     }
 
-    public ByteBuffer execute(List<ByteBuffer> parameters) throws InvalidRequestException
+    public ByteBuffer execute(int protocolVersion, List<ByteBuffer> parameters) throws InvalidRequestException
     {
         Object[] params = new Object[argTypes.size()];
         for (int i = 0; i < params.length; i++)
-        {
-            ByteBuffer bb = parameters.get(i);
-            if (bb != null)
-                params[i] = argTypes.get(i).compose(bb);
-        }
+            params[i] = compose(protocolVersion, i, parameters.get(i));
 
         try
         {
@@ -108,7 +104,7 @@ public class ScriptBasedUDF extends UDFunction
             if (result == null)
                 return null;
 
-            Class<?> javaReturnType = returnType.getSerializer().getType();
+            Class<?> javaReturnType = returnDataType.asJavaClass();
             Class<?> resultType = result.getClass();
             if (!javaReturnType.isAssignableFrom(resultType))
             {
@@ -138,8 +134,7 @@ public class ScriptBasedUDF extends UDFunction
                 }
             }
 
-            @SuppressWarnings("unchecked") ByteBuffer r = ((AbstractType) returnType).decompose(result);
-            return r;
+            return decompose(protocolVersion, result);
         }
         catch (RuntimeException | ScriptException e)
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/794d68b5/src/java/org/apache/cassandra/cql3/functions/TimeuuidFcts.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/functions/TimeuuidFcts.java b/src/java/org/apache/cassandra/cql3/functions/TimeuuidFcts.java
index e481cf5..c1c3490 100644
--- a/src/java/org/apache/cassandra/cql3/functions/TimeuuidFcts.java
+++ b/src/java/org/apache/cassandra/cql3/functions/TimeuuidFcts.java
@@ -31,7 +31,7 @@ public abstract class TimeuuidFcts
 {
     public static final Function nowFct = new NativeScalarFunction("now", TimeUUIDType.instance)
     {
-        public ByteBuffer execute(List<ByteBuffer> parameters)
+        public ByteBuffer execute(int protocolVersion, List<ByteBuffer> parameters)
         {
             return ByteBuffer.wrap(UUIDGen.getTimeUUIDBytes());
         }
@@ -45,7 +45,7 @@ public abstract class TimeuuidFcts
 
     public static final Function minTimeuuidFct = new NativeScalarFunction("mintimeuuid", TimeUUIDType.instance, TimestampType.instance)
     {
-        public ByteBuffer execute(List<ByteBuffer> parameters)
+        public ByteBuffer execute(int protocolVersion, List<ByteBuffer> parameters)
         {
             ByteBuffer bb = parameters.get(0);
             if (bb == null)
@@ -57,7 +57,7 @@ public abstract class TimeuuidFcts
 
     public static final Function maxTimeuuidFct = new NativeScalarFunction("maxtimeuuid", TimeUUIDType.instance, TimestampType.instance)
     {
-        public ByteBuffer execute(List<ByteBuffer> parameters)
+        public ByteBuffer execute(int protocolVersion, List<ByteBuffer> parameters)
         {
             ByteBuffer bb = parameters.get(0);
             if (bb == null)
@@ -69,7 +69,7 @@ public abstract class TimeuuidFcts
 
     public static final Function dateOfFct = new NativeScalarFunction("dateof", TimestampType.instance, TimeUUIDType.instance)
     {
-        public ByteBuffer execute(List<ByteBuffer> parameters)
+        public ByteBuffer execute(int protocolVersion, List<ByteBuffer> parameters)
         {
             ByteBuffer bb = parameters.get(0);
             if (bb == null)
@@ -81,7 +81,7 @@ public abstract class TimeuuidFcts
 
     public static final Function unixTimestampOfFct = new NativeScalarFunction("unixtimestampof", LongType.instance, TimeUUIDType.instance)
     {
-        public ByteBuffer execute(List<ByteBuffer> parameters)
+        public ByteBuffer execute(int protocolVersion, List<ByteBuffer> parameters)
         {
             ByteBuffer bb = parameters.get(0);
             if (bb == null)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/794d68b5/src/java/org/apache/cassandra/cql3/functions/TokenFct.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/functions/TokenFct.java b/src/java/org/apache/cassandra/cql3/functions/TokenFct.java
index ca4d473..9d50a97 100644
--- a/src/java/org/apache/cassandra/cql3/functions/TokenFct.java
+++ b/src/java/org/apache/cassandra/cql3/functions/TokenFct.java
@@ -50,7 +50,7 @@ public class TokenFct extends NativeScalarFunction
         return types;
     }
 
-    public ByteBuffer execute(List<ByteBuffer> parameters) throws InvalidRequestException
+    public ByteBuffer execute(int protocolVersion, List<ByteBuffer> parameters) throws InvalidRequestException
     {
         CBuilder builder = cfm.getKeyValidatorAsCType().builder();
         for (int i = 0; i < parameters.size(); i++)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/794d68b5/src/java/org/apache/cassandra/cql3/functions/UDFunction.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/functions/UDFunction.java b/src/java/org/apache/cassandra/cql3/functions/UDFunction.java
index 42418c6..973c70a 100644
--- a/src/java/org/apache/cassandra/cql3/functions/UDFunction.java
+++ b/src/java/org/apache/cassandra/cql3/functions/UDFunction.java
@@ -17,6 +17,9 @@
  */
 package org.apache.cassandra.cql3.functions;
 
+import java.lang.invoke.MethodHandle;
+import java.lang.invoke.MethodHandles;
+import java.lang.reflect.Method;
 import java.nio.ByteBuffer;
 import java.nio.charset.StandardCharsets;
 import java.security.MessageDigest;
@@ -26,6 +29,11 @@ import com.google.common.base.Objects;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.datastax.driver.core.DataType;
+import com.datastax.driver.core.ProtocolVersion;
+import com.datastax.driver.core.UserType;
+import org.apache.cassandra.config.KSMetaData;
+import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.cql3.*;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.composites.Composite;
@@ -33,6 +41,8 @@ import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.db.marshal.UTF8Type;
 import org.apache.cassandra.db.marshal.TypeParser;
 import org.apache.cassandra.exceptions.*;
+import org.apache.cassandra.service.MigrationManager;
+import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
 
 /**
@@ -42,11 +52,83 @@ public abstract class UDFunction extends AbstractFunction implements ScalarFunct
 {
     protected static final Logger logger = LoggerFactory.getLogger(UDFunction.class);
 
+    // TODO make these c'tors and methods public in Java-Driver - see https://datastax-oss.atlassian.net/browse/JAVA-502
+    static final MethodHandle methodParseOne;
+    static
+    {
+        try
+        {
+            Class<?> cls = Class.forName("com.datastax.driver.core.CassandraTypeParser");
+            Method m = cls.getDeclaredMethod("parseOne", String.class);
+            m.setAccessible(true);
+            methodParseOne = MethodHandles.lookup().unreflect(m);
+        }
+        catch (Exception e)
+        {
+            throw new RuntimeException(e);
+        }
+    }
+
+    /**
+     * Construct an array containing the Java classes for the given Java Driver {@link com.datastax.driver.core.DataType}s.
+     *
+     * @param dataTypes array with UDF argument types
+     * @return array of same size with UDF arguments
+     */
+    public static Class<?>[] javaTypes(DataType[] dataTypes)
+    {
+        Class<?> paramTypes[] = new Class[dataTypes.length];
+        for (int i = 0; i < paramTypes.length; i++)
+            paramTypes[i] = dataTypes[i].asJavaClass();
+        return paramTypes;
+    }
+
+    /**
+     * Construct an array containing the Java Driver {@link com.datastax.driver.core.DataType}s for the
+     * C* internal types.
+     *
+     * @param abstractTypes list with UDF argument types
+     * @return array with argument types as {@link com.datastax.driver.core.DataType}
+     */
+    public static DataType[] driverTypes(List<AbstractType<?>> abstractTypes)
+    {
+        DataType[] argDataTypes = new DataType[abstractTypes.size()];
+        for (int i = 0; i < argDataTypes.length; i++)
+            argDataTypes[i] = driverType(abstractTypes.get(i));
+        return argDataTypes;
+    }
+
+    /**
+     * Returns the Java Driver {@link com.datastax.driver.core.DataType} for the C* internal type.
+     */
+    public static DataType driverType(AbstractType abstractType)
+    {
+        CQL3Type cqlType = abstractType.asCQL3Type();
+        try
+        {
+            return (DataType) methodParseOne.invoke(cqlType.getType().toString());
+        }
+        catch (RuntimeException | Error e)
+        {
+            // immediately rethrow these...
+            throw e;
+        }
+        catch (Throwable e)
+        {
+            throw new RuntimeException("cannot parse driver type " + cqlType.getType().toString(), e);
+        }
+    }
+
+    // instance vars
+
     protected final List<ColumnIdentifier> argNames;
 
     protected final String language;
     protected final String body;
-    private final boolean deterministic;
+    protected final boolean deterministic;
+
+    protected final DataType[] argDataTypes;
+    protected final DataType returnDataType;
 
     protected UDFunction(FunctionName name,
                          List<ColumnIdentifier> argNames,
@@ -56,12 +138,53 @@ public abstract class UDFunction extends AbstractFunction implements ScalarFunct
                          String body,
                          boolean deterministic)
     {
+        this(name, argNames, argTypes, driverTypes(argTypes), returnType,
+             driverType(returnType), language, body, deterministic);
+    }
+
+    protected UDFunction(FunctionName name,
+                         List<ColumnIdentifier> argNames,
+                         List<AbstractType<?>> argTypes,
+                         DataType[] argDataTypes,
+                         AbstractType<?> returnType,
+                         DataType returnDataType,
+                         String language,
+                         String body,
+                         boolean deterministic)
+        {
         super(name, argTypes, returnType);
         assert new HashSet<>(argNames).size() == argNames.size() : "duplicate argument names";
         this.argNames = argNames;
         this.language = language;
         this.body = body;
         this.deterministic = deterministic;
+        this.argDataTypes = argDataTypes;
+        this.returnDataType = returnDataType;
+    }
+
+    /**
+     * Used by UDF implementations (both Java code generated by {@link org.apache.cassandra.cql3.functions.JavaSourceUDFFactory}
+     * and script executor {@link org.apache.cassandra.cql3.functions.ScriptBasedUDF}) to convert the C*
+     * serialized representation to the Java object representation.
+     *
+     * @param protocolVersion the native protocol version used for serialization
+     * @param argIndex        index of the UDF input argument
+     */
+    protected Object compose(int protocolVersion, int argIndex, ByteBuffer value)
+    {
+        return value == null ? null : argDataTypes[argIndex].deserialize(value, ProtocolVersion.fromInt(protocolVersion));
+    }
+
+    /**
+     * Used by UDF implementations (both Java code generated by {@link org.apache.cassandra.cql3.functions.JavaSourceUDFFactory}
+     * and script executor {@link org.apache.cassandra.cql3.functions.ScriptBasedUDF}) to convert the Java
+     * object representation for the return value to the C* serialized representation.
+     *
+     * @param protocolVersion the native protocol version used for serialization
+     */
+    protected ByteBuffer decompose(int protocolVersion, Object value)
+    {
+        return value == null ? null : returnDataType.serialize(value, ProtocolVersion.fromInt(protocolVersion));
     }
 
     public boolean isAggregate()
@@ -85,19 +208,6 @@ public abstract class UDFunction extends AbstractFunction implements ScalarFunct
         }
     }
 
-    static Class<?>[] javaParamTypes(List<AbstractType<?>> argTypes)
-    {
-        Class<?> paramTypes[] = new Class[argTypes.size()];
-        for (int i = 0; i < paramTypes.length; i++)
-            paramTypes[i] = javaType(argTypes.get(i));
-        return paramTypes;
-    }
-
-    static Class<?> javaType(AbstractType<?> type)
-    {
-        return type.getSerializer().getType();
-    }
-
     /**
      * It can happen that a function has been declared (is listed in the scheam) but cannot
      * be loaded (maybe only on some nodes). This is the case for instance if the class defining
@@ -117,7 +227,7 @@ public abstract class UDFunction extends AbstractFunction implements ScalarFunct
     {
         return new UDFunction(name, argNames, argTypes, returnType, language, body, true)
         {
-            public ByteBuffer execute(List<ByteBuffer> parameters) throws InvalidRequestException
+            public ByteBuffer execute(int protocolVersion, List<ByteBuffer> parameters) throws InvalidRequestException
             {
                 throw new InvalidRequestException(String.format("Function '%s' exists but hasn't been loaded successfully for the following reason: %s. "
                                                               + "Please see the server log for more details", this, reason.getMessage()));
@@ -135,7 +245,7 @@ public abstract class UDFunction extends AbstractFunction implements ScalarFunct
     {
         MessageDigest digest = FBUtilities.newMessageDigest("SHA-1");
         for (AbstractType<?> type : argTypes)
-            digest.update(type.toString().getBytes(StandardCharsets.UTF_8));
+            digest.update(type.asCQL3Type().toString().getBytes(StandardCharsets.UTF_8));
         return ByteBuffer.wrap(digest.digest());
     }
 
@@ -268,8 +378,8 @@ public abstract class UDFunction extends AbstractFunction implements ScalarFunct
         UDFunction that = (UDFunction)o;
         return Objects.equal(this.name, that.name)
             && Objects.equal(this.argNames, that.argNames)
-            && Objects.equal(this.argTypes, that.argTypes)
-            && Objects.equal(this.returnType, that.returnType)
+            && Functions.typeEquals(this.argTypes, that.argTypes)
+            && Functions.typeEquals(this.returnType, that.returnType)
             && Objects.equal(this.language, that.language)
             && Objects.equal(this.body, that.body)
             && Objects.equal(this.deterministic, that.deterministic);
@@ -280,4 +390,35 @@ public abstract class UDFunction extends AbstractFunction implements ScalarFunct
     {
         return Objects.hashCode(name, argNames, argTypes, returnType, language, body, deterministic);
     }
+
+    public void userTypeUpdated(String ksName, String typeName)
+    {
+        boolean updated = false;
+
+        for (int i = 0; i < argDataTypes.length; i++)
+        {
+            DataType dataType = argDataTypes[i];
+            if (dataType instanceof UserType)
+            {
+                UserType userType = (UserType) dataType;
+                if (userType.getKeyspace().equals(ksName) && userType.getTypeName().equals(typeName))
+                {
+                    KSMetaData ksm = Schema.instance.getKSMetaData(ksName);
+                    assert ksm != null;
+
+                    org.apache.cassandra.db.marshal.UserType ut = ksm.userTypes.getType(ByteBufferUtil.bytes(typeName));
+
+                    DataType newUserType = driverType(ut);
+                    argDataTypes[i] = newUserType;
+
+                    argTypes.set(i, ut);
+
+                    updated = true;
+                }
+            }
+        }
+
+        if (updated)
+            MigrationManager.announceNewFunction(this, true);
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/794d68b5/src/java/org/apache/cassandra/cql3/functions/UuidFcts.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/functions/UuidFcts.java b/src/java/org/apache/cassandra/cql3/functions/UuidFcts.java
index b3cef85..afb5aae 100644
--- a/src/java/org/apache/cassandra/cql3/functions/UuidFcts.java
+++ b/src/java/org/apache/cassandra/cql3/functions/UuidFcts.java
@@ -28,7 +28,7 @@ public abstract class UuidFcts
 {
     public static final Function uuidFct = new NativeScalarFunction("uuid", UUIDType.instance)
     {
-        public ByteBuffer execute(List<ByteBuffer> parameters)
+        public ByteBuffer execute(int protocolVersion, List<ByteBuffer> parameters)
         {
             return UUIDSerializer.instance.serialize(UUID.randomUUID());
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/794d68b5/src/java/org/apache/cassandra/cql3/selection/AggregateFunctionSelector.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/selection/AggregateFunctionSelector.java b/src/java/org/apache/cassandra/cql3/selection/AggregateFunctionSelector.java
index 6ea9716..7702796 100644
--- a/src/java/org/apache/cassandra/cql3/selection/AggregateFunctionSelector.java
+++ b/src/java/org/apache/cassandra/cql3/selection/AggregateFunctionSelector.java
@@ -34,20 +34,20 @@ final class AggregateFunctionSelector extends AbstractFunctionSelector<Aggregate
         return true;
     }
 
-    public void addInput(ResultSetBuilder rs) throws InvalidRequestException
+    public void addInput(int protocolVersion, ResultSetBuilder rs) throws InvalidRequestException
     {
         // Aggregation of aggregation is not supported
         for (int i = 0, m = argSelectors.size(); i < m; i++)
         {
             Selector s = argSelectors.get(i);
-            s.addInput(rs);
-            args.set(i, s.getOutput());
+            s.addInput(protocolVersion, rs);
+            args.set(i, s.getOutput(protocolVersion));
             s.reset();
         }
         this.aggregate.addInput(args);
     }
 
-    public ByteBuffer getOutput() throws InvalidRequestException
+    public ByteBuffer getOutput(int protocolVersion) throws InvalidRequestException
     {
         return aggregate.compute();
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/794d68b5/src/java/org/apache/cassandra/cql3/selection/FieldSelector.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/selection/FieldSelector.java b/src/java/org/apache/cassandra/cql3/selection/FieldSelector.java
index 7e14486..d695598 100644
--- a/src/java/org/apache/cassandra/cql3/selection/FieldSelector.java
+++ b/src/java/org/apache/cassandra/cql3/selection/FieldSelector.java
@@ -64,14 +64,14 @@ final class FieldSelector extends Selector
         return false;
     }
 
-    public void addInput(ResultSetBuilder rs) throws InvalidRequestException
+    public void addInput(int protocolVersion, ResultSetBuilder rs) throws InvalidRequestException
     {
-        selected.addInput(rs);
+        selected.addInput(protocolVersion, rs);
     }
 
-    public ByteBuffer getOutput() throws InvalidRequestException
+    public ByteBuffer getOutput(int protocolVersion) throws InvalidRequestException
     {
-        ByteBuffer value = selected.getOutput();
+        ByteBuffer value = selected.getOutput(protocolVersion);
         if (value == null)
             return null;
         ByteBuffer[] buffers = type.split(value);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/794d68b5/src/java/org/apache/cassandra/cql3/selection/ScalarFunctionSelector.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/selection/ScalarFunctionSelector.java b/src/java/org/apache/cassandra/cql3/selection/ScalarFunctionSelector.java
index 4ceadb9..bb56bb8 100644
--- a/src/java/org/apache/cassandra/cql3/selection/ScalarFunctionSelector.java
+++ b/src/java/org/apache/cassandra/cql3/selection/ScalarFunctionSelector.java
@@ -36,12 +36,12 @@ final class ScalarFunctionSelector extends AbstractFunctionSelector<ScalarFuncti
         return argSelectors.get(0).isAggregate();
     }
 
-    public void addInput(ResultSetBuilder rs) throws InvalidRequestException
+    public void addInput(int protocolVersion, ResultSetBuilder rs) throws InvalidRequestException
     {
         for (int i = 0, m = argSelectors.size(); i < m; i++)
         {
             Selector s = argSelectors.get(i);
-            s.addInput(rs);
+            s.addInput(protocolVersion, rs);
         }
     }
 
@@ -49,15 +49,15 @@ final class ScalarFunctionSelector extends AbstractFunctionSelector<ScalarFuncti
     {
     }
 
-    public ByteBuffer getOutput() throws InvalidRequestException
+    public ByteBuffer getOutput(int protocolVersion) throws InvalidRequestException
     {
         for (int i = 0, m = argSelectors.size(); i < m; i++)
         {
             Selector s = argSelectors.get(i);
-            args.set(i, s.getOutput());
+            args.set(i, s.getOutput(protocolVersion));
             s.reset();
         }
-        return fun.execute(args);
+        return fun.execute(protocolVersion, args);
     }
 
     ScalarFunctionSelector(Function fun, List<Selector> argSelectors)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/794d68b5/src/java/org/apache/cassandra/cql3/selection/Selection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/selection/Selection.java b/src/java/org/apache/cassandra/cql3/selection/Selection.java
index 888d96d..6ad36e9 100644
--- a/src/java/org/apache/cassandra/cql3/selection/Selection.java
+++ b/src/java/org/apache/cassandra/cql3/selection/Selection.java
@@ -219,33 +219,33 @@ public abstract class Selection
             return c == null || !c.isLive(now);
         }
 
-        public void newRow() throws InvalidRequestException
+        public void newRow(int protocolVersion) throws InvalidRequestException
         {
             if (current != null)
             {
-                selectors.addInputRow(this);
+                selectors.addInputRow(protocolVersion, this);
                 if (!selectors.isAggregate())
                 {
-                    resultSet.addRow(selectors.getOutputRow());
+                    resultSet.addRow(selectors.getOutputRow(protocolVersion));
                     selectors.reset();
                 }
             }
             current = new ArrayList<ByteBuffer>(columns.size());
         }
 
-        public ResultSet build() throws InvalidRequestException
+        public ResultSet build(int protocolVersion) throws InvalidRequestException
         {
             if (current != null)
             {
-                selectors.addInputRow(this);
-                resultSet.addRow(selectors.getOutputRow());
+                selectors.addInputRow(protocolVersion, this);
+                resultSet.addRow(selectors.getOutputRow(protocolVersion));
                 selectors.reset();
                 current = null;
             }
 
             if (resultSet.isEmpty() && selectors.isAggregate())
             {
-                resultSet.addRow(selectors.getOutputRow());
+                resultSet.addRow(selectors.getOutputRow(protocolVersion));
             }
             return resultSet;
         }
@@ -268,9 +268,9 @@ public abstract class Selection
          * @param rs the <code>ResultSetBuilder</code>
          * @throws InvalidRequestException
          */
-        public void addInputRow(ResultSetBuilder rs) throws InvalidRequestException;
+        public void addInputRow(int protocolVersion, ResultSetBuilder rs) throws InvalidRequestException;
 
-        public List<ByteBuffer> getOutputRow() throws InvalidRequestException;
+        public List<ByteBuffer> getOutputRow(int protocolVersion) throws InvalidRequestException;
 
         public void reset();
     }
@@ -318,12 +318,12 @@ public abstract class Selection
                     current = null;
                 }
 
-                public List<ByteBuffer> getOutputRow()
+                public List<ByteBuffer> getOutputRow(int protocolVersion)
                 {
                     return current;
                 }
 
-                public void addInputRow(ResultSetBuilder rs) throws InvalidRequestException
+                public void addInputRow(int protocolVersion, ResultSetBuilder rs) throws InvalidRequestException
                 {
                     current = rs.current;
                 }
@@ -388,22 +388,22 @@ public abstract class Selection
                     return factories.containsOnlyAggregateFunctions();
                 }
 
-                public List<ByteBuffer> getOutputRow() throws InvalidRequestException
+                public List<ByteBuffer> getOutputRow(int protocolVersion) throws InvalidRequestException
                 {
                     List<ByteBuffer> outputRow = new ArrayList<>(selectors.size());
 
                     for (int i = 0, m = selectors.size(); i < m; i++)
                     {
-                        outputRow.add(selectors.get(i).getOutput());
+                        outputRow.add(selectors.get(i).getOutput(protocolVersion));
                     }
                     return outputRow;
                 }
 
-                public void addInputRow(ResultSetBuilder rs) throws InvalidRequestException
+                public void addInputRow(int protocolVersion, ResultSetBuilder rs) throws InvalidRequestException
                 {
                     for (int i = 0, m = selectors.size(); i < m; i++)
                     {
-                        selectors.get(i).addInput(rs);
+                        selectors.get(i).addInput(protocolVersion, rs);
                     }
                 }
             };

http://git-wip-us.apache.org/repos/asf/cassandra/blob/794d68b5/src/java/org/apache/cassandra/cql3/selection/Selector.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/selection/Selector.java b/src/java/org/apache/cassandra/cql3/selection/Selector.java
index f2c729b..0c1933f 100644
--- a/src/java/org/apache/cassandra/cql3/selection/Selector.java
+++ b/src/java/org/apache/cassandra/cql3/selection/Selector.java
@@ -120,18 +120,20 @@ public abstract class Selector implements AssignmentTestable
     /**
      * Add the current value from the specified <code>ResultSetBuilder</code>.
      *
+     * @param protocolVersion protocol version used for serialization
      * @param rs the <code>ResultSetBuilder</code>
      * @throws InvalidRequestException if a problem occurs while add the input value
      */
-    public abstract void addInput(ResultSetBuilder rs) throws InvalidRequestException;
+    public abstract void addInput(int protocolVersion, ResultSetBuilder rs) throws InvalidRequestException;
 
     /**
      * Returns the selector output.
      *
+     * @param protocolVersion protocol version used for serialization
      * @return the selector output
      * @throws InvalidRequestException if a problem occurs while computing the output value
      */
-    public abstract ByteBuffer getOutput() throws InvalidRequestException;
+    public abstract ByteBuffer getOutput(int protocolVersion) throws InvalidRequestException;
 
     /**
      * Returns the <code>Selector</code> output type.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/794d68b5/src/java/org/apache/cassandra/cql3/selection/SimpleSelector.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/selection/SimpleSelector.java b/src/java/org/apache/cassandra/cql3/selection/SimpleSelector.java
index a5ff4cd..c2edaed 100644
--- a/src/java/org/apache/cassandra/cql3/selection/SimpleSelector.java
+++ b/src/java/org/apache/cassandra/cql3/selection/SimpleSelector.java
@@ -21,6 +21,7 @@ import java.nio.ByteBuffer;
 
 import org.apache.cassandra.cql3.selection.Selection.ResultSetBuilder;
 import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.exceptions.InvalidRequestException;
 
 public final class SimpleSelector extends Selector
 {
@@ -54,13 +55,13 @@ public final class SimpleSelector extends Selector
     }
 
     @Override
-    public void addInput(ResultSetBuilder rs)
+    public void addInput(int protocolVersion, ResultSetBuilder rs) throws InvalidRequestException
     {
         current = rs.current.get(idx);
     }
 
     @Override
-    public ByteBuffer getOutput()
+    public ByteBuffer getOutput(int protocolVersion) throws InvalidRequestException
     {
         return current;
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/794d68b5/src/java/org/apache/cassandra/cql3/selection/WritetimeOrTTLSelector.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/selection/WritetimeOrTTLSelector.java b/src/java/org/apache/cassandra/cql3/selection/WritetimeOrTTLSelector.java
index 2494334..a1ecd3d 100644
--- a/src/java/org/apache/cassandra/cql3/selection/WritetimeOrTTLSelector.java
+++ b/src/java/org/apache/cassandra/cql3/selection/WritetimeOrTTLSelector.java
@@ -63,7 +63,7 @@ final class WritetimeOrTTLSelector extends Selector
         };
     }
 
-    public void addInput(ResultSetBuilder rs)
+    public void addInput(int protocolVersion, ResultSetBuilder rs)
     {
         if (isWritetime)
         {
@@ -77,7 +77,7 @@ final class WritetimeOrTTLSelector extends Selector
         }
     }
 
-    public ByteBuffer getOutput()
+    public ByteBuffer getOutput(int protocolVersion)
     {
         return current;
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/794d68b5/src/java/org/apache/cassandra/cql3/statements/CreateFunctionStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/CreateFunctionStatement.java b/src/java/org/apache/cassandra/cql3/statements/CreateFunctionStatement.java
index c41fb08..8d8c27a 100644
--- a/src/java/org/apache/cassandra/cql3/statements/CreateFunctionStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/CreateFunctionStatement.java
@@ -50,6 +50,7 @@ public final class CreateFunctionStatement extends SchemaAlteringStatement
     private final List<ColumnIdentifier> argNames;
     private final List<CQL3Type.Raw> argRawTypes;
     private final CQL3Type.Raw rawReturnType;
+    private String currentKeyspace;
 
     public CreateFunctionStatement(FunctionName functionName,
                                    String language,
@@ -74,8 +75,10 @@ public final class CreateFunctionStatement extends SchemaAlteringStatement
 
     public void prepareKeyspace(ClientState state) throws InvalidRequestException
     {
-        if (!functionName.hasKeyspace() && state.getRawKeyspace() != null)
-            functionName = new FunctionName(state.getKeyspace(), functionName.name);
+        currentKeyspace = state.getRawKeyspace();
+
+        if (!functionName.hasKeyspace() && currentKeyspace != null)
+            functionName = new FunctionName(currentKeyspace, functionName.name);
 
         if (!functionName.hasKeyspace())
             throw new InvalidRequestException("You need to be logged in a keyspace or use a fully qualified function name");
@@ -112,11 +115,9 @@ public final class CreateFunctionStatement extends SchemaAlteringStatement
 
         List<AbstractType<?>> argTypes = new ArrayList<>(argRawTypes.size());
         for (CQL3Type.Raw rawType : argRawTypes)
-            // We have no proper keyspace to give, which means that this will break (NPE currently)
-            // for UDT: #7791 is open to fix this
-            argTypes.add(rawType.prepare(functionName.keyspace).getType());
+            argTypes.add(rawType.prepare(typeKeyspace(rawType)).getType());
 
-        AbstractType<?> returnType = rawReturnType.prepare(null).getType();
+        AbstractType<?> returnType = rawReturnType.prepare(typeKeyspace(rawReturnType)).getType();
 
         Function old = Functions.find(functionName, argTypes);
         if (old != null)
@@ -126,7 +127,7 @@ public final class CreateFunctionStatement extends SchemaAlteringStatement
             if (!orReplace)
                 throw new InvalidRequestException(String.format("Function %s already exists", old));
 
-            if (!old.returnType().isValueCompatibleWith(returnType))
+            if (!Functions.typeEquals(old.returnType(), returnType))
                 throw new InvalidRequestException(String.format("Cannot replace function %s, the new return type %s is not compatible with the return type %s of existing function",
                                                                 functionName, returnType.asCQL3Type(), old.returnType().asCQL3Type()));
         }
@@ -134,4 +135,12 @@ public final class CreateFunctionStatement extends SchemaAlteringStatement
         MigrationManager.announceNewFunction(UDFunction.create(functionName, argNames, argTypes, returnType, language, body, deterministic), isLocalOnly);
         return true;
     }
+
+    private String typeKeyspace(CQL3Type.Raw rawType)
+    {
+        String ks = rawType.keyspace();
+        if (ks != null)
+            return ks;
+        return functionName.keyspace;
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/794d68b5/src/java/org/apache/cassandra/cql3/statements/DropFunctionStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/DropFunctionStatement.java b/src/java/org/apache/cassandra/cql3/statements/DropFunctionStatement.java
index 4cd9460..0ba3721 100644
--- a/src/java/org/apache/cassandra/cql3/statements/DropFunctionStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/DropFunctionStatement.java
@@ -103,7 +103,7 @@ public final class DropFunctionStatement extends SchemaAlteringStatement
 
         List<AbstractType<?>> argTypes = new ArrayList<>(argRawTypes.size());
         for (CQL3Type.Raw rawType : argRawTypes)
-            argTypes.add(rawType.prepare(functionName.keyspace).getType());
+            argTypes.add(rawType.prepare(typeKeyspace(rawType)).getType());
 
         Function old;
         if (argsPresent)
@@ -139,4 +139,12 @@ public final class DropFunctionStatement extends SchemaAlteringStatement
         MigrationManager.announceFunctionDrop((UDFunction)old, isLocalOnly);
         return true;
     }
+
+    private String typeKeyspace(CQL3Type.Raw rawType)
+    {
+        String ks = rawType.keyspace();
+        if (ks != null)
+            return ks;
+        return functionName.keyspace;
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/794d68b5/src/java/org/apache/cassandra/cql3/statements/DropTypeStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/DropTypeStatement.java b/src/java/org/apache/cassandra/cql3/statements/DropTypeStatement.java
index a3b82a4..ed21957 100644
--- a/src/java/org/apache/cassandra/cql3/statements/DropTypeStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/DropTypeStatement.java
@@ -20,6 +20,8 @@ package org.apache.cassandra.cql3.statements;
 import org.apache.cassandra.auth.Permission;
 import org.apache.cassandra.config.*;
 import org.apache.cassandra.cql3.*;
+import org.apache.cassandra.cql3.functions.Function;
+import org.apache.cassandra.cql3.functions.Functions;
 import org.apache.cassandra.db.marshal.*;
 import org.apache.cassandra.exceptions.*;
 import org.apache.cassandra.service.ClientState;
@@ -72,6 +74,15 @@ public class DropTypeStatement extends SchemaAlteringStatement
         // we drop and 2) existing tables referencing the type (maybe in a nested
         // way).
 
+        for (Function function : Functions.all())
+        {
+            if (isUsedBy(function.returnType()))
+                throw new InvalidRequestException(String.format("Cannot drop user type %s as it is still used by function %s", name, function));
+            for (AbstractType<?> argType : function.argTypes())
+                if (isUsedBy(argType))
+                    throw new InvalidRequestException(String.format("Cannot drop user type %s as it is still used by function %s", name, function));
+        }
+
         for (KSMetaData ksm2 : Schema.instance.getKeyspaceDefinitions())
         {
             for (UserType ut : ksm2.userTypes.getAllTypes().values())

http://git-wip-us.apache.org/repos/asf/cassandra/blob/794d68b5/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
index 2607e12..4e39614 100644
--- a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
@@ -630,7 +630,7 @@ public abstract class ModificationStatement implements CQLStatement
         Selection.ResultSetBuilder builder = selection.resultSetBuilder(now);
         SelectStatement.forSelection(cfm, selection).processColumnFamily(key, cf, options, now, builder);
 
-        return builder.build();
+        return builder.build(options.getProtocolVersion());
     }
 
     public ResultMessage executeInternal(QueryState queryState, QueryOptions options) throws RequestValidationException, RequestExecutionException

http://git-wip-us.apache.org/repos/asf/cassandra/blob/794d68b5/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
index 2d28b71..3360d40 100644
--- a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
@@ -280,7 +280,7 @@ public class SelectStatement implements CQLStatement
                 processColumnFamily(row.key.getKey(), row.cf, options, now, result);
             }
         }
-        return new ResultMessage.Rows(result.build());
+        return new ResultMessage.Rows(result.build(options.getProtocolVersion()));
     }
 
     public ResultMessage.Rows processResults(List<Row> rows, QueryOptions options, int limit, long now) throws RequestValidationException
@@ -1149,7 +1149,7 @@ public class SelectStatement implements CQLStatement
             processColumnFamily(row.key.getKey(), row.cf, options, now, result);
         }
 
-        ResultSet cqlRows = result.build();
+        ResultSet cqlRows = result.build(options.getProtocolVersion());
 
         orderResults(cqlRows);
 
@@ -1189,7 +1189,7 @@ public class SelectStatement implements CQLStatement
         CQL3Row staticRow = iter.getStaticRow();
         if (staticRow != null && !iter.hasNext() && !usesSecondaryIndexing && hasNoClusteringColumnsRestriction())
         {
-            result.newRow();
+            result.newRow(options.getProtocolVersion());
             for (ColumnDefinition def : selection.getColumns())
             {
                 switch (def.kind)
@@ -1212,7 +1212,7 @@ public class SelectStatement implements CQLStatement
             CQL3Row cql3Row = iter.next();
 
             // Respect requested order
-            result.newRow();
+            result.newRow(options.getProtocolVersion());
             // Respect selection order
             for (ColumnDefinition def : selection.getColumns())
             {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/794d68b5/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java
index 555c1cd..21e30e2 100644
--- a/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java
+++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java
@@ -236,7 +236,7 @@ public class CqlRecordReader extends RecordReader<Long, Row>
 
     public Long createKey()
     {
-        return new Long(0L);
+        return Long.valueOf(0L);
     }
 
     public Row createValue()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/794d68b5/src/java/org/apache/cassandra/transport/Server.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/Server.java b/src/java/org/apache/cassandra/transport/Server.java
index 15fad88..cc071b1 100644
--- a/src/java/org/apache/cassandra/transport/Server.java
+++ b/src/java/org/apache/cassandra/transport/Server.java
@@ -64,6 +64,7 @@ public class Server implements CassandraDaemon.Server
     private static final Logger logger = LoggerFactory.getLogger(Server.class);
     private static final boolean enableEpoll = Boolean.valueOf(System.getProperty("cassandra.native.epoll.enabled", "true"));
 
+    public static final int VERSION_2 = 2;
     public static final int VERSION_3 = 3;
     public static final int CURRENT_VERSION = VERSION_3;
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/794d68b5/test/unit/org/apache/cassandra/cql3/CQLTester.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/CQLTester.java b/test/unit/org/apache/cassandra/cql3/CQLTester.java
index 9105b9d..68f90bd 100644
--- a/test/unit/org/apache/cassandra/cql3/CQLTester.java
+++ b/test/unit/org/apache/cassandra/cql3/CQLTester.java
@@ -18,8 +18,11 @@
 package org.apache.cassandra.cql3;
 
 import java.io.File;
+import java.io.IOException;
 import java.math.BigDecimal;
 import java.math.BigInteger;
+import java.net.InetAddress;
+import java.net.ServerSocket;
 import java.nio.ByteBuffer;
 import java.util.*;
 import java.util.concurrent.CountDownLatch;
@@ -29,23 +32,31 @@ import java.util.concurrent.atomic.AtomicInteger;
 
 import com.google.common.base.Objects;
 import com.google.common.collect.ImmutableSet;
+import org.apache.cassandra.utils.ByteBufferUtil;
 import org.junit.AfterClass;
 import org.junit.After;
 import org.junit.Assert;
+import org.junit.Before;
 import org.junit.BeforeClass;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.datastax.driver.core.*;
+import com.datastax.driver.core.ResultSet;
 import org.apache.cassandra.SchemaLoader;
 import org.apache.cassandra.concurrent.ScheduledExecutors;
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.db.Directories;
 import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.SystemKeyspace;
 import org.apache.cassandra.db.marshal.*;
+import org.apache.cassandra.db.marshal.TupleType;
 import org.apache.cassandra.exceptions.*;
 import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.serializers.TypeSerializer;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.transport.Server;
 
 /**
  * Base class for CQL tests.
@@ -55,17 +66,38 @@ public abstract class CQLTester
     protected static final Logger logger = LoggerFactory.getLogger(CQLTester.class);
 
     public static final String KEYSPACE = "cql_test_keyspace";
+    public static final String KEYSPACE_PER_TEST = "cql_test_keyspace_alt";
     private static final boolean USE_PREPARED_VALUES = Boolean.valueOf(System.getProperty("cassandra.test.use_prepared", "true"));
     private static final AtomicInteger seqNumber = new AtomicInteger();
 
+    private static org.apache.cassandra.transport.Server server;
+    private static final int nativePort;
+    private static final InetAddress nativeAddr;
+    private static final Cluster cluster[] = new Cluster[Server.CURRENT_VERSION];
+    private static final Session session[] = new Session[Server.CURRENT_VERSION];
+
     static
     {
         // Once per-JVM is enough
         SchemaLoader.prepareServer();
+
+        nativeAddr = InetAddress.getLoopbackAddress();
+
+        try
+        {
+            ServerSocket serverSocket = new ServerSocket(0);
+            nativePort = serverSocket.getLocalPort();
+        }
+        catch (IOException e)
+        {
+            throw new RuntimeException(e);
+        }
     }
 
     private String currentTable;
     private final Set<String> currentTypes = new HashSet<>();
+    private final Set<String> currentFunctions = new HashSet<>();
+    private final Set<String> currentAggregates = new HashSet<>();
 
     // We don't use USE_PREPARED_VALUES in the code below so some test can foce value preparation (if the result
     // is not expected to be the same without preparation)
@@ -80,11 +112,28 @@ public abstract class CQLTester
     @AfterClass
     public static void tearDownClass()
     {
+        for (Session sess : session)
+            if (sess != null)
+                sess.close();
+        for (Cluster cl : cluster)
+            if (cl != null)
+                cl.close();
+
+        if (server != null)
+            server.stop();
+    }
+
+    @Before
+    public void beforeTest() throws Throwable
+    {
+        schemaChange(String.format("CREATE KEYSPACE IF NOT EXISTS %s WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'}", KEYSPACE_PER_TEST));
     }
 
     @After
     public void afterTest() throws Throwable
     {
+        dropPerTestKeyspace();
+
         // Restore standard behavior in case it was changed
         usePrepared = USE_PREPARED_VALUES;
 
@@ -93,8 +142,12 @@ public abstract class CQLTester
 
         final String tableToDrop = currentTable;
         final Set<String> typesToDrop = currentTypes.isEmpty() ? Collections.emptySet() : new HashSet(currentTypes);
+        final Set<String> functionsToDrop = currentFunctions.isEmpty() ? Collections.emptySet() : new HashSet(currentFunctions);
+        final Set<String> aggregatesToDrop = currentAggregates.isEmpty() ? Collections.emptySet() : new HashSet(currentAggregates);
         currentTable = null;
         currentTypes.clear();
+        currentFunctions.clear();
+        currentAggregates.clear();
 
         // We want to clean up after the test, but dropping a table is rather long so just do that asynchronously
         ScheduledExecutors.optionalTasks.execute(new Runnable()
@@ -105,6 +158,12 @@ public abstract class CQLTester
                 {
                     schemaChange(String.format("DROP TABLE IF EXISTS %s.%s", KEYSPACE, tableToDrop));
 
+                    for (String aggregateName : aggregatesToDrop)
+                        schemaChange(String.format("DROP AGGREGATE IF EXISTS %s", aggregateName));
+
+                    for (String functionName : functionsToDrop)
+                        schemaChange(String.format("DROP FUNCTION IF EXISTS %s", functionName));
+
                     for (String typeName : typesToDrop)
                         schemaChange(String.format("DROP TYPE IF EXISTS %s.%s", KEYSPACE, typeName));
 
@@ -133,6 +192,40 @@ public abstract class CQLTester
         });
     }
 
+    // lazy initialization for all tests that require Java Driver
+    private static void requireNetwork() throws ConfigurationException
+    {
+        if (server != null)
+            return;
+
+        SystemKeyspace.finishStartup();
+        StorageService.instance.initServer();
+        SchemaLoader.startGossiper();
+
+        server = new org.apache.cassandra.transport.Server(nativeAddr, nativePort);
+        server.start();
+
+        for (int version = 1; version <= Server.CURRENT_VERSION; version++)
+        {
+            if (cluster[version-1] != null)
+                continue;
+
+            cluster[version-1] = Cluster.builder().addContactPoints(nativeAddr)
+                                  .withClusterName("Test Cluster")
+                                  .withPort(nativePort)
+                                  .withProtocolVersion(ProtocolVersion.fromInt(version))
+                                  .build();
+            session[version-1] = cluster[version-1].connect();
+
+            logger.info("Started Java Driver instance for protocol version {}", version);
+        }
+    }
+
+    protected void dropPerTestKeyspace() throws Throwable
+    {
+        execute(String.format("DROP KEYSPACE IF EXISTS %s", KEYSPACE_PER_TEST));
+    }
+
     public void flush()
     {
         try
@@ -183,7 +276,7 @@ public abstract class CQLTester
 
     protected String createType(String query)
     {
-        String typeName = "type_" + seqNumber.getAndIncrement();
+        String typeName = callerName() + "_type_" + seqNumber.getAndIncrement();
         String fullQuery = String.format(query, KEYSPACE + "." + typeName);
         currentTypes.add(typeName);
         logger.info(fullQuery);
@@ -191,18 +284,48 @@ public abstract class CQLTester
         return typeName;
     }
 
+    protected String createFunction(String keyspace, String argTypes, String query) throws Throwable
+    {
+        String functionName = keyspace + "." + callerName() + "_function_" + seqNumber.getAndIncrement();
+        createFunctionOverload(functionName, argTypes, query);
+        return functionName;
+    }
+
+    protected void createFunctionOverload(String functionName, String argTypes, String query) throws Throwable
+    {
+        String fullQuery = String.format(query, functionName);
+        currentFunctions.add(functionName + '(' + argTypes + ')');
+        logger.info(fullQuery);
+        execute(fullQuery);
+    }
+
+    protected String createAggregate(String keyspace, String argTypes, String query) throws Throwable
+    {
+        String aggregateName = keyspace + "." + callerName() + "_aggregate_" + seqNumber.getAndIncrement();
+        createAggregateOverload(aggregateName, argTypes, query);
+        return aggregateName;
+    }
+
+    protected void createAggregateOverload(String aggregateName, String argTypes, String query) throws Throwable
+    {
+        String fullQuery = String.format(query, aggregateName);
+        currentAggregates.add(aggregateName + '(' + argTypes + ')');
+        logger.info(fullQuery);
+        execute(fullQuery);
+    }
+
     protected void createTable(String query)
     {
-        currentTable = "table_" + seqNumber.getAndIncrement();
-        String fullQuery = String.format(query, KEYSPACE + "." + currentTable);
+        currentTable = callerName() + "_table_" + seqNumber.getAndIncrement();
+        String fullQuery = formatQuery(query);
         logger.info(fullQuery);
         schemaChange(fullQuery);
     }
 
     protected void createTableMayThrow(String query) throws Throwable
     {
-        currentTable = "table_" + seqNumber.getAndIncrement();
-        String fullQuery = String.format(query, KEYSPACE + "." + currentTable);
+        currentTable = callerName() + "_table_" + seqNumber.getAndIncrement();
+        String fullQuery = formatQuery(query);
         logger.info(fullQuery);
         try
         {
@@ -216,14 +339,14 @@ public abstract class CQLTester
 
     protected void alterTable(String query)
     {
-        String fullQuery = String.format(query, KEYSPACE + "." + currentTable);
+        String fullQuery = formatQuery(query);
         logger.info(fullQuery);
         schemaChange(fullQuery);
     }
 
     protected void alterTableMayThrow(String query) throws Throwable
     {
-        String fullQuery = String.format(query, KEYSPACE + "." + currentTable);
+        String fullQuery = formatQuery(query);
         logger.info(fullQuery);
         try
         {
@@ -244,14 +367,14 @@ public abstract class CQLTester
 
     protected void createIndex(String query)
     {
-        String fullQuery = String.format(query, KEYSPACE + "." + currentTable);
+        String fullQuery = formatQuery(query);
         logger.info(fullQuery);
         schemaChange(fullQuery);
     }
 
     protected void createIndexMayThrow(String query) throws Throwable
     {
-        String fullQuery = String.format(query, KEYSPACE + "." + currentTable);
+        String fullQuery = formatQuery(query);
         logger.info(fullQuery);
         try
         {
@@ -270,6 +393,11 @@ public abstract class CQLTester
         schemaChange(fullQuery);
     }
 
+    private static String callerName()
+    {
+        return new Exception().getStackTrace()[2].getMethodName().toLowerCase();
+    }
+
     private static void schemaChange(String query)
     {
         try
@@ -288,11 +416,24 @@ public abstract class CQLTester
         return Schema.instance.getCFMetaData(KEYSPACE, currentTable);
     }
 
+    protected com.datastax.driver.core.ResultSet executeNet(int protocolVersion, String query, Object... values) throws Throwable
+    {
+        requireNetwork();
+
+        return session[protocolVersion-1].execute(formatQuery(query), values);
+    }
+
+    private String formatQuery(String query)
+    {
+        query = currentTable == null ? query : String.format(query, KEYSPACE + "." + currentTable);
+        return query;
+    }
+
     protected UntypedResultSet execute(String query, Object... values) throws Throwable
     {
         try
         {
-            query = currentTable == null ? query : String.format(query, KEYSPACE + "." + currentTable);
+            query = formatQuery(query);
 
             UntypedResultSet rs;
             if (usePrepared)
@@ -318,6 +459,64 @@ public abstract class CQLTester
         }
     }
 
+    protected void assertRowsNet(int protocolVersion, ResultSet result, Object[]... rows)
+    {
+        if (result == null)
+        {
+            if (rows.length > 0)
+                Assert.fail(String.format("No rows returned by query but %d expected", rows.length));
+            return;
+        }
+
+        ColumnDefinitions meta = result.getColumnDefinitions();
+        Iterator<Row> iter = result.iterator();
+        int i = 0;
+        while (iter.hasNext() && i < rows.length)
+        {
+            Object[] expected = rows[i];
+            Row actual = iter.next();
+
+            Assert.assertEquals(String.format("Invalid number of (expected) values provided for row %d (using protocol version %d)",
+                                              i, protocolVersion),
+                                meta.size(), expected.length);
+
+            for (int j = 0; j < meta.size(); j++)
+            {
+                DataType type = meta.getType(j);
+                ByteBuffer expectedByteValue = type.serialize(expected[j], ProtocolVersion.fromInt(protocolVersion));
+                int expectedBytes = expectedByteValue.remaining();
+                ByteBuffer actualValue = actual.getBytesUnsafe(meta.getName(j));
+                int actualBytes = actualValue.remaining();
+
+                if (!Objects.equal(expectedByteValue, actualValue))
+                    Assert.fail(String.format("Invalid value for row %d column %d (%s of type %s), " +
+                                              "expected <%s> (%d bytes) but got <%s> (%d bytes) " +
+                                              "(using protocol version %d)",
+                                              i, j, meta.getName(j), type,
+                                              type.format(expected[j]),
+                                              expectedBytes,
+                                              type.format(type.deserialize(actualValue, ProtocolVersion.fromInt(protocolVersion))),
+                                              actualBytes,
+                                              protocolVersion));
+            }
+            i++;
+        }
+
+        if (iter.hasNext())
+        {
+            while (iter.hasNext())
+            {
+                iter.next();
+                i++;
+            }
+            Assert.fail(String.format("Got less rows than expected. Expected %d but got %d (using protocol version %d).",
+                                      rows.length, i, protocolVersion));
+        }
+
+        Assert.assertTrue(String.format("Got %s rows than expected. Expected %d but got %d (using protocol version %d)",
+                                        rows.length>i ? "less" : "more", rows.length, i, protocolVersion), i == rows.length);
+    }
+
     protected void assertRows(UntypedResultSet result, Object[]... rows)
     {
         if (result == null)
@@ -394,7 +593,7 @@ public abstract class CQLTester
     protected void assertEmpty(UntypedResultSet result) throws Throwable
     {
         if (result != null && result.size() != 0)
-            throw new InvalidRequestException(String.format("Expected empty result but got %d rows", result.size()));
+            throw new AssertionError(String.format("Expected empty result but got %d rows", result.size()));
     }
 
     protected void assertInvalid(String query, Object... values) throws Throwable
@@ -406,7 +605,16 @@ public abstract class CQLTester
     {
         try
         {
-            execute(query, values);
+            try
+            {
+                execute(query, values);
+            }
+            catch (RuntimeException e)
+            {
+                Throwable cause = e.getCause();
+                if (cause instanceof InvalidRequestException)
+                    throw cause;
+            }
             String q = USE_PREPARED_VALUES
                      ? query + " (values: " + formatAllValues(values) + ")"
                      : replaceValues(query, values);