You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ty...@apache.org on 2014/11/27 00:50:00 UTC
[2/2] cassandra git commit: Support for UDTs, tuples,
and collections in UDFs
Support for UDTs, tuples, and collections in UDFs
Patch by Robert Stupp; reviewed by Tyler Hobbs for CASSANDRA-7563
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/794d68b5
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/794d68b5
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/794d68b5
Branch: refs/heads/trunk
Commit: 794d68b51b77c2a3cb09374010b6f84231ead604
Parents: e131213
Author: Robert Stupp <sn...@snazy.de>
Authored: Wed Nov 26 17:49:45 2014 -0600
Committer: Tyler Hobbs <ty...@datastax.com>
Committed: Wed Nov 26 17:49:45 2014 -0600
----------------------------------------------------------------------
CHANGES.txt | 2 +
.../org/apache/cassandra/cql3/CQL3Type.java | 12 +-
.../cql3/functions/BytesConversionFcts.java | 8 +-
.../cassandra/cql3/functions/FunctionCall.java | 9 +-
.../cassandra/cql3/functions/FunctionName.java | 1 -
.../cassandra/cql3/functions/Functions.java | 50 +-
.../cql3/functions/JavaSourceUDFFactory.java | 51 +-
.../cql3/functions/ScalarFunction.java | 3 +-
.../cql3/functions/ScriptBasedUDF.java | 13 +-
.../cassandra/cql3/functions/TimeuuidFcts.java | 10 +-
.../cassandra/cql3/functions/TokenFct.java | 2 +-
.../cassandra/cql3/functions/UDFunction.java | 177 ++-
.../cassandra/cql3/functions/UuidFcts.java | 2 +-
.../selection/AggregateFunctionSelector.java | 8 +-
.../cassandra/cql3/selection/FieldSelector.java | 8 +-
.../cql3/selection/ScalarFunctionSelector.java | 10 +-
.../cassandra/cql3/selection/Selection.java | 30 +-
.../cassandra/cql3/selection/Selector.java | 6 +-
.../cql3/selection/SimpleSelector.java | 5 +-
.../cql3/selection/WritetimeOrTTLSelector.java | 4 +-
.../statements/CreateFunctionStatement.java | 23 +-
.../cql3/statements/DropFunctionStatement.java | 10 +-
.../cql3/statements/DropTypeStatement.java | 11 +
.../cql3/statements/ModificationStatement.java | 2 +-
.../cql3/statements/SelectStatement.java | 8 +-
.../cassandra/hadoop/cql3/CqlRecordReader.java | 2 +-
.../org/apache/cassandra/transport/Server.java | 1 +
.../org/apache/cassandra/cql3/CQLTester.java | 232 ++-
test/unit/org/apache/cassandra/cql3/UFTest.java | 1356 ++++++++++++++----
tools/lib/cassandra-driver-core-2.0.5.jar | Bin 544552 -> 0 bytes
30 files changed, 1643 insertions(+), 413 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/794d68b5/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 162d579..55c86dd 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,6 @@
3.0
+ * Support UDTs, tuples, and collections in user-defined
+ functions (CASSANDRA-7563)
* Fix aggregate fn results on empty selection, result column name,
and cqlsh parsing (CASSANDRA-8229)
* Mark sstables as repaired after full repair (CASSANDRA-7586)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/794d68b5/src/java/org/apache/cassandra/cql3/CQL3Type.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/CQL3Type.java b/src/java/org/apache/cassandra/cql3/CQL3Type.java
index b656de8..98d1b15 100644
--- a/src/java/org/apache/cassandra/cql3/CQL3Type.java
+++ b/src/java/org/apache/cassandra/cql3/CQL3Type.java
@@ -315,6 +315,11 @@ public interface CQL3Type
return false;
}
+ public String keyspace()
+ {
+ return null;
+ }
+
public void freeze() throws InvalidRequestException
{
String message = String.format("frozen<> is only allowed on collections, tuples, and user-defined types (got %s)", this);
@@ -474,6 +479,11 @@ public interface CQL3Type
this.name = name;
}
+ public String keyspace()
+ {
+ return name.getKeyspace();
+ }
+
public void freeze()
{
frozen = true;
@@ -485,7 +495,7 @@ public interface CQL3Type
{
// The provided keyspace is the one of the current statement this is part of. If it's different from the keyspace of
// the UTName, we reject since we want to limit user types to their own keyspace (see #6643)
- if (!keyspace.equals(name.getKeyspace()))
+ if (keyspace != null && !keyspace.equals(name.getKeyspace()))
throw new InvalidRequestException(String.format("Statement on keyspace %s cannot refer to a user type in keyspace %s; "
+ "user types can only be used in the keyspace they are defined in",
keyspace, name.getKeyspace()));
http://git-wip-us.apache.org/repos/asf/cassandra/blob/794d68b5/src/java/org/apache/cassandra/cql3/functions/BytesConversionFcts.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/functions/BytesConversionFcts.java b/src/java/org/apache/cassandra/cql3/functions/BytesConversionFcts.java
index 1cd1d69..ddb33fc 100644
--- a/src/java/org/apache/cassandra/cql3/functions/BytesConversionFcts.java
+++ b/src/java/org/apache/cassandra/cql3/functions/BytesConversionFcts.java
@@ -36,7 +36,7 @@ public abstract class BytesConversionFcts
String name = fromType.asCQL3Type() + "asblob";
return new NativeScalarFunction(name, BytesType.instance, fromType)
{
- public ByteBuffer execute(List<ByteBuffer> parameters)
+ public ByteBuffer execute(int protocolVersion, List<ByteBuffer> parameters)
{
return parameters.get(0);
}
@@ -48,7 +48,7 @@ public abstract class BytesConversionFcts
final String name = "blobas" + toType.asCQL3Type();
return new NativeScalarFunction(name, toType, BytesType.instance)
{
- public ByteBuffer execute(List<ByteBuffer> parameters) throws InvalidRequestException
+ public ByteBuffer execute(int protocolVersion, List<ByteBuffer> parameters) throws InvalidRequestException
{
ByteBuffer val = parameters.get(0);
try
@@ -68,7 +68,7 @@ public abstract class BytesConversionFcts
public static final Function VarcharAsBlobFct = new NativeScalarFunction("varcharasblob", BytesType.instance, UTF8Type.instance)
{
- public ByteBuffer execute(List<ByteBuffer> parameters)
+ public ByteBuffer execute(int protocolVersion, List<ByteBuffer> parameters)
{
return parameters.get(0);
}
@@ -76,7 +76,7 @@ public abstract class BytesConversionFcts
public static final Function BlobAsVarcharFact = new NativeScalarFunction("blobasvarchar", UTF8Type.instance, BytesType.instance)
{
- public ByteBuffer execute(List<ByteBuffer> parameters)
+ public ByteBuffer execute(int protocolVersion, List<ByteBuffer> parameters)
{
return parameters.get(0);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/794d68b5/src/java/org/apache/cassandra/cql3/functions/FunctionCall.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/functions/FunctionCall.java b/src/java/org/apache/cassandra/cql3/functions/FunctionCall.java
index efaa12a..01443d2 100644
--- a/src/java/org/apache/cassandra/cql3/functions/FunctionCall.java
+++ b/src/java/org/apache/cassandra/cql3/functions/FunctionCall.java
@@ -27,6 +27,7 @@ import org.apache.cassandra.db.marshal.ListType;
import org.apache.cassandra.db.marshal.MapType;
import org.apache.cassandra.db.marshal.SetType;
import org.apache.cassandra.exceptions.InvalidRequestException;
+import org.apache.cassandra.transport.Server;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.serializers.MarshalException;
@@ -69,12 +70,12 @@ public class FunctionCall extends Term.NonTerminal
throw new InvalidRequestException(String.format("Invalid null value for argument to %s", fun));
buffers.add(val);
}
- return executeInternal(fun, buffers);
+ return executeInternal(options.getProtocolVersion(), fun, buffers);
}
- private static ByteBuffer executeInternal(ScalarFunction fun, List<ByteBuffer> params) throws InvalidRequestException
+ private static ByteBuffer executeInternal(int protocolVersion, ScalarFunction fun, List<ByteBuffer> params) throws InvalidRequestException
{
- ByteBuffer result = fun.execute(params);
+ ByteBuffer result = fun.execute(protocolVersion, params);
try
{
// Check the method didn't lied on it's declared return type
@@ -172,7 +173,7 @@ public class FunctionCall extends Term.NonTerminal
buffers.add(((Term.Terminal)t).get(QueryOptions.DEFAULT));
}
- return executeInternal(fun, buffers);
+ return executeInternal(Server.CURRENT_VERSION, fun, buffers);
}
public AssignmentTestable.TestResult testAssignment(String keyspace, ColumnSpecification receiver)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/794d68b5/src/java/org/apache/cassandra/cql3/functions/FunctionName.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/functions/FunctionName.java b/src/java/org/apache/cassandra/cql3/functions/FunctionName.java
index 460e7a6..bb30040 100644
--- a/src/java/org/apache/cassandra/cql3/functions/FunctionName.java
+++ b/src/java/org/apache/cassandra/cql3/functions/FunctionName.java
@@ -19,7 +19,6 @@ package org.apache.cassandra.cql3.functions;
import com.google.common.base.Objects;
-import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.db.SystemKeyspace;
public final class FunctionName
http://git-wip-us.apache.org/repos/asf/cassandra/blob/794d68b5/src/java/org/apache/cassandra/cql3/functions/Functions.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/functions/Functions.java b/src/java/org/apache/cassandra/cql3/functions/Functions.java
index 7021475..a8fdf0f 100644
--- a/src/java/org/apache/cassandra/cql3/functions/Functions.java
+++ b/src/java/org/apache/cassandra/cql3/functions/Functions.java
@@ -18,6 +18,7 @@
package org.apache.cassandra.cql3.functions;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.List;
import com.google.common.collect.ArrayListMultimap;
@@ -29,6 +30,8 @@ import org.apache.cassandra.cql3.*;
import org.apache.cassandra.db.SystemKeyspace;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.exceptions.InvalidRequestException;
+import org.apache.cassandra.service.IMigrationListener;
+import org.apache.cassandra.service.MigrationManager;
public abstract class Functions
{
@@ -83,6 +86,8 @@ public abstract class Functions
declare(AggregateFcts.avgFunctionForDouble);
declare(AggregateFcts.avgFunctionForVarint);
declare(AggregateFcts.avgFunctionForDecimal);
+
+ MigrationManager.instance.register(new FunctionsMigrationListener());
}
private static void declare(Function fun)
@@ -188,7 +193,7 @@ public abstract class Functions
assert name.hasKeyspace() : "function name not fully qualified";
for (Function f : declared.get(name))
{
- if (f.argTypes().equals(argTypes))
+ if (typeEquals(f.argTypes(), argTypes))
return f;
}
return null;
@@ -284,4 +289,47 @@ public abstract class Functions
removeFunction(fun.name(), fun.argTypes());
addFunction(fun);
}
+
+ public static Collection<Function> all()
+ {
+ return declared.values();
+ }
+
+ public static boolean typeEquals(AbstractType<?> t1, AbstractType<?> t2)
+ {
+ return t1.asCQL3Type().toString().equals(t2.asCQL3Type().toString());
+ }
+
+ public static boolean typeEquals(List<AbstractType<?>> t1, List<AbstractType<?>> t2)
+ {
+ if (t1.size() != t2.size())
+ return false;
+ for (int i = 0; i < t1.size(); i ++)
+ if (!typeEquals(t1.get(i), t2.get(i)))
+ return false;
+ return true;
+ }
+
+ private static class FunctionsMigrationListener implements IMigrationListener
+ {
+ public void onCreateKeyspace(String ksName) { }
+ public void onCreateColumnFamily(String ksName, String cfName) { }
+ public void onCreateUserType(String ksName, String typeName) { }
+ public void onCreateFunction(String ksName, String functionName) { }
+
+ public void onUpdateKeyspace(String ksName) { }
+ public void onUpdateColumnFamily(String ksName, String cfName) { }
+ public void onUpdateUserType(String ksName, String typeName) {
+ for (Function function : all())
+ if (function instanceof UDFunction)
+ ((UDFunction)function).userTypeUpdated(ksName, typeName);
+ }
+ public void onUpdateFunction(String ksName, String functionName) { }
+
+ public void onDropKeyspace(String ksName) { }
+ public void onDropColumnFamily(String ksName, String cfName) { }
+ public void onDropUserType(String ksName, String typeName) { }
+ public void onDropFunction(String ksName, String functionName) { }
+
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/794d68b5/src/java/org/apache/cassandra/cql3/functions/JavaSourceUDFFactory.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/functions/JavaSourceUDFFactory.java b/src/java/org/apache/cassandra/cql3/functions/JavaSourceUDFFactory.java
index 0f5fe48..560f077 100644
--- a/src/java/org/apache/cassandra/cql3/functions/JavaSourceUDFFactory.java
+++ b/src/java/org/apache/cassandra/cql3/functions/JavaSourceUDFFactory.java
@@ -20,13 +20,13 @@ package org.apache.cassandra.cql3.functions;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Modifier;
-import java.nio.ByteBuffer;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.datastax.driver.core.DataType;
import javassist.CannotCompileException;
import javassist.ClassPool;
import javassist.CtClass;
@@ -56,8 +56,14 @@ public final class JavaSourceUDFFactory
boolean deterministic)
throws InvalidRequestException
{
- Class<?> javaReturnType = UDFunction.javaType(returnType);
- Class<?>[] javaParamTypes = UDFunction.javaParamTypes(argTypes);
+ // argDataTypes is just the C* internal argTypes converted to the Java Driver DataType
+ DataType[] argDataTypes = UDFunction.driverTypes(argTypes);
+ // returnDataType is just the C* internal returnType converted to the Java Driver DataType
+ DataType returnDataType = UDFunction.driverType(returnType);
+ // javaParamTypes is just the Java representation for argTypes resp. argDataTypes
+ Class<?>[] javaParamTypes = UDFunction.javaTypes(argDataTypes);
+ // javaReturnType is just the Java representation for returnType resp. returnDataType
+ Class<?> javaReturnType = returnDataType.asJavaClass();
String clsName = generateClassName(name);
@@ -92,9 +98,13 @@ public final class JavaSourceUDFFactory
Constructor ctor =
cc.toClass().getDeclaredConstructor(
- FunctionName.class, List.class, List.class,
- AbstractType.class, String.class, boolean.class);
- return (UDFunction) ctor.newInstance(name, argNames, argTypes, returnType, body, deterministic);
+ FunctionName.class, List.class, List.class, DataType[].class,
+ AbstractType.class, DataType.class,
+ String.class, boolean.class);
+ return (UDFunction) ctor.newInstance(
+ name, argNames, argTypes, argDataTypes,
+ returnType, returnDataType,
+ body, deterministic);
}
catch (NotFoundException | CannotCompileException | NoSuchMethodException | LinkageError | InstantiationException | IllegalAccessException e)
{
@@ -133,10 +143,12 @@ public final class JavaSourceUDFFactory
"(org.apache.cassandra.cql3.functions.FunctionName name, " +
"java.util.List argNames, " +
"java.util.List argTypes, " +
+ "com.datastax.driver.core.DataType[] argDataTypes, " +
"org.apache.cassandra.db.marshal.AbstractType returnType, " +
+ "com.datastax.driver.core.DataType returnDataType, " +
"String body," +
"boolean deterministic)\n{" +
- " super(name, argNames, argTypes, returnType, \"java\", body, deterministic);\n" +
+ " super(name, argNames, argTypes, argDataTypes, returnType, returnDataType, \"java\", body, deterministic);\n" +
"}";
}
@@ -177,15 +189,17 @@ public final class JavaSourceUDFFactory
* Generated looks like this:
* <code><pre>
*
- * public java.nio.ByteBuffer execute(java.util.List params)
+ * public java.nio.ByteBuffer execute(int protocolVersion, java.util.List params)
* throws org.apache.cassandra.exceptions.InvalidRequestException
* {
* try
* {
* Object result = executeInternal(
- * (<cast to JAVA_ARG_TYPE>)org.apache.cassandra.cql3.functions.JavaSourceUDFFactory.compose(argTypes, params, 0)
+ * (<cast to JAVA_ARG_TYPE>)compose(protocolVersion, 0, (java.nio.ByteBuffer)params.get(0)),
+ * (<cast to JAVA_ARG_TYPE>)compose(protocolVersion, 1, (java.nio.ByteBuffer)params.get(1)),
+ * ...
* );
- * return result != null ? returnType.decompose(result) : null;
+ * return decompose(protocolVersion, result);
* }
* catch (Throwable t)
* {
@@ -202,7 +216,7 @@ public final class JavaSourceUDFFactory
// usual methods are 700-800 chars long (prevent temp object allocations)
StringBuilder code = new StringBuilder(1024);
// overrides org.apache.cassandra.cql3.functions.Function.execute(java.util.List)
- code.append("public java.nio.ByteBuffer execute(java.util.List params)\n" +
+ code.append("public java.nio.ByteBuffer execute(int protocolVersion, java.util.List params)\n" +
"throws org.apache.cassandra.exceptions.InvalidRequestException\n" +
"{\n" +
" try\n" +
@@ -219,13 +233,13 @@ public final class JavaSourceUDFFactory
code.
// cast to Java type
append("\n (").append(paramTypes[i].getName()).append(")").
- // generate object representation of input parameter
- append("org.apache.cassandra.cql3.functions.JavaSourceUDFFactory.compose(argTypes, params, ").append(i).append(')');
+ // generate object representation of input parameter (call UDFunction.compose)
+ append("compose(protocolVersion, ").append(i).append(", (java.nio.ByteBuffer)params.get(").append(i).append("))");
}
code.append("\n );\n" +
- // generate serialized return value (returnType is a field in AbstractFunction class)
- " return result != null ? returnType.decompose(result) : null;\n" +
+ // generate serialized return value (returnType is a field in AbstractFunction class), (call UDFunction.decompose)
+ " return decompose(protocolVersion, result);\n" +
//
// error handling ...
" }\n" +
@@ -242,11 +256,4 @@ public final class JavaSourceUDFFactory
return code.toString();
}
- // Used by execute() implementations of generated java source UDFs.
- public static Object compose(List<AbstractType<?>> argTypes, List<ByteBuffer> parameters, int param)
- {
- ByteBuffer bb = parameters.get(param);
- return bb == null ? null : argTypes.get(param).compose(bb);
- }
-
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/794d68b5/src/java/org/apache/cassandra/cql3/functions/ScalarFunction.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/functions/ScalarFunction.java b/src/java/org/apache/cassandra/cql3/functions/ScalarFunction.java
index ba2a374..f00faf7 100644
--- a/src/java/org/apache/cassandra/cql3/functions/ScalarFunction.java
+++ b/src/java/org/apache/cassandra/cql3/functions/ScalarFunction.java
@@ -30,9 +30,10 @@ public interface ScalarFunction extends Function
/**
* Applies this function to the specified parameter.
*
+ * @param protocolVersion protocol version used for parameters and return value
* @param parameters the input parameters
* @return the result of applying this function to the parameter
* @throws InvalidRequestException if this function cannot not be applied to the parameter
*/
- public ByteBuffer execute(List<ByteBuffer> parameters) throws InvalidRequestException;
+ public ByteBuffer execute(int protocolVersion, List<ByteBuffer> parameters) throws InvalidRequestException;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/794d68b5/src/java/org/apache/cassandra/cql3/functions/ScriptBasedUDF.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/functions/ScriptBasedUDF.java b/src/java/org/apache/cassandra/cql3/functions/ScriptBasedUDF.java
index 73fc43b..059a612 100644
--- a/src/java/org/apache/cassandra/cql3/functions/ScriptBasedUDF.java
+++ b/src/java/org/apache/cassandra/cql3/functions/ScriptBasedUDF.java
@@ -88,15 +88,11 @@ public class ScriptBasedUDF extends UDFunction
}
}
- public ByteBuffer execute(List<ByteBuffer> parameters) throws InvalidRequestException
+ public ByteBuffer execute(int protocolVersion, List<ByteBuffer> parameters) throws InvalidRequestException
{
Object[] params = new Object[argTypes.size()];
for (int i = 0; i < params.length; i++)
- {
- ByteBuffer bb = parameters.get(i);
- if (bb != null)
- params[i] = argTypes.get(i).compose(bb);
- }
+ params[i] = compose(protocolVersion, i, parameters.get(i));
try
{
@@ -108,7 +104,7 @@ public class ScriptBasedUDF extends UDFunction
if (result == null)
return null;
- Class<?> javaReturnType = returnType.getSerializer().getType();
+ Class<?> javaReturnType = returnDataType.asJavaClass();
Class<?> resultType = result.getClass();
if (!javaReturnType.isAssignableFrom(resultType))
{
@@ -138,8 +134,7 @@ public class ScriptBasedUDF extends UDFunction
}
}
- @SuppressWarnings("unchecked") ByteBuffer r = ((AbstractType) returnType).decompose(result);
- return r;
+ return decompose(protocolVersion, result);
}
catch (RuntimeException | ScriptException e)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/794d68b5/src/java/org/apache/cassandra/cql3/functions/TimeuuidFcts.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/functions/TimeuuidFcts.java b/src/java/org/apache/cassandra/cql3/functions/TimeuuidFcts.java
index e481cf5..c1c3490 100644
--- a/src/java/org/apache/cassandra/cql3/functions/TimeuuidFcts.java
+++ b/src/java/org/apache/cassandra/cql3/functions/TimeuuidFcts.java
@@ -31,7 +31,7 @@ public abstract class TimeuuidFcts
{
public static final Function nowFct = new NativeScalarFunction("now", TimeUUIDType.instance)
{
- public ByteBuffer execute(List<ByteBuffer> parameters)
+ public ByteBuffer execute(int protocolVersion, List<ByteBuffer> parameters)
{
return ByteBuffer.wrap(UUIDGen.getTimeUUIDBytes());
}
@@ -45,7 +45,7 @@ public abstract class TimeuuidFcts
public static final Function minTimeuuidFct = new NativeScalarFunction("mintimeuuid", TimeUUIDType.instance, TimestampType.instance)
{
- public ByteBuffer execute(List<ByteBuffer> parameters)
+ public ByteBuffer execute(int protocolVersion, List<ByteBuffer> parameters)
{
ByteBuffer bb = parameters.get(0);
if (bb == null)
@@ -57,7 +57,7 @@ public abstract class TimeuuidFcts
public static final Function maxTimeuuidFct = new NativeScalarFunction("maxtimeuuid", TimeUUIDType.instance, TimestampType.instance)
{
- public ByteBuffer execute(List<ByteBuffer> parameters)
+ public ByteBuffer execute(int protocolVersion, List<ByteBuffer> parameters)
{
ByteBuffer bb = parameters.get(0);
if (bb == null)
@@ -69,7 +69,7 @@ public abstract class TimeuuidFcts
public static final Function dateOfFct = new NativeScalarFunction("dateof", TimestampType.instance, TimeUUIDType.instance)
{
- public ByteBuffer execute(List<ByteBuffer> parameters)
+ public ByteBuffer execute(int protocolVersion, List<ByteBuffer> parameters)
{
ByteBuffer bb = parameters.get(0);
if (bb == null)
@@ -81,7 +81,7 @@ public abstract class TimeuuidFcts
public static final Function unixTimestampOfFct = new NativeScalarFunction("unixtimestampof", LongType.instance, TimeUUIDType.instance)
{
- public ByteBuffer execute(List<ByteBuffer> parameters)
+ public ByteBuffer execute(int protocolVersion, List<ByteBuffer> parameters)
{
ByteBuffer bb = parameters.get(0);
if (bb == null)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/794d68b5/src/java/org/apache/cassandra/cql3/functions/TokenFct.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/functions/TokenFct.java b/src/java/org/apache/cassandra/cql3/functions/TokenFct.java
index ca4d473..9d50a97 100644
--- a/src/java/org/apache/cassandra/cql3/functions/TokenFct.java
+++ b/src/java/org/apache/cassandra/cql3/functions/TokenFct.java
@@ -50,7 +50,7 @@ public class TokenFct extends NativeScalarFunction
return types;
}
- public ByteBuffer execute(List<ByteBuffer> parameters) throws InvalidRequestException
+ public ByteBuffer execute(int protocolVersion, List<ByteBuffer> parameters) throws InvalidRequestException
{
CBuilder builder = cfm.getKeyValidatorAsCType().builder();
for (int i = 0; i < parameters.size(); i++)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/794d68b5/src/java/org/apache/cassandra/cql3/functions/UDFunction.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/functions/UDFunction.java b/src/java/org/apache/cassandra/cql3/functions/UDFunction.java
index 42418c6..973c70a 100644
--- a/src/java/org/apache/cassandra/cql3/functions/UDFunction.java
+++ b/src/java/org/apache/cassandra/cql3/functions/UDFunction.java
@@ -17,6 +17,9 @@
*/
package org.apache.cassandra.cql3.functions;
+import java.lang.invoke.MethodHandle;
+import java.lang.invoke.MethodHandles;
+import java.lang.reflect.Method;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.security.MessageDigest;
@@ -26,6 +29,11 @@ import com.google.common.base.Objects;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.datastax.driver.core.DataType;
+import com.datastax.driver.core.ProtocolVersion;
+import com.datastax.driver.core.UserType;
+import org.apache.cassandra.config.KSMetaData;
+import org.apache.cassandra.config.Schema;
import org.apache.cassandra.cql3.*;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.composites.Composite;
@@ -33,6 +41,8 @@ import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.db.marshal.UTF8Type;
import org.apache.cassandra.db.marshal.TypeParser;
import org.apache.cassandra.exceptions.*;
+import org.apache.cassandra.service.MigrationManager;
+import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
/**
@@ -42,11 +52,83 @@ public abstract class UDFunction extends AbstractFunction implements ScalarFunct
{
protected static final Logger logger = LoggerFactory.getLogger(UDFunction.class);
+ // TODO make these c'tors and methods public in Java-Driver - see https://datastax-oss.atlassian.net/browse/JAVA-502
+ static final MethodHandle methodParseOne;
+ static
+ {
+ try
+ {
+ Class<?> cls = Class.forName("com.datastax.driver.core.CassandraTypeParser");
+ Method m = cls.getDeclaredMethod("parseOne", String.class);
+ m.setAccessible(true);
+ methodParseOne = MethodHandles.lookup().unreflect(m);
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+ * Construct an array containing the Java classes for the given Java Driver {@link com.datastax.driver.core.DataType}s.
+ *
+ * @param dataTypes array with UDF argument types
+ * @return array of same size with UDF arguments
+ */
+ public static Class<?>[] javaTypes(DataType[] dataTypes)
+ {
+ Class<?> paramTypes[] = new Class[dataTypes.length];
+ for (int i = 0; i < paramTypes.length; i++)
+ paramTypes[i] = dataTypes[i].asJavaClass();
+ return paramTypes;
+ }
+
+ /**
+ * Construct an array containing the Java Driver {@link com.datastax.driver.core.DataType}s for the
+ * C* internal types.
+ *
+ * @param abstractTypes list with UDF argument types
+ * @return array with argument types as {@link com.datastax.driver.core.DataType}
+ */
+ public static DataType[] driverTypes(List<AbstractType<?>> abstractTypes)
+ {
+ DataType[] argDataTypes = new DataType[abstractTypes.size()];
+ for (int i = 0; i < argDataTypes.length; i++)
+ argDataTypes[i] = driverType(abstractTypes.get(i));
+ return argDataTypes;
+ }
+
+ /**
+ * Returns the Java Driver {@link com.datastax.driver.core.DataType} for the C* internal type.
+ */
+ public static DataType driverType(AbstractType abstractType)
+ {
+ CQL3Type cqlType = abstractType.asCQL3Type();
+ try
+ {
+ return (DataType) methodParseOne.invoke(cqlType.getType().toString());
+ }
+ catch (RuntimeException | Error e)
+ {
+ // immediately rethrow these...
+ throw e;
+ }
+ catch (Throwable e)
+ {
+ throw new RuntimeException("cannot parse driver type " + cqlType.getType().toString(), e);
+ }
+ }
+
+ // instance vars
+
protected final List<ColumnIdentifier> argNames;
protected final String language;
protected final String body;
- private final boolean deterministic;
+ protected final boolean deterministic;
+
+ protected final DataType[] argDataTypes;
+ protected final DataType returnDataType;
protected UDFunction(FunctionName name,
List<ColumnIdentifier> argNames,
@@ -56,12 +138,53 @@ public abstract class UDFunction extends AbstractFunction implements ScalarFunct
String body,
boolean deterministic)
{
+ this(name, argNames, argTypes, driverTypes(argTypes), returnType,
+ driverType(returnType), language, body, deterministic);
+ }
+
+ protected UDFunction(FunctionName name,
+ List<ColumnIdentifier> argNames,
+ List<AbstractType<?>> argTypes,
+ DataType[] argDataTypes,
+ AbstractType<?> returnType,
+ DataType returnDataType,
+ String language,
+ String body,
+ boolean deterministic)
+ {
super(name, argTypes, returnType);
assert new HashSet<>(argNames).size() == argNames.size() : "duplicate argument names";
this.argNames = argNames;
this.language = language;
this.body = body;
this.deterministic = deterministic;
+ this.argDataTypes = argDataTypes;
+ this.returnDataType = returnDataType;
+ }
+
+ /**
+ * Used by UDF implementations (both Java code generated by {@link org.apache.cassandra.cql3.functions.JavaSourceUDFFactory}
+ * and script executor {@link org.apache.cassandra.cql3.functions.ScriptBasedUDF}) to convert the C*
+ * serialized representation to the Java object representation.
+ *
+ * @param protocolVersion the native protocol version used for serialization
+ * @param argIndex index of the UDF input argument
+ */
+ protected Object compose(int protocolVersion, int argIndex, ByteBuffer value)
+ {
+ return value == null ? null : argDataTypes[argIndex].deserialize(value, ProtocolVersion.fromInt(protocolVersion));
+ }
+
+ /**
+ * Used by UDF implementations (both Java code generated by {@link org.apache.cassandra.cql3.functions.JavaSourceUDFFactory}
+ * and script executor {@link org.apache.cassandra.cql3.functions.ScriptBasedUDF}) to convert the Java
+ * object representation for the return value to the C* serialized representation.
+ *
+ * @param protocolVersion the native protocol version used for serialization
+ */
+ protected ByteBuffer decompose(int protocolVersion, Object value)
+ {
+ return value == null ? null : returnDataType.serialize(value, ProtocolVersion.fromInt(protocolVersion));
}
public boolean isAggregate()
@@ -85,19 +208,6 @@ public abstract class UDFunction extends AbstractFunction implements ScalarFunct
}
}
- static Class<?>[] javaParamTypes(List<AbstractType<?>> argTypes)
- {
- Class<?> paramTypes[] = new Class[argTypes.size()];
- for (int i = 0; i < paramTypes.length; i++)
- paramTypes[i] = javaType(argTypes.get(i));
- return paramTypes;
- }
-
- static Class<?> javaType(AbstractType<?> type)
- {
- return type.getSerializer().getType();
- }
-
/**
* It can happen that a function has been declared (is listed in the scheam) but cannot
* be loaded (maybe only on some nodes). This is the case for instance if the class defining
@@ -117,7 +227,7 @@ public abstract class UDFunction extends AbstractFunction implements ScalarFunct
{
return new UDFunction(name, argNames, argTypes, returnType, language, body, true)
{
- public ByteBuffer execute(List<ByteBuffer> parameters) throws InvalidRequestException
+ public ByteBuffer execute(int protocolVersion, List<ByteBuffer> parameters) throws InvalidRequestException
{
throw new InvalidRequestException(String.format("Function '%s' exists but hasn't been loaded successfully for the following reason: %s. "
+ "Please see the server log for more details", this, reason.getMessage()));
@@ -135,7 +245,7 @@ public abstract class UDFunction extends AbstractFunction implements ScalarFunct
{
MessageDigest digest = FBUtilities.newMessageDigest("SHA-1");
for (AbstractType<?> type : argTypes)
- digest.update(type.toString().getBytes(StandardCharsets.UTF_8));
+ digest.update(type.asCQL3Type().toString().getBytes(StandardCharsets.UTF_8));
return ByteBuffer.wrap(digest.digest());
}
@@ -268,8 +378,8 @@ public abstract class UDFunction extends AbstractFunction implements ScalarFunct
UDFunction that = (UDFunction)o;
return Objects.equal(this.name, that.name)
&& Objects.equal(this.argNames, that.argNames)
- && Objects.equal(this.argTypes, that.argTypes)
- && Objects.equal(this.returnType, that.returnType)
+ && Functions.typeEquals(this.argTypes, that.argTypes)
+ && Functions.typeEquals(this.returnType, that.returnType)
&& Objects.equal(this.language, that.language)
&& Objects.equal(this.body, that.body)
&& Objects.equal(this.deterministic, that.deterministic);
@@ -280,4 +390,35 @@ public abstract class UDFunction extends AbstractFunction implements ScalarFunct
{
return Objects.hashCode(name, argNames, argTypes, returnType, language, body, deterministic);
}
+
+ public void userTypeUpdated(String ksName, String typeName)
+ {
+ boolean updated = false;
+
+ for (int i = 0; i < argDataTypes.length; i++)
+ {
+ DataType dataType = argDataTypes[i];
+ if (dataType instanceof UserType)
+ {
+ UserType userType = (UserType) dataType;
+ if (userType.getKeyspace().equals(ksName) && userType.getTypeName().equals(typeName))
+ {
+ KSMetaData ksm = Schema.instance.getKSMetaData(ksName);
+ assert ksm != null;
+
+ org.apache.cassandra.db.marshal.UserType ut = ksm.userTypes.getType(ByteBufferUtil.bytes(typeName));
+
+ DataType newUserType = driverType(ut);
+ argDataTypes[i] = newUserType;
+
+ argTypes.set(i, ut);
+
+ updated = true;
+ }
+ }
+ }
+
+ if (updated)
+ MigrationManager.announceNewFunction(this, true);
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/794d68b5/src/java/org/apache/cassandra/cql3/functions/UuidFcts.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/functions/UuidFcts.java b/src/java/org/apache/cassandra/cql3/functions/UuidFcts.java
index b3cef85..afb5aae 100644
--- a/src/java/org/apache/cassandra/cql3/functions/UuidFcts.java
+++ b/src/java/org/apache/cassandra/cql3/functions/UuidFcts.java
@@ -28,7 +28,7 @@ public abstract class UuidFcts
{
public static final Function uuidFct = new NativeScalarFunction("uuid", UUIDType.instance)
{
- public ByteBuffer execute(List<ByteBuffer> parameters)
+ public ByteBuffer execute(int protocolVersion, List<ByteBuffer> parameters)
{
return UUIDSerializer.instance.serialize(UUID.randomUUID());
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/794d68b5/src/java/org/apache/cassandra/cql3/selection/AggregateFunctionSelector.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/selection/AggregateFunctionSelector.java b/src/java/org/apache/cassandra/cql3/selection/AggregateFunctionSelector.java
index 6ea9716..7702796 100644
--- a/src/java/org/apache/cassandra/cql3/selection/AggregateFunctionSelector.java
+++ b/src/java/org/apache/cassandra/cql3/selection/AggregateFunctionSelector.java
@@ -34,20 +34,20 @@ final class AggregateFunctionSelector extends AbstractFunctionSelector<Aggregate
return true;
}
- public void addInput(ResultSetBuilder rs) throws InvalidRequestException
+ public void addInput(int protocolVersion, ResultSetBuilder rs) throws InvalidRequestException
{
// Aggregation of aggregation is not supported
for (int i = 0, m = argSelectors.size(); i < m; i++)
{
Selector s = argSelectors.get(i);
- s.addInput(rs);
- args.set(i, s.getOutput());
+ s.addInput(protocolVersion, rs);
+ args.set(i, s.getOutput(protocolVersion));
s.reset();
}
this.aggregate.addInput(args);
}
- public ByteBuffer getOutput() throws InvalidRequestException
+ public ByteBuffer getOutput(int protocolVersion) throws InvalidRequestException
{
return aggregate.compute();
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/794d68b5/src/java/org/apache/cassandra/cql3/selection/FieldSelector.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/selection/FieldSelector.java b/src/java/org/apache/cassandra/cql3/selection/FieldSelector.java
index 7e14486..d695598 100644
--- a/src/java/org/apache/cassandra/cql3/selection/FieldSelector.java
+++ b/src/java/org/apache/cassandra/cql3/selection/FieldSelector.java
@@ -64,14 +64,14 @@ final class FieldSelector extends Selector
return false;
}
- public void addInput(ResultSetBuilder rs) throws InvalidRequestException
+ public void addInput(int protocolVersion, ResultSetBuilder rs) throws InvalidRequestException
{
- selected.addInput(rs);
+ selected.addInput(protocolVersion, rs);
}
- public ByteBuffer getOutput() throws InvalidRequestException
+ public ByteBuffer getOutput(int protocolVersion) throws InvalidRequestException
{
- ByteBuffer value = selected.getOutput();
+ ByteBuffer value = selected.getOutput(protocolVersion);
if (value == null)
return null;
ByteBuffer[] buffers = type.split(value);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/794d68b5/src/java/org/apache/cassandra/cql3/selection/ScalarFunctionSelector.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/selection/ScalarFunctionSelector.java b/src/java/org/apache/cassandra/cql3/selection/ScalarFunctionSelector.java
index 4ceadb9..bb56bb8 100644
--- a/src/java/org/apache/cassandra/cql3/selection/ScalarFunctionSelector.java
+++ b/src/java/org/apache/cassandra/cql3/selection/ScalarFunctionSelector.java
@@ -36,12 +36,12 @@ final class ScalarFunctionSelector extends AbstractFunctionSelector<ScalarFuncti
return argSelectors.get(0).isAggregate();
}
- public void addInput(ResultSetBuilder rs) throws InvalidRequestException
+ public void addInput(int protocolVersion, ResultSetBuilder rs) throws InvalidRequestException
{
for (int i = 0, m = argSelectors.size(); i < m; i++)
{
Selector s = argSelectors.get(i);
- s.addInput(rs);
+ s.addInput(protocolVersion, rs);
}
}
@@ -49,15 +49,15 @@ final class ScalarFunctionSelector extends AbstractFunctionSelector<ScalarFuncti
{
}
- public ByteBuffer getOutput() throws InvalidRequestException
+ public ByteBuffer getOutput(int protocolVersion) throws InvalidRequestException
{
for (int i = 0, m = argSelectors.size(); i < m; i++)
{
Selector s = argSelectors.get(i);
- args.set(i, s.getOutput());
+ args.set(i, s.getOutput(protocolVersion));
s.reset();
}
- return fun.execute(args);
+ return fun.execute(protocolVersion, args);
}
ScalarFunctionSelector(Function fun, List<Selector> argSelectors)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/794d68b5/src/java/org/apache/cassandra/cql3/selection/Selection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/selection/Selection.java b/src/java/org/apache/cassandra/cql3/selection/Selection.java
index 888d96d..6ad36e9 100644
--- a/src/java/org/apache/cassandra/cql3/selection/Selection.java
+++ b/src/java/org/apache/cassandra/cql3/selection/Selection.java
@@ -219,33 +219,33 @@ public abstract class Selection
return c == null || !c.isLive(now);
}
- public void newRow() throws InvalidRequestException
+ public void newRow(int protocolVersion) throws InvalidRequestException
{
if (current != null)
{
- selectors.addInputRow(this);
+ selectors.addInputRow(protocolVersion, this);
if (!selectors.isAggregate())
{
- resultSet.addRow(selectors.getOutputRow());
+ resultSet.addRow(selectors.getOutputRow(protocolVersion));
selectors.reset();
}
}
current = new ArrayList<ByteBuffer>(columns.size());
}
- public ResultSet build() throws InvalidRequestException
+ public ResultSet build(int protocolVersion) throws InvalidRequestException
{
if (current != null)
{
- selectors.addInputRow(this);
- resultSet.addRow(selectors.getOutputRow());
+ selectors.addInputRow(protocolVersion, this);
+ resultSet.addRow(selectors.getOutputRow(protocolVersion));
selectors.reset();
current = null;
}
if (resultSet.isEmpty() && selectors.isAggregate())
{
- resultSet.addRow(selectors.getOutputRow());
+ resultSet.addRow(selectors.getOutputRow(protocolVersion));
}
return resultSet;
}
@@ -268,9 +268,9 @@ public abstract class Selection
* @param rs the <code>ResultSetBuilder</code>
* @throws InvalidRequestException
*/
- public void addInputRow(ResultSetBuilder rs) throws InvalidRequestException;
+ public void addInputRow(int protocolVersion, ResultSetBuilder rs) throws InvalidRequestException;
- public List<ByteBuffer> getOutputRow() throws InvalidRequestException;
+ public List<ByteBuffer> getOutputRow(int protocolVersion) throws InvalidRequestException;
public void reset();
}
@@ -318,12 +318,12 @@ public abstract class Selection
current = null;
}
- public List<ByteBuffer> getOutputRow()
+ public List<ByteBuffer> getOutputRow(int protocolVersion)
{
return current;
}
- public void addInputRow(ResultSetBuilder rs) throws InvalidRequestException
+ public void addInputRow(int protocolVersion, ResultSetBuilder rs) throws InvalidRequestException
{
current = rs.current;
}
@@ -388,22 +388,22 @@ public abstract class Selection
return factories.containsOnlyAggregateFunctions();
}
- public List<ByteBuffer> getOutputRow() throws InvalidRequestException
+ public List<ByteBuffer> getOutputRow(int protocolVersion) throws InvalidRequestException
{
List<ByteBuffer> outputRow = new ArrayList<>(selectors.size());
for (int i = 0, m = selectors.size(); i < m; i++)
{
- outputRow.add(selectors.get(i).getOutput());
+ outputRow.add(selectors.get(i).getOutput(protocolVersion));
}
return outputRow;
}
- public void addInputRow(ResultSetBuilder rs) throws InvalidRequestException
+ public void addInputRow(int protocolVersion, ResultSetBuilder rs) throws InvalidRequestException
{
for (int i = 0, m = selectors.size(); i < m; i++)
{
- selectors.get(i).addInput(rs);
+ selectors.get(i).addInput(protocolVersion, rs);
}
}
};
http://git-wip-us.apache.org/repos/asf/cassandra/blob/794d68b5/src/java/org/apache/cassandra/cql3/selection/Selector.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/selection/Selector.java b/src/java/org/apache/cassandra/cql3/selection/Selector.java
index f2c729b..0c1933f 100644
--- a/src/java/org/apache/cassandra/cql3/selection/Selector.java
+++ b/src/java/org/apache/cassandra/cql3/selection/Selector.java
@@ -120,18 +120,20 @@ public abstract class Selector implements AssignmentTestable
/**
* Add the current value from the specified <code>ResultSetBuilder</code>.
*
+ * @param protocolVersion protocol version used for serialization
* @param rs the <code>ResultSetBuilder</code>
* @throws InvalidRequestException if a problem occurs while add the input value
*/
- public abstract void addInput(ResultSetBuilder rs) throws InvalidRequestException;
+ public abstract void addInput(int protocolVersion, ResultSetBuilder rs) throws InvalidRequestException;
/**
* Returns the selector output.
*
+ * @param protocolVersion protocol version used for serialization
* @return the selector output
* @throws InvalidRequestException if a problem occurs while computing the output value
*/
- public abstract ByteBuffer getOutput() throws InvalidRequestException;
+ public abstract ByteBuffer getOutput(int protocolVersion) throws InvalidRequestException;
/**
* Returns the <code>Selector</code> output type.
http://git-wip-us.apache.org/repos/asf/cassandra/blob/794d68b5/src/java/org/apache/cassandra/cql3/selection/SimpleSelector.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/selection/SimpleSelector.java b/src/java/org/apache/cassandra/cql3/selection/SimpleSelector.java
index a5ff4cd..c2edaed 100644
--- a/src/java/org/apache/cassandra/cql3/selection/SimpleSelector.java
+++ b/src/java/org/apache/cassandra/cql3/selection/SimpleSelector.java
@@ -21,6 +21,7 @@ import java.nio.ByteBuffer;
import org.apache.cassandra.cql3.selection.Selection.ResultSetBuilder;
import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.exceptions.InvalidRequestException;
public final class SimpleSelector extends Selector
{
@@ -54,13 +55,13 @@ public final class SimpleSelector extends Selector
}
@Override
- public void addInput(ResultSetBuilder rs)
+ public void addInput(int protocolVersion, ResultSetBuilder rs) throws InvalidRequestException
{
current = rs.current.get(idx);
}
@Override
- public ByteBuffer getOutput()
+ public ByteBuffer getOutput(int protocolVersion) throws InvalidRequestException
{
return current;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/794d68b5/src/java/org/apache/cassandra/cql3/selection/WritetimeOrTTLSelector.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/selection/WritetimeOrTTLSelector.java b/src/java/org/apache/cassandra/cql3/selection/WritetimeOrTTLSelector.java
index 2494334..a1ecd3d 100644
--- a/src/java/org/apache/cassandra/cql3/selection/WritetimeOrTTLSelector.java
+++ b/src/java/org/apache/cassandra/cql3/selection/WritetimeOrTTLSelector.java
@@ -63,7 +63,7 @@ final class WritetimeOrTTLSelector extends Selector
};
}
- public void addInput(ResultSetBuilder rs)
+ public void addInput(int protocolVersion, ResultSetBuilder rs)
{
if (isWritetime)
{
@@ -77,7 +77,7 @@ final class WritetimeOrTTLSelector extends Selector
}
}
- public ByteBuffer getOutput()
+ public ByteBuffer getOutput(int protocolVersion)
{
return current;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/794d68b5/src/java/org/apache/cassandra/cql3/statements/CreateFunctionStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/CreateFunctionStatement.java b/src/java/org/apache/cassandra/cql3/statements/CreateFunctionStatement.java
index c41fb08..8d8c27a 100644
--- a/src/java/org/apache/cassandra/cql3/statements/CreateFunctionStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/CreateFunctionStatement.java
@@ -50,6 +50,7 @@ public final class CreateFunctionStatement extends SchemaAlteringStatement
private final List<ColumnIdentifier> argNames;
private final List<CQL3Type.Raw> argRawTypes;
private final CQL3Type.Raw rawReturnType;
+ private String currentKeyspace;
public CreateFunctionStatement(FunctionName functionName,
String language,
@@ -74,8 +75,10 @@ public final class CreateFunctionStatement extends SchemaAlteringStatement
public void prepareKeyspace(ClientState state) throws InvalidRequestException
{
- if (!functionName.hasKeyspace() && state.getRawKeyspace() != null)
- functionName = new FunctionName(state.getKeyspace(), functionName.name);
+ currentKeyspace = state.getRawKeyspace();
+
+ if (!functionName.hasKeyspace() && currentKeyspace != null)
+ functionName = new FunctionName(currentKeyspace, functionName.name);
if (!functionName.hasKeyspace())
throw new InvalidRequestException("You need to be logged in a keyspace or use a fully qualified function name");
@@ -112,11 +115,9 @@ public final class CreateFunctionStatement extends SchemaAlteringStatement
List<AbstractType<?>> argTypes = new ArrayList<>(argRawTypes.size());
for (CQL3Type.Raw rawType : argRawTypes)
- // We have no proper keyspace to give, which means that this will break (NPE currently)
- // for UDT: #7791 is open to fix this
- argTypes.add(rawType.prepare(functionName.keyspace).getType());
+ argTypes.add(rawType.prepare(typeKeyspace(rawType)).getType());
- AbstractType<?> returnType = rawReturnType.prepare(null).getType();
+ AbstractType<?> returnType = rawReturnType.prepare(typeKeyspace(rawReturnType)).getType();
Function old = Functions.find(functionName, argTypes);
if (old != null)
@@ -126,7 +127,7 @@ public final class CreateFunctionStatement extends SchemaAlteringStatement
if (!orReplace)
throw new InvalidRequestException(String.format("Function %s already exists", old));
- if (!old.returnType().isValueCompatibleWith(returnType))
+ if (!Functions.typeEquals(old.returnType(), returnType))
throw new InvalidRequestException(String.format("Cannot replace function %s, the new return type %s is not compatible with the return type %s of existing function",
functionName, returnType.asCQL3Type(), old.returnType().asCQL3Type()));
}
@@ -134,4 +135,12 @@ public final class CreateFunctionStatement extends SchemaAlteringStatement
MigrationManager.announceNewFunction(UDFunction.create(functionName, argNames, argTypes, returnType, language, body, deterministic), isLocalOnly);
return true;
}
+
+ private String typeKeyspace(CQL3Type.Raw rawType)
+ {
+ String ks = rawType.keyspace();
+ if (ks != null)
+ return ks;
+ return functionName.keyspace;
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/794d68b5/src/java/org/apache/cassandra/cql3/statements/DropFunctionStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/DropFunctionStatement.java b/src/java/org/apache/cassandra/cql3/statements/DropFunctionStatement.java
index 4cd9460..0ba3721 100644
--- a/src/java/org/apache/cassandra/cql3/statements/DropFunctionStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/DropFunctionStatement.java
@@ -103,7 +103,7 @@ public final class DropFunctionStatement extends SchemaAlteringStatement
List<AbstractType<?>> argTypes = new ArrayList<>(argRawTypes.size());
for (CQL3Type.Raw rawType : argRawTypes)
- argTypes.add(rawType.prepare(functionName.keyspace).getType());
+ argTypes.add(rawType.prepare(typeKeyspace(rawType)).getType());
Function old;
if (argsPresent)
@@ -139,4 +139,12 @@ public final class DropFunctionStatement extends SchemaAlteringStatement
MigrationManager.announceFunctionDrop((UDFunction)old, isLocalOnly);
return true;
}
+
+ private String typeKeyspace(CQL3Type.Raw rawType)
+ {
+ String ks = rawType.keyspace();
+ if (ks != null)
+ return ks;
+ return functionName.keyspace;
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/794d68b5/src/java/org/apache/cassandra/cql3/statements/DropTypeStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/DropTypeStatement.java b/src/java/org/apache/cassandra/cql3/statements/DropTypeStatement.java
index a3b82a4..ed21957 100644
--- a/src/java/org/apache/cassandra/cql3/statements/DropTypeStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/DropTypeStatement.java
@@ -20,6 +20,8 @@ package org.apache.cassandra.cql3.statements;
import org.apache.cassandra.auth.Permission;
import org.apache.cassandra.config.*;
import org.apache.cassandra.cql3.*;
+import org.apache.cassandra.cql3.functions.Function;
+import org.apache.cassandra.cql3.functions.Functions;
import org.apache.cassandra.db.marshal.*;
import org.apache.cassandra.exceptions.*;
import org.apache.cassandra.service.ClientState;
@@ -72,6 +74,15 @@ public class DropTypeStatement extends SchemaAlteringStatement
// we drop and 2) existing tables referencing the type (maybe in a nested
// way).
+ for (Function function : Functions.all())
+ {
+ if (isUsedBy(function.returnType()))
+ throw new InvalidRequestException(String.format("Cannot drop user type %s as it is still used by function %s", name, function));
+ for (AbstractType<?> argType : function.argTypes())
+ if (isUsedBy(argType))
+ throw new InvalidRequestException(String.format("Cannot drop user type %s as it is still used by function %s", name, function));
+ }
+
for (KSMetaData ksm2 : Schema.instance.getKeyspaceDefinitions())
{
for (UserType ut : ksm2.userTypes.getAllTypes().values())
http://git-wip-us.apache.org/repos/asf/cassandra/blob/794d68b5/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
index 2607e12..4e39614 100644
--- a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
@@ -630,7 +630,7 @@ public abstract class ModificationStatement implements CQLStatement
Selection.ResultSetBuilder builder = selection.resultSetBuilder(now);
SelectStatement.forSelection(cfm, selection).processColumnFamily(key, cf, options, now, builder);
- return builder.build();
+ return builder.build(options.getProtocolVersion());
}
public ResultMessage executeInternal(QueryState queryState, QueryOptions options) throws RequestValidationException, RequestExecutionException
http://git-wip-us.apache.org/repos/asf/cassandra/blob/794d68b5/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
index 2d28b71..3360d40 100644
--- a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
@@ -280,7 +280,7 @@ public class SelectStatement implements CQLStatement
processColumnFamily(row.key.getKey(), row.cf, options, now, result);
}
}
- return new ResultMessage.Rows(result.build());
+ return new ResultMessage.Rows(result.build(options.getProtocolVersion()));
}
public ResultMessage.Rows processResults(List<Row> rows, QueryOptions options, int limit, long now) throws RequestValidationException
@@ -1149,7 +1149,7 @@ public class SelectStatement implements CQLStatement
processColumnFamily(row.key.getKey(), row.cf, options, now, result);
}
- ResultSet cqlRows = result.build();
+ ResultSet cqlRows = result.build(options.getProtocolVersion());
orderResults(cqlRows);
@@ -1189,7 +1189,7 @@ public class SelectStatement implements CQLStatement
CQL3Row staticRow = iter.getStaticRow();
if (staticRow != null && !iter.hasNext() && !usesSecondaryIndexing && hasNoClusteringColumnsRestriction())
{
- result.newRow();
+ result.newRow(options.getProtocolVersion());
for (ColumnDefinition def : selection.getColumns())
{
switch (def.kind)
@@ -1212,7 +1212,7 @@ public class SelectStatement implements CQLStatement
CQL3Row cql3Row = iter.next();
// Respect requested order
- result.newRow();
+ result.newRow(options.getProtocolVersion());
// Respect selection order
for (ColumnDefinition def : selection.getColumns())
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/794d68b5/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java
index 555c1cd..21e30e2 100644
--- a/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java
+++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java
@@ -236,7 +236,7 @@ public class CqlRecordReader extends RecordReader<Long, Row>
public Long createKey()
{
- return new Long(0L);
+ return Long.valueOf(0L);
}
public Row createValue()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/794d68b5/src/java/org/apache/cassandra/transport/Server.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/Server.java b/src/java/org/apache/cassandra/transport/Server.java
index 15fad88..cc071b1 100644
--- a/src/java/org/apache/cassandra/transport/Server.java
+++ b/src/java/org/apache/cassandra/transport/Server.java
@@ -64,6 +64,7 @@ public class Server implements CassandraDaemon.Server
private static final Logger logger = LoggerFactory.getLogger(Server.class);
private static final boolean enableEpoll = Boolean.valueOf(System.getProperty("cassandra.native.epoll.enabled", "true"));
+ public static final int VERSION_2 = 2;
public static final int VERSION_3 = 3;
public static final int CURRENT_VERSION = VERSION_3;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/794d68b5/test/unit/org/apache/cassandra/cql3/CQLTester.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/CQLTester.java b/test/unit/org/apache/cassandra/cql3/CQLTester.java
index 9105b9d..68f90bd 100644
--- a/test/unit/org/apache/cassandra/cql3/CQLTester.java
+++ b/test/unit/org/apache/cassandra/cql3/CQLTester.java
@@ -18,8 +18,11 @@
package org.apache.cassandra.cql3;
import java.io.File;
+import java.io.IOException;
import java.math.BigDecimal;
import java.math.BigInteger;
+import java.net.InetAddress;
+import java.net.ServerSocket;
import java.nio.ByteBuffer;
import java.util.*;
import java.util.concurrent.CountDownLatch;
@@ -29,23 +32,31 @@ import java.util.concurrent.atomic.AtomicInteger;
import com.google.common.base.Objects;
import com.google.common.collect.ImmutableSet;
+import org.apache.cassandra.utils.ByteBufferUtil;
import org.junit.AfterClass;
import org.junit.After;
import org.junit.Assert;
+import org.junit.Before;
import org.junit.BeforeClass;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.datastax.driver.core.*;
+import com.datastax.driver.core.ResultSet;
import org.apache.cassandra.SchemaLoader;
import org.apache.cassandra.concurrent.ScheduledExecutors;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.Schema;
import org.apache.cassandra.db.Directories;
import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.SystemKeyspace;
import org.apache.cassandra.db.marshal.*;
+import org.apache.cassandra.db.marshal.TupleType;
import org.apache.cassandra.exceptions.*;
import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.serializers.TypeSerializer;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.transport.Server;
/**
* Base class for CQL tests.
@@ -55,17 +66,38 @@ public abstract class CQLTester
protected static final Logger logger = LoggerFactory.getLogger(CQLTester.class);
public static final String KEYSPACE = "cql_test_keyspace";
+ public static final String KEYSPACE_PER_TEST = "cql_test_keyspace_alt";
private static final boolean USE_PREPARED_VALUES = Boolean.valueOf(System.getProperty("cassandra.test.use_prepared", "true"));
private static final AtomicInteger seqNumber = new AtomicInteger();
+ private static org.apache.cassandra.transport.Server server;
+ private static final int nativePort;
+ private static final InetAddress nativeAddr;
+ private static final Cluster cluster[] = new Cluster[Server.CURRENT_VERSION];
+ private static final Session session[] = new Session[Server.CURRENT_VERSION];
+
static
{
// Once per-JVM is enough
SchemaLoader.prepareServer();
+
+ nativeAddr = InetAddress.getLoopbackAddress();
+
+ try
+ {
+ ServerSocket serverSocket = new ServerSocket(0);
+ nativePort = serverSocket.getLocalPort();
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
}
private String currentTable;
private final Set<String> currentTypes = new HashSet<>();
+ private final Set<String> currentFunctions = new HashSet<>();
+ private final Set<String> currentAggregates = new HashSet<>();
// We don't use USE_PREPARED_VALUES in the code below so some test can foce value preparation (if the result
// is not expected to be the same without preparation)
@@ -80,11 +112,28 @@ public abstract class CQLTester
@AfterClass
public static void tearDownClass()
{
+ for (Session sess : session)
+ if (sess != null)
+ sess.close();
+ for (Cluster cl : cluster)
+ if (cl != null)
+ cl.close();
+
+ if (server != null)
+ server.stop();
+ }
+
+ @Before
+ public void beforeTest() throws Throwable
+ {
+ schemaChange(String.format("CREATE KEYSPACE IF NOT EXISTS %s WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'}", KEYSPACE_PER_TEST));
}
@After
public void afterTest() throws Throwable
{
+ dropPerTestKeyspace();
+
// Restore standard behavior in case it was changed
usePrepared = USE_PREPARED_VALUES;
@@ -93,8 +142,12 @@ public abstract class CQLTester
final String tableToDrop = currentTable;
final Set<String> typesToDrop = currentTypes.isEmpty() ? Collections.emptySet() : new HashSet(currentTypes);
+ final Set<String> functionsToDrop = currentFunctions.isEmpty() ? Collections.emptySet() : new HashSet(currentFunctions);
+ final Set<String> aggregatesToDrop = currentAggregates.isEmpty() ? Collections.emptySet() : new HashSet(currentAggregates);
currentTable = null;
currentTypes.clear();
+ currentFunctions.clear();
+ currentAggregates.clear();
// We want to clean up after the test, but dropping a table is rather long so just do that asynchronously
ScheduledExecutors.optionalTasks.execute(new Runnable()
@@ -105,6 +158,12 @@ public abstract class CQLTester
{
schemaChange(String.format("DROP TABLE IF EXISTS %s.%s", KEYSPACE, tableToDrop));
+ for (String aggregateName : aggregatesToDrop)
+ schemaChange(String.format("DROP AGGREGATE IF EXISTS %s", aggregateName));
+
+ for (String functionName : functionsToDrop)
+ schemaChange(String.format("DROP FUNCTION IF EXISTS %s", functionName));
+
for (String typeName : typesToDrop)
schemaChange(String.format("DROP TYPE IF EXISTS %s.%s", KEYSPACE, typeName));
@@ -133,6 +192,40 @@ public abstract class CQLTester
});
}
+ // lazy initialization for all tests that require Java Driver
+ private static void requireNetwork() throws ConfigurationException
+ {
+ if (server != null)
+ return;
+
+ SystemKeyspace.finishStartup();
+ StorageService.instance.initServer();
+ SchemaLoader.startGossiper();
+
+ server = new org.apache.cassandra.transport.Server(nativeAddr, nativePort);
+ server.start();
+
+ for (int version = 1; version <= Server.CURRENT_VERSION; version++)
+ {
+ if (cluster[version-1] != null)
+ continue;
+
+ cluster[version-1] = Cluster.builder().addContactPoints(nativeAddr)
+ .withClusterName("Test Cluster")
+ .withPort(nativePort)
+ .withProtocolVersion(ProtocolVersion.fromInt(version))
+ .build();
+ session[version-1] = cluster[version-1].connect();
+
+ logger.info("Started Java Driver instance for protocol version {}", version);
+ }
+ }
+
+ protected void dropPerTestKeyspace() throws Throwable
+ {
+ execute(String.format("DROP KEYSPACE IF EXISTS %s", KEYSPACE_PER_TEST));
+ }
+
public void flush()
{
try
@@ -183,7 +276,7 @@ public abstract class CQLTester
protected String createType(String query)
{
- String typeName = "type_" + seqNumber.getAndIncrement();
+ String typeName = callerName() + "_type_" + seqNumber.getAndIncrement();
String fullQuery = String.format(query, KEYSPACE + "." + typeName);
currentTypes.add(typeName);
logger.info(fullQuery);
@@ -191,18 +284,48 @@ public abstract class CQLTester
return typeName;
}
+ protected String createFunction(String keyspace, String argTypes, String query) throws Throwable
+ {
+ String functionName = keyspace + "." + callerName() + "_function_" + seqNumber.getAndIncrement();
+ createFunctionOverload(functionName, argTypes, query);
+ return functionName;
+ }
+
+ protected void createFunctionOverload(String functionName, String argTypes, String query) throws Throwable
+ {
+ String fullQuery = String.format(query, functionName);
+ currentFunctions.add(functionName + '(' + argTypes + ')');
+ logger.info(fullQuery);
+ execute(fullQuery);
+ }
+
+ protected String createAggregate(String keyspace, String argTypes, String query) throws Throwable
+ {
+ String aggregateName = keyspace + "." + callerName() + "_aggregate_" + seqNumber.getAndIncrement();
+ createAggregateOverload(aggregateName, argTypes, query);
+ return aggregateName;
+ }
+
+ protected void createAggregateOverload(String aggregateName, String argTypes, String query) throws Throwable
+ {
+ String fullQuery = String.format(query, aggregateName);
+ currentAggregates.add(aggregateName + '(' + argTypes + ')');
+ logger.info(fullQuery);
+ execute(fullQuery);
+ }
+
protected void createTable(String query)
{
- currentTable = "table_" + seqNumber.getAndIncrement();
- String fullQuery = String.format(query, KEYSPACE + "." + currentTable);
+ currentTable = callerName() + "_table_" + seqNumber.getAndIncrement();
+ String fullQuery = formatQuery(query);
logger.info(fullQuery);
schemaChange(fullQuery);
}
protected void createTableMayThrow(String query) throws Throwable
{
- currentTable = "table_" + seqNumber.getAndIncrement();
- String fullQuery = String.format(query, KEYSPACE + "." + currentTable);
+ currentTable = callerName() + "_table_" + seqNumber.getAndIncrement();
+ String fullQuery = formatQuery(query);
logger.info(fullQuery);
try
{
@@ -216,14 +339,14 @@ public abstract class CQLTester
protected void alterTable(String query)
{
- String fullQuery = String.format(query, KEYSPACE + "." + currentTable);
+ String fullQuery = formatQuery(query);
logger.info(fullQuery);
schemaChange(fullQuery);
}
protected void alterTableMayThrow(String query) throws Throwable
{
- String fullQuery = String.format(query, KEYSPACE + "." + currentTable);
+ String fullQuery = formatQuery(query);
logger.info(fullQuery);
try
{
@@ -244,14 +367,14 @@ public abstract class CQLTester
protected void createIndex(String query)
{
- String fullQuery = String.format(query, KEYSPACE + "." + currentTable);
+ String fullQuery = formatQuery(query);
logger.info(fullQuery);
schemaChange(fullQuery);
}
protected void createIndexMayThrow(String query) throws Throwable
{
- String fullQuery = String.format(query, KEYSPACE + "." + currentTable);
+ String fullQuery = formatQuery(query);
logger.info(fullQuery);
try
{
@@ -270,6 +393,11 @@ public abstract class CQLTester
schemaChange(fullQuery);
}
+ private static String callerName()
+ {
+ return new Exception().getStackTrace()[2].getMethodName().toLowerCase();
+ }
+
private static void schemaChange(String query)
{
try
@@ -288,11 +416,24 @@ public abstract class CQLTester
return Schema.instance.getCFMetaData(KEYSPACE, currentTable);
}
+ protected com.datastax.driver.core.ResultSet executeNet(int protocolVersion, String query, Object... values) throws Throwable
+ {
+ requireNetwork();
+
+ return session[protocolVersion-1].execute(formatQuery(query), values);
+ }
+
+ private String formatQuery(String query)
+ {
+ query = currentTable == null ? query : String.format(query, KEYSPACE + "." + currentTable);
+ return query;
+ }
+
protected UntypedResultSet execute(String query, Object... values) throws Throwable
{
try
{
- query = currentTable == null ? query : String.format(query, KEYSPACE + "." + currentTable);
+ query = formatQuery(query);
UntypedResultSet rs;
if (usePrepared)
@@ -318,6 +459,64 @@ public abstract class CQLTester
}
}
+ protected void assertRowsNet(int protocolVersion, ResultSet result, Object[]... rows)
+ {
+ if (result == null)
+ {
+ if (rows.length > 0)
+ Assert.fail(String.format("No rows returned by query but %d expected", rows.length));
+ return;
+ }
+
+ ColumnDefinitions meta = result.getColumnDefinitions();
+ Iterator<Row> iter = result.iterator();
+ int i = 0;
+ while (iter.hasNext() && i < rows.length)
+ {
+ Object[] expected = rows[i];
+ Row actual = iter.next();
+
+ Assert.assertEquals(String.format("Invalid number of (expected) values provided for row %d (using protocol version %d)",
+ i, protocolVersion),
+ meta.size(), expected.length);
+
+ for (int j = 0; j < meta.size(); j++)
+ {
+ DataType type = meta.getType(j);
+ ByteBuffer expectedByteValue = type.serialize(expected[j], ProtocolVersion.fromInt(protocolVersion));
+ int expectedBytes = expectedByteValue.remaining();
+ ByteBuffer actualValue = actual.getBytesUnsafe(meta.getName(j));
+ int actualBytes = actualValue.remaining();
+
+ if (!Objects.equal(expectedByteValue, actualValue))
+ Assert.fail(String.format("Invalid value for row %d column %d (%s of type %s), " +
+ "expected <%s> (%d bytes) but got <%s> (%d bytes) " +
+ "(using protocol version %d)",
+ i, j, meta.getName(j), type,
+ type.format(expected[j]),
+ expectedBytes,
+ type.format(type.deserialize(actualValue, ProtocolVersion.fromInt(protocolVersion))),
+ actualBytes,
+ protocolVersion));
+ }
+ i++;
+ }
+
+ if (iter.hasNext())
+ {
+ while (iter.hasNext())
+ {
+ iter.next();
+ i++;
+ }
+ Assert.fail(String.format("Got less rows than expected. Expected %d but got %d (using protocol version %d).",
+ rows.length, i, protocolVersion));
+ }
+
+ Assert.assertTrue(String.format("Got %s rows than expected. Expected %d but got %d (using protocol version %d)",
+ rows.length>i ? "less" : "more", rows.length, i, protocolVersion), i == rows.length);
+ }
+
protected void assertRows(UntypedResultSet result, Object[]... rows)
{
if (result == null)
@@ -394,7 +593,7 @@ public abstract class CQLTester
protected void assertEmpty(UntypedResultSet result) throws Throwable
{
if (result != null && result.size() != 0)
- throw new InvalidRequestException(String.format("Expected empty result but got %d rows", result.size()));
+ throw new AssertionError(String.format("Expected empty result but got %d rows", result.size()));
}
protected void assertInvalid(String query, Object... values) throws Throwable
@@ -406,7 +605,16 @@ public abstract class CQLTester
{
try
{
- execute(query, values);
+ try
+ {
+ execute(query, values);
+ }
+ catch (RuntimeException e)
+ {
+ Throwable cause = e.getCause();
+ if (cause instanceof InvalidRequestException)
+ throw cause;
+ }
String q = USE_PREPARED_VALUES
? query + " (values: " + formatAllValues(values) + ")"
: replaceValues(query, values);